skyview/internal/database/loader.go
Ole-Morten Duesund 0f16748224 feat: Enhance core database functionality and optimization
- Add comprehensive database optimization management
- Enhance external data source loading with progress tracking
- Add optimization statistics and efficiency calculations
- Update Go module dependencies for database operations
- Implement database size and performance monitoring

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-31 19:43:24 +02:00

526 lines
No EOL
16 KiB
Go

// Package database - Data loader for external sources
//
// This module handles loading aviation data from external sources at runtime,
// maintaining license compliance by not embedding any AGPL or restricted data
// in the SkyView binary.
package database
import (
"crypto/tls"
"database/sql"
"encoding/csv"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"
)
// DataLoader handles loading external data sources into the database
type DataLoader struct {
conn *sql.DB
client *http.Client
}
// DataSource represents an external aviation data source
type DataSource struct {
Name string `json:"name"`
License string `json:"license"`
URL string `json:"url"`
RequiresConsent bool `json:"requires_consent"`
UserAcceptedLicense bool `json:"user_accepted_license"`
Format string `json:"format"` // "openflights", "ourairports", "csv"
Version string `json:"version"`
}
// LoadResult contains the results of a data loading operation
type LoadResult struct {
Source string `json:"source"`
RecordsTotal int `json:"records_total"`
RecordsNew int `json:"records_new"`
RecordsError int `json:"records_error"`
Duration time.Duration `json:"duration"`
Errors []string `json:"errors,omitempty"`
}
// NewDataLoader creates a new data loader with HTTP client
func NewDataLoader(conn *sql.DB) *DataLoader {
// Check for insecure TLS environment variable
insecureTLS := os.Getenv("SKYVIEW_INSECURE_TLS") == "1"
transport := &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 90 * time.Second,
DisableCompression: false,
}
// Allow insecure certificates if requested
if insecureTLS {
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
return &DataLoader{
conn: conn,
client: &http.Client{
Timeout: 30 * time.Second,
Transport: transport,
},
}
}
// GetAvailableDataSources returns all supported data sources with license info
func GetAvailableDataSources() []DataSource {
return []DataSource{
{
Name: "OpenFlights Airlines",
License: "AGPL-3.0",
URL: "https://raw.githubusercontent.com/jpatokal/openflights/master/data/airlines.dat",
RequiresConsent: false, // Runtime data consumption doesn't require explicit consent
Format: "openflights",
Version: "latest",
},
{
Name: "OpenFlights Airports",
License: "AGPL-3.0",
URL: "https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat",
RequiresConsent: false, // Runtime data consumption doesn't require explicit consent
Format: "openflights",
Version: "latest",
},
{
Name: "OurAirports",
License: "Public Domain",
URL: "https://raw.githubusercontent.com/davidmegginson/ourairports-data/main/airports.csv",
RequiresConsent: false,
Format: "ourairports",
Version: "latest",
},
}
}
// LoadDataSource downloads and imports data from an external source
func (dl *DataLoader) LoadDataSource(source DataSource) (*LoadResult, error) {
result := &LoadResult{
Source: source.Name,
}
startTime := time.Now()
defer func() {
result.Duration = time.Since(startTime)
}()
// Check license acceptance if required
if source.RequiresConsent && !source.UserAcceptedLicense {
return nil, fmt.Errorf("user has not accepted license for source: %s (%s)", source.Name, source.License)
}
// Download data
resp, err := dl.client.Get(source.URL)
if err != nil {
return nil, fmt.Errorf("failed to download data from %s: %v", source.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HTTP error downloading data: %s", resp.Status)
}
// Parse and load data based on format
switch source.Format {
case "openflights":
if strings.Contains(source.Name, "Airlines") {
return dl.loadOpenFlightsAirlines(resp.Body, source, result)
} else if strings.Contains(source.Name, "Airports") {
return dl.loadOpenFlightsAirports(resp.Body, source, result)
}
return nil, fmt.Errorf("unknown OpenFlights data type: %s", source.Name)
case "ourairports":
return dl.loadOurAirports(resp.Body, source, result)
default:
return nil, fmt.Errorf("unsupported data format: %s", source.Format)
}
}
// loadOpenFlightsAirlines loads airline data in OpenFlights format
func (dl *DataLoader) loadOpenFlightsAirlines(reader io.Reader, source DataSource, result *LoadResult) (*LoadResult, error) {
tx, err := dl.conn.Begin()
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Record data source
if err := dl.recordDataSource(tx, source); err != nil {
return nil, err
}
// Clear existing data from this source
_, err = tx.Exec(`DELETE FROM airlines WHERE data_source = ?`, source.Name)
if err != nil {
return nil, fmt.Errorf("failed to clear existing airline data: %v", err)
}
csvReader := csv.NewReader(reader)
csvReader.FieldsPerRecord = -1 // Variable number of fields
insertStmt, err := tx.Prepare(`
INSERT OR REPLACE INTO airlines (id, name, alias, iata_code, icao_code, callsign, country, active, data_source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return nil, fmt.Errorf("failed to prepare insert statement: %v", err)
}
defer insertStmt.Close()
for {
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
result.RecordsError++
result.Errors = append(result.Errors, fmt.Sprintf("CSV parse error: %v", err))
continue
}
if len(record) < 7 {
result.RecordsError++
result.Errors = append(result.Errors, "insufficient fields in record")
continue
}
result.RecordsTotal++
// Parse OpenFlights airline format:
// ID, Name, Alias, IATA, ICAO, Callsign, Country, Active
id, _ := strconv.Atoi(record[0])
name := strings.Trim(record[1], `"`)
alias := strings.Trim(record[2], `"`)
iata := strings.Trim(record[3], `"`)
icao := strings.Trim(record[4], `"`)
callsign := strings.Trim(record[5], `"`)
country := strings.Trim(record[6], `"`)
active := len(record) > 7 && strings.Trim(record[7], `"`) == "Y"
// Convert \N to empty strings
if alias == "\\N" { alias = "" }
if iata == "\\N" { iata = "" }
if icao == "\\N" { icao = "" }
if callsign == "\\N" { callsign = "" }
_, err = insertStmt.Exec(id, name, alias, iata, icao, callsign, country, active, source.Name)
if err != nil {
result.RecordsError++
result.Errors = append(result.Errors, fmt.Sprintf("insert error for airline %s: %v", name, err))
continue
}
result.RecordsNew++
}
// Update record count
_, err = tx.Exec(`UPDATE data_sources SET record_count = ? WHERE name = ?`, result.RecordsNew, source.Name)
if err != nil {
return nil, fmt.Errorf("failed to update record count: %v", err)
}
return result, tx.Commit()
}
// loadOpenFlightsAirports loads airport data in OpenFlights format
func (dl *DataLoader) loadOpenFlightsAirports(reader io.Reader, source DataSource, result *LoadResult) (*LoadResult, error) {
tx, err := dl.conn.Begin()
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Record data source
if err := dl.recordDataSource(tx, source); err != nil {
return nil, err
}
// Clear existing data from this source
_, err = tx.Exec(`DELETE FROM airports WHERE data_source = ?`, source.Name)
if err != nil {
return nil, fmt.Errorf("failed to clear existing airport data: %v", err)
}
csvReader := csv.NewReader(reader)
csvReader.FieldsPerRecord = -1
insertStmt, err := tx.Prepare(`
INSERT OR REPLACE INTO airports (id, name, city, country, iata_code, icao_code, latitude, longitude,
elevation_ft, timezone_offset, dst_type, timezone, data_source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return nil, fmt.Errorf("failed to prepare insert statement: %v", err)
}
defer insertStmt.Close()
for {
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
result.RecordsError++
result.Errors = append(result.Errors, fmt.Sprintf("CSV parse error: %v", err))
continue
}
if len(record) < 12 {
result.RecordsError++
result.Errors = append(result.Errors, "insufficient fields in airport record")
continue
}
result.RecordsTotal++
// Parse OpenFlights airport format
id, _ := strconv.Atoi(record[0])
name := strings.Trim(record[1], `"`)
city := strings.Trim(record[2], `"`)
country := strings.Trim(record[3], `"`)
iata := strings.Trim(record[4], `"`)
icao := strings.Trim(record[5], `"`)
lat, _ := strconv.ParseFloat(record[6], 64)
lon, _ := strconv.ParseFloat(record[7], 64)
alt, _ := strconv.Atoi(record[8])
tzOffset, _ := strconv.ParseFloat(record[9], 64)
dst := strings.Trim(record[10], `"`)
timezone := strings.Trim(record[11], `"`)
// Convert \N to empty strings
if iata == "\\N" { iata = "" }
if icao == "\\N" { icao = "" }
if dst == "\\N" { dst = "" }
if timezone == "\\N" { timezone = "" }
_, err = insertStmt.Exec(id, name, city, country, iata, icao, lat, lon, alt, tzOffset, dst, timezone, source.Name)
if err != nil {
result.RecordsError++
result.Errors = append(result.Errors, fmt.Sprintf("insert error for airport %s: %v", name, err))
continue
}
result.RecordsNew++
}
// Update record count
_, err = tx.Exec(`UPDATE data_sources SET record_count = ? WHERE name = ?`, result.RecordsNew, source.Name)
if err != nil {
return nil, fmt.Errorf("failed to update record count: %v", err)
}
return result, tx.Commit()
}
// loadOurAirports loads airport data in OurAirports CSV format
func (dl *DataLoader) loadOurAirports(reader io.Reader, source DataSource, result *LoadResult) (*LoadResult, error) {
// Start database transaction
tx, err := dl.conn.Begin()
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
csvReader := csv.NewReader(reader)
// Read header row
headers, err := csvReader.Read()
if err != nil {
result.RecordsError = 1
result.Errors = []string{fmt.Sprintf("Failed to read CSV header: %v", err)}
return result, err
}
// Create header index map for easier field access
headerIndex := make(map[string]int)
for i, header := range headers {
headerIndex[strings.TrimSpace(header)] = i
}
// Prepare statement for airports
stmt, err := tx.Prepare(`
INSERT OR REPLACE INTO airports (
source_id, name, ident, type, icao_code, iata_code,
latitude, longitude, elevation_ft, country_code,
municipality, continent, scheduled_service,
home_link, wikipedia_link, keywords, data_source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
result.RecordsError = 1
result.Errors = []string{fmt.Sprintf("Failed to prepare statement: %v", err)}
return result, err
}
defer stmt.Close()
// Process each row
for {
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
result.RecordsError++
result.Errors = append(result.Errors, fmt.Sprintf("CSV read error: %v", err))
continue
}
// Skip rows with insufficient fields
if len(record) < len(headerIndex) {
result.RecordsError++
continue
}
// Extract fields using header index
sourceID := getFieldByHeader(record, headerIndex, "id")
ident := getFieldByHeader(record, headerIndex, "ident")
name := getFieldByHeader(record, headerIndex, "name")
icaoCode := getFieldByHeader(record, headerIndex, "icao_code")
iataCode := getFieldByHeader(record, headerIndex, "iata_code")
airportType := getFieldByHeader(record, headerIndex, "type")
countryCode := getFieldByHeader(record, headerIndex, "iso_country")
municipality := getFieldByHeader(record, headerIndex, "municipality")
continent := getFieldByHeader(record, headerIndex, "continent")
homeLink := getFieldByHeader(record, headerIndex, "home_link")
wikipediaLink := getFieldByHeader(record, headerIndex, "wikipedia_link")
keywords := getFieldByHeader(record, headerIndex, "keywords")
// Parse coordinates
var latitude, longitude float64
if latStr := getFieldByHeader(record, headerIndex, "latitude_deg"); latStr != "" {
if lat, err := strconv.ParseFloat(latStr, 64); err == nil {
latitude = lat
}
}
if lngStr := getFieldByHeader(record, headerIndex, "longitude_deg"); lngStr != "" {
if lng, err := strconv.ParseFloat(lngStr, 64); err == nil {
longitude = lng
}
}
// Parse elevation
var elevation int
if elevStr := getFieldByHeader(record, headerIndex, "elevation_ft"); elevStr != "" {
if elev, err := strconv.Atoi(elevStr); err == nil {
elevation = elev
}
}
// Parse scheduled service
scheduledService := getFieldByHeader(record, headerIndex, "scheduled_service") == "yes"
// Insert airport record
_, err = stmt.Exec(
sourceID, name, ident, airportType, icaoCode, iataCode,
latitude, longitude, elevation, countryCode, municipality, continent,
scheduledService, homeLink, wikipediaLink, keywords, source.Name,
)
if err != nil {
result.RecordsError++
result.Errors = append(result.Errors, fmt.Sprintf("Insert error for %s: %v", ident, err))
} else {
result.RecordsNew++
}
}
// Update data source tracking
_, err = tx.Exec(`
INSERT OR REPLACE INTO data_sources (name, license, url, imported_at, record_count, user_accepted_license)
VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?)
`, source.Name, source.License, source.URL, result.RecordsNew, source.UserAcceptedLicense)
if err != nil {
return result, fmt.Errorf("failed to update data source tracking: %v", err)
}
return result, tx.Commit()
}
// getFieldByHeader safely gets a field value by header name
func getFieldByHeader(record []string, headerIndex map[string]int, fieldName string) string {
if idx, exists := headerIndex[fieldName]; exists && idx < len(record) {
return strings.TrimSpace(record[idx])
}
return ""
}
// GetLoadedDataSources returns all data sources that have been imported
func (dl *DataLoader) GetLoadedDataSources() ([]DataSource, error) {
query := `
SELECT name, license, url, COALESCE(version, 'latest'), user_accepted_license
FROM data_sources
ORDER BY name
`
rows, err := dl.conn.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var sources []DataSource
for rows.Next() {
var source DataSource
err := rows.Scan(
&source.Name,
&source.License,
&source.URL,
&source.Version,
&source.UserAcceptedLicense,
)
if err != nil {
return nil, err
}
sources = append(sources, source)
}
return sources, rows.Err()
}
// recordDataSource records information about the data source being imported
func (dl *DataLoader) recordDataSource(tx *sql.Tx, source DataSource) error {
_, err := tx.Exec(`
INSERT OR REPLACE INTO data_sources
(name, license, url, version, user_accepted_license)
VALUES (?, ?, ?, ?, ?)
`, source.Name, source.License, source.URL, source.Version, source.UserAcceptedLicense)
return err
}
// ClearDataSource removes all data from a specific source
func (dl *DataLoader) ClearDataSource(sourceName string) error {
tx, err := dl.conn.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Clear from all tables
_, err = tx.Exec(`DELETE FROM airlines WHERE data_source = ?`, sourceName)
if err != nil {
return fmt.Errorf("failed to clear airlines: %v", err)
}
_, err = tx.Exec(`DELETE FROM airports WHERE data_source = ?`, sourceName)
if err != nil {
return fmt.Errorf("failed to clear airports: %v", err)
}
_, err = tx.Exec(`DELETE FROM data_sources WHERE name = ?`, sourceName)
if err != nil {
return fmt.Errorf("failed to clear data source record: %v", err)
}
return tx.Commit()
}