go-astiav icon indicating copy to clipboard operation
go-astiav copied to clipboard

getting the specified video and audio data from the rtsp stream can be a memory leak issue

Open asphyxia-Github opened this issue 1 year ago • 10 comments

I referred to the question https://github.com/asticode/go-astiav/issues/115 .This function is implemented, but after my testing, I found that the memory running in the docker container will grow slowly, and the go heap memory is relatively stable in the go gc log. So my guess is that the C-side code didn't free up memory in time. Here's my code.

package main

import (
	"errors"
	"flag"
	"fmt"
	"hiyan_capture/buffer"
	"log"
	"os"
	"strings"
	"time"

	"github.com/asticode/go-astiav"
)

var (
	inputUrl = flag.String("i", "", "the input path")
)

func init() {
	// Handle ffmpeg logs
	astiav.SetLogLevel(astiav.LogLevelInfo)
	astiav.SetLogCallback(func(c astiav.Classer, l astiav.LogLevel, fmt, msg string) {
		var cs string
		if c != nil {
			if cl := c.Class(); cl != nil {
				cs = " - class: " + cl.String()
			}
		}
		log.Printf("ffmpeg log: %s%s - level: %d\n", strings.TrimSpace(msg), cs, l)
	})
}

func main() {
	for true {
		demo()
		time.Sleep(1 * time.Second)
	}
}

const AV_NOPTS_VALUE = int64(-1 << 63)

// only video
func demo() {
	// Parse flags
	flag.Parse()

	// Usage
	if *inputUrl == "" {
		log.Println("Usage: <binary path> -i <input path> -o <output path>")
		return
	}

	// Allocate packet
	pkt := astiav.AllocPacket()
	defer pkt.Free()

	// Allocate input format context
	inputFormatContext := astiav.AllocFormatContext()
	if inputFormatContext == nil {
		log.Fatal(errors.New("main: input format context is nil"))
	}
	defer inputFormatContext.Free()

	options := &astiav.Dictionary{}
	defer options.Free()
	_ = options.Set("rtsp_transport", "tcp", astiav.DictionaryFlags(0)) 
	_ = options.Set("buffer_size", "8192", astiav.DictionaryFlags(0))     
	_ = options.Set("max_delay", "5000", astiav.DictionaryFlags(0))   
	_ = options.Set("max_bandwidth", "1000000", astiav.DictionaryFlags(0)) 
	_ = options.Set("fifo_size", "1", astiav.DictionaryFlags(0))       
	_ = options.Set("threads", "1", astiav.DictionaryFlags(0))

	// Open input
	if err := inputFormatContext.OpenInput(*inputUrl, nil, options); err != nil {
		log.Fatal(fmt.Errorf("main: opening input failed: %w", err))
	}
	defer inputFormatContext.CloseInput()

	// Find stream info
	if err := inputFormatContext.FindStreamInfo(nil); err != nil {
		log.Fatal(fmt.Errorf("main: finding stream info failed: %w", err))
	}

	// Allocate output format context
	outputFormatContext, err := astiav.AllocOutputFormatContext(nil, "mp4", "")
	if err != nil {
		log.Fatal(fmt.Errorf("main: allocating output format context failed: %w", err))
	}
	if outputFormatContext == nil {
		log.Fatal(errors.New("main: output format context is nil"))
	}
	defer outputFormatContext.Free()

	// Open file
	videoBuf := buffer.NewEmptyBuffer()
	defer func() {
		videoBuf.Clear()
		videoBuf = nil
	}()

	// Allocate io context
	ioContext, err := astiav.AllocIOContext(
		4096,
		true,
		nil,
		func(offset int64, whence int) (n int64, err error) {
			return videoBuf.Seek(offset, whence)
		},
		func(b []byte) (n int, err error) {
			return videoBuf.Write(b)
		},
	)
	if err != nil {
		log.Fatal(fmt.Errorf("main: allocating io context failed: %w", err))
	}
	defer ioContext.Free()

	// Store io context
	outputFormatContext.SetPb(ioContext)

	// Loop through streams
	inputStreams := make(map[int]*astiav.Stream)  // Indexed by input stream index
	outputStreams := make(map[int]*astiav.Stream) // Indexed by input stream index
	for _, is := range inputFormatContext.Streams() {
		// Only process audio or video
		if is.CodecParameters().MediaType() != astiav.MediaTypeAudio &&
			is.CodecParameters().MediaType() != astiav.MediaTypeVideo {
			continue
		}

		// Add input stream
		inputStreams[is.Index()] = is

		// Add stream to output format context
		os := outputFormatContext.NewStream(nil)
		if os == nil {
			log.Fatal(errors.New("main: output stream is nil"))
		}

		// Copy codec parameters
		if err = is.CodecParameters().Copy(os.CodecParameters()); err != nil {
			log.Fatal(fmt.Errorf("main: copying codec parameters failed: %w", err))
		}

		// Reset codec tag
		os.CodecParameters().SetCodecTag(0)

		// Add output stream
		outputStreams[is.Index()] = os
	}

	// Write header
	if err = outputFormatContext.WriteHeader(nil); err != nil {
		log.Fatal(fmt.Errorf("main: writing header failed: %w", err))
	}

	// Loop through packets
	startTime := time.Now()
	frameCount := 0
	for {
		// We use a closure to ease unreferencing packet
		if stop := func() bool {
			// Read frame
			if err = inputFormatContext.ReadFrame(pkt); err != nil {
				if errors.Is(err, astiav.ErrEof) {
					return true
				}
				log.Fatal(fmt.Errorf("main: reading frame failed: %w", err))
			}

			// Make sure to unreference packet
			defer pkt.Unref()

			// Get input stream
			inputStream, ok := inputStreams[pkt.StreamIndex()]
			if !ok {
				return false
			}

			// Get output stream
			outputStream, ok := outputStreams[pkt.StreamIndex()]
			if !ok {
				return false
			}

			// Make sure the timestamp is set
			avgFrameRate := inputStream.AvgFrameRate().Float64()
			if pkt.Pts() == AV_NOPTS_VALUE {
				frameTime := float64(frameCount*1000) / avgFrameRate 
				pkt.SetPts(int64(frameTime))                         
			}
			if pkt.Dts() == AV_NOPTS_VALUE {
				pkt.SetDts(pkt.Pts())
			}

			// Update packet
			pkt.SetStreamIndex(outputStream.Index())
			pkt.RescaleTs(inputStream.TimeBase(), outputStream.TimeBase())
			pkt.SetPos(-1)

			// Write frame
			if err = outputFormatContext.WriteInterleavedFrame(pkt); err != nil {
				log.Fatal(fmt.Errorf("main: writing interleaved frame failed: %w", err))
			}
			if time.Since(startTime) > 5*time.Second {
				return true
			} else {
				frameCount++
				return false
			}
		}(); stop {
			break
		}
	}

	// Write trailer
	if err = outputFormatContext.WriteTrailer(); err != nil {
		log.Fatal(fmt.Errorf("main: writing trailer failed: %w", err))
	}

        // Save the data to the local PC in demo
	save2(videoBuf.Bytes(), "aaa.mp4")

	// Success
	log.Println("success")
}

I think I've freed up all the memory I can with go. That's why I'm asking for your help

Image

asphyxia-Github avatar Apr 03 '25 05:04 asphyxia-Github

Oh, I forgot to mention that the memory leak does not occur on windows systems, for the time being, the problem only appears in ubuntu docker container, container version is: Ubuntu 24.04.1 LTS

asphyxia-Github avatar Apr 03 '25 05:04 asphyxia-Github

I may have misunderstood your code but aren't you copying all data to a buffer in memory (hence the memory increasing over time) before writing all data to a file but only at the end ? 🤔

If I'm right and you're looking to avoid the increased memory over time, you could try writing to a file directly instead of using a memory buffer in between 👍

asticode avatar Apr 03 '25 07:04 asticode

@asticode Thank you for your busy reply. My purpose is not to write the acquired data into a file, just like the brother I referred to, I need to write the data into redis for the convenience of subsequent use, the main purpose is to reduce io, and this demo is saved as a file for testing purposes only.

asphyxia-Github avatar Apr 03 '25 07:04 asphyxia-Github

Got it 👍

I don't know how videoBuf := buffer.NewEmptyBuffer() works internally, but if writing into it append data to a []byte then it explains the increase in memory over time.

Let me know whether that answers your question 👍

asticode avatar Apr 03 '25 07:04 asticode

This is the code for the buffer package

package buffer

import (
	"errors"
	"io"
)

type Buffer struct {
	data   []byte
	offset int64
}

func NewEmptyBuffer() *Buffer {
	return &Buffer{
		data:   make([]byte, 0),
		offset: 0,
	}
}

func NewBuffer(b []byte) *Buffer {
	return &Buffer{
		data:   b,
		offset: 0,
	}
}

func (b *Buffer) Write(p []byte) (n int, err error) {
	bufSize := len(p)
	if int(b.offset) < len(b.data) {
		b.data = append(append(b.data[:b.offset], p...), b.data[int(b.offset)+bufSize:]...)
	} else {
		b.data = append(b.data, p...)
	}
	n = bufSize
	b.offset += int64(n)
	return
}

func (b *Buffer) Read(p []byte) (n int, err error) {
	bufSize := len(p)
	if int(b.offset) > len(b.data) {
		return
	}
	if int(b.offset)+bufSize < len(b.data) {
		copy(p, b.data[b.offset:int(b.offset)+bufSize])
	} else {
		copy(p, b.data[b.offset:])
	}
	n = bufSize
	b.offset += int64(n)
	return
}

func (b *Buffer) Bytes() []byte {
	return b.data
}

func (b *Buffer) Seek(offset int64, whence int) (int64, error) {
	var newOffset int64
	switch whence {
	case io.SeekStart:
		newOffset = offset
	case io.SeekCurrent:
		newOffset = b.offset + offset
	case io.SeekEnd:
		newOffset = int64(len(b.data)) + offset
	default:
		return 0, errors.New("invalid whence value")
	}

	if newOffset < 0 {
		return 0, errors.New("negative offset")
	}

	b.offset = newOffset
	return b.offset, nil
}

func (b *Buffer) Clear() {
	b.data = make([]byte, 0)
	b.offset = 0
}

asphyxia-Github avatar Apr 03 '25 07:04 asphyxia-Github

Since writing in the buffer appends data to a []byte in memory, it seems logical memory usage increases over time 🤔

asticode avatar Apr 03 '25 07:04 asticode

I'm sorry, I don't quite understand what you mean. My logic will free videoBuf's memory every five seconds.

	videoBuf := buffer.NewEmptyBuffer()
	defer func() {
		videoBuf.Clear()
		videoBuf = nil
	}()

	// Allocate io context
	ioContext, err := astiav.AllocIOContext(
		4096,
		true,
		nil,
		func(offset int64, whence int) (n int64, err error) {
			return videoBuf.Seek(offset, whence)
		},
		func(b []byte) (n int, err error) {
			return videoBuf.Write(b)
		},
	)

asphyxia-Github avatar Apr 03 '25 08:04 asphyxia-Github

My logic will free videoBuf's memory every five seconds.

Where is it done? I don't see it in the code you've shared.

What I see is that the videoBuf's memory is cleared at the end of the process 🤔

asticode avatar Apr 03 '25 08:04 asticode

Sorry, I thought Videobuf.clear () would reclaim its memory on the next gc, so I said videoBuf's memory was free. Is there something wrong with my thinking?

asphyxia-Github avatar Apr 03 '25 08:04 asphyxia-Github

When reaching the end your code, memory should be fine. However I don't really get your "every five seconds", since I don't see anything related to a 5 seconds interval in the code you've shared.

Also, memory might not be released to the OS on the next GC, it depends on the allocator which is used. Therefore memory should be released properly in the in_use space but may not be released in the alloc space.

asticode avatar Apr 03 '25 12:04 asticode

ok After my modification, it is found that there is no risk of memory leakage. Before I used multiple concurrent processing and a number of astiav.FormatContext, in the normal release of resources in the early stage will still occur slow memory growth, but the memory usage to a bottleneck will not continue to grow, generally reflected in the memory usage than the direct use of ffmpeg command to use the memory is higher. Using only one astiav.FormatContext object for each route, as I've tweaked and tested, uses the same amount of memory as using ffmpeg directly. Thank you again for your advice, and I am sorry that I was not clear before. :blush:

asphyxia-Github avatar Apr 08 '25 01:04 asphyxia-Github

No worries, glad everything worked out fine ❤

asticode avatar Apr 12 '25 13:04 asticode