package merger import ( "math" "sync" "time" "skyview/internal/modes" ) // Source represents a data source (dump1090 receiver) type Source struct { ID string `json:"id"` Name string `json:"name"` Host string `json:"host"` Port int `json:"port"` Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` Altitude float64 `json:"altitude"` Active bool `json:"active"` LastSeen time.Time `json:"last_seen"` Messages int64 `json:"messages"` Aircraft int `json:"aircraft"` } // AircraftState represents merged aircraft state from all sources type AircraftState struct { *modes.Aircraft Sources map[string]*SourceData `json:"sources"` LastUpdate time.Time `json:"last_update"` FirstSeen time.Time `json:"first_seen"` TotalMessages int64 `json:"total_messages"` PositionHistory []PositionPoint `json:"position_history"` SignalHistory []SignalPoint `json:"signal_history"` AltitudeHistory []AltitudePoint `json:"altitude_history"` SpeedHistory []SpeedPoint `json:"speed_history"` Distance float64 `json:"distance"` // Distance from closest receiver Bearing float64 `json:"bearing"` // Bearing from closest receiver Age float64 `json:"age"` // Seconds since last update MLATSources []string `json:"mlat_sources"` // Sources providing MLAT data PositionSource string `json:"position_source"` // Source providing current position UpdateRate float64 `json:"update_rate"` // Updates per second } // SourceData represents data from a specific source type SourceData struct { SourceID string `json:"source_id"` SignalLevel float64 `json:"signal_level"` Messages int64 `json:"messages"` LastSeen time.Time `json:"last_seen"` Distance float64 `json:"distance"` Bearing float64 `json:"bearing"` UpdateRate float64 `json:"update_rate"` } // Position/Signal/Altitude/Speed history points type PositionPoint struct { Time time.Time `json:"time"` Latitude float64 `json:"lat"` Longitude float64 `json:"lon"` Source string `json:"source"` } type SignalPoint struct { Time time.Time `json:"time"` Signal float64 `json:"signal"` Source string `json:"source"` } type AltitudePoint struct { Time time.Time `json:"time"` Altitude int `json:"altitude"` VRate int `json:"vrate"` } type SpeedPoint struct { Time time.Time `json:"time"` GroundSpeed float64 `json:"ground_speed"` Track float64 `json:"track"` } // Merger handles merging aircraft data from multiple sources type Merger struct { aircraft map[uint32]*AircraftState sources map[string]*Source mu sync.RWMutex historyLimit int staleTimeout time.Duration updateMetrics map[uint32]*updateMetric } type updateMetric struct { lastUpdate time.Time updates []time.Time } // NewMerger creates a new aircraft data merger func NewMerger() *Merger { return &Merger{ aircraft: make(map[uint32]*AircraftState), sources: make(map[string]*Source), historyLimit: 500, staleTimeout: 60 * time.Second, updateMetrics: make(map[uint32]*updateMetric), } } // AddSource registers a new data source func (m *Merger) AddSource(source *Source) { m.mu.Lock() defer m.mu.Unlock() m.sources[source.ID] = source } // UpdateAircraft merges new aircraft data from a source func (m *Merger) UpdateAircraft(sourceID string, aircraft *modes.Aircraft, signal float64, timestamp time.Time) { m.mu.Lock() defer m.mu.Unlock() // Get or create aircraft state state, exists := m.aircraft[aircraft.ICAO24] if !exists { state = &AircraftState{ Aircraft: aircraft, Sources: make(map[string]*SourceData), FirstSeen: timestamp, PositionHistory: make([]PositionPoint, 0), SignalHistory: make([]SignalPoint, 0), AltitudeHistory: make([]AltitudePoint, 0), SpeedHistory: make([]SpeedPoint, 0), } m.aircraft[aircraft.ICAO24] = state m.updateMetrics[aircraft.ICAO24] = &updateMetric{ updates: make([]time.Time, 0), } } // Update or create source data srcData, srcExists := state.Sources[sourceID] if !srcExists { srcData = &SourceData{ SourceID: sourceID, } state.Sources[sourceID] = srcData } // Update source data srcData.SignalLevel = signal srcData.Messages++ srcData.LastSeen = timestamp // Calculate distance and bearing from source if source, ok := m.sources[sourceID]; ok && aircraft.Latitude != 0 && aircraft.Longitude != 0 { srcData.Distance, srcData.Bearing = calculateDistanceBearing( source.Latitude, source.Longitude, aircraft.Latitude, aircraft.Longitude, ) } // Update merged aircraft data (use best/newest data) m.mergeAircraftData(state, aircraft, sourceID, timestamp) // Update histories m.updateHistories(state, aircraft, sourceID, signal, timestamp) // Update metrics m.updateUpdateRate(aircraft.ICAO24, timestamp) // Update source statistics if source, ok := m.sources[sourceID]; ok { source.LastSeen = timestamp source.Messages++ source.Active = true } state.LastUpdate = timestamp state.TotalMessages++ } // mergeAircraftData intelligently merges data from multiple sources func (m *Merger) mergeAircraftData(state *AircraftState, new *modes.Aircraft, sourceID string, timestamp time.Time) { // Position - use source with best signal or most recent if new.Latitude != 0 && new.Longitude != 0 { updatePosition := false if state.Latitude == 0 { updatePosition = true } else if srcData, ok := state.Sources[sourceID]; ok { // Use position from source with strongest signal currentBest := m.getBestSignalSource(state) if currentBest == "" || srcData.SignalLevel > state.Sources[currentBest].SignalLevel { updatePosition = true } } if updatePosition { state.Latitude = new.Latitude state.Longitude = new.Longitude state.PositionSource = sourceID } } // Altitude - use most recent if new.Altitude != 0 { state.Altitude = new.Altitude } if new.BaroAltitude != 0 { state.BaroAltitude = new.BaroAltitude } if new.GeomAltitude != 0 { state.GeomAltitude = new.GeomAltitude } // Speed and track - use most recent if new.GroundSpeed != 0 { state.GroundSpeed = new.GroundSpeed } if new.Track != 0 { state.Track = new.Track } if new.Heading != 0 { state.Heading = new.Heading } // Vertical rate - use most recent if new.VerticalRate != 0 { state.VerticalRate = new.VerticalRate } // Identity - use most recent non-empty if new.Callsign != "" { state.Callsign = new.Callsign } if new.Squawk != "" { state.Squawk = new.Squawk } if new.Category != "" { state.Category = new.Category } // Status - use most recent if new.Emergency != "" { state.Emergency = new.Emergency } state.OnGround = new.OnGround state.Alert = new.Alert state.SPI = new.SPI // Navigation accuracy - use best available if new.NACp > state.NACp { state.NACp = new.NACp } if new.NACv > state.NACv { state.NACv = new.NACv } if new.SIL > state.SIL { state.SIL = new.SIL } // Selected values - use most recent if new.SelectedAltitude != 0 { state.SelectedAltitude = new.SelectedAltitude } if new.SelectedHeading != 0 { state.SelectedHeading = new.SelectedHeading } if new.BaroSetting != 0 { state.BaroSetting = new.BaroSetting } } // updateHistories adds data points to history arrays func (m *Merger) updateHistories(state *AircraftState, aircraft *modes.Aircraft, sourceID string, signal float64, timestamp time.Time) { // Position history if aircraft.Latitude != 0 && aircraft.Longitude != 0 { state.PositionHistory = append(state.PositionHistory, PositionPoint{ Time: timestamp, Latitude: aircraft.Latitude, Longitude: aircraft.Longitude, Source: sourceID, }) } // Signal history if signal != 0 { state.SignalHistory = append(state.SignalHistory, SignalPoint{ Time: timestamp, Signal: signal, Source: sourceID, }) } // Altitude history if aircraft.Altitude != 0 { state.AltitudeHistory = append(state.AltitudeHistory, AltitudePoint{ Time: timestamp, Altitude: aircraft.Altitude, VRate: aircraft.VerticalRate, }) } // Speed history if aircraft.GroundSpeed != 0 { state.SpeedHistory = append(state.SpeedHistory, SpeedPoint{ Time: timestamp, GroundSpeed: aircraft.GroundSpeed, Track: aircraft.Track, }) } // Trim histories if they exceed limit if len(state.PositionHistory) > m.historyLimit { state.PositionHistory = state.PositionHistory[len(state.PositionHistory)-m.historyLimit:] } if len(state.SignalHistory) > m.historyLimit { state.SignalHistory = state.SignalHistory[len(state.SignalHistory)-m.historyLimit:] } if len(state.AltitudeHistory) > m.historyLimit { state.AltitudeHistory = state.AltitudeHistory[len(state.AltitudeHistory)-m.historyLimit:] } if len(state.SpeedHistory) > m.historyLimit { state.SpeedHistory = state.SpeedHistory[len(state.SpeedHistory)-m.historyLimit:] } } // updateUpdateRate calculates message update rate func (m *Merger) updateUpdateRate(icao uint32, timestamp time.Time) { metric := m.updateMetrics[icao] metric.updates = append(metric.updates, timestamp) // Keep only last 30 seconds of updates cutoff := timestamp.Add(-30 * time.Second) for len(metric.updates) > 0 && metric.updates[0].Before(cutoff) { metric.updates = metric.updates[1:] } if len(metric.updates) > 1 { duration := metric.updates[len(metric.updates)-1].Sub(metric.updates[0]).Seconds() if duration > 0 { if state, ok := m.aircraft[icao]; ok { state.UpdateRate = float64(len(metric.updates)) / duration } } } } // getBestSignalSource returns the source ID with the strongest signal func (m *Merger) getBestSignalSource(state *AircraftState) string { var bestSource string var bestSignal float64 = -999 for srcID, srcData := range state.Sources { if srcData.SignalLevel > bestSignal { bestSignal = srcData.SignalLevel bestSource = srcID } } return bestSource } // GetAircraft returns current aircraft states func (m *Merger) GetAircraft() map[uint32]*AircraftState { m.mu.RLock() defer m.mu.RUnlock() // Create copy and calculate ages result := make(map[uint32]*AircraftState) now := time.Now() for icao, state := range m.aircraft { // Skip stale aircraft if now.Sub(state.LastUpdate) > m.staleTimeout { continue } // Calculate age stateCopy := *state stateCopy.Age = now.Sub(state.LastUpdate).Seconds() // Find closest receiver distance minDistance := float64(999999) for _, srcData := range state.Sources { if srcData.Distance > 0 && srcData.Distance < minDistance { minDistance = srcData.Distance stateCopy.Distance = srcData.Distance stateCopy.Bearing = srcData.Bearing } } result[icao] = &stateCopy } return result } // GetSources returns all registered sources func (m *Merger) GetSources() []*Source { m.mu.RLock() defer m.mu.RUnlock() sources := make([]*Source, 0, len(m.sources)) for _, src := range m.sources { sources = append(sources, src) } return sources } // GetStatistics returns merger statistics func (m *Merger) GetStatistics() map[string]interface{} { m.mu.RLock() defer m.mu.RUnlock() totalMessages := int64(0) activeSources := 0 aircraftBySources := make(map[int]int) // Count by number of sources for _, state := range m.aircraft { totalMessages += state.TotalMessages numSources := len(state.Sources) aircraftBySources[numSources]++ } for _, src := range m.sources { if src.Active { activeSources++ } } return map[string]interface{}{ "total_aircraft": len(m.aircraft), "total_messages": totalMessages, "active_sources": activeSources, "aircraft_by_sources": aircraftBySources, } } // CleanupStale removes stale aircraft func (m *Merger) CleanupStale() { m.mu.Lock() defer m.mu.Unlock() now := time.Now() for icao, state := range m.aircraft { if now.Sub(state.LastUpdate) > m.staleTimeout { delete(m.aircraft, icao) delete(m.updateMetrics, icao) } } } // Helper functions func calculateDistanceBearing(lat1, lon1, lat2, lon2 float64) (float64, float64) { // Haversine formula for distance const R = 6371.0 // Earth radius in km dLat := (lat2 - lat1) * math.Pi / 180 dLon := (lon2 - lon1) * math.Pi / 180 a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(lat1*math.Pi/180)*math.Cos(lat2*math.Pi/180)* math.Sin(dLon/2)*math.Sin(dLon/2) c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) distance := R * c // Bearing calculation y := math.Sin(dLon) * math.Cos(lat2*math.Pi/180) x := math.Cos(lat1*math.Pi/180)*math.Sin(lat2*math.Pi/180) - math.Sin(lat1*math.Pi/180)*math.Cos(lat2*math.Pi/180)*math.Cos(dLon) bearing := math.Atan2(y, x) * 180 / math.Pi if bearing < 0 { bearing += 360 } return distance, bearing }