package main import ( "fmt" "net/url" "os" "sync" "syscall" "time" "gocv.io/x/gocv" ) // suppressStderr temporarily redirects stderr to /dev/null func suppressStderr() (restore func()) { stderr := os.Stderr devNull, _ := os.Open(os.DevNull) os.Stderr = devNull // Also redirect the actual file descriptor for C code oldFd, _ := syscall.Dup(2) syscall.Dup2(int(devNull.Fd()), 2) return func() { syscall.Dup2(oldFd, 2) syscall.Close(oldFd) devNull.Close() os.Stderr = stderr } } // Capture handles RTSP frame capture via GStreamer type Capture struct { stream *gocv.VideoCapture url string closed bool closeOnce sync.Once mu sync.Mutex } // NewCapture creates a new RTSP capture from URL func NewCapture(rtspURL string) (*Capture, error) { pipeline, err := buildPipeline(rtspURL) if err != nil { return nil, err } restore := suppressStderr() stream, err := gocv.OpenVideoCapture(pipeline) restore() if err != nil { return nil, fmt.Errorf("failed to connect: %w", err) } return &Capture{ stream: stream, url: rtspURL, }, nil } // buildPipeline creates GStreamer pipeline with credentials extracted func buildPipeline(rtspURL string) (string, error) { u, err := url.Parse(rtspURL) if err != nil { return "", fmt.Errorf("invalid URL: %w", err) } var userID, userPW string if u.User != nil { userID = u.User.Username() userPW, _ = u.User.Password() } location := fmt.Sprintf("rtsp://%s%s", u.Host, u.Path) return fmt.Sprintf( "rtspsrc location=%s user-id=%s user-pw=%s latency=0 ! decodebin ! videoconvert ! appsink", location, userID, userPW, ), nil } // NextFrame returns the next frame from the stream // Returns nil if stream is closed or error occurs func (c *Capture) NextFrame() gocv.Mat { c.mu.Lock() defer c.mu.Unlock() if c.closed || c.stream == nil { return gocv.NewMat() } frame := gocv.NewMat() if ok := c.stream.Read(&frame); !ok { frame.Close() c.reconnect() return gocv.NewMat() } if frame.Empty() { frame.Close() return gocv.NewMat() } return frame } // reconnect attempts to reconnect to the stream func (c *Capture) reconnect() { if c.closed { return } fmt.Println("Reconnecting to stream...") c.stream.Close() time.Sleep(5 * time.Second) pipeline, err := buildPipeline(c.url) if err != nil { fmt.Printf("Reconnect failed: %v\n", err) return } restore := suppressStderr() newStream, err := gocv.OpenVideoCapture(pipeline) restore() if err != nil { fmt.Printf("Reconnect failed: %v\n", err) return } c.stream = newStream } // Close closes the capture func (c *Capture) Close() { c.closeOnce.Do(func() { c.mu.Lock() c.closed = true c.mu.Unlock() }) } // IsActive returns true if capture is still active func (c *Capture) IsActive() bool { c.mu.Lock() defer c.mu.Unlock() return !c.closed }