Major features implemented: - Beast binary format parser with full Mode S/ADS-B decoding - Multi-source data merger with intelligent signal-based fusion - Advanced web frontend with 5 view modes (Map, Table, Stats, Coverage, 3D) - Real-time WebSocket updates with sub-second latency - Signal strength analysis and coverage heatmaps - Debian packaging with systemd integration - Production-ready deployment with security hardening Technical highlights: - Concurrent TCP clients with auto-reconnection - CPR position decoding and aircraft identification - Historical flight tracking with position trails - Range circles and receiver location visualization - Mobile-responsive design with professional UI - REST API and WebSocket real-time updates - Comprehensive build system and documentation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
249 lines
No EOL
5.2 KiB
Go
249 lines
No EOL
5.2 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"skyview/internal/beast"
|
|
"skyview/internal/merger"
|
|
"skyview/internal/modes"
|
|
)
|
|
|
|
// BeastClient handles connection to a single dump1090 Beast TCP stream
|
|
type BeastClient struct {
|
|
source *merger.Source
|
|
merger *merger.Merger
|
|
decoder *modes.Decoder
|
|
conn net.Conn
|
|
parser *beast.Parser
|
|
msgChan chan *beast.Message
|
|
errChan chan error
|
|
stopChan chan struct{}
|
|
wg sync.WaitGroup
|
|
|
|
reconnectDelay time.Duration
|
|
maxReconnect time.Duration
|
|
}
|
|
|
|
// NewBeastClient creates a new Beast format TCP client
|
|
func NewBeastClient(source *merger.Source, merger *merger.Merger) *BeastClient {
|
|
return &BeastClient{
|
|
source: source,
|
|
merger: merger,
|
|
decoder: modes.NewDecoder(),
|
|
msgChan: make(chan *beast.Message, 1000),
|
|
errChan: make(chan error, 10),
|
|
stopChan: make(chan struct{}),
|
|
reconnectDelay: 5 * time.Second,
|
|
maxReconnect: 60 * time.Second,
|
|
}
|
|
}
|
|
|
|
// Start begins the client connection and processing
|
|
func (c *BeastClient) Start(ctx context.Context) {
|
|
c.wg.Add(1)
|
|
go c.run(ctx)
|
|
}
|
|
|
|
// Stop gracefully stops the client
|
|
func (c *BeastClient) Stop() {
|
|
close(c.stopChan)
|
|
if c.conn != nil {
|
|
c.conn.Close()
|
|
}
|
|
c.wg.Wait()
|
|
}
|
|
|
|
// run is the main client loop
|
|
func (c *BeastClient) run(ctx context.Context) {
|
|
defer c.wg.Done()
|
|
|
|
reconnectDelay := c.reconnectDelay
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-c.stopChan:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Connect to Beast TCP stream
|
|
addr := fmt.Sprintf("%s:%d", c.source.Host, c.source.Port)
|
|
fmt.Printf("Connecting to Beast stream at %s (%s)...\n", addr, c.source.Name)
|
|
|
|
conn, err := net.DialTimeout("tcp", addr, 10*time.Second)
|
|
if err != nil {
|
|
fmt.Printf("Failed to connect to %s: %v\n", c.source.Name, err)
|
|
c.source.Active = false
|
|
|
|
// Exponential backoff
|
|
time.Sleep(reconnectDelay)
|
|
if reconnectDelay < c.maxReconnect {
|
|
reconnectDelay *= 2
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.conn = conn
|
|
c.source.Active = true
|
|
reconnectDelay = c.reconnectDelay // Reset backoff
|
|
|
|
fmt.Printf("Connected to %s at %s\n", c.source.Name, addr)
|
|
|
|
// Create parser for this connection
|
|
c.parser = beast.NewParser(conn, c.source.ID)
|
|
|
|
// Start processing messages
|
|
c.wg.Add(2)
|
|
go c.readMessages()
|
|
go c.processMessages()
|
|
|
|
// Wait for disconnect
|
|
select {
|
|
case <-ctx.Done():
|
|
c.conn.Close()
|
|
return
|
|
case <-c.stopChan:
|
|
c.conn.Close()
|
|
return
|
|
case err := <-c.errChan:
|
|
fmt.Printf("Error from %s: %v\n", c.source.Name, err)
|
|
c.conn.Close()
|
|
c.source.Active = false
|
|
}
|
|
|
|
// Wait for goroutines to finish
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
|
|
// readMessages reads Beast messages from the TCP stream
|
|
func (c *BeastClient) readMessages() {
|
|
defer c.wg.Done()
|
|
c.parser.ParseStream(c.msgChan, c.errChan)
|
|
}
|
|
|
|
// processMessages decodes and merges aircraft data
|
|
func (c *BeastClient) processMessages() {
|
|
defer c.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-c.stopChan:
|
|
return
|
|
case msg := <-c.msgChan:
|
|
if msg == nil {
|
|
return
|
|
}
|
|
|
|
// Decode Mode S message
|
|
aircraft, err := c.decoder.Decode(msg.Data)
|
|
if err != nil {
|
|
continue // Skip invalid messages
|
|
}
|
|
|
|
// Update merger with new data
|
|
c.merger.UpdateAircraft(
|
|
c.source.ID,
|
|
aircraft,
|
|
msg.GetSignalStrength(),
|
|
msg.ReceivedAt,
|
|
)
|
|
|
|
// Update source statistics
|
|
c.source.Messages++
|
|
}
|
|
}
|
|
}
|
|
|
|
// MultiSourceClient manages multiple Beast TCP clients
|
|
type MultiSourceClient struct {
|
|
clients []*BeastClient
|
|
merger *merger.Merger
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewMultiSourceClient creates a client that connects to multiple Beast sources
|
|
func NewMultiSourceClient(merger *merger.Merger) *MultiSourceClient {
|
|
return &MultiSourceClient{
|
|
clients: make([]*BeastClient, 0),
|
|
merger: merger,
|
|
}
|
|
}
|
|
|
|
// AddSource adds a new Beast TCP source
|
|
func (m *MultiSourceClient) AddSource(source *merger.Source) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// Register source with merger
|
|
m.merger.AddSource(source)
|
|
|
|
// Create and start client
|
|
client := NewBeastClient(source, m.merger)
|
|
m.clients = append(m.clients, client)
|
|
}
|
|
|
|
// Start begins all client connections
|
|
func (m *MultiSourceClient) Start(ctx context.Context) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
for _, client := range m.clients {
|
|
client.Start(ctx)
|
|
}
|
|
|
|
// Start cleanup routine
|
|
go m.cleanupRoutine(ctx)
|
|
}
|
|
|
|
// Stop gracefully stops all clients
|
|
func (m *MultiSourceClient) Stop() {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
for _, client := range m.clients {
|
|
client.Stop()
|
|
}
|
|
}
|
|
|
|
// cleanupRoutine periodically removes stale aircraft
|
|
func (m *MultiSourceClient) cleanupRoutine(ctx context.Context) {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
m.merger.CleanupStale()
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetStatistics returns client statistics
|
|
func (m *MultiSourceClient) GetStatistics() map[string]interface{} {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
stats := m.merger.GetStatistics()
|
|
|
|
// Add client-specific stats
|
|
activeClients := 0
|
|
for _, client := range m.clients {
|
|
if client.source.Active {
|
|
activeClients++
|
|
}
|
|
}
|
|
|
|
stats["active_clients"] = activeClients
|
|
stats["total_clients"] = len(m.clients)
|
|
|
|
return stats
|
|
} |