// Package client provides VRS JSON format client implementation for connecting to readsb receivers. // // This file implements a VRS (Virtual Radar Server) JSON format client that connects // to readsb instances with --net-vrs-port enabled. VRS JSON is a simpler alternative // to the Beast binary protocol, providing aircraft data as JSON over TCP. package client import ( "context" "fmt" "net" "sync" "time" "skyview/internal/merger" "skyview/internal/modes" "skyview/internal/vrs" ) // VRSClient handles connection to a single readsb VRS JSON TCP stream // // The VRS client provides robust connectivity with: // - Automatic reconnection with exponential backoff // - JSON message parsing and processing // - Integration with data merger // - Source status tracking and statistics // - Graceful shutdown handling // // VRS JSON is simpler than Beast format but provides the same aircraft data type VRSClient struct { source *merger.Source // Source configuration and status merger *merger.Merger // Data merger for multi-source fusion conn net.Conn // TCP connection to VRS source parser *vrs.Parser // VRS JSON format parser msgChan chan *vrs.VRSMessage // Buffered channel for parsed messages errChan chan error // Error reporting channel stopChan chan struct{} // Shutdown signal channel wg sync.WaitGroup // Wait group for goroutine coordination // Reconnection parameters reconnectDelay time.Duration // Initial reconnect delay maxReconnect time.Duration // Maximum reconnect delay (for backoff cap) } // NewVRSClient creates a new VRS JSON format TCP client for a specific data source // // The client is configured with: // - Buffered message channel (100 messages) for handling updates // - Error channel for connection and parsing issues // - Initial reconnect delay of 5 seconds // - Maximum reconnect delay of 60 seconds (exponential backoff cap) // // Parameters: // - source: Source configuration including host, port, and metadata // - merger: Data merger instance for aircraft state management // // Returns a configured but not yet started VRSClient func NewVRSClient(source *merger.Source, merger *merger.Merger) *VRSClient { return &VRSClient{ source: source, merger: merger, msgChan: make(chan *vrs.VRSMessage, 100), errChan: make(chan error, 10), stopChan: make(chan struct{}), reconnectDelay: 5 * time.Second, maxReconnect: 60 * time.Second, } } // Start begins the client connection and message processing in the background // // The client will: // - Attempt to connect to the configured VRS source // - Handle connection failures with exponential backoff // - Start message reading and processing goroutines // - Continuously reconnect if the connection is lost // // The method returns immediately; the client runs in background goroutines // until Stop() is called or the context is cancelled // // Parameters: // - ctx: Context for cancellation and timeout control func (c *VRSClient) Start(ctx context.Context) { c.wg.Add(1) go c.run(ctx) } // Stop gracefully shuts down the client and all associated goroutines // // The shutdown process: // 1. Signals all goroutines to stop via stopChan // 2. Closes the TCP connection if active // 3. Waits for all goroutines to complete // // This method blocks until the shutdown is complete func (c *VRSClient) Stop() { close(c.stopChan) if c.conn != nil { c.conn.Close() } c.wg.Wait() } // run implements the main client connection and reconnection loop // // This method handles the complete client lifecycle: // 1. Connection establishment with timeout // 2. Exponential backoff on connection failures // 3. Message parsing and processing goroutine management // 4. Connection monitoring and failure detection // 5. Automatic reconnection on disconnection // // The exponential backoff starts at reconnectDelay (5s) and doubles on each // failure up to maxReconnect (60s), then resets on successful connection // // Source status is updated to reflect connection state for monitoring func (c *VRSClient) run(ctx context.Context) { defer c.wg.Done() reconnectDelay := c.reconnectDelay for { select { case <-ctx.Done(): return case <-c.stopChan: return default: } // Connect to VRS JSON stream addr := fmt.Sprintf("%s:%d", c.source.Host, c.source.Port) fmt.Printf("Connecting to VRS JSON stream at %s (%s)...\n", addr, c.source.Name) conn, err := net.DialTimeout("tcp", addr, 30*time.Second) if err != nil { fmt.Printf("Failed to connect to VRS source %s: %v\n", c.source.Name, err) c.source.Active = false // Exponential backoff time.Sleep(reconnectDelay) if reconnectDelay < c.maxReconnect { reconnectDelay *= 2 } continue } c.conn = conn c.source.Active = true reconnectDelay = c.reconnectDelay // Reset backoff fmt.Printf("Connected to VRS source %s at %s\n", c.source.Name, addr) // Create parser for this connection c.parser = vrs.NewParser(conn, c.source.ID) // Start processing messages c.wg.Add(2) go c.readMessages() go c.processMessages() // Wait for disconnect select { case <-ctx.Done(): c.conn.Close() return case <-c.stopChan: c.conn.Close() return case err := <-c.errChan: fmt.Printf("Error from VRS source %s: %v\n", c.source.Name, err) c.conn.Close() c.source.Active = false } // Wait for goroutines to finish time.Sleep(1 * time.Second) } } // readMessages runs in a dedicated goroutine to read VRS JSON messages // // This method: // - Continuously reads from the TCP connection // - Parses VRS JSON data into Message structs // - Queues parsed messages for processing // - Reports parsing errors to the error channel // // The method blocks on the parser's ParseStream call and exits when // the connection is closed or an unrecoverable error occurs func (c *VRSClient) readMessages() { defer c.wg.Done() c.parser.ParseStream(c.msgChan, c.errChan) } // processMessages runs in a dedicated goroutine to process aircraft data // // For each received VRS message, this method: // 1. Iterates through all aircraft in the message // 2. Converts VRS format to internal aircraft representation // 3. Updates the data merger with new aircraft state // 4. Updates source statistics (message count) // // Invalid or unparseable aircraft data is silently discarded to maintain // system stability. The merger handles data fusion from multiple sources // and conflict resolution based on signal strength func (c *VRSClient) processMessages() { defer c.wg.Done() for { select { case <-c.stopChan: return case msg := <-c.msgChan: if msg == nil { return } // Process each aircraft in the message for _, vrsAircraft := range msg.AcList { // Convert VRS aircraft to internal format aircraft := c.convertVRSToAircraft(&vrsAircraft) if aircraft == nil { continue // Skip invalid aircraft } // Update merger with new data // Note: VRS doesn't provide signal strength, so we use a default value c.merger.UpdateAircraft( c.source.ID, aircraft, -30.0, // Default signal strength for VRS sources vrsAircraft.GetTimestamp(), ) // Update source statistics c.source.Messages++ } } } } // convertVRSToAircraft converts VRS JSON aircraft to internal aircraft format // // This method maps VRS JSON fields to the internal aircraft structure used // by the merger. It handles: // - ICAO address extraction and validation // - Position data (lat/lon) // - Altitude (barometric and geometric) // - Speed and heading // - Callsign and squawk code // - Ground status // // Returns nil if the aircraft data is invalid or incomplete func (c *VRSClient) convertVRSToAircraft(vrs *vrs.VRSAircraft) *modes.Aircraft { // Get ICAO address icao, err := vrs.GetICAO24() if err != nil { return nil // Invalid ICAO, skip this aircraft } // Create aircraft structure aircraft := &modes.Aircraft{ ICAO24: icao, } // Set position if available if vrs.HasPosition() { aircraft.Latitude = vrs.Lat aircraft.Longitude = vrs.Long aircraft.PositionValid = true } // Set altitude if available if vrs.HasAltitude() { aircraft.Altitude = vrs.GetAltitude() aircraft.AltitudeValid = true // Set barometric altitude specifically if vrs.Alt != 0 { aircraft.BaroAltitude = vrs.Alt } // Set geometric altitude if different from barometric if vrs.GAlt != 0 { aircraft.GeomAltitude = vrs.GAlt aircraft.GeomAltitudeValid = true } } // Set speed if available if vrs.Spd > 0 { aircraft.GroundSpeed = int(vrs.Spd) aircraft.GroundSpeedValid = true } // Set heading/track if available if vrs.Trak > 0 { aircraft.Track = int(vrs.Trak) aircraft.TrackValid = true } // Set vertical rate if available if vrs.Vsi != 0 { aircraft.VerticalRate = vrs.Vsi aircraft.VerticalRateValid = true } // Set callsign if available if vrs.Call != "" { aircraft.Callsign = vrs.Call aircraft.CallsignValid = true } // Set squawk if available if squawk, err := vrs.GetSquawk(); err == nil { aircraft.Squawk = fmt.Sprintf("%04X", squawk) // Convert to hex string aircraft.SquawkValid = true } // Set ground status aircraft.OnGround = vrs.IsOnGround() aircraft.OnGroundValid = true // Set selected altitude if available if vrs.TAlt != 0 { aircraft.SelectedAltitude = vrs.TAlt } // Set position source switch vrs.GetPositionSource() { case "MLAT": aircraft.PositionMLAT = true case "TIS-B": aircraft.PositionTISB = true } // Set additional metadata if available if vrs.Reg != "" { aircraft.Registration = vrs.Reg } if vrs.Type != "" { aircraft.AircraftType = vrs.Type } if vrs.Op != "" { aircraft.Operator = vrs.Op } return aircraft }