feat: implement complete Rust version of mail2couch
- Add comprehensive Rust implementation matching Go functionality - Configuration loading with automatic file discovery - GNU-style command line parsing with clap (--config/-c, --max-messages/-m) - CouchDB client integration with document storage and sync metadata - IMAP client functionality with message fetching and parsing - Folder filtering with wildcard pattern support (*, ?, [abc]) - Message filtering by subject, sender, and recipient keywords - Incremental sync functionality with metadata tracking - Bash completion generation matching Go implementation - Cross-compatible document schemas and database structures - Successfully tested with existing test environment Note: TLS support and advanced email parsing features pending 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
88a5bfb42b
commit
4835df070e
9 changed files with 1901 additions and 8 deletions
380
rust/src/sync.rs
Normal file
380
rust/src/sync.rs
Normal file
|
|
@ -0,0 +1,380 @@
|
|||
//! Synchronization logic for mail2couch
|
||||
//!
|
||||
//! This module coordinates the synchronization process between IMAP servers and CouchDB,
|
||||
//! implementing incremental sync with metadata tracking.
|
||||
|
||||
use crate::config::{Config, MailSource, CommandLineArgs};
|
||||
use crate::couch::CouchClient;
|
||||
use crate::filters::{filter_folders, get_filter_summary, validate_folder_patterns};
|
||||
use crate::imap::{ImapClient, should_process_message};
|
||||
use crate::schemas::{SyncMetadata, generate_database_name};
|
||||
use anyhow::{anyhow, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::{info, warn, error, debug};
|
||||
|
||||
/// Main synchronization coordinator
|
||||
pub struct SyncCoordinator {
|
||||
config: Config,
|
||||
couch_client: CouchClient,
|
||||
args: CommandLineArgs,
|
||||
}
|
||||
|
||||
/// Result of synchronizing a single mailbox
|
||||
#[derive(Debug)]
|
||||
pub struct MailboxSyncResult {
|
||||
pub mailbox: String,
|
||||
pub messages_processed: u32,
|
||||
pub messages_stored: u32,
|
||||
pub messages_skipped: u32,
|
||||
pub last_uid: Option<u32>,
|
||||
pub sync_time: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Result of synchronizing a mail source
|
||||
#[derive(Debug)]
|
||||
pub struct SourceSyncResult {
|
||||
pub source_name: String,
|
||||
pub database: String,
|
||||
pub mailboxes_processed: u32,
|
||||
pub total_messages: u32,
|
||||
pub mailbox_results: Vec<MailboxSyncResult>,
|
||||
pub sync_time: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl SyncCoordinator {
|
||||
/// Create a new sync coordinator
|
||||
pub fn new(config: Config, args: CommandLineArgs) -> Result<Self> {
|
||||
let couch_client = CouchClient::new(&config.couch_db)?;
|
||||
|
||||
Ok(SyncCoordinator {
|
||||
config,
|
||||
couch_client,
|
||||
args,
|
||||
})
|
||||
}
|
||||
|
||||
/// Test connections to all services
|
||||
pub async fn test_connections(&self) -> Result<()> {
|
||||
info!("Testing CouchDB connection...");
|
||||
self.couch_client.test_connection().await
|
||||
.map_err(|e| anyhow!("CouchDB connection failed: {}", e))?;
|
||||
info!("✅ CouchDB connection successful");
|
||||
|
||||
// Test IMAP connections for enabled sources
|
||||
for source in &self.config.mail_sources {
|
||||
if !source.enabled {
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("Testing IMAP connection to {}...", source.name);
|
||||
let imap_client = ImapClient::connect(source.clone()).await
|
||||
.map_err(|e| anyhow!("IMAP connection to {} failed: {}", source.name, e))?;
|
||||
|
||||
imap_client.close().await?;
|
||||
info!("✅ IMAP connection to {} successful", source.name);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Synchronize all enabled mail sources
|
||||
pub async fn sync_all_sources(&mut self) -> Result<Vec<SourceSyncResult>> {
|
||||
let mut results = Vec::new();
|
||||
|
||||
// Clone the sources to avoid borrowing issues
|
||||
let sources = self.config.mail_sources.clone();
|
||||
for source in &sources {
|
||||
if !source.enabled {
|
||||
info!("Skipping disabled source: {}", source.name);
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("Starting sync for source: {}", source.name);
|
||||
match self.sync_source(source).await {
|
||||
Ok(result) => {
|
||||
info!(
|
||||
"✅ Completed sync for {}: {} messages across {} mailboxes",
|
||||
result.source_name,
|
||||
result.total_messages,
|
||||
result.mailboxes_processed
|
||||
);
|
||||
results.push(result);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("❌ Failed to sync source {}: {}", source.name, e);
|
||||
// Continue with other sources even if one fails
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Synchronize a single mail source
|
||||
async fn sync_source(&mut self, source: &MailSource) -> Result<SourceSyncResult> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
// Generate database name
|
||||
let db_name = generate_database_name(&source.name, &source.user);
|
||||
info!("Using database: {}", db_name);
|
||||
|
||||
// Create database if it doesn't exist
|
||||
self.couch_client.create_database(&db_name).await?;
|
||||
|
||||
// Connect to IMAP server
|
||||
let mut imap_client = ImapClient::connect(source.clone()).await?;
|
||||
|
||||
// Get list of available mailboxes
|
||||
let all_mailboxes = imap_client.list_mailboxes().await?;
|
||||
info!("Found {} total mailboxes", all_mailboxes.len());
|
||||
|
||||
// Apply folder filtering
|
||||
let filtered_mailboxes = filter_folders(&all_mailboxes, &source.folder_filter);
|
||||
let filter_summary = get_filter_summary(&all_mailboxes, &filtered_mailboxes, &source.folder_filter);
|
||||
info!("{}", filter_summary);
|
||||
|
||||
// Validate folder patterns and show warnings
|
||||
let warnings = validate_folder_patterns(&source.folder_filter, &all_mailboxes);
|
||||
for warning in warnings {
|
||||
warn!("{}", warning);
|
||||
}
|
||||
|
||||
// Sync each filtered mailbox
|
||||
let mut mailbox_results = Vec::new();
|
||||
let mut total_messages = 0;
|
||||
|
||||
for mailbox in &filtered_mailboxes {
|
||||
info!("Syncing mailbox: {}", mailbox);
|
||||
|
||||
match self.sync_mailbox(&mut imap_client, &db_name, mailbox, source).await {
|
||||
Ok(result) => {
|
||||
info!(
|
||||
" ✅ {}: {} processed, {} stored, {} skipped",
|
||||
result.mailbox,
|
||||
result.messages_processed,
|
||||
result.messages_stored,
|
||||
result.messages_skipped
|
||||
);
|
||||
total_messages += result.messages_processed;
|
||||
mailbox_results.push(result);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(" ❌ Failed to sync mailbox {}: {}", mailbox, e);
|
||||
// Continue with other mailboxes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close IMAP connection
|
||||
imap_client.close().await?;
|
||||
|
||||
Ok(SourceSyncResult {
|
||||
source_name: source.name.clone(),
|
||||
database: db_name,
|
||||
mailboxes_processed: filtered_mailboxes.len() as u32,
|
||||
total_messages,
|
||||
mailbox_results,
|
||||
sync_time: start_time,
|
||||
})
|
||||
}
|
||||
|
||||
/// Synchronize a single mailbox
|
||||
async fn sync_mailbox(
|
||||
&mut self,
|
||||
imap_client: &mut ImapClient,
|
||||
db_name: &str,
|
||||
mailbox: &str,
|
||||
source: &MailSource,
|
||||
) -> Result<MailboxSyncResult> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
// Select the mailbox
|
||||
let mailbox_info = imap_client.select_mailbox(mailbox).await?;
|
||||
debug!("Selected mailbox {}: {} messages", mailbox, mailbox_info.exists);
|
||||
|
||||
// Get last sync metadata
|
||||
let since_date = match self.couch_client.get_sync_metadata(db_name, mailbox).await {
|
||||
Ok(metadata) => {
|
||||
info!(" Found sync metadata, last sync: {}", metadata.last_sync_time);
|
||||
Some(metadata.last_sync_time)
|
||||
}
|
||||
Err(_) => {
|
||||
info!(" No sync metadata found, performing full sync");
|
||||
// Parse since date from message filter if provided
|
||||
source.message_filter.since.as_ref()
|
||||
.and_then(|since_str| {
|
||||
DateTime::parse_from_str(&format!("{} 00:00:00 +0000", since_str), "%Y-%m-%d %H:%M:%S %z")
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.ok()
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
// Search for messages
|
||||
let message_uids = imap_client.search_messages(since_date.as_ref()).await?;
|
||||
info!(" Found {} messages to process", message_uids.len());
|
||||
|
||||
if message_uids.is_empty() {
|
||||
return Ok(MailboxSyncResult {
|
||||
mailbox: mailbox.to_string(),
|
||||
messages_processed: 0,
|
||||
messages_stored: 0,
|
||||
messages_skipped: 0,
|
||||
last_uid: None,
|
||||
sync_time: start_time,
|
||||
});
|
||||
}
|
||||
|
||||
// Apply max message limit if specified
|
||||
let uids_to_process = if let Some(max) = self.args.max_messages {
|
||||
if message_uids.len() > max as usize {
|
||||
info!(" Limiting to {} messages due to --max-messages flag", max);
|
||||
&message_uids[..max as usize]
|
||||
} else {
|
||||
&message_uids
|
||||
}
|
||||
} else {
|
||||
&message_uids
|
||||
};
|
||||
|
||||
// Fetch and process messages
|
||||
let messages = imap_client.fetch_messages(uids_to_process, self.args.max_messages).await?;
|
||||
|
||||
let mut messages_stored = 0;
|
||||
let mut messages_skipped = 0;
|
||||
let mut last_uid = None;
|
||||
|
||||
for mail_doc in messages {
|
||||
// Apply message filters
|
||||
if !should_process_message(&mail_doc, &source.message_filter) {
|
||||
messages_skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Extract UID before moving the document
|
||||
let uid_str = mail_doc.source_uid.clone();
|
||||
|
||||
// Store the message
|
||||
match self.couch_client.store_mail_document(db_name, mail_doc).await {
|
||||
Ok(_) => {
|
||||
messages_stored += 1;
|
||||
// Parse UID from source_uid
|
||||
if let Ok(uid) = uid_str.parse::<u32>() {
|
||||
last_uid = Some(last_uid.map_or(uid, |prev: u32| prev.max(uid)));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(" Failed to store message {}: {}", uid_str, e);
|
||||
messages_skipped += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update sync metadata
|
||||
if let Some(uid) = last_uid {
|
||||
let sync_metadata = SyncMetadata::new(
|
||||
mailbox.to_string(),
|
||||
start_time,
|
||||
uid,
|
||||
messages_stored,
|
||||
);
|
||||
|
||||
if let Err(e) = self.couch_client.store_sync_metadata(db_name, &sync_metadata).await {
|
||||
warn!(" Failed to store sync metadata: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(MailboxSyncResult {
|
||||
mailbox: mailbox.to_string(),
|
||||
messages_processed: uids_to_process.len() as u32,
|
||||
messages_stored,
|
||||
messages_skipped,
|
||||
last_uid,
|
||||
sync_time: start_time,
|
||||
})
|
||||
}
|
||||
|
||||
/// Print summary of sync results
|
||||
pub fn print_sync_summary(&self, results: &[SourceSyncResult]) {
|
||||
info!("\n🎉 Synchronization completed!");
|
||||
info!("{}", "=".repeat(50));
|
||||
|
||||
let mut total_sources = 0;
|
||||
let mut total_mailboxes = 0;
|
||||
let mut total_messages = 0;
|
||||
|
||||
for result in results {
|
||||
total_sources += 1;
|
||||
total_mailboxes += result.mailboxes_processed;
|
||||
total_messages += result.total_messages;
|
||||
|
||||
info!(
|
||||
"📧 {}: {} mailboxes, {} messages (database: {})",
|
||||
result.source_name,
|
||||
result.mailboxes_processed,
|
||||
result.total_messages,
|
||||
result.database
|
||||
);
|
||||
}
|
||||
|
||||
info!("{}", "=".repeat(50));
|
||||
info!(
|
||||
"📊 Total: {} sources, {} mailboxes, {} messages",
|
||||
total_sources, total_mailboxes, total_messages
|
||||
);
|
||||
|
||||
if let Some(max) = self.args.max_messages {
|
||||
info!("⚠️ Message limit was applied: {} per mailbox", max);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::{CouchDbConfig, FolderFilter, MessageFilter};
|
||||
|
||||
fn create_test_config() -> Config {
|
||||
Config {
|
||||
couch_db: CouchDbConfig {
|
||||
url: "http://localhost:5984".to_string(),
|
||||
user: "admin".to_string(),
|
||||
password: "password".to_string(),
|
||||
},
|
||||
mail_sources: vec![
|
||||
MailSource {
|
||||
name: "Test Account".to_string(),
|
||||
enabled: true,
|
||||
protocol: "imap".to_string(),
|
||||
host: "localhost".to_string(),
|
||||
port: 3143,
|
||||
user: "testuser".to_string(),
|
||||
password: "testpass".to_string(),
|
||||
mode: "archive".to_string(),
|
||||
folder_filter: FolderFilter {
|
||||
include: vec!["*".to_string()],
|
||||
exclude: vec!["Trash".to_string()],
|
||||
},
|
||||
message_filter: MessageFilter::default(),
|
||||
}
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sync_coordinator_creation() {
|
||||
let config = create_test_config();
|
||||
let args = CommandLineArgs {
|
||||
config_path: None,
|
||||
max_messages: Some(10),
|
||||
generate_bash_completion: false,
|
||||
help: false,
|
||||
};
|
||||
|
||||
// This will fail without a real CouchDB connection, but tests the structure
|
||||
let result = SyncCoordinator::new(config, args);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
// Additional integration tests would require running services
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue