package client import ( "bufio" "context" "fmt" "log" "net" "sync" "time" "skyview/internal/config" "skyview/internal/parser" ) type Dump1090Client struct { config *config.Config aircraftMap map[string]*parser.Aircraft mutex sync.RWMutex subscribers []chan parser.AircraftData subMutex sync.RWMutex } func NewDump1090Client(cfg *config.Config) *Dump1090Client { return &Dump1090Client{ config: cfg, aircraftMap: make(map[string]*parser.Aircraft), subscribers: make([]chan parser.AircraftData, 0), } } func (c *Dump1090Client) Start(ctx context.Context) error { go c.startDataStream(ctx) go c.startPeriodicBroadcast(ctx) go c.startCleanup(ctx) return nil } func (c *Dump1090Client) startDataStream(ctx context.Context) { for { select { case <-ctx.Done(): return default: if err := c.connectAndRead(ctx); err != nil { log.Printf("Connection error: %v, retrying in 5s", err) time.Sleep(5 * time.Second) } } } } func (c *Dump1090Client) connectAndRead(ctx context.Context) error { address := fmt.Sprintf("%s:%d", c.config.Dump1090.Host, c.config.Dump1090.DataPort) conn, err := net.Dial("tcp", address) if err != nil { return fmt.Errorf("failed to connect to %s: %w", address, err) } defer conn.Close() log.Printf("Connected to dump1090 at %s", address) scanner := bufio.NewScanner(conn) for scanner.Scan() { select { case <-ctx.Done(): return nil default: line := scanner.Text() c.processLine(line) } } return scanner.Err() } func (c *Dump1090Client) processLine(line string) { aircraft, err := parser.ParseSBS1Line(line) if err != nil || aircraft == nil { return } c.mutex.Lock() if existing, exists := c.aircraftMap[aircraft.Hex]; exists { c.updateExistingAircraft(existing, aircraft) } else { c.aircraftMap[aircraft.Hex] = aircraft } c.mutex.Unlock() } func (c *Dump1090Client) updateExistingAircraft(existing, update *parser.Aircraft) { existing.LastSeen = update.LastSeen existing.Messages++ if update.Flight != "" { existing.Flight = update.Flight } if update.Altitude != 0 { existing.Altitude = update.Altitude } if update.GroundSpeed != 0 { existing.GroundSpeed = update.GroundSpeed } if update.Track != 0 { existing.Track = update.Track } if update.Latitude != 0 && update.Longitude != 0 { existing.Latitude = update.Latitude existing.Longitude = update.Longitude // Add to track history if position changed significantly if c.shouldAddTrackPoint(existing, update) { trackPoint := parser.TrackPoint{ Timestamp: update.LastSeen, Latitude: update.Latitude, Longitude: update.Longitude, Altitude: update.Altitude, Speed: update.GroundSpeed, Track: update.Track, } existing.TrackHistory = append(existing.TrackHistory, trackPoint) // Keep only last 200 points (about 3-4 hours at 1 point/minute) if len(existing.TrackHistory) > 200 { existing.TrackHistory = existing.TrackHistory[1:] } } } if update.VertRate != 0 { existing.VertRate = update.VertRate } if update.Squawk != "" { existing.Squawk = update.Squawk } existing.OnGround = update.OnGround } func (c *Dump1090Client) shouldAddTrackPoint(existing, update *parser.Aircraft) bool { // Add track point if: // 1. No history yet if len(existing.TrackHistory) == 0 { return true } lastPoint := existing.TrackHistory[len(existing.TrackHistory)-1] // 2. At least 30 seconds since last point if time.Since(lastPoint.Timestamp) < 30*time.Second { return false } // 3. Position changed by at least 0.001 degrees (~100m) latDiff := existing.Latitude - lastPoint.Latitude lonDiff := existing.Longitude - lastPoint.Longitude distanceChange := latDiff*latDiff + lonDiff*lonDiff return distanceChange > 0.000001 // ~0.001 degrees squared } func (c *Dump1090Client) GetAircraftData() parser.AircraftData { c.mutex.RLock() defer c.mutex.RUnlock() aircraftMap := make(map[string]parser.Aircraft) totalMessages := 0 for hex, aircraft := range c.aircraftMap { aircraftMap[hex] = *aircraft totalMessages += aircraft.Messages } return parser.AircraftData{ Now: time.Now().Unix(), Messages: totalMessages, Aircraft: aircraftMap, } } func (c *Dump1090Client) Subscribe() <-chan parser.AircraftData { c.subMutex.Lock() defer c.subMutex.Unlock() ch := make(chan parser.AircraftData, 10) c.subscribers = append(c.subscribers, ch) return ch } func (c *Dump1090Client) startPeriodicBroadcast(ctx context.Context) { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: data := c.GetAircraftData() c.broadcastToSubscribers(data) } } } func (c *Dump1090Client) broadcastToSubscribers(data parser.AircraftData) { c.subMutex.RLock() defer c.subMutex.RUnlock() for i, ch := range c.subscribers { select { case ch <- data: default: close(ch) c.subMutex.RUnlock() c.subMutex.Lock() c.subscribers = append(c.subscribers[:i], c.subscribers[i+1:]...) c.subMutex.Unlock() c.subMutex.RLock() } } } func (c *Dump1090Client) startCleanup(ctx context.Context) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: c.cleanupStaleAircraft() } } } func (c *Dump1090Client) cleanupStaleAircraft() { c.mutex.Lock() defer c.mutex.Unlock() cutoff := time.Now().Add(-2 * time.Minute) trackCutoff := time.Now().Add(-24 * time.Hour) for hex, aircraft := range c.aircraftMap { if aircraft.LastSeen.Before(cutoff) { delete(c.aircraftMap, hex) } else { // Clean up old track points (keep last 24 hours) validTracks := make([]parser.TrackPoint, 0) for _, point := range aircraft.TrackHistory { if point.Timestamp.After(trackCutoff) { validTracks = append(validTracks, point) } } aircraft.TrackHistory = validTracks } } }