package client import ( "bufio" "context" "fmt" "log" "net" "sync" "time" "skyview/internal/config" "skyview/internal/parser" ) type Dump1090Client struct { config *config.Config aircraftMap map[string]*parser.Aircraft mutex sync.RWMutex subscribers []chan parser.AircraftData subMutex sync.RWMutex } func NewDump1090Client(cfg *config.Config) *Dump1090Client { return &Dump1090Client{ config: cfg, aircraftMap: make(map[string]*parser.Aircraft), subscribers: make([]chan parser.AircraftData, 0), } } func (c *Dump1090Client) Start(ctx context.Context) error { go c.startDataStream(ctx) go c.startPeriodicBroadcast(ctx) go c.startCleanup(ctx) return nil } func (c *Dump1090Client) startDataStream(ctx context.Context) { for { select { case <-ctx.Done(): return default: if err := c.connectAndRead(ctx); err != nil { log.Printf("Connection error: %v, retrying in 5s", err) time.Sleep(5 * time.Second) } } } } func (c *Dump1090Client) connectAndRead(ctx context.Context) error { address := fmt.Sprintf("%s:%d", c.config.Dump1090.Host, c.config.Dump1090.DataPort) conn, err := net.Dial("tcp", address) if err != nil { return fmt.Errorf("failed to connect to %s: %w", address, err) } defer conn.Close() log.Printf("Connected to dump1090 at %s", address) scanner := bufio.NewScanner(conn) for scanner.Scan() { select { case <-ctx.Done(): return nil default: line := scanner.Text() c.processLine(line) } } return scanner.Err() } func (c *Dump1090Client) processLine(line string) { aircraft, err := parser.ParseSBS1Line(line) if err != nil || aircraft == nil { return } c.mutex.Lock() if existing, exists := c.aircraftMap[aircraft.Hex]; exists { c.updateExistingAircraft(existing, aircraft) } else { c.aircraftMap[aircraft.Hex] = aircraft } c.mutex.Unlock() } func (c *Dump1090Client) updateExistingAircraft(existing, update *parser.Aircraft) { existing.LastSeen = update.LastSeen existing.Messages++ if update.Flight != "" { existing.Flight = update.Flight } if update.Altitude != 0 { existing.Altitude = update.Altitude } if update.GroundSpeed != 0 { existing.GroundSpeed = update.GroundSpeed } if update.Track != 0 { existing.Track = update.Track } if update.Latitude != 0 { existing.Latitude = update.Latitude } if update.Longitude != 0 { existing.Longitude = update.Longitude } if update.VertRate != 0 { existing.VertRate = update.VertRate } if update.Squawk != "" { existing.Squawk = update.Squawk } existing.OnGround = update.OnGround } func (c *Dump1090Client) GetAircraftData() parser.AircraftData { c.mutex.RLock() defer c.mutex.RUnlock() aircraftMap := make(map[string]parser.Aircraft) totalMessages := 0 for hex, aircraft := range c.aircraftMap { aircraftMap[hex] = *aircraft totalMessages += aircraft.Messages } return parser.AircraftData{ Now: time.Now().Unix(), Messages: totalMessages, Aircraft: aircraftMap, } } func (c *Dump1090Client) Subscribe() <-chan parser.AircraftData { c.subMutex.Lock() defer c.subMutex.Unlock() ch := make(chan parser.AircraftData, 10) c.subscribers = append(c.subscribers, ch) return ch } func (c *Dump1090Client) startPeriodicBroadcast(ctx context.Context) { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: data := c.GetAircraftData() c.broadcastToSubscribers(data) } } } func (c *Dump1090Client) broadcastToSubscribers(data parser.AircraftData) { c.subMutex.RLock() defer c.subMutex.RUnlock() for i, ch := range c.subscribers { select { case ch <- data: default: close(ch) c.subMutex.RUnlock() c.subMutex.Lock() c.subscribers = append(c.subscribers[:i], c.subscribers[i+1:]...) c.subMutex.Unlock() c.subMutex.RLock() } } } func (c *Dump1090Client) startCleanup(ctx context.Context) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: c.cleanupStaleAircraft() } } } func (c *Dump1090Client) cleanupStaleAircraft() { c.mutex.Lock() defer c.mutex.Unlock() cutoff := time.Now().Add(-2 * time.Minute) for hex, aircraft := range c.aircraftMap { if aircraft.LastSeen.Before(cutoff) { delete(c.aircraftMap, hex) } } }