package client import ( "context" "fmt" "net" "sync" "time" "skyview/internal/beast" "skyview/internal/merger" "skyview/internal/modes" ) // BeastClient handles connection to a single dump1090 Beast TCP stream type BeastClient struct { source *merger.Source merger *merger.Merger decoder *modes.Decoder conn net.Conn parser *beast.Parser msgChan chan *beast.Message errChan chan error stopChan chan struct{} wg sync.WaitGroup reconnectDelay time.Duration maxReconnect time.Duration } // NewBeastClient creates a new Beast format TCP client func NewBeastClient(source *merger.Source, merger *merger.Merger) *BeastClient { return &BeastClient{ source: source, merger: merger, decoder: modes.NewDecoder(), msgChan: make(chan *beast.Message, 1000), errChan: make(chan error, 10), stopChan: make(chan struct{}), reconnectDelay: 5 * time.Second, maxReconnect: 60 * time.Second, } } // Start begins the client connection and processing func (c *BeastClient) Start(ctx context.Context) { c.wg.Add(1) go c.run(ctx) } // Stop gracefully stops the client func (c *BeastClient) Stop() { close(c.stopChan) if c.conn != nil { c.conn.Close() } c.wg.Wait() } // run is the main client loop func (c *BeastClient) run(ctx context.Context) { defer c.wg.Done() reconnectDelay := c.reconnectDelay for { select { case <-ctx.Done(): return case <-c.stopChan: return default: } // Connect to Beast TCP stream addr := fmt.Sprintf("%s:%d", c.source.Host, c.source.Port) fmt.Printf("Connecting to Beast stream at %s (%s)...\n", addr, c.source.Name) conn, err := net.DialTimeout("tcp", addr, 10*time.Second) if err != nil { fmt.Printf("Failed to connect to %s: %v\n", c.source.Name, err) c.source.Active = false // Exponential backoff time.Sleep(reconnectDelay) if reconnectDelay < c.maxReconnect { reconnectDelay *= 2 } continue } c.conn = conn c.source.Active = true reconnectDelay = c.reconnectDelay // Reset backoff fmt.Printf("Connected to %s at %s\n", c.source.Name, addr) // Create parser for this connection c.parser = beast.NewParser(conn, c.source.ID) // Start processing messages c.wg.Add(2) go c.readMessages() go c.processMessages() // Wait for disconnect select { case <-ctx.Done(): c.conn.Close() return case <-c.stopChan: c.conn.Close() return case err := <-c.errChan: fmt.Printf("Error from %s: %v\n", c.source.Name, err) c.conn.Close() c.source.Active = false } // Wait for goroutines to finish time.Sleep(1 * time.Second) } } // readMessages reads Beast messages from the TCP stream func (c *BeastClient) readMessages() { defer c.wg.Done() c.parser.ParseStream(c.msgChan, c.errChan) } // processMessages decodes and merges aircraft data func (c *BeastClient) processMessages() { defer c.wg.Done() for { select { case <-c.stopChan: return case msg := <-c.msgChan: if msg == nil { return } // Decode Mode S message aircraft, err := c.decoder.Decode(msg.Data) if err != nil { continue // Skip invalid messages } // Update merger with new data c.merger.UpdateAircraft( c.source.ID, aircraft, msg.GetSignalStrength(), msg.ReceivedAt, ) // Update source statistics c.source.Messages++ } } } // MultiSourceClient manages multiple Beast TCP clients type MultiSourceClient struct { clients []*BeastClient merger *merger.Merger mu sync.RWMutex } // NewMultiSourceClient creates a client that connects to multiple Beast sources func NewMultiSourceClient(merger *merger.Merger) *MultiSourceClient { return &MultiSourceClient{ clients: make([]*BeastClient, 0), merger: merger, } } // AddSource adds a new Beast TCP source func (m *MultiSourceClient) AddSource(source *merger.Source) { m.mu.Lock() defer m.mu.Unlock() // Register source with merger m.merger.AddSource(source) // Create and start client client := NewBeastClient(source, m.merger) m.clients = append(m.clients, client) } // Start begins all client connections func (m *MultiSourceClient) Start(ctx context.Context) { m.mu.RLock() defer m.mu.RUnlock() for _, client := range m.clients { client.Start(ctx) } // Start cleanup routine go m.cleanupRoutine(ctx) } // Stop gracefully stops all clients func (m *MultiSourceClient) Stop() { m.mu.RLock() defer m.mu.RUnlock() for _, client := range m.clients { client.Stop() } } // cleanupRoutine periodically removes stale aircraft func (m *MultiSourceClient) cleanupRoutine(ctx context.Context) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: m.merger.CleanupStale() } } } // GetStatistics returns client statistics func (m *MultiSourceClient) GetStatistics() map[string]interface{} { m.mu.RLock() defer m.mu.RUnlock() stats := m.merger.GetStatistics() // Add client-specific stats activeClients := 0 for _, client := range m.clients { if client.source.Active { activeClients++ } } stats["active_clients"] = activeClients stats["total_clients"] = len(m.clients) return stats }