// Package client provides Beast format TCP client implementations for connecting to ADS-B receivers. // // This package handles the network connectivity and data streaming from dump1090 or similar // Beast format sources. It provides: // - Single-source Beast TCP client with automatic reconnection // - Multi-source client manager for handling multiple receivers // - Exponential backoff for connection failures // - Message parsing and Mode S decoding integration // - Automatic stale aircraft cleanup // // The Beast format is a binary protocol commonly used by dump1090 and other ADS-B // software to stream real-time aircraft data over TCP port 30005. This package // abstracts the connection management and integrates with the merger for // multi-source data fusion. package client import ( "context" "fmt" "net" "sync" "time" "skyview/internal/beast" "skyview/internal/merger" "skyview/internal/modes" ) // BeastClient handles connection to a single dump1090 Beast format TCP stream. // // The client provides robust connectivity with: // - Automatic reconnection with exponential backoff // - Concurrent message reading and processing // - Integration with Mode S decoder and data merger // - Source status tracking and statistics // - Graceful shutdown handling // // Each client maintains a persistent connection to one Beast source and // continuously processes incoming messages until stopped or the source // becomes unavailable. type BeastClient struct { source *merger.Source // Source configuration and status merger *merger.Merger // Data merger for multi-source fusion decoder *modes.Decoder // Mode S/ADS-B message decoder conn net.Conn // TCP connection to Beast source parser *beast.Parser // Beast format message parser msgChan chan *beast.Message // Buffered channel for parsed messages errChan chan error // Error reporting channel stopChan chan struct{} // Shutdown signal channel wg sync.WaitGroup // Wait group for goroutine coordination // Reconnection parameters reconnectDelay time.Duration // Initial reconnect delay maxReconnect time.Duration // Maximum reconnect delay (for backoff cap) } // NewBeastClient creates a new Beast format TCP client for a specific data source. // // The client is configured with: // - Buffered message channel (1000 messages) to handle burst traffic // - Error channel for connection and parsing issues // - Initial reconnect delay of 5 seconds // - Maximum reconnect delay of 60 seconds (exponential backoff cap) // - Fresh Mode S decoder instance // // Parameters: // - source: Source configuration including host, port, and metadata // - merger: Data merger instance for aircraft state management // // Returns a configured but not yet started BeastClient. func NewBeastClient(source *merger.Source, merger *merger.Merger) *BeastClient { return &BeastClient{ source: source, merger: merger, decoder: modes.NewDecoder(source.Latitude, source.Longitude), msgChan: make(chan *beast.Message, 5000), errChan: make(chan error, 10), stopChan: make(chan struct{}), reconnectDelay: 5 * time.Second, maxReconnect: 60 * time.Second, } } // Start begins the client connection and message processing in the background. // // The client will: // - Attempt to connect to the configured Beast source // - Handle connection failures with exponential backoff // - Start message reading and processing goroutines // - Continuously reconnect if the connection is lost // // The method returns immediately; the client runs in background goroutines // until Stop() is called or the context is cancelled. // // Parameters: // - ctx: Context for cancellation and timeout control func (c *BeastClient) Start(ctx context.Context) { c.wg.Add(1) go c.run(ctx) } // Stop gracefully shuts down the client and all associated goroutines. // // The shutdown process: // 1. Signals all goroutines to stop via stopChan // 2. Closes the TCP connection if active // 3. Waits for all goroutines to complete // // This method blocks until the shutdown is complete. func (c *BeastClient) Stop() { close(c.stopChan) if c.conn != nil { c.conn.Close() } c.wg.Wait() } // run implements the main client connection and reconnection loop. // // This method handles the complete client lifecycle: // 1. Connection establishment with timeout // 2. Exponential backoff on connection failures // 3. Message parsing and processing goroutine management // 4. Connection monitoring and failure detection // 5. Automatic reconnection on disconnection // // The exponential backoff starts at reconnectDelay (5s) and doubles on each // failure up to maxReconnect (60s), then resets on successful connection. // // Source status is updated to reflect connection state for monitoring. func (c *BeastClient) run(ctx context.Context) { defer c.wg.Done() reconnectDelay := c.reconnectDelay for { select { case <-ctx.Done(): return case <-c.stopChan: return default: } // Connect to Beast TCP stream addr := fmt.Sprintf("%s:%d", c.source.Host, c.source.Port) fmt.Printf("Connecting to Beast stream at %s (%s)...\n", addr, c.source.Name) conn, err := net.DialTimeout("tcp", addr, 30*time.Second) if err != nil { fmt.Printf("Failed to connect to %s: %v\n", c.source.Name, err) c.source.Active = false // Exponential backoff time.Sleep(reconnectDelay) if reconnectDelay < c.maxReconnect { reconnectDelay *= 2 } continue } c.conn = conn c.source.Active = true reconnectDelay = c.reconnectDelay // Reset backoff fmt.Printf("Connected to %s at %s\n", c.source.Name, addr) // Create parser for this connection c.parser = beast.NewParser(conn, c.source.ID) // Start processing messages c.wg.Add(2) go c.readMessages() go c.processMessages() // Wait for disconnect select { case <-ctx.Done(): c.conn.Close() return case <-c.stopChan: c.conn.Close() return case err := <-c.errChan: fmt.Printf("Error from %s: %v\n", c.source.Name, err) c.conn.Close() c.source.Active = false } // Wait for goroutines to finish time.Sleep(1 * time.Second) } } // readMessages runs in a dedicated goroutine to read Beast format messages. // // This method: // - Continuously reads from the TCP connection // - Parses Beast format binary data into Message structs // - Queues parsed messages for processing // - Reports parsing errors to the error channel // // The method blocks on the parser's ParseStream call and exits when // the connection is closed or an unrecoverable error occurs. func (c *BeastClient) readMessages() { defer c.wg.Done() c.parser.ParseStream(c.msgChan, c.errChan) } // processMessages runs in a dedicated goroutine to decode and merge aircraft data. // // For each received Beast message, this method: // 1. Decodes the Mode S/ADS-B message payload // 2. Extracts aircraft information (position, altitude, speed, etc.) // 3. Updates the data merger with new aircraft state // 4. Updates source statistics (message count) // // Invalid or unparseable messages are silently discarded to maintain // system stability. The merger handles data fusion from multiple sources // and conflict resolution based on signal strength. func (c *BeastClient) processMessages() { defer c.wg.Done() for { select { case <-c.stopChan: return case msg := <-c.msgChan: if msg == nil { return } // Decode Mode S message aircraft, err := c.decoder.Decode(msg.Data) if err != nil { continue // Skip invalid messages } // Update merger with new data c.merger.UpdateAircraft( c.source.ID, aircraft, msg.GetSignalStrength(), msg.ReceivedAt, ) // Update source statistics c.source.Messages++ } } } // MultiSourceClient manages multiple Beast TCP clients for multi-receiver setups. // // This client coordinator: // - Manages connections to multiple Beast format sources simultaneously // - Provides unified control for starting and stopping all clients // - Runs periodic cleanup tasks for stale aircraft data // - Aggregates statistics from all managed clients // - Handles dynamic source addition and management // // All clients share the same data merger, enabling automatic data fusion // and conflict resolution across multiple receivers. type MultiSourceClient struct { clients []*BeastClient // Managed Beast clients merger *merger.Merger // Shared data merger for all sources mu sync.RWMutex // Protects clients slice } // NewMultiSourceClient creates a client manager for multiple Beast format sources. // // The multi-source client enables connecting to multiple dump1090 instances // or other Beast format sources simultaneously. All sources feed into the // same data merger, which handles automatic data fusion and conflict resolution. // // This is essential for: // - Improved coverage from multiple receivers // - Redundancy in case of individual receiver failures // - Data quality improvement through signal strength comparison // // Parameters: // - merger: Shared data merger instance for all sources // // Returns a configured multi-source client ready for source addition. func NewMultiSourceClient(merger *merger.Merger) *MultiSourceClient { return &MultiSourceClient{ clients: make([]*BeastClient, 0), merger: merger, } } // AddSource registers and configures a new Beast format data source. // // This method: // 1. Registers the source with the data merger // 2. Creates a new BeastClient for the source // 3. Adds the client to the managed clients list // // The source is not automatically started; call Start() to begin connections. // Sources can be added before or after starting the multi-source client. // // Parameters: // - source: Source configuration including connection details and metadata func (m *MultiSourceClient) AddSource(source *merger.Source) { m.mu.Lock() defer m.mu.Unlock() // Register source with merger m.merger.AddSource(source) // Create and start client client := NewBeastClient(source, m.merger) m.clients = append(m.clients, client) } // Start begins connections to all configured Beast sources. // // This method: // - Starts all managed BeastClient instances in parallel // - Begins the periodic cleanup routine for stale aircraft data // - Uses the provided context for cancellation control // // Each client will independently attempt connections with their own // reconnection logic. The method returns immediately; all clients // operate in background goroutines. // // Parameters: // - ctx: Context for cancellation and timeout control func (m *MultiSourceClient) Start(ctx context.Context) { m.mu.RLock() defer m.mu.RUnlock() for _, client := range m.clients { client.Start(ctx) } // Start cleanup routine go m.cleanupRoutine(ctx) } // Stop gracefully shuts down all managed Beast clients. // // This method stops all clients in parallel and waits for their // goroutines to complete. The shutdown is coordinated to ensure // clean termination of all network connections and processing routines. func (m *MultiSourceClient) Stop() { m.mu.RLock() defer m.mu.RUnlock() for _, client := range m.clients { client.Stop() } } // cleanupRoutine runs periodic maintenance tasks in a background goroutine. // // Currently performs: // - Stale aircraft cleanup every 30 seconds // - Removal of aircraft that haven't been updated recently // // The cleanup frequency is designed to balance memory usage with // the typical aircraft update rates in ADS-B systems. Aircraft // typically update their position every few seconds when in range. // // Parameters: // - ctx: Context for cancellation when the client shuts down func (m *MultiSourceClient) cleanupRoutine(ctx context.Context) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: m.merger.CleanupStale() } } } // GetStatistics returns comprehensive statistics from all managed clients. // // The statistics include: // - All merger statistics (aircraft count, message rates, etc.) // - Number of active client connections // - Total number of configured clients // - Per-source connection status and message counts // // This information is useful for monitoring system health, diagnosing // connectivity issues, and understanding data quality across sources. // // Returns a map of statistics suitable for JSON serialization and web display. func (m *MultiSourceClient) GetStatistics() map[string]interface{} { m.mu.RLock() defer m.mu.RUnlock() stats := m.merger.GetStatistics() // Add client-specific stats activeClients := 0 for _, client := range m.clients { if client.source.Active { activeClients++ } } stats["active_clients"] = activeClients stats["total_clients"] = len(m.clients) return stats }