getting the specified video and audio data from the rtsp stream can be a memory leak issue
I referred to the question https://github.com/asticode/go-astiav/issues/115 .This function is implemented, but after my testing, I found that the memory running in the docker container will grow slowly, and the go heap memory is relatively stable in the go gc log. So my guess is that the C-side code didn't free up memory in time. Here's my code.
package main
import (
"errors"
"flag"
"fmt"
"hiyan_capture/buffer"
"log"
"os"
"strings"
"time"
"github.com/asticode/go-astiav"
)
var (
inputUrl = flag.String("i", "", "the input path")
)
func init() {
// Handle ffmpeg logs
astiav.SetLogLevel(astiav.LogLevelInfo)
astiav.SetLogCallback(func(c astiav.Classer, l astiav.LogLevel, fmt, msg string) {
var cs string
if c != nil {
if cl := c.Class(); cl != nil {
cs = " - class: " + cl.String()
}
}
log.Printf("ffmpeg log: %s%s - level: %d\n", strings.TrimSpace(msg), cs, l)
})
}
func main() {
for true {
demo()
time.Sleep(1 * time.Second)
}
}
const AV_NOPTS_VALUE = int64(-1 << 63)
// only video
func demo() {
// Parse flags
flag.Parse()
// Usage
if *inputUrl == "" {
log.Println("Usage: <binary path> -i <input path> -o <output path>")
return
}
// Allocate packet
pkt := astiav.AllocPacket()
defer pkt.Free()
// Allocate input format context
inputFormatContext := astiav.AllocFormatContext()
if inputFormatContext == nil {
log.Fatal(errors.New("main: input format context is nil"))
}
defer inputFormatContext.Free()
options := &astiav.Dictionary{}
defer options.Free()
_ = options.Set("rtsp_transport", "tcp", astiav.DictionaryFlags(0))
_ = options.Set("buffer_size", "8192", astiav.DictionaryFlags(0))
_ = options.Set("max_delay", "5000", astiav.DictionaryFlags(0))
_ = options.Set("max_bandwidth", "1000000", astiav.DictionaryFlags(0))
_ = options.Set("fifo_size", "1", astiav.DictionaryFlags(0))
_ = options.Set("threads", "1", astiav.DictionaryFlags(0))
// Open input
if err := inputFormatContext.OpenInput(*inputUrl, nil, options); err != nil {
log.Fatal(fmt.Errorf("main: opening input failed: %w", err))
}
defer inputFormatContext.CloseInput()
// Find stream info
if err := inputFormatContext.FindStreamInfo(nil); err != nil {
log.Fatal(fmt.Errorf("main: finding stream info failed: %w", err))
}
// Allocate output format context
outputFormatContext, err := astiav.AllocOutputFormatContext(nil, "mp4", "")
if err != nil {
log.Fatal(fmt.Errorf("main: allocating output format context failed: %w", err))
}
if outputFormatContext == nil {
log.Fatal(errors.New("main: output format context is nil"))
}
defer outputFormatContext.Free()
// Open file
videoBuf := buffer.NewEmptyBuffer()
defer func() {
videoBuf.Clear()
videoBuf = nil
}()
// Allocate io context
ioContext, err := astiav.AllocIOContext(
4096,
true,
nil,
func(offset int64, whence int) (n int64, err error) {
return videoBuf.Seek(offset, whence)
},
func(b []byte) (n int, err error) {
return videoBuf.Write(b)
},
)
if err != nil {
log.Fatal(fmt.Errorf("main: allocating io context failed: %w", err))
}
defer ioContext.Free()
// Store io context
outputFormatContext.SetPb(ioContext)
// Loop through streams
inputStreams := make(map[int]*astiav.Stream) // Indexed by input stream index
outputStreams := make(map[int]*astiav.Stream) // Indexed by input stream index
for _, is := range inputFormatContext.Streams() {
// Only process audio or video
if is.CodecParameters().MediaType() != astiav.MediaTypeAudio &&
is.CodecParameters().MediaType() != astiav.MediaTypeVideo {
continue
}
// Add input stream
inputStreams[is.Index()] = is
// Add stream to output format context
os := outputFormatContext.NewStream(nil)
if os == nil {
log.Fatal(errors.New("main: output stream is nil"))
}
// Copy codec parameters
if err = is.CodecParameters().Copy(os.CodecParameters()); err != nil {
log.Fatal(fmt.Errorf("main: copying codec parameters failed: %w", err))
}
// Reset codec tag
os.CodecParameters().SetCodecTag(0)
// Add output stream
outputStreams[is.Index()] = os
}
// Write header
if err = outputFormatContext.WriteHeader(nil); err != nil {
log.Fatal(fmt.Errorf("main: writing header failed: %w", err))
}
// Loop through packets
startTime := time.Now()
frameCount := 0
for {
// We use a closure to ease unreferencing packet
if stop := func() bool {
// Read frame
if err = inputFormatContext.ReadFrame(pkt); err != nil {
if errors.Is(err, astiav.ErrEof) {
return true
}
log.Fatal(fmt.Errorf("main: reading frame failed: %w", err))
}
// Make sure to unreference packet
defer pkt.Unref()
// Get input stream
inputStream, ok := inputStreams[pkt.StreamIndex()]
if !ok {
return false
}
// Get output stream
outputStream, ok := outputStreams[pkt.StreamIndex()]
if !ok {
return false
}
// Make sure the timestamp is set
avgFrameRate := inputStream.AvgFrameRate().Float64()
if pkt.Pts() == AV_NOPTS_VALUE {
frameTime := float64(frameCount*1000) / avgFrameRate
pkt.SetPts(int64(frameTime))
}
if pkt.Dts() == AV_NOPTS_VALUE {
pkt.SetDts(pkt.Pts())
}
// Update packet
pkt.SetStreamIndex(outputStream.Index())
pkt.RescaleTs(inputStream.TimeBase(), outputStream.TimeBase())
pkt.SetPos(-1)
// Write frame
if err = outputFormatContext.WriteInterleavedFrame(pkt); err != nil {
log.Fatal(fmt.Errorf("main: writing interleaved frame failed: %w", err))
}
if time.Since(startTime) > 5*time.Second {
return true
} else {
frameCount++
return false
}
}(); stop {
break
}
}
// Write trailer
if err = outputFormatContext.WriteTrailer(); err != nil {
log.Fatal(fmt.Errorf("main: writing trailer failed: %w", err))
}
// Save the data to the local PC in demo
save2(videoBuf.Bytes(), "aaa.mp4")
// Success
log.Println("success")
}
I think I've freed up all the memory I can with go. That's why I'm asking for your help
Oh, I forgot to mention that the memory leak does not occur on windows systems, for the time being, the problem only appears in ubuntu docker container, container version is: Ubuntu 24.04.1 LTS
I may have misunderstood your code but aren't you copying all data to a buffer in memory (hence the memory increasing over time) before writing all data to a file but only at the end ? 🤔
If I'm right and you're looking to avoid the increased memory over time, you could try writing to a file directly instead of using a memory buffer in between 👍
@asticode Thank you for your busy reply. My purpose is not to write the acquired data into a file, just like the brother I referred to, I need to write the data into redis for the convenience of subsequent use, the main purpose is to reduce io, and this demo is saved as a file for testing purposes only.
Got it 👍
I don't know how videoBuf := buffer.NewEmptyBuffer() works internally, but if writing into it append data to a []byte then it explains the increase in memory over time.
Let me know whether that answers your question 👍
This is the code for the buffer package
package buffer
import (
"errors"
"io"
)
type Buffer struct {
data []byte
offset int64
}
func NewEmptyBuffer() *Buffer {
return &Buffer{
data: make([]byte, 0),
offset: 0,
}
}
func NewBuffer(b []byte) *Buffer {
return &Buffer{
data: b,
offset: 0,
}
}
func (b *Buffer) Write(p []byte) (n int, err error) {
bufSize := len(p)
if int(b.offset) < len(b.data) {
b.data = append(append(b.data[:b.offset], p...), b.data[int(b.offset)+bufSize:]...)
} else {
b.data = append(b.data, p...)
}
n = bufSize
b.offset += int64(n)
return
}
func (b *Buffer) Read(p []byte) (n int, err error) {
bufSize := len(p)
if int(b.offset) > len(b.data) {
return
}
if int(b.offset)+bufSize < len(b.data) {
copy(p, b.data[b.offset:int(b.offset)+bufSize])
} else {
copy(p, b.data[b.offset:])
}
n = bufSize
b.offset += int64(n)
return
}
func (b *Buffer) Bytes() []byte {
return b.data
}
func (b *Buffer) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = b.offset + offset
case io.SeekEnd:
newOffset = int64(len(b.data)) + offset
default:
return 0, errors.New("invalid whence value")
}
if newOffset < 0 {
return 0, errors.New("negative offset")
}
b.offset = newOffset
return b.offset, nil
}
func (b *Buffer) Clear() {
b.data = make([]byte, 0)
b.offset = 0
}
Since writing in the buffer appends data to a []byte in memory, it seems logical memory usage increases over time 🤔
I'm sorry, I don't quite understand what you mean. My logic will free videoBuf's memory every five seconds.
videoBuf := buffer.NewEmptyBuffer()
defer func() {
videoBuf.Clear()
videoBuf = nil
}()
// Allocate io context
ioContext, err := astiav.AllocIOContext(
4096,
true,
nil,
func(offset int64, whence int) (n int64, err error) {
return videoBuf.Seek(offset, whence)
},
func(b []byte) (n int, err error) {
return videoBuf.Write(b)
},
)
My logic will free videoBuf's memory every five seconds.
Where is it done? I don't see it in the code you've shared.
What I see is that the videoBuf's memory is cleared at the end of the process 🤔
Sorry, I thought Videobuf.clear () would reclaim its memory on the next gc, so I said videoBuf's memory was free. Is there something wrong with my thinking?
When reaching the end your code, memory should be fine. However I don't really get your "every five seconds", since I don't see anything related to a 5 seconds interval in the code you've shared.
Also, memory might not be released to the OS on the next GC, it depends on the allocator which is used. Therefore memory should be released properly in the in_use space but may not be released in the alloc space.
ok After my modification, it is found that there is no risk of memory leakage. Before I used multiple concurrent processing and a number of astiav.FormatContext, in the normal release of resources in the early stage will still occur slow memory growth, but the memory usage to a bottleneck will not continue to grow, generally reflected in the memory usage than the direct use of ffmpeg command to use the memory is higher. Using only one astiav.FormatContext object for each route, as I've tweaked and tested, uses the same amount of memory as using ffmpeg directly. Thank you again for your advice, and I am sorry that I was not clear before. :blush:
No worries, glad everything worked out fine ❤