Cleanup: - Remove unused aircraft-icon.svg (replaced by type-specific icons) - Remove test files: beast-dump-with-heli.bin, beast.test, main, old.json, ux.png - Remove duplicate config.json.example (kept config.example.json) - Remove empty internal/coverage/ directory - Move CLAUDE.md to project root - Update assets.go documentation to reflect current icon structure - Format all Go code with gofmt Server Host Binding Fix: - Fix critical bug where server host configuration was ignored - Add host parameter to Server struct and NewWebServer constructor - Rename NewServer to NewWebServer for better clarity - Fix IPv6 address formatting in server binding (wrap in brackets) - Update startup message to show correct bind address format - Support localhost-only, IPv4, IPv6, and interface-specific binding This resolves the "too many colons in address" error for IPv6 hosts like ::1 and enables proper localhost-only deployment as configured. Closes #15 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
411 lines
13 KiB
Go
411 lines
13 KiB
Go
// Package client provides Beast format TCP client implementations for connecting to ADS-B receivers.
|
|
//
|
|
// This package handles the network connectivity and data streaming from dump1090 or similar
|
|
// Beast format sources. It provides:
|
|
// - Single-source Beast TCP client with automatic reconnection
|
|
// - Multi-source client manager for handling multiple receivers
|
|
// - Exponential backoff for connection failures
|
|
// - Message parsing and Mode S decoding integration
|
|
// - Automatic stale aircraft cleanup
|
|
//
|
|
// The Beast format is a binary protocol commonly used by dump1090 and other ADS-B
|
|
// software to stream real-time aircraft data over TCP port 30005. This package
|
|
// abstracts the connection management and integrates with the merger for
|
|
// multi-source data fusion.
|
|
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"skyview/internal/beast"
|
|
"skyview/internal/merger"
|
|
"skyview/internal/modes"
|
|
)
|
|
|
|
// BeastClient handles connection to a single dump1090 Beast format TCP stream.
|
|
//
|
|
// The client provides robust connectivity with:
|
|
// - Automatic reconnection with exponential backoff
|
|
// - Concurrent message reading and processing
|
|
// - Integration with Mode S decoder and data merger
|
|
// - Source status tracking and statistics
|
|
// - Graceful shutdown handling
|
|
//
|
|
// Each client maintains a persistent connection to one Beast source and
|
|
// continuously processes incoming messages until stopped or the source
|
|
// becomes unavailable.
|
|
type BeastClient struct {
|
|
source *merger.Source // Source configuration and status
|
|
merger *merger.Merger // Data merger for multi-source fusion
|
|
decoder *modes.Decoder // Mode S/ADS-B message decoder
|
|
conn net.Conn // TCP connection to Beast source
|
|
parser *beast.Parser // Beast format message parser
|
|
msgChan chan *beast.Message // Buffered channel for parsed messages
|
|
errChan chan error // Error reporting channel
|
|
stopChan chan struct{} // Shutdown signal channel
|
|
wg sync.WaitGroup // Wait group for goroutine coordination
|
|
|
|
// Reconnection parameters
|
|
reconnectDelay time.Duration // Initial reconnect delay
|
|
maxReconnect time.Duration // Maximum reconnect delay (for backoff cap)
|
|
}
|
|
|
|
// NewBeastClient creates a new Beast format TCP client for a specific data source.
|
|
//
|
|
// The client is configured with:
|
|
// - Buffered message channel (1000 messages) to handle burst traffic
|
|
// - Error channel for connection and parsing issues
|
|
// - Initial reconnect delay of 5 seconds
|
|
// - Maximum reconnect delay of 60 seconds (exponential backoff cap)
|
|
// - Fresh Mode S decoder instance
|
|
//
|
|
// Parameters:
|
|
// - source: Source configuration including host, port, and metadata
|
|
// - merger: Data merger instance for aircraft state management
|
|
//
|
|
// Returns a configured but not yet started BeastClient.
|
|
func NewBeastClient(source *merger.Source, merger *merger.Merger) *BeastClient {
|
|
return &BeastClient{
|
|
source: source,
|
|
merger: merger,
|
|
decoder: modes.NewDecoder(source.Latitude, source.Longitude),
|
|
msgChan: make(chan *beast.Message, 5000),
|
|
errChan: make(chan error, 10),
|
|
stopChan: make(chan struct{}),
|
|
reconnectDelay: 5 * time.Second,
|
|
maxReconnect: 60 * time.Second,
|
|
}
|
|
}
|
|
|
|
// Start begins the client connection and message processing in the background.
|
|
//
|
|
// The client will:
|
|
// - Attempt to connect to the configured Beast source
|
|
// - Handle connection failures with exponential backoff
|
|
// - Start message reading and processing goroutines
|
|
// - Continuously reconnect if the connection is lost
|
|
//
|
|
// The method returns immediately; the client runs in background goroutines
|
|
// until Stop() is called or the context is cancelled.
|
|
//
|
|
// Parameters:
|
|
// - ctx: Context for cancellation and timeout control
|
|
func (c *BeastClient) Start(ctx context.Context) {
|
|
c.wg.Add(1)
|
|
go c.run(ctx)
|
|
}
|
|
|
|
// Stop gracefully shuts down the client and all associated goroutines.
|
|
//
|
|
// The shutdown process:
|
|
// 1. Signals all goroutines to stop via stopChan
|
|
// 2. Closes the TCP connection if active
|
|
// 3. Waits for all goroutines to complete
|
|
//
|
|
// This method blocks until the shutdown is complete.
|
|
func (c *BeastClient) Stop() {
|
|
close(c.stopChan)
|
|
if c.conn != nil {
|
|
c.conn.Close()
|
|
}
|
|
c.wg.Wait()
|
|
}
|
|
|
|
// run implements the main client connection and reconnection loop.
|
|
//
|
|
// This method handles the complete client lifecycle:
|
|
// 1. Connection establishment with timeout
|
|
// 2. Exponential backoff on connection failures
|
|
// 3. Message parsing and processing goroutine management
|
|
// 4. Connection monitoring and failure detection
|
|
// 5. Automatic reconnection on disconnection
|
|
//
|
|
// The exponential backoff starts at reconnectDelay (5s) and doubles on each
|
|
// failure up to maxReconnect (60s), then resets on successful connection.
|
|
//
|
|
// Source status is updated to reflect connection state for monitoring.
|
|
func (c *BeastClient) run(ctx context.Context) {
|
|
defer c.wg.Done()
|
|
|
|
reconnectDelay := c.reconnectDelay
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-c.stopChan:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Connect to Beast TCP stream
|
|
addr := fmt.Sprintf("%s:%d", c.source.Host, c.source.Port)
|
|
fmt.Printf("Connecting to Beast stream at %s (%s)...\n", addr, c.source.Name)
|
|
|
|
conn, err := net.DialTimeout("tcp", addr, 30*time.Second)
|
|
if err != nil {
|
|
fmt.Printf("Failed to connect to %s: %v\n", c.source.Name, err)
|
|
c.source.Active = false
|
|
|
|
// Exponential backoff
|
|
time.Sleep(reconnectDelay)
|
|
if reconnectDelay < c.maxReconnect {
|
|
reconnectDelay *= 2
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.conn = conn
|
|
c.source.Active = true
|
|
reconnectDelay = c.reconnectDelay // Reset backoff
|
|
|
|
fmt.Printf("Connected to %s at %s\n", c.source.Name, addr)
|
|
|
|
// Create parser for this connection
|
|
c.parser = beast.NewParser(conn, c.source.ID)
|
|
|
|
// Start processing messages
|
|
c.wg.Add(2)
|
|
go c.readMessages()
|
|
go c.processMessages()
|
|
|
|
// Wait for disconnect
|
|
select {
|
|
case <-ctx.Done():
|
|
c.conn.Close()
|
|
return
|
|
case <-c.stopChan:
|
|
c.conn.Close()
|
|
return
|
|
case err := <-c.errChan:
|
|
fmt.Printf("Error from %s: %v\n", c.source.Name, err)
|
|
c.conn.Close()
|
|
c.source.Active = false
|
|
}
|
|
|
|
// Wait for goroutines to finish
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
|
|
// readMessages runs in a dedicated goroutine to read Beast format messages.
|
|
//
|
|
// This method:
|
|
// - Continuously reads from the TCP connection
|
|
// - Parses Beast format binary data into Message structs
|
|
// - Queues parsed messages for processing
|
|
// - Reports parsing errors to the error channel
|
|
//
|
|
// The method blocks on the parser's ParseStream call and exits when
|
|
// the connection is closed or an unrecoverable error occurs.
|
|
func (c *BeastClient) readMessages() {
|
|
defer c.wg.Done()
|
|
c.parser.ParseStream(c.msgChan, c.errChan)
|
|
}
|
|
|
|
// processMessages runs in a dedicated goroutine to decode and merge aircraft data.
|
|
//
|
|
// For each received Beast message, this method:
|
|
// 1. Decodes the Mode S/ADS-B message payload
|
|
// 2. Extracts aircraft information (position, altitude, speed, etc.)
|
|
// 3. Updates the data merger with new aircraft state
|
|
// 4. Updates source statistics (message count)
|
|
//
|
|
// Invalid or unparseable messages are silently discarded to maintain
|
|
// system stability. The merger handles data fusion from multiple sources
|
|
// and conflict resolution based on signal strength.
|
|
func (c *BeastClient) processMessages() {
|
|
defer c.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-c.stopChan:
|
|
return
|
|
case msg := <-c.msgChan:
|
|
if msg == nil {
|
|
return
|
|
}
|
|
|
|
// Decode Mode S message
|
|
aircraft, err := c.decoder.Decode(msg.Data)
|
|
if err != nil {
|
|
continue // Skip invalid messages
|
|
}
|
|
|
|
// Update merger with new data
|
|
c.merger.UpdateAircraft(
|
|
c.source.ID,
|
|
aircraft,
|
|
msg.GetSignalStrength(),
|
|
msg.ReceivedAt,
|
|
)
|
|
|
|
// Update source statistics
|
|
c.source.Messages++
|
|
}
|
|
}
|
|
}
|
|
|
|
// MultiSourceClient manages multiple Beast TCP clients for multi-receiver setups.
|
|
//
|
|
// This client coordinator:
|
|
// - Manages connections to multiple Beast format sources simultaneously
|
|
// - Provides unified control for starting and stopping all clients
|
|
// - Runs periodic cleanup tasks for stale aircraft data
|
|
// - Aggregates statistics from all managed clients
|
|
// - Handles dynamic source addition and management
|
|
//
|
|
// All clients share the same data merger, enabling automatic data fusion
|
|
// and conflict resolution across multiple receivers.
|
|
type MultiSourceClient struct {
|
|
clients []*BeastClient // Managed Beast clients
|
|
merger *merger.Merger // Shared data merger for all sources
|
|
mu sync.RWMutex // Protects clients slice
|
|
}
|
|
|
|
// NewMultiSourceClient creates a client manager for multiple Beast format sources.
|
|
//
|
|
// The multi-source client enables connecting to multiple dump1090 instances
|
|
// or other Beast format sources simultaneously. All sources feed into the
|
|
// same data merger, which handles automatic data fusion and conflict resolution.
|
|
//
|
|
// This is essential for:
|
|
// - Improved coverage from multiple receivers
|
|
// - Redundancy in case of individual receiver failures
|
|
// - Data quality improvement through signal strength comparison
|
|
//
|
|
// Parameters:
|
|
// - merger: Shared data merger instance for all sources
|
|
//
|
|
// Returns a configured multi-source client ready for source addition.
|
|
func NewMultiSourceClient(merger *merger.Merger) *MultiSourceClient {
|
|
return &MultiSourceClient{
|
|
clients: make([]*BeastClient, 0),
|
|
merger: merger,
|
|
}
|
|
}
|
|
|
|
// AddSource registers and configures a new Beast format data source.
|
|
//
|
|
// This method:
|
|
// 1. Registers the source with the data merger
|
|
// 2. Creates a new BeastClient for the source
|
|
// 3. Adds the client to the managed clients list
|
|
//
|
|
// The source is not automatically started; call Start() to begin connections.
|
|
// Sources can be added before or after starting the multi-source client.
|
|
//
|
|
// Parameters:
|
|
// - source: Source configuration including connection details and metadata
|
|
func (m *MultiSourceClient) AddSource(source *merger.Source) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// Register source with merger
|
|
m.merger.AddSource(source)
|
|
|
|
// Create and start client
|
|
client := NewBeastClient(source, m.merger)
|
|
m.clients = append(m.clients, client)
|
|
}
|
|
|
|
// Start begins connections to all configured Beast sources.
|
|
//
|
|
// This method:
|
|
// - Starts all managed BeastClient instances in parallel
|
|
// - Begins the periodic cleanup routine for stale aircraft data
|
|
// - Uses the provided context for cancellation control
|
|
//
|
|
// Each client will independently attempt connections with their own
|
|
// reconnection logic. The method returns immediately; all clients
|
|
// operate in background goroutines.
|
|
//
|
|
// Parameters:
|
|
// - ctx: Context for cancellation and timeout control
|
|
func (m *MultiSourceClient) Start(ctx context.Context) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
for _, client := range m.clients {
|
|
client.Start(ctx)
|
|
}
|
|
|
|
// Start cleanup routine
|
|
go m.cleanupRoutine(ctx)
|
|
}
|
|
|
|
// Stop gracefully shuts down all managed Beast clients.
|
|
//
|
|
// This method stops all clients in parallel and waits for their
|
|
// goroutines to complete. The shutdown is coordinated to ensure
|
|
// clean termination of all network connections and processing routines.
|
|
func (m *MultiSourceClient) Stop() {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
for _, client := range m.clients {
|
|
client.Stop()
|
|
}
|
|
}
|
|
|
|
// cleanupRoutine runs periodic maintenance tasks in a background goroutine.
|
|
//
|
|
// Currently performs:
|
|
// - Stale aircraft cleanup every 30 seconds
|
|
// - Removal of aircraft that haven't been updated recently
|
|
//
|
|
// The cleanup frequency is designed to balance memory usage with
|
|
// the typical aircraft update rates in ADS-B systems. Aircraft
|
|
// typically update their position every few seconds when in range.
|
|
//
|
|
// Parameters:
|
|
// - ctx: Context for cancellation when the client shuts down
|
|
func (m *MultiSourceClient) cleanupRoutine(ctx context.Context) {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
m.merger.CleanupStale()
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetStatistics returns comprehensive statistics from all managed clients.
|
|
//
|
|
// The statistics include:
|
|
// - All merger statistics (aircraft count, message rates, etc.)
|
|
// - Number of active client connections
|
|
// - Total number of configured clients
|
|
// - Per-source connection status and message counts
|
|
//
|
|
// This information is useful for monitoring system health, diagnosing
|
|
// connectivity issues, and understanding data quality across sources.
|
|
//
|
|
// Returns a map of statistics suitable for JSON serialization and web display.
|
|
func (m *MultiSourceClient) GetStatistics() map[string]interface{} {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
stats := m.merger.GetStatistics()
|
|
|
|
// Add client-specific stats
|
|
activeClients := 0
|
|
for _, client := range m.clients {
|
|
if client.source.Active {
|
|
activeClients++
|
|
}
|
|
}
|
|
|
|
stats["active_clients"] = activeClients
|
|
stats["total_clients"] = len(m.clients)
|
|
|
|
return stats
|
|
}
|