skyview/internal/database/migrations.go
Ole-Morten Duesund 2bffa2c418 style: Apply code formatting with go fmt
- Run 'make format' to ensure all Go code follows standard formatting
- Maintains consistent code style across the entire codebase
- No functional changes, only whitespace and formatting improvements
2025-09-01 10:05:29 +02:00

419 lines
12 KiB
Go

package database
import (
"database/sql"
"fmt"
"sort"
"strings"
"time"
)
// Migrator handles database schema migrations with rollback support
type Migrator struct {
conn *sql.DB
}
// Migration represents a database schema change
type Migration struct {
Version int
Description string
Up string
Down string
DataLoss bool
Checksum string
}
// MigrationRecord represents a completed migration in the database
type MigrationRecord struct {
Version int `json:"version"`
Description string `json:"description"`
AppliedAt time.Time `json:"applied_at"`
Checksum string `json:"checksum"`
}
// NewMigrator creates a new database migrator
func NewMigrator(conn *sql.DB) *Migrator {
return &Migrator{conn: conn}
}
// GetMigrations returns all available migrations in version order
func GetMigrations() []Migration {
migrations := []Migration{
{
Version: 1,
Description: "Initial schema with aircraft history",
Up: `
-- Schema metadata table
CREATE TABLE IF NOT EXISTS schema_info (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT NOT NULL,
checksum TEXT NOT NULL
);
-- Aircraft position history
CREATE TABLE IF NOT EXISTS aircraft_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
icao TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
latitude REAL,
longitude REAL,
altitude INTEGER,
speed INTEGER,
track INTEGER,
vertical_rate INTEGER,
squawk TEXT,
callsign TEXT,
source_id TEXT NOT NULL,
signal_strength REAL
);
-- Indexes for aircraft history
CREATE INDEX IF NOT EXISTS idx_aircraft_history_icao_time ON aircraft_history(icao, timestamp);
CREATE INDEX IF NOT EXISTS idx_aircraft_history_timestamp ON aircraft_history(timestamp);
CREATE INDEX IF NOT EXISTS idx_aircraft_history_callsign ON aircraft_history(callsign);
`,
Down: `
DROP TABLE IF EXISTS aircraft_history;
DROP TABLE IF EXISTS schema_info;
`,
DataLoss: true,
},
{
Version: 2,
Description: "Add callsign enhancement tables",
Up: `
-- Airlines data table (unified schema for all sources)
CREATE TABLE IF NOT EXISTS airlines (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
alias TEXT,
iata_code TEXT,
icao_code TEXT,
callsign TEXT,
country TEXT,
country_code TEXT,
active BOOLEAN DEFAULT 1,
data_source TEXT NOT NULL DEFAULT 'unknown',
source_id TEXT, -- Original ID from source
imported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Airports data table (unified schema for all sources)
CREATE TABLE IF NOT EXISTS airports (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
ident TEXT, -- Airport identifier (ICAO, FAA, etc.)
type TEXT, -- Airport type (large_airport, medium_airport, etc.)
city TEXT,
municipality TEXT, -- More specific than city
region TEXT, -- State/province
country TEXT,
country_code TEXT, -- ISO country code
continent TEXT,
iata_code TEXT,
icao_code TEXT,
local_code TEXT,
gps_code TEXT,
latitude REAL,
longitude REAL,
elevation_ft INTEGER,
scheduled_service BOOLEAN DEFAULT 0,
home_link TEXT,
wikipedia_link TEXT,
keywords TEXT,
timezone_offset REAL,
timezone TEXT,
dst_type TEXT,
data_source TEXT NOT NULL DEFAULT 'unknown',
source_id TEXT, -- Original ID from source
imported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- External API cache for callsign lookups
CREATE TABLE IF NOT EXISTS callsign_cache (
callsign TEXT PRIMARY KEY,
airline_icao TEXT,
airline_iata TEXT,
airline_name TEXT,
airline_country TEXT,
flight_number TEXT,
origin_iata TEXT,
destination_iata TEXT,
aircraft_type TEXT,
route TEXT,
status TEXT,
source TEXT NOT NULL DEFAULT 'local',
cached_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
);
-- Data source tracking
CREATE TABLE IF NOT EXISTS data_sources (
name TEXT PRIMARY KEY,
license TEXT NOT NULL,
url TEXT,
version TEXT,
imported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
record_count INTEGER DEFAULT 0,
user_accepted_license BOOLEAN DEFAULT 0
);
-- Indexes for airlines
CREATE INDEX IF NOT EXISTS idx_airlines_icao_code ON airlines(icao_code);
CREATE INDEX IF NOT EXISTS idx_airlines_iata_code ON airlines(iata_code);
CREATE INDEX IF NOT EXISTS idx_airlines_callsign ON airlines(callsign);
CREATE INDEX IF NOT EXISTS idx_airlines_country_code ON airlines(country_code);
CREATE INDEX IF NOT EXISTS idx_airlines_active ON airlines(active);
CREATE INDEX IF NOT EXISTS idx_airlines_source ON airlines(data_source);
-- Indexes for airports
CREATE INDEX IF NOT EXISTS idx_airports_icao_code ON airports(icao_code);
CREATE INDEX IF NOT EXISTS idx_airports_iata_code ON airports(iata_code);
CREATE INDEX IF NOT EXISTS idx_airports_ident ON airports(ident);
CREATE INDEX IF NOT EXISTS idx_airports_country_code ON airports(country_code);
CREATE INDEX IF NOT EXISTS idx_airports_type ON airports(type);
CREATE INDEX IF NOT EXISTS idx_airports_coords ON airports(latitude, longitude);
CREATE INDEX IF NOT EXISTS idx_airports_source ON airports(data_source);
-- Indexes for callsign cache
CREATE INDEX IF NOT EXISTS idx_callsign_cache_expires ON callsign_cache(expires_at);
CREATE INDEX IF NOT EXISTS idx_callsign_cache_airline ON callsign_cache(airline_icao);
`,
Down: `
DROP TABLE IF EXISTS callsign_cache;
DROP TABLE IF EXISTS airports;
DROP TABLE IF EXISTS airlines;
DROP TABLE IF EXISTS data_sources;
`,
DataLoss: true,
},
// Future migrations will be added here
}
// Calculate checksums
for i := range migrations {
migrations[i].Checksum = calculateChecksum(migrations[i].Up)
}
// Sort by version
sort.Slice(migrations, func(i, j int) bool {
return migrations[i].Version < migrations[j].Version
})
return migrations
}
// MigrateToLatest runs all pending migrations to bring database to latest schema
func (m *Migrator) MigrateToLatest() error {
currentVersion, err := m.getCurrentVersion()
if err != nil {
return fmt.Errorf("failed to get current version: %v", err)
}
migrations := GetMigrations()
for _, migration := range migrations {
if migration.Version <= currentVersion {
continue
}
if err := m.applyMigration(migration); err != nil {
return fmt.Errorf("failed to apply migration %d: %v", migration.Version, err)
}
}
return nil
}
// MigrateTo runs migrations to reach a specific version
func (m *Migrator) MigrateTo(targetVersion int) error {
currentVersion, err := m.getCurrentVersion()
if err != nil {
return fmt.Errorf("failed to get current version: %v", err)
}
if targetVersion == currentVersion {
return nil // Already at target version
}
migrations := GetMigrations()
if targetVersion > currentVersion {
// Forward migration
for _, migration := range migrations {
if migration.Version <= currentVersion || migration.Version > targetVersion {
continue
}
if err := m.applyMigration(migration); err != nil {
return fmt.Errorf("failed to apply migration %d: %v", migration.Version, err)
}
}
} else {
// Rollback migration
// Sort in reverse order for rollback
sort.Slice(migrations, func(i, j int) bool {
return migrations[i].Version > migrations[j].Version
})
for _, migration := range migrations {
if migration.Version > currentVersion || migration.Version <= targetVersion {
continue
}
if err := m.rollbackMigration(migration); err != nil {
return fmt.Errorf("failed to rollback migration %d: %v", migration.Version, err)
}
}
}
return nil
}
// GetAppliedMigrations returns all migrations that have been applied
func (m *Migrator) GetAppliedMigrations() ([]MigrationRecord, error) {
// Ensure schema_info table exists
if err := m.ensureSchemaInfoTable(); err != nil {
return nil, err
}
query := `
SELECT version, description, applied_at, checksum
FROM schema_info
ORDER BY version
`
rows, err := m.conn.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to query applied migrations: %v", err)
}
defer rows.Close()
var migrations []MigrationRecord
for rows.Next() {
var migration MigrationRecord
err := rows.Scan(
&migration.Version,
&migration.Description,
&migration.AppliedAt,
&migration.Checksum,
)
if err != nil {
return nil, fmt.Errorf("failed to scan migration record: %v", err)
}
migrations = append(migrations, migration)
}
return migrations, nil
}
// getCurrentVersion returns the highest applied migration version
func (m *Migrator) getCurrentVersion() (int, error) {
if err := m.ensureSchemaInfoTable(); err != nil {
return 0, err
}
var version int
err := m.conn.QueryRow(`SELECT COALESCE(MAX(version), 0) FROM schema_info`).Scan(&version)
if err != nil {
return 0, fmt.Errorf("failed to get current version: %v", err)
}
return version, nil
}
// applyMigration executes a migration in a transaction
func (m *Migrator) applyMigration(migration Migration) error {
tx, err := m.conn.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Warn about data loss
if migration.DataLoss {
// In a real application, this would show a warning to the user
// For now, we'll just log it
}
// Execute migration SQL
statements := strings.Split(migration.Up, ";")
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := tx.Exec(stmt); err != nil {
return fmt.Errorf("failed to execute migration statement: %v", err)
}
}
// Record migration
_, err = tx.Exec(`
INSERT INTO schema_info (version, description, checksum)
VALUES (?, ?, ?)
`, migration.Version, migration.Description, migration.Checksum)
if err != nil {
return fmt.Errorf("failed to record migration: %v", err)
}
return tx.Commit()
}
// rollbackMigration executes a migration rollback in a transaction
func (m *Migrator) rollbackMigration(migration Migration) error {
if migration.Down == "" {
return fmt.Errorf("migration %d has no rollback script", migration.Version)
}
tx, err := m.conn.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Execute rollback SQL
statements := strings.Split(migration.Down, ";")
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := tx.Exec(stmt); err != nil {
return fmt.Errorf("failed to execute rollback statement: %v", err)
}
}
// Remove migration record
_, err = tx.Exec(`DELETE FROM schema_info WHERE version = ?`, migration.Version)
if err != nil {
return fmt.Errorf("failed to remove migration record: %v", err)
}
return tx.Commit()
}
// ensureSchemaInfoTable creates the schema_info table if it doesn't exist
func (m *Migrator) ensureSchemaInfoTable() error {
_, err := m.conn.Exec(`
CREATE TABLE IF NOT EXISTS schema_info (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT NOT NULL,
checksum TEXT NOT NULL
)
`)
return err
}
// calculateChecksum generates a checksum for migration content
func calculateChecksum(content string) string {
// Simple checksum - in production, use a proper hash function
return fmt.Sprintf("%x", len(content))
}