package main import ( "fmt" "image" "net/url" "sync" "time" "gocv.io/x/gocv" ) // FrameSource provides frames to process type FrameSource interface { // Next returns the next frame and whether to continue processing // Returns (frame, shouldContinue, error) Next() (gocv.Mat, bool, error) // NextImmediate returns the very next frame (ignoring skip count) // Used for immediate retry after low confidence/corruption NextImmediate() (gocv.Mat, bool, error) // Close cleans up resources Close() error // Name returns a description of the source Name() string // IsActive returns false if source has been closed IsActive() bool } // RTSPSource reads from an RTSP stream type RTSPSource struct { url string stream *gocv.VideoCapture frameNum int minFrameInterval time.Duration // Minimum time between processed frames lastProcessedTime time.Time // Last time we processed a frame lastAcquiredTime time.Time // Last time we acquired a frame (for logging) closed bool closeOnce sync.Once } // buildGStreamerPipeline constructs a GStreamer pipeline from an RTSP URL // Using explicit user-id/user-pw parameters works better than embedding in URL func buildGStreamerPipeline(rtspURL string) (string, error) { u, err := url.Parse(rtspURL) if err != nil { return "", fmt.Errorf("invalid RTSP URL: %w", err) } // Extract credentials var userID, userPW string if u.User != nil { userID = u.User.Username() userPW, _ = u.User.Password() } // Build location without credentials locationURL := fmt.Sprintf("rtsp://%s%s", u.Host, u.Path) // Build GStreamer pipeline with explicit credentials pipeline := fmt.Sprintf( "rtspsrc location=%s user-id=%s user-pw=%s latency=0 ! decodebin ! videoconvert ! appsink", locationURL, userID, userPW, ) return pipeline, nil } // NewRTSPSource creates a new RTSP frame source // Processes frames twice per second func NewRTSPSource(rtspURL string) (*RTSPSource, error) { // Build GStreamer pipeline with explicit credentials pipeline, err := buildGStreamerPipeline(rtspURL) if err != nil { return nil, err } logMessage(Console, Info, "📡 Connecting via GStreamer pipeline...") stream, err := gocv.OpenVideoCapture(pipeline) if err != nil { return nil, fmt.Errorf("failed to connect to RTSP stream: %w", err) } logMessage(Console, Info, "📊 Processing frames at 2 per second") return &RTSPSource{ url: rtspURL, stream: stream, minFrameInterval: 500 * time.Millisecond, lastProcessedTime: time.Time{}, // Zero time = process first frame immediately }, nil } // readFrame reads and validates the next frame from the stream func (s *RTSPSource) readFrame() (gocv.Mat, bool, error) { // Check if source was closed if s.closed { return gocv.NewMat(), false, nil } // Check if stream is valid if s.stream == nil { return gocv.NewMat(), false, fmt.Errorf("stream is null") } frame := gocv.NewMat() if ok := s.stream.Read(&frame); !ok { frame.Close() // Check again before reconnecting if s.closed { return gocv.NewMat(), false, nil } // Try to reconnect s.stream.Close() time.Sleep(5 * time.Second) // Check again after sleep if s.closed { return gocv.NewMat(), false, nil } // Rebuild GStreamer pipeline for reconnection pipeline, err := buildGStreamerPipeline(s.url) if err != nil { s.stream = nil s.closed = true return gocv.NewMat(), false, fmt.Errorf("reconnect failed: %w", err) } newStream, err := gocv.OpenVideoCapture(pipeline) if err != nil { s.stream = nil // Set to nil to prevent further read attempts s.closed = true // Mark as closed return gocv.NewMat(), false, fmt.Errorf("reconnect failed: %w", err) } s.stream = newStream return gocv.NewMat(), true, nil // Signal to retry } s.frameNum++ // Check frame validity if frame.Empty() || frame.Cols() < 640 || frame.Rows() < 480 { frame.Close() return gocv.NewMat(), true, nil // Signal to retry } return frame, true, nil } func (s *RTSPSource) Next() (gocv.Mat, bool, error) { for { frame, shouldContinue, err := s.readFrame() if err != nil || !shouldContinue { return frame, shouldContinue, err } if frame.Empty() { // readFrame returned empty (reconnecting or invalid), retry continue } // Check if enough time has passed since last processed frame was ACQUIRED now := time.Now() if !s.lastProcessedTime.IsZero() { elapsed := now.Sub(s.lastProcessedTime) if elapsed < s.minFrameInterval { // Not enough time passed, skip this frame frame.Close() continue } } // Enough time passed, mark acquisition time NOW (before processing) s.lastProcessedTime = now // Log frame acquisition interval if !s.lastAcquiredTime.IsZero() { interval := now.Sub(s.lastAcquiredTime).Milliseconds() logMessage(LogFile, Debug, " [TIMING] Frame acquired: +%dms since last (target: 500ms)", interval) } s.lastAcquiredTime = now return frame, true, nil } } func (s *RTSPSource) NextImmediate() (gocv.Mat, bool, error) { // Get the very next frame (no skip count) for { frame, shouldContinue, err := s.readFrame() if err != nil || !shouldContinue { return frame, shouldContinue, err } if frame.Empty() { // readFrame returned empty (reconnecting or invalid), retry continue } return frame, true, nil } } func (s *RTSPSource) Close() error { s.closeOnce.Do(func() { s.closed = true // DON'T close the stream here - it causes segfault if read is in progress // The stream will be cleaned up when main() exits }) return nil } func (s *RTSPSource) IsActive() bool { return !s.closed } func (s *RTSPSource) Name() string { return fmt.Sprintf("RTSP stream: %s", s.url) } // FileSource reads a single frame from a file type FileSource struct { path string done bool } // NewFileSource creates a new file frame source func NewFileSource(path string) *FileSource { return &FileSource{ path: path, done: false, } } func (s *FileSource) Next() (gocv.Mat, bool, error) { if s.done { return gocv.NewMat(), false, nil } frame := gocv.IMRead(s.path, gocv.IMReadColor) if frame.Empty() { return gocv.NewMat(), false, fmt.Errorf("failed to load frame: %s", s.path) } s.done = true return frame, false, nil // Return frame, but signal to stop after this } func (s *FileSource) NextImmediate() (gocv.Mat, bool, error) { // For file source, NextImmediate is same as Next return s.Next() } func (s *FileSource) Close() error { return nil } func (s *FileSource) IsActive() bool { return !s.done } func (s *FileSource) Name() string { return fmt.Sprintf("File: %s", s.path) } // preprocessFrame crops timestamp and rotates func preprocessFrame(frame gocv.Mat) gocv.Mat { // Crop timestamp (top 68 pixels) noTs := frame.Region(image.Rect(0, 68, frame.Cols(), frame.Rows())) rotated := gocv.NewMat() gocv.Rotate(noTs, &rotated, gocv.Rotate90Clockwise) noTs.Close() return rotated }