skyview/internal/merger/merger.go
Ole-Morten Duesund 2bffa2c418 style: Apply code formatting with go fmt
- Run 'make format' to ensure all Go code follows standard formatting
- Maintains consistent code style across the entire codebase
- No functional changes, only whitespace and formatting improvements
2025-09-01 10:05:29 +02:00

1128 lines
41 KiB
Go

// Package merger provides multi-source aircraft data fusion and conflict resolution.
//
// This package is the core of SkyView's multi-source capability, handling the complex
// task of merging aircraft data from multiple ADS-B receivers. It provides:
// - Intelligent data fusion based on signal strength and recency
// - Historical tracking of aircraft positions, altitudes, and speeds
// - Per-source signal quality and update rate tracking
// - Automatic conflict resolution when sources disagree
// - Comprehensive aircraft state management
// - Distance and bearing calculations from receivers
//
// The merger uses several strategies for data fusion:
// - Position data: Uses source with strongest signal
// - Recent data: Prefers newer information for dynamic values
// - Best quality: Prioritizes higher accuracy navigation data
// - History tracking: Maintains trails for visualization and analysis
//
// All data structures are designed for concurrent access and JSON serialization
// for web API consumption.
package merger
import (
"encoding/json"
"fmt"
"log"
"math"
"sync"
"time"
"skyview/internal/database"
"skyview/internal/icao"
"skyview/internal/modes"
"skyview/internal/squawk"
)
const (
// MaxDistance represents an infinite distance for initialization
MaxDistance = float64(999999)
// Position validation constants - relaxed for better track propagation
MaxSpeedKnots = 2000.0 // Maximum plausible aircraft speed (roughly Mach 3 at cruise altitude)
MaxDistanceNautMiles = 1000.0 // Maximum position jump distance in nautical miles (increased from 500)
MaxAltitudeFeet = 60000 // Maximum altitude in feet (commercial ceiling ~FL600)
MinAltitudeFeet = -500 // Minimum altitude (below sea level but allow for dead sea, etc.)
// Earth coordinate bounds
MinLatitude = -90.0
MaxLatitude = 90.0
MinLongitude = -180.0
MaxLongitude = 180.0
// Conversion factors
KnotsToKmh = 1.852
NmToKm = 1.852
)
// ValidationResult represents the result of position validation checks.
type ValidationResult struct {
Valid bool // Whether the position passed all validation checks
Errors []string // List of validation failures for debugging
Warnings []string // List of potential issues (not blocking)
}
// Source represents a data source (dump1090 receiver or similar ADS-B source).
// It contains both static configuration and dynamic status information used
// for data fusion decisions and source monitoring.
type Source struct {
ID string `json:"id"` // Unique identifier for this source
Name string `json:"name"` // Human-readable name
Host string `json:"host"` // Hostname or IP address
Port int `json:"port"` // TCP port number
Format string `json:"format"` // Data format: "beast" or "vrs" (default: "beast")
Latitude float64 `json:"latitude"` // Receiver location latitude
Longitude float64 `json:"longitude"` // Receiver location longitude
Altitude float64 `json:"altitude"` // Receiver altitude above sea level
Active bool `json:"active"` // Currently connected and receiving data
LastSeen time.Time `json:"last_seen"` // Timestamp of last received message
Messages int64 `json:"messages"` // Total messages processed from this source
Aircraft int `json:"aircraft"` // Current aircraft count from this source
}
// AircraftState represents the complete merged aircraft state from all sources.
//
// This structure combines the basic aircraft data from Mode S decoding with
// multi-source metadata, historical tracking, and derived information:
// - Embedded modes.Aircraft with all decoded ADS-B data
// - Per-source signal and quality information
// - Historical trails for position, altitude, speed, and signal strength
// - Distance and bearing from receivers
// - Update rate and age calculations
// - Data provenance tracking
type AircraftState struct {
*modes.Aircraft // Embedded decoded aircraft data
Sources map[string]*SourceData `json:"sources"` // Per-source information
LastUpdate time.Time `json:"last_update"` // Last update from any source
FirstSeen time.Time `json:"first_seen"` // First time this aircraft was seen
TotalMessages int64 `json:"total_messages"` // Total messages received for this aircraft
PositionHistory []PositionPoint `json:"position_history"` // Trail of position updates
SignalHistory []SignalPoint `json:"signal_history"` // Signal strength over time
AltitudeHistory []AltitudePoint `json:"altitude_history"` // Altitude and vertical rate history
SpeedHistory []SpeedPoint `json:"speed_history"` // Speed and track history
Distance float64 `json:"distance"` // Distance from closest receiver (km)
Bearing float64 `json:"bearing"` // Bearing from closest receiver (degrees)
Age float64 `json:"age"` // Seconds since last update
MLATSources []string `json:"mlat_sources"` // Sources providing MLAT position data
PositionSource string `json:"position_source"` // Source providing current position
UpdateRate float64 `json:"update_rate"` // Recent updates per second
Country string `json:"country"` // Country of registration
CountryCode string `json:"country_code"` // ISO country code
Flag string `json:"flag"` // Country flag emoji
}
// MarshalJSON provides custom JSON marshaling for AircraftState to format ICAO24 as hex.
func (a *AircraftState) MarshalJSON() ([]byte, error) {
// Create a struct that mirrors AircraftState but with ICAO24 as string
return json.Marshal(&struct {
// From embedded modes.Aircraft
ICAO24 string `json:"ICAO24"`
Callsign string `json:"Callsign"`
Latitude float64 `json:"Latitude"`
Longitude float64 `json:"Longitude"`
Altitude int `json:"Altitude"`
BaroAltitude int `json:"BaroAltitude"`
GeomAltitude int `json:"GeomAltitude"`
VerticalRate int `json:"VerticalRate"`
GroundSpeed int `json:"GroundSpeed"`
Track int `json:"Track"`
Heading int `json:"Heading"`
Category string `json:"Category"`
Squawk string `json:"Squawk"`
SquawkDescription string `json:"SquawkDescription"`
Emergency string `json:"Emergency"`
OnGround bool `json:"OnGround"`
Alert bool `json:"Alert"`
SPI bool `json:"SPI"`
NACp uint8 `json:"NACp"`
NACv uint8 `json:"NACv"`
SIL uint8 `json:"SIL"`
TransponderCapability string `json:"TransponderCapability"`
TransponderLevel uint8 `json:"TransponderLevel"`
SignalQuality string `json:"SignalQuality"`
SelectedAltitude int `json:"SelectedAltitude"`
SelectedHeading float64 `json:"SelectedHeading"`
BaroSetting float64 `json:"BaroSetting"`
// From AircraftState
Sources map[string]*SourceData `json:"sources"`
LastUpdate time.Time `json:"last_update"`
FirstSeen time.Time `json:"first_seen"`
TotalMessages int64 `json:"total_messages"`
PositionHistory []PositionPoint `json:"position_history"`
SignalHistory []SignalPoint `json:"signal_history"`
AltitudeHistory []AltitudePoint `json:"altitude_history"`
SpeedHistory []SpeedPoint `json:"speed_history"`
Distance float64 `json:"distance"`
Bearing float64 `json:"bearing"`
Age float64 `json:"age"`
MLATSources []string `json:"mlat_sources"`
PositionSource string `json:"position_source"`
UpdateRate float64 `json:"update_rate"`
Country string `json:"country"`
CountryCode string `json:"country_code"`
Flag string `json:"flag"`
}{
// Copy all fields from Aircraft
ICAO24: fmt.Sprintf("%06X", a.Aircraft.ICAO24),
Callsign: a.Aircraft.Callsign,
Latitude: a.Aircraft.Latitude,
Longitude: a.Aircraft.Longitude,
Altitude: a.Aircraft.Altitude,
BaroAltitude: a.Aircraft.BaroAltitude,
GeomAltitude: a.Aircraft.GeomAltitude,
VerticalRate: a.Aircraft.VerticalRate,
GroundSpeed: a.Aircraft.GroundSpeed,
Track: a.Aircraft.Track,
Heading: a.Aircraft.Heading,
Category: a.Aircraft.Category,
Squawk: a.Aircraft.Squawk,
SquawkDescription: a.Aircraft.SquawkDescription,
Emergency: a.Aircraft.Emergency,
OnGround: a.Aircraft.OnGround,
Alert: a.Aircraft.Alert,
SPI: a.Aircraft.SPI,
NACp: a.Aircraft.NACp,
NACv: a.Aircraft.NACv,
SIL: a.Aircraft.SIL,
TransponderCapability: a.Aircraft.TransponderCapability,
TransponderLevel: a.Aircraft.TransponderLevel,
SignalQuality: a.Aircraft.SignalQuality,
SelectedAltitude: a.Aircraft.SelectedAltitude,
SelectedHeading: a.Aircraft.SelectedHeading,
BaroSetting: a.Aircraft.BaroSetting,
// Copy all fields from AircraftState
Sources: a.Sources,
LastUpdate: a.LastUpdate,
FirstSeen: a.FirstSeen,
TotalMessages: a.TotalMessages,
PositionHistory: a.PositionHistory,
SignalHistory: a.SignalHistory,
AltitudeHistory: a.AltitudeHistory,
SpeedHistory: a.SpeedHistory,
Distance: a.Distance,
Bearing: a.Bearing,
Age: a.Age,
MLATSources: a.MLATSources,
PositionSource: a.PositionSource,
UpdateRate: a.UpdateRate,
Country: a.Country,
CountryCode: a.CountryCode,
Flag: a.Flag,
})
}
// SourceData represents data quality and statistics for a specific source-aircraft pair.
// This information is used for data fusion decisions and signal quality analysis.
type SourceData struct {
SourceID string `json:"source_id"` // Unique identifier of the source
SignalLevel float64 `json:"signal_level"` // Signal strength (dBFS)
Messages int64 `json:"messages"` // Messages received from this source
LastSeen time.Time `json:"last_seen"` // Last message timestamp from this source
Distance float64 `json:"distance"` // Distance from receiver to aircraft (km)
Bearing float64 `json:"bearing"` // Bearing from receiver to aircraft (degrees)
UpdateRate float64 `json:"update_rate"` // Updates per second from this source
}
// PositionPoint represents a timestamped position update in aircraft history.
// Used to build position trails for visualization and track analysis.
type PositionPoint struct {
Time time.Time `json:"time"` // Timestamp when position was received
Latitude float64 `json:"lat"` // Latitude in decimal degrees
Longitude float64 `json:"lon"` // Longitude in decimal degrees
Source string `json:"source"` // Source that provided this position
}
// SignalPoint represents a timestamped signal strength measurement.
// Used to track signal quality over time and analyze receiver performance.
type SignalPoint struct {
Time time.Time `json:"time"` // Timestamp when signal was measured
Signal float64 `json:"signal"` // Signal strength in dBFS
Source string `json:"source"` // Source that measured this signal
}
// AltitudePoint represents a timestamped altitude measurement.
// Includes vertical rate for flight profile analysis.
type AltitudePoint struct {
Time time.Time `json:"time"` // Timestamp when altitude was received
Altitude int `json:"altitude"` // Altitude in feet
VRate int `json:"vrate"` // Vertical rate in feet per minute
}
// SpeedPoint represents a timestamped speed and track measurement.
// Used for aircraft performance analysis and track prediction.
type SpeedPoint struct {
Time time.Time `json:"time"` // Timestamp when speed was received
GroundSpeed int `json:"ground_speed"` // Ground speed in knots (integer)
Track int `json:"track"` // Track angle in degrees (0-359)
}
// Merger handles merging aircraft data from multiple sources with intelligent conflict resolution.
//
// The merger maintains:
// - Complete aircraft states with multi-source data fusion
// - Source registry with connection status and statistics
// - Historical data with configurable retention limits
// - Update rate metrics for performance monitoring
// - Automatic stale aircraft cleanup
//
// Thread safety is provided by RWMutex for concurrent read access while
// maintaining write consistency during updates.
type Merger struct {
aircraft map[uint32]*AircraftState // ICAO24 -> merged aircraft state
sources map[string]*Source // Source ID -> source information
icaoDB *icao.Database // ICAO country lookup database
squawkDB *squawk.Database // Transponder code lookup database
db *database.Database // Optional persistent database
mu sync.RWMutex // Protects all maps and slices
historyLimit int // Maximum history points to retain
staleTimeout time.Duration // Time before aircraft considered stale (15 seconds)
updateMetrics map[uint32]*updateMetric // ICAO24 -> update rate calculation data
validationLog map[uint32]time.Time // ICAO24 -> last validation log time (rate limiting)
}
// updateMetric tracks recent update times for calculating update rates.
// Used internally to provide real-time update frequency information.
type updateMetric struct {
updates []time.Time // Recent update timestamps (last 30 seconds)
}
// NewMerger creates a new aircraft data merger with default configuration.
//
// Default settings:
// - History limit: 500 points per aircraft
// - Stale timeout: 15 seconds
// - Empty aircraft and source maps
// - Update metrics tracking enabled
//
// The merger is ready for immediate use after creation.
func NewMerger() (*Merger, error) {
return NewMergerWithDatabase(nil)
}
// NewMergerWithDatabase creates a new aircraft data merger with optional database support.
//
// If a database is provided, aircraft positions will be persisted to the database
// for historical analysis and long-term tracking. The database parameter can be nil
// to disable persistence.
//
// Default settings:
// - History limit: 500 points per aircraft
// - Stale timeout: 15 seconds
// - Empty aircraft and source maps
// - Update metrics tracking enabled
//
// The merger is ready for immediate use after creation.
func NewMergerWithDatabase(db *database.Database) (*Merger, error) {
icaoDB, err := icao.NewDatabase()
if err != nil {
return nil, fmt.Errorf("failed to initialize ICAO database: %w", err)
}
squawkDB := squawk.NewDatabase()
return &Merger{
aircraft: make(map[uint32]*AircraftState),
sources: make(map[string]*Source),
icaoDB: icaoDB,
squawkDB: squawkDB,
db: db,
historyLimit: 500,
staleTimeout: 15 * time.Second, // Aircraft timeout - reasonable for ADS-B tracking
updateMetrics: make(map[uint32]*updateMetric),
validationLog: make(map[uint32]time.Time),
}, nil
}
// AddSource registers a new data source with the merger.
//
// The source must have a unique ID and will be available for aircraft
// data updates immediately. Sources can be added at any time, even
// after the merger is actively processing data.
//
// Parameters:
// - source: Source configuration with ID, location, and connection details
func (m *Merger) AddSource(source *Source) {
m.mu.Lock()
defer m.mu.Unlock()
m.sources[source.ID] = source
}
// UpdateAircraft merges new aircraft data from a source using intelligent fusion strategies.
//
// This is the core method of the merger, handling:
// 1. Aircraft state creation for new aircraft
// 2. Source data tracking and statistics
// 3. Multi-source data fusion with conflict resolution
// 4. Historical data updates with retention limits
// 5. Distance and bearing calculations
// 6. Update rate metrics
// 7. Source status maintenance
//
// Data fusion strategies:
// - Position: Use source with strongest signal
// - Dynamic data: Prefer most recent updates
// - Quality indicators: Keep highest accuracy values
// - Identity: Use most recent non-empty values
//
// Parameters:
// - sourceID: Identifier of the source providing this data
// - aircraft: Decoded Mode S/ADS-B aircraft data
// - signal: Signal strength in dBFS
// - timestamp: When this data was received
func (m *Merger) UpdateAircraft(sourceID string, aircraft *modes.Aircraft, signal float64, timestamp time.Time) {
m.mu.Lock()
defer m.mu.Unlock()
// Get or create aircraft state
state, exists := m.aircraft[aircraft.ICAO24]
if !exists {
state = &AircraftState{
Aircraft: aircraft,
Sources: make(map[string]*SourceData),
FirstSeen: timestamp,
PositionHistory: make([]PositionPoint, 0),
SignalHistory: make([]SignalPoint, 0),
AltitudeHistory: make([]AltitudePoint, 0),
SpeedHistory: make([]SpeedPoint, 0),
}
// Lookup country information for new aircraft
icaoHex := fmt.Sprintf("%06X", aircraft.ICAO24)
if countryInfo, err := m.icaoDB.LookupCountry(icaoHex); err == nil {
state.Country = countryInfo.Country
state.CountryCode = countryInfo.CountryCode
state.Flag = countryInfo.Flag
} else {
// Fallback to unknown if lookup fails
state.Country = "Unknown"
state.CountryCode = "XX"
state.Flag = "🏳️"
}
m.aircraft[aircraft.ICAO24] = state
m.updateMetrics[aircraft.ICAO24] = &updateMetric{
updates: make([]time.Time, 0),
}
}
// Note: For existing aircraft, we don't overwrite state.Aircraft here
// The mergeAircraftData function will handle selective field updates
// Update or create source data
srcData, srcExists := state.Sources[sourceID]
if !srcExists {
srcData = &SourceData{
SourceID: sourceID,
}
state.Sources[sourceID] = srcData
}
// Update source data
srcData.SignalLevel = signal
srcData.Messages++
srcData.LastSeen = timestamp
// Calculate distance and bearing from source
if source, ok := m.sources[sourceID]; ok && aircraft.Latitude != 0 && aircraft.Longitude != 0 {
srcData.Distance, srcData.Bearing = calculateDistanceBearing(
source.Latitude, source.Longitude,
aircraft.Latitude, aircraft.Longitude,
)
}
// Update merged aircraft data (use best/newest data)
m.mergeAircraftData(state, aircraft, sourceID, timestamp)
// Update histories
m.updateHistories(state, aircraft, sourceID, signal, timestamp)
// Update metrics
m.updateUpdateRate(aircraft.ICAO24, timestamp)
// Update source statistics
if source, ok := m.sources[sourceID]; ok {
source.LastSeen = timestamp
source.Messages++
source.Active = true
}
state.LastUpdate = timestamp
state.TotalMessages++
// Persist to database if available and aircraft has position
if m.db != nil && aircraft.Latitude != 0 && aircraft.Longitude != 0 {
m.saveAircraftToDatabase(aircraft, sourceID, signal, timestamp)
}
}
// mergeAircraftData intelligently merges data from multiple sources with conflict resolution.
//
// This method implements the core data fusion logic:
//
// Position Data:
// - Uses source with strongest signal strength for best accuracy
// - Falls back to any available position if none exists
// - Tracks which source provided the current position
//
// Dynamic Data (altitude, speed, heading, vertical rate):
// - Always uses most recent data to reflect current aircraft state
// - Assumes more recent data is more accurate for rapidly changing values
//
// Identity Data (callsign, squawk, category):
// - Uses most recent non-empty values
// - Preserves existing values when new data is empty
//
// Quality Indicators (NACp, NACv, SIL):
// - Uses highest available accuracy values
// - Maintains best quality indicators across all sources
//
// Parameters:
// - state: Current merged aircraft state to update
// - new: New aircraft data from a source
// - sourceID: Identifier of source providing new data
// - timestamp: Timestamp of new data
func (m *Merger) mergeAircraftData(state *AircraftState, new *modes.Aircraft, sourceID string, timestamp time.Time) {
// Position - use source with best signal or most recent, but validate first
if new.Latitude != 0 && new.Longitude != 0 {
// Always validate position before considering update
validation := m.validatePosition(new, state, timestamp)
if !validation.Valid {
// Rate-limited logging: only log once every 10 seconds per aircraft
if lastLog, exists := m.validationLog[new.ICAO24]; !exists || timestamp.Sub(lastLog) > 10*time.Second {
icaoHex := fmt.Sprintf("%06X", new.ICAO24)
// Only log first error to reduce spam
if len(validation.Errors) > 0 {
log.Printf("[POSITION_VALIDATION] ICAO %s: REJECTED - %s", icaoHex, validation.Errors[0])
m.validationLog[new.ICAO24] = timestamp
}
}
} else {
// Position is valid, proceed with normal logic
updatePosition := false
if state.Latitude == 0 {
// First position update
updatePosition = true
} else if srcData, ok := state.Sources[sourceID]; ok {
// Use position from source with strongest signal
currentBest := m.getBestSignalSource(state)
if currentBest == "" || srcData.SignalLevel > state.Sources[currentBest].SignalLevel {
updatePosition = true
} else if currentBest == sourceID {
// Same source as current best - allow updates for moving aircraft
updatePosition = true
}
}
if updatePosition {
state.Latitude = new.Latitude
state.Longitude = new.Longitude
state.PositionSource = sourceID
}
}
// Rate-limited warning logging
if len(validation.Warnings) > 0 {
// Only log warnings once every 30 seconds per aircraft
warningKey := new.ICAO24 + 0x10000000 // Offset to differentiate from error logging
if lastLog, exists := m.validationLog[warningKey]; !exists || timestamp.Sub(lastLog) > 30*time.Second {
icaoHex := fmt.Sprintf("%06X", new.ICAO24)
log.Printf("[POSITION_VALIDATION] ICAO %s: WARNING - %s", icaoHex, validation.Warnings[0])
m.validationLog[warningKey] = timestamp
}
}
}
// Altitude - use most recent
if new.Altitude != 0 {
state.Altitude = new.Altitude
}
if new.BaroAltitude != 0 {
state.BaroAltitude = new.BaroAltitude
}
if new.GeomAltitude != 0 {
state.GeomAltitude = new.GeomAltitude
}
// Speed and track - use most recent
if new.GroundSpeed != 0 {
state.GroundSpeed = new.GroundSpeed
}
if new.Track != 0 {
state.Track = new.Track
}
if new.Heading != 0 {
state.Heading = new.Heading
}
// Vertical rate - use most recent
if new.VerticalRate != 0 {
state.VerticalRate = new.VerticalRate
}
// Identity - use most recent non-empty
if new.Callsign != "" {
state.Callsign = new.Callsign
}
if new.Squawk != "" {
state.Squawk = new.Squawk
// Look up squawk description
state.SquawkDescription = m.squawkDB.FormatSquawkWithDescription(new.Squawk)
}
if new.Category != "" {
state.Category = new.Category
}
// Status - use most recent
if new.Emergency != "" {
state.Emergency = new.Emergency
}
state.OnGround = new.OnGround
state.Alert = new.Alert
state.SPI = new.SPI
// Navigation accuracy - use best available
if new.NACp > state.NACp {
state.NACp = new.NACp
}
if new.NACv > state.NACv {
state.NACv = new.NACv
}
if new.SIL > state.SIL {
state.SIL = new.SIL
}
// Selected values - use most recent
if new.SelectedAltitude != 0 {
state.SelectedAltitude = new.SelectedAltitude
}
if new.SelectedHeading != 0 {
state.SelectedHeading = new.SelectedHeading
}
if new.BaroSetting != 0 {
state.BaroSetting = new.BaroSetting
}
// Transponder information - use most recent non-empty
if new.TransponderCapability != "" {
state.TransponderCapability = new.TransponderCapability
}
if new.TransponderLevel > 0 {
state.TransponderLevel = new.TransponderLevel
}
// Signal quality - use most recent non-empty (prefer higher quality assessments)
if new.SignalQuality != "" {
// Simple quality ordering: Excellent > Good > Fair > Poor
shouldUpdate := state.SignalQuality == "" ||
(new.SignalQuality == "Excellent") ||
(new.SignalQuality == "Good" && state.SignalQuality != "Excellent") ||
(new.SignalQuality == "Fair" && state.SignalQuality == "Poor")
if shouldUpdate {
state.SignalQuality = new.SignalQuality
}
}
}
// updateHistories adds data points to historical tracking arrays.
//
// Maintains time-series data for:
// - Position trail for track visualization
// - Signal strength for coverage analysis
// - Altitude profile for flight analysis
// - Speed history for performance tracking
//
// Each history array is limited by historyLimit to prevent unbounded growth.
// Only non-zero values are recorded to avoid cluttering histories with
// invalid or missing data points.
//
// Parameters:
// - state: Aircraft state to update histories for
// - aircraft: New aircraft data containing values to record
// - sourceID: Source providing this data point
// - signal: Signal strength measurement
// - timestamp: When this data was received
func (m *Merger) updateHistories(state *AircraftState, aircraft *modes.Aircraft, sourceID string, signal float64, timestamp time.Time) {
// Position history with validation
if aircraft.Latitude != 0 && aircraft.Longitude != 0 {
// Validate position before adding to history
validation := m.validatePosition(aircraft, state, timestamp)
if validation.Valid {
state.PositionHistory = append(state.PositionHistory, PositionPoint{
Time: timestamp,
Latitude: aircraft.Latitude,
Longitude: aircraft.Longitude,
Source: sourceID,
})
}
// Note: Validation errors/warnings are already logged in mergeAircraftData
}
// Signal history
if signal != 0 {
state.SignalHistory = append(state.SignalHistory, SignalPoint{
Time: timestamp,
Signal: signal,
Source: sourceID,
})
}
// Altitude history
if aircraft.Altitude != 0 {
state.AltitudeHistory = append(state.AltitudeHistory, AltitudePoint{
Time: timestamp,
Altitude: aircraft.Altitude,
VRate: aircraft.VerticalRate,
})
}
// Speed history
if aircraft.GroundSpeed != 0 {
state.SpeedHistory = append(state.SpeedHistory, SpeedPoint{
Time: timestamp,
GroundSpeed: aircraft.GroundSpeed,
Track: aircraft.Track,
})
}
// Trim histories if they exceed limit
if len(state.PositionHistory) > m.historyLimit {
state.PositionHistory = state.PositionHistory[len(state.PositionHistory)-m.historyLimit:]
}
if len(state.SignalHistory) > m.historyLimit {
state.SignalHistory = state.SignalHistory[len(state.SignalHistory)-m.historyLimit:]
}
if len(state.AltitudeHistory) > m.historyLimit {
state.AltitudeHistory = state.AltitudeHistory[len(state.AltitudeHistory)-m.historyLimit:]
}
if len(state.SpeedHistory) > m.historyLimit {
state.SpeedHistory = state.SpeedHistory[len(state.SpeedHistory)-m.historyLimit:]
}
}
// updateUpdateRate calculates and maintains the message update rate for an aircraft.
//
// The calculation:
// 1. Records the timestamp of each update
// 2. Maintains a sliding 30-second window of updates
// 3. Calculates updates per second over this window
// 4. Updates the aircraft's UpdateRate field
//
// This provides real-time feedback on data quality and can help identify
// aircraft that are updating frequently (close, good signal) vs infrequently
// (distant, weak signal).
//
// Parameters:
// - icao: ICAO24 address of the aircraft
// - timestamp: Timestamp of this update
func (m *Merger) updateUpdateRate(icao uint32, timestamp time.Time) {
metric := m.updateMetrics[icao]
metric.updates = append(metric.updates, timestamp)
// Keep only last 30 seconds of updates
cutoff := timestamp.Add(-30 * time.Second)
for len(metric.updates) > 0 && metric.updates[0].Before(cutoff) {
metric.updates = metric.updates[1:]
}
if len(metric.updates) > 1 {
duration := metric.updates[len(metric.updates)-1].Sub(metric.updates[0]).Seconds()
if duration > 0 {
if state, ok := m.aircraft[icao]; ok {
state.UpdateRate = float64(len(metric.updates)) / duration
}
}
}
}
// getBestSignalSource identifies the source with the strongest signal for this aircraft.
//
// Used in position data fusion to determine which source should provide
// the authoritative position. Sources with stronger signals typically
// provide more accurate position data.
//
// Parameters:
// - state: Aircraft state containing per-source signal data
//
// Returns the source ID with the highest signal level, or empty string if none.
func (m *Merger) getBestSignalSource(state *AircraftState) string {
var bestSource string
var bestSignal float64 = -999
for srcID, srcData := range state.Sources {
if srcData.SignalLevel > bestSignal {
bestSignal = srcData.SignalLevel
bestSource = srcID
}
}
return bestSource
}
// GetAircraft returns a snapshot of all current aircraft states.
//
// This method:
// 1. Filters out stale aircraft (older than staleTimeout)
// 2. Calculates current age for each aircraft
// 3. Determines closest receiver distance and bearing
// 4. Returns copies to prevent external modification
//
// The returned map uses ICAO24 addresses as keys and can be safely
// used by multiple goroutines without affecting the internal state.
//
// Returns a map of ICAO24 -> AircraftState for all non-stale aircraft.
func (m *Merger) GetAircraft() map[uint32]*AircraftState {
m.mu.RLock()
defer m.mu.RUnlock()
// Create copy and calculate ages
result := make(map[uint32]*AircraftState)
now := time.Now()
for icao, state := range m.aircraft {
// Skip stale aircraft
if now.Sub(state.LastUpdate) > m.staleTimeout {
continue
}
// Calculate age
stateCopy := *state
stateCopy.Age = now.Sub(state.LastUpdate).Seconds()
// Find closest receiver distance
minDistance := MaxDistance
for _, srcData := range state.Sources {
if srcData.Distance > 0 && srcData.Distance < minDistance {
minDistance = srcData.Distance
stateCopy.Distance = srcData.Distance
stateCopy.Bearing = srcData.Bearing
}
}
result[icao] = &stateCopy
}
return result
}
// GetSources returns all registered data sources.
//
// Provides access to source configuration, status, and statistics.
// Used by the web API to display source information and connection status.
//
// Returns a slice of all registered sources (active and inactive).
func (m *Merger) GetSources() []*Source {
m.mu.RLock()
defer m.mu.RUnlock()
sources := make([]*Source, 0, len(m.sources))
for _, src := range m.sources {
sources = append(sources, src)
}
return sources
}
// GetStatistics returns comprehensive merger and system statistics.
//
// The statistics include:
// - total_aircraft: Current number of tracked aircraft
// - aircraft_with_position: Number of aircraft with valid position data
// - aircraft_without_position: Number of aircraft without position data
// - total_messages: Sum of all messages processed
// - active_sources: Number of currently connected sources
// - aircraft_by_sources: Distribution of aircraft by number of tracking sources
//
// The position statistics help assess data quality and tracking effectiveness.
// The aircraft_by_sources map shows data quality - aircraft tracked by
// multiple sources generally have better position accuracy and reliability.
//
// Returns a map suitable for JSON serialization and web display.
func (m *Merger) GetStatistics() map[string]interface{} {
m.mu.RLock()
defer m.mu.RUnlock()
totalMessages := int64(0)
activeSources := 0
aircraftBySources := make(map[int]int) // Count by number of sources
aircraftWithPosition := 0
aircraftWithoutPosition := 0
for _, state := range m.aircraft {
totalMessages += state.TotalMessages
numSources := len(state.Sources)
aircraftBySources[numSources]++
// Check if aircraft has valid position data
if state.Aircraft.PositionValid &&
state.Aircraft.Latitude != 0.0 &&
state.Aircraft.Longitude != 0.0 {
aircraftWithPosition++
} else {
aircraftWithoutPosition++
}
}
for _, src := range m.sources {
if src.Active {
activeSources++
}
}
return map[string]interface{}{
"total_aircraft": len(m.aircraft),
"aircraft_with_position": aircraftWithPosition,
"aircraft_without_position": aircraftWithoutPosition,
"total_messages": totalMessages,
"active_sources": activeSources,
"aircraft_by_sources": aircraftBySources,
}
}
// CleanupStale removes aircraft that haven't been updated recently.
//
// Aircraft are considered stale if they haven't received updates for longer
// than staleTimeout (default 15 seconds). This cleanup prevents memory
// growth from aircraft that have left the coverage area or stopped transmitting.
//
// The cleanup also removes associated update metrics to free memory.
// This method is typically called periodically by the client manager.
func (m *Merger) CleanupStale() {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now()
for icao, state := range m.aircraft {
if now.Sub(state.LastUpdate) > m.staleTimeout {
delete(m.aircraft, icao)
delete(m.updateMetrics, icao)
// Clean up validation log entries
delete(m.validationLog, icao)
delete(m.validationLog, icao+0x10000000) // Warning key
}
}
}
// calculateDistanceBearing computes great circle distance and bearing between two points.
//
// Uses the Haversine formula for distance calculation and forward azimuth
// for bearing calculation. Both calculations account for Earth's spherical
// nature for accuracy over long distances.
//
// Distance is calculated in kilometers, bearing in degrees (0-360° from North).
// This is used to calculate aircraft distance from receivers and for
// coverage analysis.
//
// Parameters:
// - lat1, lon1: First point (receiver) coordinates in decimal degrees
// - lat2, lon2: Second point (aircraft) coordinates in decimal degrees
//
// Returns:
// - distance: Great circle distance in kilometers
// - bearing: Forward azimuth in degrees (0° = North, 90° = East)
func calculateDistanceBearing(lat1, lon1, lat2, lon2 float64) (float64, float64) {
// Haversine formula for distance
const R = 6371.0 // Earth radius in km
dLat := (lat2 - lat1) * math.Pi / 180
dLon := (lon2 - lon1) * math.Pi / 180
a := math.Sin(dLat/2)*math.Sin(dLat/2) +
math.Cos(lat1*math.Pi/180)*math.Cos(lat2*math.Pi/180)*
math.Sin(dLon/2)*math.Sin(dLon/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
distance := R * c
// Bearing calculation
y := math.Sin(dLon) * math.Cos(lat2*math.Pi/180)
x := math.Cos(lat1*math.Pi/180)*math.Sin(lat2*math.Pi/180) -
math.Sin(lat1*math.Pi/180)*math.Cos(lat2*math.Pi/180)*math.Cos(dLon)
bearing := math.Atan2(y, x) * 180 / math.Pi
if bearing < 0 {
bearing += 360
}
return distance, bearing
}
// validatePosition performs comprehensive validation of aircraft position data to filter out
// obviously incorrect flight paths and implausible position updates.
//
// This function implements multiple validation checks to improve data quality:
//
// 1. **Coordinate Validation**: Ensures latitude/longitude are within Earth's bounds
// 2. **Altitude Validation**: Rejects impossible altitudes (negative or > FL600)
// 3. **Speed Validation**: Calculates implied speed and rejects >Mach 3 movements
// 4. **Distance Validation**: Rejects position jumps >500nm without time justification
// 5. **Time Validation**: Ensures timestamps are chronologically consistent
//
// Parameters:
// - aircraft: New aircraft position data to validate
// - state: Current aircraft state with position history
// - timestamp: Timestamp of the new position data
//
// Returns:
// - ValidationResult with valid flag and detailed error/warning messages
func (m *Merger) validatePosition(aircraft *modes.Aircraft, state *AircraftState, timestamp time.Time) *ValidationResult {
result := &ValidationResult{
Valid: true,
Errors: make([]string, 0),
Warnings: make([]string, 0),
}
// Skip validation if no position data
if aircraft.Latitude == 0 && aircraft.Longitude == 0 {
return result // No position to validate
}
// 1. Geographic coordinate validation
if aircraft.Latitude < MinLatitude || aircraft.Latitude > MaxLatitude {
result.Valid = false
result.Errors = append(result.Errors, fmt.Sprintf("Invalid latitude: %.6f (must be between %.1f and %.1f)",
aircraft.Latitude, MinLatitude, MaxLatitude))
}
if aircraft.Longitude < MinLongitude || aircraft.Longitude > MaxLongitude {
result.Valid = false
result.Errors = append(result.Errors, fmt.Sprintf("Invalid longitude: %.6f (must be between %.1f and %.1f)",
aircraft.Longitude, MinLongitude, MaxLongitude))
}
// 2. Altitude validation
if aircraft.Altitude != 0 { // Only validate non-zero altitudes
if aircraft.Altitude < MinAltitudeFeet {
result.Valid = false
result.Errors = append(result.Errors, fmt.Sprintf("Impossible altitude: %d feet (below minimum %d)",
aircraft.Altitude, MinAltitudeFeet))
}
if aircraft.Altitude > MaxAltitudeFeet {
result.Valid = false
result.Errors = append(result.Errors, fmt.Sprintf("Impossible altitude: %d feet (above maximum %d)",
aircraft.Altitude, MaxAltitudeFeet))
}
}
// 3. Speed and distance validation (requires position history)
if len(state.PositionHistory) > 0 && state.Latitude != 0 && state.Longitude != 0 {
lastPos := state.PositionHistory[len(state.PositionHistory)-1]
// Calculate distance between positions
distance, _ := calculateDistanceBearing(lastPos.Latitude, lastPos.Longitude,
aircraft.Latitude, aircraft.Longitude)
// Calculate time difference
timeDiff := timestamp.Sub(lastPos.Time).Seconds()
if timeDiff > 0 {
// Calculate implied speed in knots
distanceNm := distance / NmToKm
speedKnots := (distanceNm / timeDiff) * 3600 // Convert to knots per hour
// Distance validation: reject jumps >500nm
if distanceNm > MaxDistanceNautMiles {
result.Valid = false
result.Errors = append(result.Errors, fmt.Sprintf("Impossible position jump: %.1f nm in %.1f seconds (max allowed: %.1f nm)",
distanceNm, timeDiff, MaxDistanceNautMiles))
}
// Speed validation: reject >2000 knots (roughly Mach 3)
if speedKnots > MaxSpeedKnots {
result.Valid = false
result.Errors = append(result.Errors, fmt.Sprintf("Impossible speed: %.0f knots (max allowed: %.0f knots)",
speedKnots, MaxSpeedKnots))
}
// Warning for high but possible speeds (>1000 knots) - increased threshold
if speedKnots > 1000 && speedKnots <= MaxSpeedKnots {
result.Warnings = append(result.Warnings, fmt.Sprintf("High speed detected: %.0f knots", speedKnots))
}
} else if timeDiff < 0 {
// 4. Time validation: reject out-of-order timestamps
result.Valid = false
result.Errors = append(result.Errors, fmt.Sprintf("Out-of-order timestamp: %.1f seconds in the past", -timeDiff))
}
}
// 5. Aircraft-specific validations based on reported speed vs. position
if aircraft.GroundSpeed > 0 && len(state.PositionHistory) > 0 {
// Check if reported ground speed is consistent with position changes
lastPos := state.PositionHistory[len(state.PositionHistory)-1]
distance, _ := calculateDistanceBearing(lastPos.Latitude, lastPos.Longitude,
aircraft.Latitude, aircraft.Longitude)
timeDiff := timestamp.Sub(lastPos.Time).Seconds()
if timeDiff > 0 {
distanceNm := distance / NmToKm
impliedSpeed := (distanceNm / timeDiff) * 3600
reportedSpeed := float64(aircraft.GroundSpeed)
// Warning if speeds differ significantly (>100 knots difference)
if math.Abs(impliedSpeed-reportedSpeed) > 100 && reportedSpeed > 50 {
result.Warnings = append(result.Warnings,
fmt.Sprintf("Speed inconsistency: reported %d knots, implied %.0f knots",
aircraft.GroundSpeed, impliedSpeed))
}
}
}
return result
}
// saveAircraftToDatabase persists aircraft position data to the database
func (m *Merger) saveAircraftToDatabase(aircraft *modes.Aircraft, sourceID string, signal float64, timestamp time.Time) {
// Convert ICAO24 to hex string
icaoHex := fmt.Sprintf("%06X", aircraft.ICAO24)
// Prepare database record
record := database.AircraftHistoryRecord{
ICAO: icaoHex,
Timestamp: timestamp,
Latitude: &aircraft.Latitude,
Longitude: &aircraft.Longitude,
SourceID: sourceID,
SignalStrength: &signal,
}
// Add optional fields if available
if aircraft.Altitude > 0 {
record.Altitude = &aircraft.Altitude
}
if aircraft.GroundSpeed > 0 {
record.Speed = &aircraft.GroundSpeed
}
if aircraft.Track >= 0 && aircraft.Track < 360 {
record.Track = &aircraft.Track
}
if aircraft.VerticalRate != 0 {
record.VerticalRate = &aircraft.VerticalRate
}
if aircraft.Squawk != "" && aircraft.Squawk != "0000" {
record.Squawk = &aircraft.Squawk
}
if aircraft.Callsign != "" {
record.Callsign = &aircraft.Callsign
}
// Save to database (non-blocking to avoid slowing down real-time processing)
go func() {
if err := m.db.GetHistoryManager().RecordAircraft(&record); err != nil {
log.Printf("Warning: Failed to save aircraft %s to database: %v", icaoHex, err)
}
}()
}
// Close closes the merger and releases resources
func (m *Merger) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.icaoDB != nil {
return m.icaoDB.Close()
}
return nil
}