Add VRS JSON format support for readsb integration #28
8 changed files with 874 additions and 40 deletions
42
README.md
42
README.md
|
|
@ -1,12 +1,13 @@
|
||||||
# SkyView - Multi-Source ADS-B Aircraft Tracker
|
# SkyView - Multi-Source ADS-B Aircraft Tracker
|
||||||
|
|
||||||
A high-performance, multi-source ADS-B aircraft tracking application that connects to multiple dump1090 Beast format TCP streams and provides a modern web interface with advanced visualization capabilities.
|
A high-performance, multi-source ADS-B aircraft tracking application that connects to multiple dump1090/readsb receivers using Beast binary or VRS JSON formats and provides a modern web interface with advanced visualization capabilities.
|
||||||
|
|
||||||
## ✨ Features
|
## ✨ Features
|
||||||
|
|
||||||
### Multi-Source Data Fusion
|
### Multi-Source Data Fusion
|
||||||
- **Beast Binary Format**: Native support for dump1090 Beast format (port 30005)
|
- **Multiple Formats**: Support for both Beast binary (port 30005) and VRS JSON (port 33005) protocols
|
||||||
- **Multiple Receivers**: Connect to unlimited dump1090 sources simultaneously
|
- **Multiple Receivers**: Connect to unlimited dump1090/readsb sources simultaneously
|
||||||
|
- **Mixed Sources**: Use Beast and VRS sources together in the same system
|
||||||
- **Intelligent Merging**: Smart data fusion with signal strength-based source selection
|
- **Intelligent Merging**: Smart data fusion with signal strength-based source selection
|
||||||
- **High-throughput Processing**: High-performance concurrent message processing
|
- **High-throughput Processing**: High-performance concurrent message processing
|
||||||
|
|
||||||
|
|
@ -75,13 +76,25 @@ sudo systemctl enable skyview
|
||||||
"sources": [
|
"sources": [
|
||||||
{
|
{
|
||||||
"id": "primary",
|
"id": "primary",
|
||||||
"name": "Primary Receiver",
|
"name": "Primary Receiver (Beast)",
|
||||||
"host": "localhost",
|
"host": "localhost",
|
||||||
"port": 30005,
|
"port": 30005,
|
||||||
|
"format": "beast",
|
||||||
"latitude": 51.4700,
|
"latitude": 51.4700,
|
||||||
"longitude": -0.4600,
|
"longitude": -0.4600,
|
||||||
"altitude": 50.0,
|
"altitude": 50.0,
|
||||||
"enabled": true
|
"enabled": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "secondary",
|
||||||
|
"name": "Secondary Receiver (VRS JSON)",
|
||||||
|
"host": "192.168.1.100",
|
||||||
|
"port": 33005,
|
||||||
|
"format": "vrs",
|
||||||
|
"latitude": 51.4800,
|
||||||
|
"longitude": -0.4500,
|
||||||
|
"altitude": 75.0,
|
||||||
|
"enabled": true
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"settings": {
|
"settings": {
|
||||||
|
|
@ -97,6 +110,27 @@ sudo systemctl enable skyview
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Data Format Configuration
|
||||||
|
|
||||||
|
SkyView supports two ADS-B data formats:
|
||||||
|
|
||||||
|
#### Beast Binary Format
|
||||||
|
- **Port**: Typically 30005
|
||||||
|
- **Format**: Binary protocol developed by FlightAware
|
||||||
|
- **Source**: dump1090, dump1090-fa, readsb with `--net-bo-port 30005`
|
||||||
|
- **Configuration**: `"format": "beast"` (default if not specified)
|
||||||
|
- **Advantages**: Compact binary format, includes precise timestamps and signal strength
|
||||||
|
|
||||||
|
#### VRS JSON Format
|
||||||
|
- **Port**: Typically 33005
|
||||||
|
- **Format**: JSON objects with aircraft arrays
|
||||||
|
- **Source**: readsb with `--net-vrs-port 33005`
|
||||||
|
- **Configuration**: `"format": "vrs"`
|
||||||
|
- **Advantages**: Human-readable JSON, simpler to parse and debug
|
||||||
|
- **Update Interval**: Configurable with `--net-vrs-interval` (default 5 seconds)
|
||||||
|
|
||||||
|
Both formats can be used simultaneously in the same SkyView instance for maximum flexibility and redundancy.
|
||||||
|
|
||||||
### Command Line Options
|
### Command Line Options
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|
|
||||||
133
cmd/vrs-test/main.go
Normal file
133
cmd/vrs-test/main.go
Normal file
|
|
@ -0,0 +1,133 @@
|
||||||
|
// Package main implements a simple VRS JSON parser test utility.
|
||||||
|
//
|
||||||
|
// This utility connects to a VRS JSON source and displays the parsed
|
||||||
|
// aircraft data for testing and debugging purposes.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"skyview/internal/vrs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
host := flag.String("host", "svovel", "VRS host to connect to")
|
||||||
|
port := flag.Int("port", 33005, "VRS port to connect to")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
fmt.Printf("Connecting to VRS JSON source at %s:%d...\n", *host, *port)
|
||||||
|
|
||||||
|
// Connect to VRS source
|
||||||
|
addr := fmt.Sprintf("%s:%d", *host, *port)
|
||||||
|
conn, err := net.DialTimeout("tcp", addr, 30*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to %s: %v", addr, err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
fmt.Printf("Connected to %s\n", addr)
|
||||||
|
|
||||||
|
// Create VRS parser
|
||||||
|
parser := vrs.NewParser(conn, "test")
|
||||||
|
|
||||||
|
// Set up channels for messages and errors
|
||||||
|
msgChan := make(chan *vrs.VRSMessage, 100)
|
||||||
|
errChan := make(chan error, 10)
|
||||||
|
|
||||||
|
// Set up signal handling
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
|
// Start parsing in background
|
||||||
|
go parser.ParseStream(msgChan, errChan)
|
||||||
|
|
||||||
|
// Statistics tracking
|
||||||
|
messageCount := 0
|
||||||
|
aircraftCount := 0
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
fmt.Println("Receiving VRS JSON data... (Press Ctrl+C to stop)")
|
||||||
|
fmt.Println("----------------------------------------")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
fmt.Println("\nShutting down...")
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-sigChan:
|
||||||
|
fmt.Println("\nReceived interrupt signal")
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
case err := <-errChan:
|
||||||
|
log.Printf("Parser error: %v", err)
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
case msg := <-msgChan:
|
||||||
|
if msg == nil {
|
||||||
|
cancel()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
messageCount++
|
||||||
|
aircraftCount += len(msg.AcList)
|
||||||
|
|
||||||
|
fmt.Printf("Message %d: %d aircraft\n", messageCount, len(msg.AcList))
|
||||||
|
|
||||||
|
// Display first few aircraft for debugging
|
||||||
|
for i, ac := range msg.AcList {
|
||||||
|
if i >= 3 { // Only show first 3 aircraft per message
|
||||||
|
fmt.Printf(" ... and %d more aircraft\n", len(msg.AcList)-i)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
icao, _ := ac.GetICAO24()
|
||||||
|
fmt.Printf(" ICAO: %06X", icao)
|
||||||
|
|
||||||
|
if ac.Call != "" {
|
||||||
|
fmt.Printf(" Call: %-8s", ac.Call)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ac.HasPosition() {
|
||||||
|
fmt.Printf(" Pos: %7.3f°, %8.3f°", ac.Lat, ac.Long)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ac.HasAltitude() {
|
||||||
|
fmt.Printf(" Alt: %5d ft", ac.GetAltitude())
|
||||||
|
}
|
||||||
|
|
||||||
|
if ac.Spd > 0 {
|
||||||
|
fmt.Printf(" Spd: %3.0f kt", ac.Spd)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ac.Trak > 0 {
|
||||||
|
fmt.Printf(" Hdg: %3.0f°", ac.Trak)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ac.Gnd {
|
||||||
|
fmt.Printf(" [GND]")
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf(" Src: %s\n", ac.GetPositionSource())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Show statistics every 10 messages
|
||||||
|
if messageCount%10 == 0 {
|
||||||
|
elapsed := time.Since(startTime)
|
||||||
|
fmt.Printf("Stats: %d messages, %d total aircraft, %.1fs elapsed\n",
|
||||||
|
messageCount, aircraftCount, elapsed.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("----------------------------------------")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -52,21 +52,28 @@ SkyView is a high-performance, multi-source ADS-B aircraft tracking system built
|
||||||
|
|
||||||
## Core Components
|
## Core Components
|
||||||
|
|
||||||
### 1. Beast Format Clients (`internal/client/`)
|
### 1. Multi-Format Clients (`internal/client/`)
|
||||||
|
|
||||||
**Purpose**: Manages TCP connections to dump1090 receivers
|
**Purpose**: Manages TCP connections to dump1090/readsb receivers using multiple protocols
|
||||||
|
|
||||||
**Key Features**:
|
**Key Features**:
|
||||||
- Concurrent connection handling for multiple sources
|
- Concurrent connection handling for multiple sources
|
||||||
- Automatic reconnection with exponential backoff
|
- Automatic reconnection with exponential backoff
|
||||||
- Beast binary format parsing
|
- Support for Beast binary and VRS JSON formats
|
||||||
- Per-source connection monitoring and statistics
|
- Per-source connection monitoring and statistics
|
||||||
|
- Mixed-format multi-source support
|
||||||
|
|
||||||
**Files**:
|
**Files**:
|
||||||
- `beast.go`: Main client implementation
|
- `beast.go`: Beast binary client and multi-source manager
|
||||||
|
- `vrs.go`: VRS JSON format client
|
||||||
|
|
||||||
### 2. Mode S/ADS-B Decoder (`internal/modes/`)
|
**Supported Formats**:
|
||||||
|
- **Beast Binary**: Traditional binary format from dump1090 (port 30005)
|
||||||
|
- **VRS JSON**: JSON format from readsb VRS output (port 33005)
|
||||||
|
|
||||||
|
### 2. Data Format Processors
|
||||||
|
|
||||||
|
#### Mode S/ADS-B Decoder (`internal/modes/`)
|
||||||
**Purpose**: Decodes raw Mode S and ADS-B messages into structured aircraft data
|
**Purpose**: Decodes raw Mode S and ADS-B messages into structured aircraft data
|
||||||
|
|
||||||
**Key Features**:
|
**Key Features**:
|
||||||
|
|
@ -78,6 +85,18 @@ SkyView is a high-performance, multi-source ADS-B aircraft tracking system built
|
||||||
**Files**:
|
**Files**:
|
||||||
- `decoder.go`: Core decoding logic
|
- `decoder.go`: Core decoding logic
|
||||||
|
|
||||||
|
#### VRS JSON Parser (`internal/vrs/`)
|
||||||
|
**Purpose**: Parses Virtual Radar Server JSON format aircraft data
|
||||||
|
|
||||||
|
**Key Features**:
|
||||||
|
- Newline-delimited JSON stream parsing
|
||||||
|
- Direct aircraft data extraction (no Mode S decoding required)
|
||||||
|
- Support for VRS-specific fields (registration, aircraft type, operator)
|
||||||
|
- Position source identification (ADS-B, MLAT, TIS-B, Satellite)
|
||||||
|
|
||||||
|
**Files**:
|
||||||
|
- `parser.go`: VRS JSON message parsing
|
||||||
|
|
||||||
### 3. Data Merger (`internal/merger/`)
|
### 3. Data Merger (`internal/merger/`)
|
||||||
|
|
||||||
**Purpose**: Fuses aircraft data from multiple sources using intelligent conflict resolution
|
**Purpose**: Fuses aircraft data from multiple sources using intelligent conflict resolution
|
||||||
|
|
@ -144,9 +163,9 @@ SkyView is a high-performance, multi-source ADS-B aircraft tracking system built
|
||||||
## Data Flow
|
## Data Flow
|
||||||
|
|
||||||
### 1. Data Ingestion
|
### 1. Data Ingestion
|
||||||
1. **Beast Clients** connect to dump1090 receivers via TCP
|
1. **Multi-format Clients** connect to dump1090/readsb receivers via TCP
|
||||||
2. **Beast Parser** processes binary message stream
|
2. **Format Parsers** process binary Beast or JSON VRS message streams
|
||||||
3. **Mode S Decoder** converts raw messages to structured aircraft data
|
3. **Data Converters** convert messages to structured aircraft data (Mode S decoding for Beast, direct mapping for VRS)
|
||||||
4. **Data Merger** receives aircraft updates with source attribution
|
4. **Data Merger** receives aircraft updates with source attribution
|
||||||
|
|
||||||
### 2. Data Fusion
|
### 2. Data Fusion
|
||||||
|
|
|
||||||
|
|
@ -250,10 +250,10 @@ func (c *BeastClient) processMessages() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultiSourceClient manages multiple Beast TCP clients for multi-receiver setups.
|
// MultiSourceClient manages multiple Beast and VRS clients for multi-receiver setups.
|
||||||
//
|
//
|
||||||
// This client coordinator:
|
// This client coordinator:
|
||||||
// - Manages connections to multiple Beast format sources simultaneously
|
// - Manages connections to multiple Beast and VRS format sources simultaneously
|
||||||
// - Provides unified control for starting and stopping all clients
|
// - Provides unified control for starting and stopping all clients
|
||||||
// - Runs periodic cleanup tasks for stale aircraft data
|
// - Runs periodic cleanup tasks for stale aircraft data
|
||||||
// - Aggregates statistics from all managed clients
|
// - Aggregates statistics from all managed clients
|
||||||
|
|
@ -262,21 +262,23 @@ func (c *BeastClient) processMessages() {
|
||||||
// All clients share the same data merger, enabling automatic data fusion
|
// All clients share the same data merger, enabling automatic data fusion
|
||||||
// and conflict resolution across multiple receivers.
|
// and conflict resolution across multiple receivers.
|
||||||
type MultiSourceClient struct {
|
type MultiSourceClient struct {
|
||||||
clients []*BeastClient // Managed Beast clients
|
beastClients []*BeastClient // Managed Beast clients
|
||||||
merger *merger.Merger // Shared data merger for all sources
|
vrsClients []*VRSClient // Managed VRS JSON clients
|
||||||
mu sync.RWMutex // Protects clients slice
|
merger *merger.Merger // Shared data merger for all sources
|
||||||
|
mu sync.RWMutex // Protects client slices
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMultiSourceClient creates a client manager for multiple Beast format sources.
|
// NewMultiSourceClient creates a client manager for multiple Beast and VRS format sources.
|
||||||
//
|
//
|
||||||
// The multi-source client enables connecting to multiple dump1090 instances
|
// The multi-source client enables connecting to multiple dump1090/readsb instances
|
||||||
// or other Beast format sources simultaneously. All sources feed into the
|
// using either Beast binary or VRS JSON formats simultaneously. All sources feed into
|
||||||
// same data merger, which handles automatic data fusion and conflict resolution.
|
// the same data merger, which handles automatic data fusion and conflict resolution.
|
||||||
//
|
//
|
||||||
// This is essential for:
|
// This is essential for:
|
||||||
// - Improved coverage from multiple receivers
|
// - Improved coverage from multiple receivers
|
||||||
// - Redundancy in case of individual receiver failures
|
// - Redundancy in case of individual receiver failures
|
||||||
// - Data quality improvement through signal strength comparison
|
// - Data quality improvement through signal strength comparison
|
||||||
|
// - Support for different receiver output formats
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// - merger: Shared data merger instance for all sources
|
// - merger: Shared data merger instance for all sources
|
||||||
|
|
@ -284,18 +286,23 @@ type MultiSourceClient struct {
|
||||||
// Returns a configured multi-source client ready for source addition.
|
// Returns a configured multi-source client ready for source addition.
|
||||||
func NewMultiSourceClient(merger *merger.Merger) *MultiSourceClient {
|
func NewMultiSourceClient(merger *merger.Merger) *MultiSourceClient {
|
||||||
return &MultiSourceClient{
|
return &MultiSourceClient{
|
||||||
clients: make([]*BeastClient, 0),
|
beastClients: make([]*BeastClient, 0),
|
||||||
merger: merger,
|
vrsClients: make([]*VRSClient, 0),
|
||||||
|
merger: merger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddSource registers and configures a new Beast format data source.
|
// AddSource registers and configures a new data source (Beast or VRS format).
|
||||||
//
|
//
|
||||||
// This method:
|
// This method:
|
||||||
// 1. Registers the source with the data merger
|
// 1. Registers the source with the data merger
|
||||||
// 2. Creates a new BeastClient for the source
|
// 2. Creates appropriate client based on source format
|
||||||
// 3. Adds the client to the managed clients list
|
// 3. Adds the client to the managed clients list
|
||||||
//
|
//
|
||||||
|
// The source format is determined by the source.Format field:
|
||||||
|
// - "beast": Creates a BeastClient for Beast binary protocol (default)
|
||||||
|
// - "vrs": Creates a VRSClient for VRS JSON protocol
|
||||||
|
//
|
||||||
// The source is not automatically started; call Start() to begin connections.
|
// The source is not automatically started; call Start() to begin connections.
|
||||||
// Sources can be added before or after starting the multi-source client.
|
// Sources can be added before or after starting the multi-source client.
|
||||||
//
|
//
|
||||||
|
|
@ -305,18 +312,31 @@ func (m *MultiSourceClient) AddSource(source *merger.Source) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
// Default to Beast format if not specified
|
||||||
|
if source.Format == "" {
|
||||||
|
source.Format = "beast"
|
||||||
|
}
|
||||||
|
|
||||||
// Register source with merger
|
// Register source with merger
|
||||||
m.merger.AddSource(source)
|
m.merger.AddSource(source)
|
||||||
|
|
||||||
// Create and start client
|
// Create appropriate client based on format
|
||||||
client := NewBeastClient(source, m.merger)
|
switch source.Format {
|
||||||
m.clients = append(m.clients, client)
|
case "vrs":
|
||||||
|
client := NewVRSClient(source, m.merger)
|
||||||
|
m.vrsClients = append(m.vrsClients, client)
|
||||||
|
case "beast":
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
client := NewBeastClient(source, m.merger)
|
||||||
|
m.beastClients = append(m.beastClients, client)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins connections to all configured Beast sources.
|
// Start begins connections to all configured sources (Beast and VRS).
|
||||||
//
|
//
|
||||||
// This method:
|
// This method:
|
||||||
// - Starts all managed BeastClient instances in parallel
|
// - Starts all managed BeastClient and VRSClient instances in parallel
|
||||||
// - Begins the periodic cleanup routine for stale aircraft data
|
// - Begins the periodic cleanup routine for stale aircraft data
|
||||||
// - Uses the provided context for cancellation control
|
// - Uses the provided context for cancellation control
|
||||||
//
|
//
|
||||||
|
|
@ -330,7 +350,13 @@ func (m *MultiSourceClient) Start(ctx context.Context) {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
defer m.mu.RUnlock()
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
for _, client := range m.clients {
|
// Start Beast clients
|
||||||
|
for _, client := range m.beastClients {
|
||||||
|
client.Start(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start VRS clients
|
||||||
|
for _, client := range m.vrsClients {
|
||||||
client.Start(ctx)
|
client.Start(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -338,7 +364,7 @@ func (m *MultiSourceClient) Start(ctx context.Context) {
|
||||||
go m.cleanupRoutine(ctx)
|
go m.cleanupRoutine(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop gracefully shuts down all managed Beast clients.
|
// Stop gracefully shuts down all managed clients (Beast and VRS).
|
||||||
//
|
//
|
||||||
// This method stops all clients in parallel and waits for their
|
// This method stops all clients in parallel and waits for their
|
||||||
// goroutines to complete. The shutdown is coordinated to ensure
|
// goroutines to complete. The shutdown is coordinated to ensure
|
||||||
|
|
@ -347,7 +373,13 @@ func (m *MultiSourceClient) Stop() {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
defer m.mu.RUnlock()
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
for _, client := range m.clients {
|
// Stop Beast clients
|
||||||
|
for _, client := range m.beastClients {
|
||||||
|
client.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop VRS clients
|
||||||
|
for _, client := range m.vrsClients {
|
||||||
client.Stop()
|
client.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -382,8 +414,8 @@ func (m *MultiSourceClient) cleanupRoutine(ctx context.Context) {
|
||||||
//
|
//
|
||||||
// The statistics include:
|
// The statistics include:
|
||||||
// - All merger statistics (aircraft count, message rates, etc.)
|
// - All merger statistics (aircraft count, message rates, etc.)
|
||||||
// - Number of active client connections
|
// - Number of active client connections (Beast and VRS)
|
||||||
// - Total number of configured clients
|
// - Total number of configured clients by type
|
||||||
// - Per-source connection status and message counts
|
// - Per-source connection status and message counts
|
||||||
//
|
//
|
||||||
// This information is useful for monitoring system health, diagnosing
|
// This information is useful for monitoring system health, diagnosing
|
||||||
|
|
@ -397,15 +429,26 @@ func (m *MultiSourceClient) GetStatistics() map[string]interface{} {
|
||||||
stats := m.merger.GetStatistics()
|
stats := m.merger.GetStatistics()
|
||||||
|
|
||||||
// Add client-specific stats
|
// Add client-specific stats
|
||||||
activeClients := 0
|
activeBeastClients := 0
|
||||||
for _, client := range m.clients {
|
for _, client := range m.beastClients {
|
||||||
if client.source.Active {
|
if client.source.Active {
|
||||||
activeClients++
|
activeBeastClients++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stats["active_clients"] = activeClients
|
activeVRSClients := 0
|
||||||
stats["total_clients"] = len(m.clients)
|
for _, client := range m.vrsClients {
|
||||||
|
if client.source.Active {
|
||||||
|
activeVRSClients++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stats["active_beast_clients"] = activeBeastClients
|
||||||
|
stats["active_vrs_clients"] = activeVRSClients
|
||||||
|
stats["active_clients"] = activeBeastClients + activeVRSClients
|
||||||
|
stats["total_beast_clients"] = len(m.beastClients)
|
||||||
|
stats["total_vrs_clients"] = len(m.vrsClients)
|
||||||
|
stats["total_clients"] = len(m.beastClients) + len(m.vrsClients)
|
||||||
|
|
||||||
return stats
|
return stats
|
||||||
}
|
}
|
||||||
|
|
|
||||||
350
internal/client/vrs.go
Normal file
350
internal/client/vrs.go
Normal file
|
|
@ -0,0 +1,350 @@
|
||||||
|
// Package client provides VRS JSON format client implementation for connecting to readsb receivers.
|
||||||
|
//
|
||||||
|
// This file implements a VRS (Virtual Radar Server) JSON format client that connects
|
||||||
|
// to readsb instances with --net-vrs-port enabled. VRS JSON is a simpler alternative
|
||||||
|
// to the Beast binary protocol, providing aircraft data as JSON over TCP.
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"skyview/internal/merger"
|
||||||
|
"skyview/internal/modes"
|
||||||
|
"skyview/internal/vrs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// VRSClient handles connection to a single readsb VRS JSON TCP stream
|
||||||
|
//
|
||||||
|
// The VRS client provides robust connectivity with:
|
||||||
|
// - Automatic reconnection with exponential backoff
|
||||||
|
// - JSON message parsing and processing
|
||||||
|
// - Integration with data merger
|
||||||
|
// - Source status tracking and statistics
|
||||||
|
// - Graceful shutdown handling
|
||||||
|
//
|
||||||
|
// VRS JSON is simpler than Beast format but provides the same aircraft data
|
||||||
|
type VRSClient struct {
|
||||||
|
source *merger.Source // Source configuration and status
|
||||||
|
merger *merger.Merger // Data merger for multi-source fusion
|
||||||
|
conn net.Conn // TCP connection to VRS source
|
||||||
|
parser *vrs.Parser // VRS JSON format parser
|
||||||
|
msgChan chan *vrs.VRSMessage // Buffered channel for parsed messages
|
||||||
|
errChan chan error // Error reporting channel
|
||||||
|
stopChan chan struct{} // Shutdown signal channel
|
||||||
|
wg sync.WaitGroup // Wait group for goroutine coordination
|
||||||
|
|
||||||
|
// Reconnection parameters
|
||||||
|
reconnectDelay time.Duration // Initial reconnect delay
|
||||||
|
maxReconnect time.Duration // Maximum reconnect delay (for backoff cap)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewVRSClient creates a new VRS JSON format TCP client for a specific data source
|
||||||
|
//
|
||||||
|
// The client is configured with:
|
||||||
|
// - Buffered message channel (100 messages) for handling updates
|
||||||
|
// - Error channel for connection and parsing issues
|
||||||
|
// - Initial reconnect delay of 5 seconds
|
||||||
|
// - Maximum reconnect delay of 60 seconds (exponential backoff cap)
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - source: Source configuration including host, port, and metadata
|
||||||
|
// - merger: Data merger instance for aircraft state management
|
||||||
|
//
|
||||||
|
// Returns a configured but not yet started VRSClient
|
||||||
|
func NewVRSClient(source *merger.Source, merger *merger.Merger) *VRSClient {
|
||||||
|
return &VRSClient{
|
||||||
|
source: source,
|
||||||
|
merger: merger,
|
||||||
|
msgChan: make(chan *vrs.VRSMessage, 100),
|
||||||
|
errChan: make(chan error, 10),
|
||||||
|
stopChan: make(chan struct{}),
|
||||||
|
reconnectDelay: 5 * time.Second,
|
||||||
|
maxReconnect: 60 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins the client connection and message processing in the background
|
||||||
|
//
|
||||||
|
// The client will:
|
||||||
|
// - Attempt to connect to the configured VRS source
|
||||||
|
// - Handle connection failures with exponential backoff
|
||||||
|
// - Start message reading and processing goroutines
|
||||||
|
// - Continuously reconnect if the connection is lost
|
||||||
|
//
|
||||||
|
// The method returns immediately; the client runs in background goroutines
|
||||||
|
// until Stop() is called or the context is cancelled
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - ctx: Context for cancellation and timeout control
|
||||||
|
func (c *VRSClient) Start(ctx context.Context) {
|
||||||
|
c.wg.Add(1)
|
||||||
|
go c.run(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop gracefully shuts down the client and all associated goroutines
|
||||||
|
//
|
||||||
|
// The shutdown process:
|
||||||
|
// 1. Signals all goroutines to stop via stopChan
|
||||||
|
// 2. Closes the TCP connection if active
|
||||||
|
// 3. Waits for all goroutines to complete
|
||||||
|
//
|
||||||
|
// This method blocks until the shutdown is complete
|
||||||
|
func (c *VRSClient) Stop() {
|
||||||
|
close(c.stopChan)
|
||||||
|
if c.conn != nil {
|
||||||
|
c.conn.Close()
|
||||||
|
}
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// run implements the main client connection and reconnection loop
|
||||||
|
//
|
||||||
|
// This method handles the complete client lifecycle:
|
||||||
|
// 1. Connection establishment with timeout
|
||||||
|
// 2. Exponential backoff on connection failures
|
||||||
|
// 3. Message parsing and processing goroutine management
|
||||||
|
// 4. Connection monitoring and failure detection
|
||||||
|
// 5. Automatic reconnection on disconnection
|
||||||
|
//
|
||||||
|
// The exponential backoff starts at reconnectDelay (5s) and doubles on each
|
||||||
|
// failure up to maxReconnect (60s), then resets on successful connection
|
||||||
|
//
|
||||||
|
// Source status is updated to reflect connection state for monitoring
|
||||||
|
func (c *VRSClient) run(ctx context.Context) {
|
||||||
|
defer c.wg.Done()
|
||||||
|
|
||||||
|
reconnectDelay := c.reconnectDelay
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-c.stopChan:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to VRS JSON stream
|
||||||
|
addr := fmt.Sprintf("%s:%d", c.source.Host, c.source.Port)
|
||||||
|
fmt.Printf("Connecting to VRS JSON stream at %s (%s)...\n", addr, c.source.Name)
|
||||||
|
|
||||||
|
conn, err := net.DialTimeout("tcp", addr, 30*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to connect to VRS source %s: %v\n", c.source.Name, err)
|
||||||
|
c.source.Active = false
|
||||||
|
|
||||||
|
// Exponential backoff
|
||||||
|
time.Sleep(reconnectDelay)
|
||||||
|
if reconnectDelay < c.maxReconnect {
|
||||||
|
reconnectDelay *= 2
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.conn = conn
|
||||||
|
c.source.Active = true
|
||||||
|
reconnectDelay = c.reconnectDelay // Reset backoff
|
||||||
|
|
||||||
|
fmt.Printf("Connected to VRS source %s at %s\n", c.source.Name, addr)
|
||||||
|
|
||||||
|
// Create parser for this connection
|
||||||
|
c.parser = vrs.NewParser(conn, c.source.ID)
|
||||||
|
|
||||||
|
// Start processing messages
|
||||||
|
c.wg.Add(2)
|
||||||
|
go c.readMessages()
|
||||||
|
go c.processMessages()
|
||||||
|
|
||||||
|
// Wait for disconnect
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
c.conn.Close()
|
||||||
|
return
|
||||||
|
case <-c.stopChan:
|
||||||
|
c.conn.Close()
|
||||||
|
return
|
||||||
|
case err := <-c.errChan:
|
||||||
|
fmt.Printf("Error from VRS source %s: %v\n", c.source.Name, err)
|
||||||
|
c.conn.Close()
|
||||||
|
c.source.Active = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for goroutines to finish
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// readMessages runs in a dedicated goroutine to read VRS JSON messages
|
||||||
|
//
|
||||||
|
// This method:
|
||||||
|
// - Continuously reads from the TCP connection
|
||||||
|
// - Parses VRS JSON data into Message structs
|
||||||
|
// - Queues parsed messages for processing
|
||||||
|
// - Reports parsing errors to the error channel
|
||||||
|
//
|
||||||
|
// The method blocks on the parser's ParseStream call and exits when
|
||||||
|
// the connection is closed or an unrecoverable error occurs
|
||||||
|
func (c *VRSClient) readMessages() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
c.parser.ParseStream(c.msgChan, c.errChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processMessages runs in a dedicated goroutine to process aircraft data
|
||||||
|
//
|
||||||
|
// For each received VRS message, this method:
|
||||||
|
// 1. Iterates through all aircraft in the message
|
||||||
|
// 2. Converts VRS format to internal aircraft representation
|
||||||
|
// 3. Updates the data merger with new aircraft state
|
||||||
|
// 4. Updates source statistics (message count)
|
||||||
|
//
|
||||||
|
// Invalid or unparseable aircraft data is silently discarded to maintain
|
||||||
|
// system stability. The merger handles data fusion from multiple sources
|
||||||
|
// and conflict resolution based on signal strength
|
||||||
|
func (c *VRSClient) processMessages() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.stopChan:
|
||||||
|
return
|
||||||
|
case msg := <-c.msgChan:
|
||||||
|
if msg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process each aircraft in the message
|
||||||
|
for _, vrsAircraft := range msg.AcList {
|
||||||
|
// Convert VRS aircraft to internal format
|
||||||
|
aircraft := c.convertVRSToAircraft(&vrsAircraft)
|
||||||
|
if aircraft == nil {
|
||||||
|
continue // Skip invalid aircraft
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update merger with new data
|
||||||
|
// Note: VRS doesn't provide signal strength, so we use a default value
|
||||||
|
c.merger.UpdateAircraft(
|
||||||
|
c.source.ID,
|
||||||
|
aircraft,
|
||||||
|
-30.0, // Default signal strength for VRS sources
|
||||||
|
vrsAircraft.GetTimestamp(),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Update source statistics
|
||||||
|
c.source.Messages++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// convertVRSToAircraft converts VRS JSON aircraft to internal aircraft format
|
||||||
|
//
|
||||||
|
// This method maps VRS JSON fields to the internal aircraft structure used
|
||||||
|
// by the merger. It handles:
|
||||||
|
// - ICAO address extraction and validation
|
||||||
|
// - Position data (lat/lon)
|
||||||
|
// - Altitude (barometric and geometric)
|
||||||
|
// - Speed and heading
|
||||||
|
// - Callsign and squawk code
|
||||||
|
// - Ground status
|
||||||
|
//
|
||||||
|
// Returns nil if the aircraft data is invalid or incomplete
|
||||||
|
func (c *VRSClient) convertVRSToAircraft(vrs *vrs.VRSAircraft) *modes.Aircraft {
|
||||||
|
// Get ICAO address
|
||||||
|
icao, err := vrs.GetICAO24()
|
||||||
|
if err != nil {
|
||||||
|
return nil // Invalid ICAO, skip this aircraft
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create aircraft structure
|
||||||
|
aircraft := &modes.Aircraft{
|
||||||
|
ICAO24: icao,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set position if available
|
||||||
|
if vrs.HasPosition() {
|
||||||
|
aircraft.Latitude = vrs.Lat
|
||||||
|
aircraft.Longitude = vrs.Long
|
||||||
|
aircraft.PositionValid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set altitude if available
|
||||||
|
if vrs.HasAltitude() {
|
||||||
|
aircraft.Altitude = vrs.GetAltitude()
|
||||||
|
aircraft.AltitudeValid = true
|
||||||
|
|
||||||
|
// Set barometric altitude specifically
|
||||||
|
if vrs.Alt != 0 {
|
||||||
|
aircraft.BaroAltitude = vrs.Alt
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set geometric altitude if different from barometric
|
||||||
|
if vrs.GAlt != 0 {
|
||||||
|
aircraft.GeomAltitude = vrs.GAlt
|
||||||
|
aircraft.GeomAltitudeValid = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set speed if available
|
||||||
|
if vrs.Spd > 0 {
|
||||||
|
aircraft.GroundSpeed = int(vrs.Spd)
|
||||||
|
aircraft.GroundSpeedValid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set heading/track if available
|
||||||
|
if vrs.Trak > 0 {
|
||||||
|
aircraft.Track = int(vrs.Trak)
|
||||||
|
aircraft.TrackValid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set vertical rate if available
|
||||||
|
if vrs.Vsi != 0 {
|
||||||
|
aircraft.VerticalRate = vrs.Vsi
|
||||||
|
aircraft.VerticalRateValid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set callsign if available
|
||||||
|
if vrs.Call != "" {
|
||||||
|
aircraft.Callsign = vrs.Call
|
||||||
|
aircraft.CallsignValid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set squawk if available
|
||||||
|
if squawk, err := vrs.GetSquawk(); err == nil {
|
||||||
|
aircraft.Squawk = fmt.Sprintf("%04X", squawk) // Convert to hex string
|
||||||
|
aircraft.SquawkValid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set ground status
|
||||||
|
aircraft.OnGround = vrs.IsOnGround()
|
||||||
|
aircraft.OnGroundValid = true
|
||||||
|
|
||||||
|
// Set selected altitude if available
|
||||||
|
if vrs.TAlt != 0 {
|
||||||
|
aircraft.SelectedAltitude = vrs.TAlt
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set position source
|
||||||
|
switch vrs.GetPositionSource() {
|
||||||
|
case "MLAT":
|
||||||
|
aircraft.PositionMLAT = true
|
||||||
|
case "TIS-B":
|
||||||
|
aircraft.PositionTISB = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set additional metadata if available
|
||||||
|
if vrs.Reg != "" {
|
||||||
|
aircraft.Registration = vrs.Reg
|
||||||
|
}
|
||||||
|
if vrs.Type != "" {
|
||||||
|
aircraft.AircraftType = vrs.Type
|
||||||
|
}
|
||||||
|
if vrs.Op != "" {
|
||||||
|
aircraft.Operator = vrs.Op
|
||||||
|
}
|
||||||
|
|
||||||
|
return aircraft
|
||||||
|
}
|
||||||
|
|
@ -67,6 +67,7 @@ type Source struct {
|
||||||
Name string `json:"name"` // Human-readable name
|
Name string `json:"name"` // Human-readable name
|
||||||
Host string `json:"host"` // Hostname or IP address
|
Host string `json:"host"` // Hostname or IP address
|
||||||
Port int `json:"port"` // TCP port number
|
Port int `json:"port"` // TCP port number
|
||||||
|
Format string `json:"format"` // Data format: "beast" or "vrs" (default: "beast")
|
||||||
Latitude float64 `json:"latitude"` // Receiver location latitude
|
Latitude float64 `json:"latitude"` // Receiver location latitude
|
||||||
Longitude float64 `json:"longitude"` // Receiver location longitude
|
Longitude float64 `json:"longitude"` // Receiver location longitude
|
||||||
Altitude float64 `json:"altitude"` // Receiver altitude above sea level
|
Altitude float64 `json:"altitude"` // Receiver altitude above sea level
|
||||||
|
|
|
||||||
|
|
@ -155,6 +155,26 @@ type Aircraft struct {
|
||||||
SelectedAltitude int // MCP/FCU selected altitude in feet
|
SelectedAltitude int // MCP/FCU selected altitude in feet
|
||||||
SelectedHeading float64 // MCP/FCU selected heading in degrees
|
SelectedHeading float64 // MCP/FCU selected heading in degrees
|
||||||
BaroSetting float64 // Barometric pressure setting (QNH) in millibars
|
BaroSetting float64 // Barometric pressure setting (QNH) in millibars
|
||||||
|
|
||||||
|
// Additional fields from VRS JSON and extended sources
|
||||||
|
Registration string // Aircraft registration (e.g., "N12345")
|
||||||
|
AircraftType string // Aircraft type (e.g., "B738")
|
||||||
|
Operator string // Airline or operator name
|
||||||
|
|
||||||
|
// Validity flags for optional fields (used by VRS and other sources)
|
||||||
|
CallsignValid bool // Whether callsign is valid
|
||||||
|
PositionValid bool // Whether position is valid
|
||||||
|
AltitudeValid bool // Whether altitude is valid
|
||||||
|
GeomAltitudeValid bool // Whether geometric altitude is valid
|
||||||
|
GroundSpeedValid bool // Whether ground speed is valid
|
||||||
|
TrackValid bool // Whether track is valid
|
||||||
|
VerticalRateValid bool // Whether vertical rate is valid
|
||||||
|
SquawkValid bool // Whether squawk code is valid
|
||||||
|
OnGroundValid bool // Whether on-ground status is valid
|
||||||
|
|
||||||
|
// Position source indicators
|
||||||
|
PositionMLAT bool // Position derived from MLAT
|
||||||
|
PositionTISB bool // Position from TIS-B
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decoder handles Mode S and ADS-B message decoding with CPR position resolution.
|
// Decoder handles Mode S and ADS-B message decoding with CPR position resolution.
|
||||||
|
|
|
||||||
234
internal/vrs/parser.go
Normal file
234
internal/vrs/parser.go
Normal file
|
|
@ -0,0 +1,234 @@
|
||||||
|
// Package vrs provides Virtual Radar Server JSON format parsing for ADS-B message streams.
|
||||||
|
//
|
||||||
|
// The VRS JSON format is a simplified alternative to the Beast binary protocol,
|
||||||
|
// providing aircraft data as newline-delimited JSON objects over TCP connections
|
||||||
|
// (typically port 33005 when using readsb --net-vrs-port).
|
||||||
|
//
|
||||||
|
// VRS JSON Format Structure:
|
||||||
|
// - Single-line JSON object per update: {"acList":[{aircraft1},{aircraft2},...]}
|
||||||
|
// - Updates sent at configurable intervals (default 5 seconds)
|
||||||
|
// - Each aircraft object contains ICAO, position, altitude, speed, etc.
|
||||||
|
// - Simpler parsing compared to binary Beast format
|
||||||
|
//
|
||||||
|
// This package handles:
|
||||||
|
// - JSON message parsing and validation
|
||||||
|
// - Aircraft data extraction from VRS format
|
||||||
|
// - Continuous stream processing with error recovery
|
||||||
|
// - Conversion to internal aircraft data structures
|
||||||
|
package vrs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// VRSMessage represents a complete VRS JSON message containing multiple aircraft
|
||||||
|
type VRSMessage struct {
|
||||||
|
AcList []VRSAircraft `json:"acList"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// VRSAircraft represents a single aircraft in VRS JSON format
|
||||||
|
type VRSAircraft struct {
|
||||||
|
Icao string `json:"Icao"` // ICAO hex address (may have ~ prefix for non-ICAO)
|
||||||
|
Lat float64 `json:"Lat"` // Latitude
|
||||||
|
Long float64 `json:"Long"` // Longitude
|
||||||
|
Alt int `json:"Alt"` // Barometric altitude in feet
|
||||||
|
GAlt int `json:"GAlt"` // Geometric altitude in feet
|
||||||
|
Spd float64 `json:"Spd"` // Speed in knots
|
||||||
|
Trak float64 `json:"Trak"` // Track/heading in degrees
|
||||||
|
Vsi int `json:"Vsi"` // Vertical speed in feet/min
|
||||||
|
Sqk string `json:"Sqk"` // Squawk code
|
||||||
|
Call string `json:"Call"` // Callsign
|
||||||
|
Gnd bool `json:"Gnd"` // On ground flag
|
||||||
|
TAlt int `json:"TAlt"` // Target altitude
|
||||||
|
Mlat bool `json:"Mlat"` // MLAT position flag
|
||||||
|
Tisb bool `json:"Tisb"` // TIS-B flag
|
||||||
|
Sat bool `json:"Sat"` // Satellite (JAERO) position flag
|
||||||
|
|
||||||
|
// Additional fields that may be present
|
||||||
|
Reg string `json:"Reg"` // Registration
|
||||||
|
Type string `json:"Type"` // Aircraft type
|
||||||
|
Mdl string `json:"Mdl"` // Model
|
||||||
|
Op string `json:"Op"` // Operator
|
||||||
|
From string `json:"From"` // Departure airport
|
||||||
|
To string `json:"To"` // Destination airport
|
||||||
|
|
||||||
|
// Timing fields
|
||||||
|
PosTime int64 `json:"PosTime"` // Position timestamp (milliseconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parser handles VRS JSON format parsing from a stream
|
||||||
|
type Parser struct {
|
||||||
|
reader *bufio.Reader
|
||||||
|
sourceID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewParser creates a new VRS JSON format parser for a data stream
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - r: Input stream containing VRS JSON data
|
||||||
|
// - sourceID: Identifier for this data source
|
||||||
|
//
|
||||||
|
// Returns a configured parser ready for message parsing
|
||||||
|
func NewParser(r io.Reader, sourceID string) *Parser {
|
||||||
|
return &Parser{
|
||||||
|
reader: bufio.NewReader(r),
|
||||||
|
sourceID: sourceID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadMessage reads and parses a single VRS JSON message from the stream
|
||||||
|
//
|
||||||
|
// The VRS format sends complete JSON objects on single lines, with each
|
||||||
|
// object containing an array of aircraft updates. This is much simpler
|
||||||
|
// than Beast binary parsing.
|
||||||
|
//
|
||||||
|
// Returns the parsed message or an error if the stream is closed or corrupted
|
||||||
|
func (p *Parser) ReadMessage() (*VRSMessage, error) {
|
||||||
|
// Read complete line (VRS sends one JSON object per line)
|
||||||
|
line, err := p.reader.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trim whitespace
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if line == "" {
|
||||||
|
// Empty line, try again
|
||||||
|
return p.ReadMessage()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse JSON
|
||||||
|
var msg VRSMessage
|
||||||
|
if err := json.Unmarshal([]byte(line), &msg); err != nil {
|
||||||
|
// Invalid JSON, skip and continue
|
||||||
|
return p.ReadMessage()
|
||||||
|
}
|
||||||
|
|
||||||
|
return &msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseStream continuously reads messages from the stream until an error occurs
|
||||||
|
//
|
||||||
|
// This method runs in a loop, parsing messages and sending them to the provided
|
||||||
|
// channel. It handles various error conditions gracefully:
|
||||||
|
// - EOF and closed pipe errors terminate normally (expected on disconnect)
|
||||||
|
// - JSON parsing errors are recovered from automatically
|
||||||
|
// - Other errors are reported via the error channel
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - msgChan: Channel for sending successfully parsed messages
|
||||||
|
// - errChan: Channel for reporting parsing errors
|
||||||
|
func (p *Parser) ParseStream(msgChan chan<- *VRSMessage, errChan chan<- error) {
|
||||||
|
for {
|
||||||
|
msg, err := p.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
errChan <- fmt.Errorf("VRS parser error from %s: %w", p.sourceID, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
msgChan <- msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetICAO24 extracts and cleans the ICAO 24-bit address from VRS format
|
||||||
|
//
|
||||||
|
// VRS format may prefix non-ICAO addresses with '~'. This method:
|
||||||
|
// - Removes the '~' prefix if present
|
||||||
|
// - Converts hex string to uint32
|
||||||
|
// - Returns 0 for invalid addresses
|
||||||
|
func (a *VRSAircraft) GetICAO24() (uint32, error) {
|
||||||
|
// Remove non-ICAO prefix if present
|
||||||
|
icaoStr := strings.TrimPrefix(a.Icao, "~")
|
||||||
|
|
||||||
|
// Parse hex string
|
||||||
|
icao64, err := strconv.ParseUint(icaoStr, 16, 24)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("invalid ICAO address: %s", a.Icao)
|
||||||
|
}
|
||||||
|
|
||||||
|
return uint32(icao64), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasPosition returns true if the aircraft has valid position data
|
||||||
|
func (a *VRSAircraft) HasPosition() bool {
|
||||||
|
// Check if we have non-zero lat/lon
|
||||||
|
return a.Lat != 0 || a.Long != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasAltitude returns true if the aircraft has valid altitude data
|
||||||
|
func (a *VRSAircraft) HasAltitude() bool {
|
||||||
|
return a.Alt != 0 || a.GAlt != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAltitude returns the best available altitude (barometric preferred)
|
||||||
|
func (a *VRSAircraft) GetAltitude() int {
|
||||||
|
if a.Alt != 0 {
|
||||||
|
return a.Alt
|
||||||
|
}
|
||||||
|
return a.GAlt
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSpeed returns the speed in knots
|
||||||
|
func (a *VRSAircraft) GetSpeed() float64 {
|
||||||
|
return a.Spd
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetHeading returns the track/heading in degrees
|
||||||
|
func (a *VRSAircraft) GetHeading() float64 {
|
||||||
|
return a.Trak
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetVerticalRate returns the vertical speed in feet/min
|
||||||
|
func (a *VRSAircraft) GetVerticalRate() int {
|
||||||
|
return a.Vsi
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSquawk returns the squawk code as an integer
|
||||||
|
func (a *VRSAircraft) GetSquawk() (uint16, error) {
|
||||||
|
if a.Sqk == "" {
|
||||||
|
return 0, fmt.Errorf("no squawk code")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse hex squawk code
|
||||||
|
squawk64, err := strconv.ParseUint(a.Sqk, 16, 16)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("invalid squawk code: %s", a.Sqk)
|
||||||
|
}
|
||||||
|
|
||||||
|
return uint16(squawk64), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPositionSource returns the source of position data
|
||||||
|
func (a *VRSAircraft) GetPositionSource() string {
|
||||||
|
if a.Mlat {
|
||||||
|
return "MLAT"
|
||||||
|
}
|
||||||
|
if a.Tisb {
|
||||||
|
return "TIS-B"
|
||||||
|
}
|
||||||
|
if a.Sat {
|
||||||
|
return "Satellite"
|
||||||
|
}
|
||||||
|
return "ADS-B"
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTimestamp returns the position timestamp as time.Time
|
||||||
|
func (a *VRSAircraft) GetTimestamp() time.Time {
|
||||||
|
if a.PosTime > 0 {
|
||||||
|
return time.Unix(0, a.PosTime*int64(time.Millisecond))
|
||||||
|
}
|
||||||
|
// If no timestamp provided, use current time
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsOnGround returns true if the aircraft is on the ground
|
||||||
|
func (a *VRSAircraft) IsOnGround() bool {
|
||||||
|
return a.Gnd
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue