skyview/internal/client/beast.go
Ole-Morten Duesund 1425f0a018 Restructure assets to top-level package and add Reset Map button
- Move assets from internal/assets to top-level assets/ package for clean embed directive
- Consolidate all static files in single location (assets/static/)
- Remove duplicate static file locations to maintain single source of truth
- Add Reset Map button to map controls with full functionality
- Implement resetMap() method to return map to calculated origin position
- Store origin in this.mapOrigin for reset functionality
- Fix go:embed pattern to work without parent directory references

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-24 00:57:49 +02:00

249 lines
5.1 KiB
Go

package client
import (
"context"
"fmt"
"net"
"sync"
"time"
"skyview/internal/beast"
"skyview/internal/merger"
"skyview/internal/modes"
)
// BeastClient handles connection to a single dump1090 Beast TCP stream
type BeastClient struct {
source *merger.Source
merger *merger.Merger
decoder *modes.Decoder
conn net.Conn
parser *beast.Parser
msgChan chan *beast.Message
errChan chan error
stopChan chan struct{}
wg sync.WaitGroup
reconnectDelay time.Duration
maxReconnect time.Duration
}
// NewBeastClient creates a new Beast format TCP client
func NewBeastClient(source *merger.Source, merger *merger.Merger) *BeastClient {
return &BeastClient{
source: source,
merger: merger,
decoder: modes.NewDecoder(),
msgChan: make(chan *beast.Message, 1000),
errChan: make(chan error, 10),
stopChan: make(chan struct{}),
reconnectDelay: 5 * time.Second,
maxReconnect: 60 * time.Second,
}
}
// Start begins the client connection and processing
func (c *BeastClient) Start(ctx context.Context) {
c.wg.Add(1)
go c.run(ctx)
}
// Stop gracefully stops the client
func (c *BeastClient) Stop() {
close(c.stopChan)
if c.conn != nil {
c.conn.Close()
}
c.wg.Wait()
}
// run is the main client loop
func (c *BeastClient) run(ctx context.Context) {
defer c.wg.Done()
reconnectDelay := c.reconnectDelay
for {
select {
case <-ctx.Done():
return
case <-c.stopChan:
return
default:
}
// Connect to Beast TCP stream
addr := fmt.Sprintf("%s:%d", c.source.Host, c.source.Port)
fmt.Printf("Connecting to Beast stream at %s (%s)...\n", addr, c.source.Name)
conn, err := net.DialTimeout("tcp", addr, 10*time.Second)
if err != nil {
fmt.Printf("Failed to connect to %s: %v\n", c.source.Name, err)
c.source.Active = false
// Exponential backoff
time.Sleep(reconnectDelay)
if reconnectDelay < c.maxReconnect {
reconnectDelay *= 2
}
continue
}
c.conn = conn
c.source.Active = true
reconnectDelay = c.reconnectDelay // Reset backoff
fmt.Printf("Connected to %s at %s\n", c.source.Name, addr)
// Create parser for this connection
c.parser = beast.NewParser(conn, c.source.ID)
// Start processing messages
c.wg.Add(2)
go c.readMessages()
go c.processMessages()
// Wait for disconnect
select {
case <-ctx.Done():
c.conn.Close()
return
case <-c.stopChan:
c.conn.Close()
return
case err := <-c.errChan:
fmt.Printf("Error from %s: %v\n", c.source.Name, err)
c.conn.Close()
c.source.Active = false
}
// Wait for goroutines to finish
time.Sleep(1 * time.Second)
}
}
// readMessages reads Beast messages from the TCP stream
func (c *BeastClient) readMessages() {
defer c.wg.Done()
c.parser.ParseStream(c.msgChan, c.errChan)
}
// processMessages decodes and merges aircraft data
func (c *BeastClient) processMessages() {
defer c.wg.Done()
for {
select {
case <-c.stopChan:
return
case msg := <-c.msgChan:
if msg == nil {
return
}
// Decode Mode S message
aircraft, err := c.decoder.Decode(msg.Data)
if err != nil {
continue // Skip invalid messages
}
// Update merger with new data
c.merger.UpdateAircraft(
c.source.ID,
aircraft,
msg.GetSignalStrength(),
msg.ReceivedAt,
)
// Update source statistics
c.source.Messages++
}
}
}
// MultiSourceClient manages multiple Beast TCP clients
type MultiSourceClient struct {
clients []*BeastClient
merger *merger.Merger
mu sync.RWMutex
}
// NewMultiSourceClient creates a client that connects to multiple Beast sources
func NewMultiSourceClient(merger *merger.Merger) *MultiSourceClient {
return &MultiSourceClient{
clients: make([]*BeastClient, 0),
merger: merger,
}
}
// AddSource adds a new Beast TCP source
func (m *MultiSourceClient) AddSource(source *merger.Source) {
m.mu.Lock()
defer m.mu.Unlock()
// Register source with merger
m.merger.AddSource(source)
// Create and start client
client := NewBeastClient(source, m.merger)
m.clients = append(m.clients, client)
}
// Start begins all client connections
func (m *MultiSourceClient) Start(ctx context.Context) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, client := range m.clients {
client.Start(ctx)
}
// Start cleanup routine
go m.cleanupRoutine(ctx)
}
// Stop gracefully stops all clients
func (m *MultiSourceClient) Stop() {
m.mu.RLock()
defer m.mu.RUnlock()
for _, client := range m.clients {
client.Stop()
}
}
// cleanupRoutine periodically removes stale aircraft
func (m *MultiSourceClient) cleanupRoutine(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.merger.CleanupStale()
}
}
}
// GetStatistics returns client statistics
func (m *MultiSourceClient) GetStatistics() map[string]interface{} {
m.mu.RLock()
defer m.mu.RUnlock()
stats := m.merger.GetStatistics()
// Add client-specific stats
activeClients := 0
for _, client := range m.clients {
if client.source.Active {
activeClients++
}
}
stats["active_clients"] = activeClients
stats["total_clients"] = len(m.clients)
return stats
}