Add VRS JSON format support for readsb integration
Added comprehensive support for VRS (Virtual Radar Server) JSON format
as a simpler alternative to Beast binary protocol, enabling integration
with readsb --net-vrs-port output.
## Key Features:
- **VRS JSON Parser**: Stream parsing of newline-delimited JSON aircraft data
- **VRS Client**: TCP client with automatic reconnection and error recovery
- **Mixed Format Support**: Use Beast and VRS sources simultaneously
- **Enhanced Aircraft Data**: Added VRS-specific fields (registration, type, operator)
- **Position Source Tracking**: Identifies ADS-B, MLAT, TIS-B, and satellite positions
## Implementation:
- `internal/vrs/parser.go`: VRS JSON message parsing and validation
- `internal/client/vrs.go`: VRS TCP client implementation
- Enhanced `MultiSourceClient` to support both Beast and VRS formats
- Extended `Aircraft` struct with validity flags and additional metadata
- Updated configuration to include `format` field ("beast" or "vrs")
## Testing:
- Successfully tested against svovel:33005 VRS JSON stream
- Verified aircraft data parsing and position tracking
- Confirmed mixed-format operation with existing Beast clients
## Documentation:
- Updated README.md with VRS format configuration examples
- Enhanced ARCHITECTURE.md with VRS parser documentation
- Added data format comparison and configuration guide
This enables simpler integration with modern readsb installations while
maintaining full backward compatibility with existing Beast deployments.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
3d92ce4481
commit
073acb7304
10 changed files with 903 additions and 40 deletions
350
internal/client/vrs.go
Normal file
350
internal/client/vrs.go
Normal file
|
|
@ -0,0 +1,350 @@
|
|||
// Package client provides VRS JSON format client implementation for connecting to readsb receivers.
|
||||
//
|
||||
// This file implements a VRS (Virtual Radar Server) JSON format client that connects
|
||||
// to readsb instances with --net-vrs-port enabled. VRS JSON is a simpler alternative
|
||||
// to the Beast binary protocol, providing aircraft data as JSON over TCP.
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"skyview/internal/merger"
|
||||
"skyview/internal/modes"
|
||||
"skyview/internal/vrs"
|
||||
)
|
||||
|
||||
// VRSClient handles connection to a single readsb VRS JSON TCP stream
|
||||
//
|
||||
// The VRS client provides robust connectivity with:
|
||||
// - Automatic reconnection with exponential backoff
|
||||
// - JSON message parsing and processing
|
||||
// - Integration with data merger
|
||||
// - Source status tracking and statistics
|
||||
// - Graceful shutdown handling
|
||||
//
|
||||
// VRS JSON is simpler than Beast format but provides the same aircraft data
|
||||
type VRSClient struct {
|
||||
source *merger.Source // Source configuration and status
|
||||
merger *merger.Merger // Data merger for multi-source fusion
|
||||
conn net.Conn // TCP connection to VRS source
|
||||
parser *vrs.Parser // VRS JSON format parser
|
||||
msgChan chan *vrs.VRSMessage // Buffered channel for parsed messages
|
||||
errChan chan error // Error reporting channel
|
||||
stopChan chan struct{} // Shutdown signal channel
|
||||
wg sync.WaitGroup // Wait group for goroutine coordination
|
||||
|
||||
// Reconnection parameters
|
||||
reconnectDelay time.Duration // Initial reconnect delay
|
||||
maxReconnect time.Duration // Maximum reconnect delay (for backoff cap)
|
||||
}
|
||||
|
||||
// NewVRSClient creates a new VRS JSON format TCP client for a specific data source
|
||||
//
|
||||
// The client is configured with:
|
||||
// - Buffered message channel (100 messages) for handling updates
|
||||
// - Error channel for connection and parsing issues
|
||||
// - Initial reconnect delay of 5 seconds
|
||||
// - Maximum reconnect delay of 60 seconds (exponential backoff cap)
|
||||
//
|
||||
// Parameters:
|
||||
// - source: Source configuration including host, port, and metadata
|
||||
// - merger: Data merger instance for aircraft state management
|
||||
//
|
||||
// Returns a configured but not yet started VRSClient
|
||||
func NewVRSClient(source *merger.Source, merger *merger.Merger) *VRSClient {
|
||||
return &VRSClient{
|
||||
source: source,
|
||||
merger: merger,
|
||||
msgChan: make(chan *vrs.VRSMessage, 100),
|
||||
errChan: make(chan error, 10),
|
||||
stopChan: make(chan struct{}),
|
||||
reconnectDelay: 5 * time.Second,
|
||||
maxReconnect: 60 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the client connection and message processing in the background
|
||||
//
|
||||
// The client will:
|
||||
// - Attempt to connect to the configured VRS source
|
||||
// - Handle connection failures with exponential backoff
|
||||
// - Start message reading and processing goroutines
|
||||
// - Continuously reconnect if the connection is lost
|
||||
//
|
||||
// The method returns immediately; the client runs in background goroutines
|
||||
// until Stop() is called or the context is cancelled
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: Context for cancellation and timeout control
|
||||
func (c *VRSClient) Start(ctx context.Context) {
|
||||
c.wg.Add(1)
|
||||
go c.run(ctx)
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the client and all associated goroutines
|
||||
//
|
||||
// The shutdown process:
|
||||
// 1. Signals all goroutines to stop via stopChan
|
||||
// 2. Closes the TCP connection if active
|
||||
// 3. Waits for all goroutines to complete
|
||||
//
|
||||
// This method blocks until the shutdown is complete
|
||||
func (c *VRSClient) Stop() {
|
||||
close(c.stopChan)
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
}
|
||||
c.wg.Wait()
|
||||
}
|
||||
|
||||
// run implements the main client connection and reconnection loop
|
||||
//
|
||||
// This method handles the complete client lifecycle:
|
||||
// 1. Connection establishment with timeout
|
||||
// 2. Exponential backoff on connection failures
|
||||
// 3. Message parsing and processing goroutine management
|
||||
// 4. Connection monitoring and failure detection
|
||||
// 5. Automatic reconnection on disconnection
|
||||
//
|
||||
// The exponential backoff starts at reconnectDelay (5s) and doubles on each
|
||||
// failure up to maxReconnect (60s), then resets on successful connection
|
||||
//
|
||||
// Source status is updated to reflect connection state for monitoring
|
||||
func (c *VRSClient) run(ctx context.Context) {
|
||||
defer c.wg.Done()
|
||||
|
||||
reconnectDelay := c.reconnectDelay
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-c.stopChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Connect to VRS JSON stream
|
||||
addr := fmt.Sprintf("%s:%d", c.source.Host, c.source.Port)
|
||||
fmt.Printf("Connecting to VRS JSON stream at %s (%s)...\n", addr, c.source.Name)
|
||||
|
||||
conn, err := net.DialTimeout("tcp", addr, 30*time.Second)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to connect to VRS source %s: %v\n", c.source.Name, err)
|
||||
c.source.Active = false
|
||||
|
||||
// Exponential backoff
|
||||
time.Sleep(reconnectDelay)
|
||||
if reconnectDelay < c.maxReconnect {
|
||||
reconnectDelay *= 2
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
c.conn = conn
|
||||
c.source.Active = true
|
||||
reconnectDelay = c.reconnectDelay // Reset backoff
|
||||
|
||||
fmt.Printf("Connected to VRS source %s at %s\n", c.source.Name, addr)
|
||||
|
||||
// Create parser for this connection
|
||||
c.parser = vrs.NewParser(conn, c.source.ID)
|
||||
|
||||
// Start processing messages
|
||||
c.wg.Add(2)
|
||||
go c.readMessages()
|
||||
go c.processMessages()
|
||||
|
||||
// Wait for disconnect
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
c.conn.Close()
|
||||
return
|
||||
case <-c.stopChan:
|
||||
c.conn.Close()
|
||||
return
|
||||
case err := <-c.errChan:
|
||||
fmt.Printf("Error from VRS source %s: %v\n", c.source.Name, err)
|
||||
c.conn.Close()
|
||||
c.source.Active = false
|
||||
}
|
||||
|
||||
// Wait for goroutines to finish
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// readMessages runs in a dedicated goroutine to read VRS JSON messages
|
||||
//
|
||||
// This method:
|
||||
// - Continuously reads from the TCP connection
|
||||
// - Parses VRS JSON data into Message structs
|
||||
// - Queues parsed messages for processing
|
||||
// - Reports parsing errors to the error channel
|
||||
//
|
||||
// The method blocks on the parser's ParseStream call and exits when
|
||||
// the connection is closed or an unrecoverable error occurs
|
||||
func (c *VRSClient) readMessages() {
|
||||
defer c.wg.Done()
|
||||
c.parser.ParseStream(c.msgChan, c.errChan)
|
||||
}
|
||||
|
||||
// processMessages runs in a dedicated goroutine to process aircraft data
|
||||
//
|
||||
// For each received VRS message, this method:
|
||||
// 1. Iterates through all aircraft in the message
|
||||
// 2. Converts VRS format to internal aircraft representation
|
||||
// 3. Updates the data merger with new aircraft state
|
||||
// 4. Updates source statistics (message count)
|
||||
//
|
||||
// Invalid or unparseable aircraft data is silently discarded to maintain
|
||||
// system stability. The merger handles data fusion from multiple sources
|
||||
// and conflict resolution based on signal strength
|
||||
func (c *VRSClient) processMessages() {
|
||||
defer c.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.stopChan:
|
||||
return
|
||||
case msg := <-c.msgChan:
|
||||
if msg == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Process each aircraft in the message
|
||||
for _, vrsAircraft := range msg.AcList {
|
||||
// Convert VRS aircraft to internal format
|
||||
aircraft := c.convertVRSToAircraft(&vrsAircraft)
|
||||
if aircraft == nil {
|
||||
continue // Skip invalid aircraft
|
||||
}
|
||||
|
||||
// Update merger with new data
|
||||
// Note: VRS doesn't provide signal strength, so we use a default value
|
||||
c.merger.UpdateAircraft(
|
||||
c.source.ID,
|
||||
aircraft,
|
||||
-30.0, // Default signal strength for VRS sources
|
||||
vrsAircraft.GetTimestamp(),
|
||||
)
|
||||
|
||||
// Update source statistics
|
||||
c.source.Messages++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// convertVRSToAircraft converts VRS JSON aircraft to internal aircraft format
|
||||
//
|
||||
// This method maps VRS JSON fields to the internal aircraft structure used
|
||||
// by the merger. It handles:
|
||||
// - ICAO address extraction and validation
|
||||
// - Position data (lat/lon)
|
||||
// - Altitude (barometric and geometric)
|
||||
// - Speed and heading
|
||||
// - Callsign and squawk code
|
||||
// - Ground status
|
||||
//
|
||||
// Returns nil if the aircraft data is invalid or incomplete
|
||||
func (c *VRSClient) convertVRSToAircraft(vrs *vrs.VRSAircraft) *modes.Aircraft {
|
||||
// Get ICAO address
|
||||
icao, err := vrs.GetICAO24()
|
||||
if err != nil {
|
||||
return nil // Invalid ICAO, skip this aircraft
|
||||
}
|
||||
|
||||
// Create aircraft structure
|
||||
aircraft := &modes.Aircraft{
|
||||
ICAO24: icao,
|
||||
}
|
||||
|
||||
// Set position if available
|
||||
if vrs.HasPosition() {
|
||||
aircraft.Latitude = vrs.Lat
|
||||
aircraft.Longitude = vrs.Long
|
||||
aircraft.PositionValid = true
|
||||
}
|
||||
|
||||
// Set altitude if available
|
||||
if vrs.HasAltitude() {
|
||||
aircraft.Altitude = vrs.GetAltitude()
|
||||
aircraft.AltitudeValid = true
|
||||
|
||||
// Set barometric altitude specifically
|
||||
if vrs.Alt != 0 {
|
||||
aircraft.BaroAltitude = vrs.Alt
|
||||
}
|
||||
|
||||
// Set geometric altitude if different from barometric
|
||||
if vrs.GAlt != 0 {
|
||||
aircraft.GeomAltitude = vrs.GAlt
|
||||
aircraft.GeomAltitudeValid = true
|
||||
}
|
||||
}
|
||||
|
||||
// Set speed if available
|
||||
if vrs.Spd > 0 {
|
||||
aircraft.GroundSpeed = int(vrs.Spd)
|
||||
aircraft.GroundSpeedValid = true
|
||||
}
|
||||
|
||||
// Set heading/track if available
|
||||
if vrs.Trak > 0 {
|
||||
aircraft.Track = int(vrs.Trak)
|
||||
aircraft.TrackValid = true
|
||||
}
|
||||
|
||||
// Set vertical rate if available
|
||||
if vrs.Vsi != 0 {
|
||||
aircraft.VerticalRate = vrs.Vsi
|
||||
aircraft.VerticalRateValid = true
|
||||
}
|
||||
|
||||
// Set callsign if available
|
||||
if vrs.Call != "" {
|
||||
aircraft.Callsign = vrs.Call
|
||||
aircraft.CallsignValid = true
|
||||
}
|
||||
|
||||
// Set squawk if available
|
||||
if squawk, err := vrs.GetSquawk(); err == nil {
|
||||
aircraft.Squawk = fmt.Sprintf("%04X", squawk) // Convert to hex string
|
||||
aircraft.SquawkValid = true
|
||||
}
|
||||
|
||||
// Set ground status
|
||||
aircraft.OnGround = vrs.IsOnGround()
|
||||
aircraft.OnGroundValid = true
|
||||
|
||||
// Set selected altitude if available
|
||||
if vrs.TAlt != 0 {
|
||||
aircraft.SelectedAltitude = vrs.TAlt
|
||||
}
|
||||
|
||||
// Set position source
|
||||
switch vrs.GetPositionSource() {
|
||||
case "MLAT":
|
||||
aircraft.PositionMLAT = true
|
||||
case "TIS-B":
|
||||
aircraft.PositionTISB = true
|
||||
}
|
||||
|
||||
// Set additional metadata if available
|
||||
if vrs.Reg != "" {
|
||||
aircraft.Registration = vrs.Reg
|
||||
}
|
||||
if vrs.Type != "" {
|
||||
aircraft.AircraftType = vrs.Type
|
||||
}
|
||||
if vrs.Op != "" {
|
||||
aircraft.Operator = vrs.Op
|
||||
}
|
||||
|
||||
return aircraft
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue