pulse-monitor-v2/capture.go

145 lines
2.8 KiB
Go

package main
import (
"fmt"
"net/url"
"os"
"sync"
"syscall"
"time"
"gocv.io/x/gocv"
)
// suppressStderr temporarily redirects stderr to /dev/null
func suppressStderr() (restore func()) {
stderr := os.Stderr
devNull, _ := os.Open(os.DevNull)
os.Stderr = devNull
// Also redirect the actual file descriptor for C code
oldFd, _ := syscall.Dup(2)
syscall.Dup2(int(devNull.Fd()), 2)
return func() {
syscall.Dup2(oldFd, 2)
syscall.Close(oldFd)
devNull.Close()
os.Stderr = stderr
}
}
// Capture handles RTSP frame capture via GStreamer
type Capture struct {
stream *gocv.VideoCapture
url string
closed bool
closeOnce sync.Once
mu sync.Mutex
}
// NewCapture creates a new RTSP capture from URL
func NewCapture(rtspURL string) (*Capture, error) {
pipeline, err := buildPipeline(rtspURL)
if err != nil {
return nil, err
}
restore := suppressStderr()
stream, err := gocv.OpenVideoCapture(pipeline)
restore()
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
return &Capture{
stream: stream,
url: rtspURL,
}, nil
}
// buildPipeline creates GStreamer pipeline with credentials extracted
func buildPipeline(rtspURL string) (string, error) {
u, err := url.Parse(rtspURL)
if err != nil {
return "", fmt.Errorf("invalid URL: %w", err)
}
var userID, userPW string
if u.User != nil {
userID = u.User.Username()
userPW, _ = u.User.Password()
}
location := fmt.Sprintf("rtsp://%s%s", u.Host, u.Path)
return fmt.Sprintf(
"rtspsrc location=%s user-id=%s user-pw=%s latency=0 ! decodebin ! videoconvert ! appsink",
location, userID, userPW,
), nil
}
// NextFrame returns the next frame from the stream
// Returns nil if stream is closed or error occurs
func (c *Capture) NextFrame() gocv.Mat {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed || c.stream == nil {
return gocv.NewMat()
}
frame := gocv.NewMat()
if ok := c.stream.Read(&frame); !ok {
frame.Close()
c.reconnect()
return gocv.NewMat()
}
if frame.Empty() {
frame.Close()
return gocv.NewMat()
}
return frame
}
// reconnect attempts to reconnect to the stream
func (c *Capture) reconnect() {
if c.closed {
return
}
fmt.Println("Reconnecting to stream...")
c.stream.Close()
time.Sleep(5 * time.Second)
pipeline, err := buildPipeline(c.url)
if err != nil {
fmt.Printf("Reconnect failed: %v\n", err)
return
}
restore := suppressStderr()
newStream, err := gocv.OpenVideoCapture(pipeline)
restore()
if err != nil {
fmt.Printf("Reconnect failed: %v\n", err)
return
}
c.stream = newStream
}
// Close closes the capture
func (c *Capture) Close() {
c.closeOnce.Do(func() {
c.mu.Lock()
c.closed = true
c.mu.Unlock()
})
}
// IsActive returns true if capture is still active
func (c *Capture) IsActive() bool {
c.mu.Lock()
defer c.mu.Unlock()
return !c.closed
}