mail2couch/rust/src/sync.rs
Ole-Morten Duesund ee236db3c1 feat: implement server-side IMAP LIST and SEARCH filtering in Rust
Add server-side folder filtering using IMAP LIST patterns and enhance
message filtering to use IMAP SEARCH with keyword filters when available.

Key improvements:
- Add list_filtered_mailboxes() method using IMAP LIST with patterns
- Use server-side filtering instead of client-side folder filtering
- Enhance message search to use IMAP SEARCH for subject/sender keywords
- Add has_keyword_filters() method to MessageFilter
- Reduce network traffic by leveraging IMAP server capabilities
- Remove dependency on client-side filter_folders function

This achieves full feature parity with the updated Go implementation
and ensures both versions use IMAP standards optimally.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-03 14:29:49 +02:00

506 lines
No EOL
18 KiB
Rust

//! Synchronization logic for mail2couch
//!
//! This module coordinates the synchronization process between IMAP servers and CouchDB,
//! implementing incremental sync with metadata tracking.
use crate::config::{Config, MailSource, CommandLineArgs};
use crate::couch::CouchClient;
use crate::filters::{get_filter_summary, validate_folder_patterns};
use crate::imap::{ImapClient, should_process_message};
use crate::schemas::{SyncMetadata, generate_database_name};
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use log::{info, warn, error, debug};
/// Main synchronization coordinator
pub struct SyncCoordinator {
config: Config,
couch_client: CouchClient,
args: CommandLineArgs,
}
/// Result of synchronizing a single mailbox
#[derive(Debug)]
pub struct MailboxSyncResult {
pub mailbox: String,
pub messages_processed: u32,
pub messages_stored: u32,
pub messages_skipped: u32,
pub messages_deleted: u32,
pub last_uid: Option<u32>,
pub sync_time: DateTime<Utc>,
}
/// Result of synchronizing a mail source
#[derive(Debug)]
pub struct SourceSyncResult {
pub source_name: String,
pub database: String,
pub mailboxes_processed: u32,
pub total_messages: u32,
pub mailbox_results: Vec<MailboxSyncResult>,
pub sync_time: DateTime<Utc>,
}
impl SyncCoordinator {
/// Create a new sync coordinator
pub fn new(config: Config, args: CommandLineArgs) -> Result<Self> {
let couch_client = CouchClient::new(&config.couch_db)?;
Ok(SyncCoordinator {
config,
couch_client,
args,
})
}
/// Test connections to all services
pub async fn test_connections(&self) -> Result<()> {
info!("Testing CouchDB connection...");
self.couch_client.test_connection().await
.map_err(|e| anyhow!("CouchDB connection failed: {}", e))?;
info!("✅ CouchDB connection successful");
// Test IMAP connections for enabled sources
for source in &self.config.mail_sources {
if !source.enabled {
continue;
}
info!("Testing IMAP connection to {}...", source.name);
let imap_client = ImapClient::connect(source.clone()).await
.map_err(|e| anyhow!("IMAP connection to {} failed: {}", source.name, e))?;
imap_client.close().await?;
info!("✅ IMAP connection to {} successful", source.name);
}
Ok(())
}
/// Synchronize all enabled mail sources
pub async fn sync_all_sources(&mut self) -> Result<Vec<SourceSyncResult>> {
let mut results = Vec::new();
// Clone the sources to avoid borrowing issues
let sources = self.config.mail_sources.clone();
for source in &sources {
if !source.enabled {
info!("Skipping disabled source: {}", source.name);
continue;
}
info!("Starting sync for source: {}", source.name);
match self.sync_source(source).await {
Ok(result) => {
info!(
"✅ Completed sync for {}: {} messages across {} mailboxes",
result.source_name,
result.total_messages,
result.mailboxes_processed
);
results.push(result);
}
Err(e) => {
error!("❌ Failed to sync source {}: {}", source.name, e);
// Continue with other sources even if one fails
}
}
}
Ok(results)
}
/// Synchronize a single mail source
async fn sync_source(&mut self, source: &MailSource) -> Result<SourceSyncResult> {
let start_time = Utc::now();
// Generate database name
let db_name = generate_database_name(&source.name, &source.user);
info!("Using database: {}", db_name);
// Create database if it doesn't exist
self.couch_client.create_database(&db_name).await?;
// Connect to IMAP server
let mut imap_client = ImapClient::connect(source.clone()).await?;
// Use IMAP LIST with patterns for server-side filtering
let filtered_mailboxes = imap_client.list_filtered_mailboxes(&source.folder_filter).await?;
info!("Found {} matching mailboxes after server-side filtering", filtered_mailboxes.len());
// For validation and summary, we still need the full list
let all_mailboxes = if !source.folder_filter.include.is_empty() || !source.folder_filter.exclude.is_empty() {
// Only fetch all mailboxes if we have filters (for logging/validation)
imap_client.list_mailboxes().await.unwrap_or_else(|_| Vec::new())
} else {
filtered_mailboxes.clone()
};
if !all_mailboxes.is_empty() {
let filter_summary = get_filter_summary(&all_mailboxes, &filtered_mailboxes, &source.folder_filter);
info!("{}", filter_summary);
// Validate folder patterns and show warnings
let warnings = validate_folder_patterns(&source.folder_filter, &all_mailboxes);
for warning in warnings {
warn!("{}", warning);
}
}
// Sync each filtered mailbox
let mut mailbox_results = Vec::new();
let mut total_messages = 0;
for mailbox in &filtered_mailboxes {
info!("Syncing mailbox: {}", mailbox);
match self.sync_mailbox(&mut imap_client, &db_name, mailbox, source).await {
Ok(result) => {
if result.messages_deleted > 0 {
info!(
" ✅ {}: {} processed, {} stored, {} skipped, {} deleted",
result.mailbox,
result.messages_processed,
result.messages_stored,
result.messages_skipped,
result.messages_deleted
);
} else {
info!(
" ✅ {}: {} processed, {} stored, {} skipped",
result.mailbox,
result.messages_processed,
result.messages_stored,
result.messages_skipped
);
}
total_messages += result.messages_processed;
mailbox_results.push(result);
}
Err(e) => {
error!(" ❌ Failed to sync mailbox {}: {}", mailbox, e);
// Continue with other mailboxes
}
}
}
// Close IMAP connection
imap_client.close().await?;
Ok(SourceSyncResult {
source_name: source.name.clone(),
database: db_name,
mailboxes_processed: filtered_mailboxes.len() as u32,
total_messages,
mailbox_results,
sync_time: start_time,
})
}
/// Synchronize a single mailbox
async fn sync_mailbox(
&mut self,
imap_client: &mut ImapClient,
db_name: &str,
mailbox: &str,
source: &MailSource,
) -> Result<MailboxSyncResult> {
let start_time = Utc::now();
// Select the mailbox
let mailbox_info = imap_client.select_mailbox(mailbox).await?;
debug!("Selected mailbox {}: {} messages", mailbox, mailbox_info.exists);
// Get last sync metadata
let since_date = match self.couch_client.get_sync_metadata(db_name, mailbox).await {
Ok(metadata) => {
info!(" Found sync metadata, last sync: {}", metadata.last_sync_time);
Some(metadata.last_sync_time)
}
Err(_) => {
info!(" No sync metadata found, performing full sync");
// Parse since date from message filter if provided
source.message_filter.since.as_ref()
.and_then(|since_str| {
DateTime::parse_from_str(&format!("{} 00:00:00 +0000", since_str), "%Y-%m-%d %H:%M:%S %z")
.map(|dt| dt.with_timezone(&Utc))
.ok()
})
}
};
// Search for messages using server-side IMAP SEARCH with keyword filtering when possible
let message_uids = if source.message_filter.has_keyword_filters() {
// Use advanced IMAP SEARCH with keyword filtering
let subject_keywords = if source.message_filter.subject_keywords.is_empty() {
None
} else {
Some(source.message_filter.subject_keywords.as_slice())
};
let from_keywords = if source.message_filter.sender_keywords.is_empty() {
None
} else {
Some(source.message_filter.sender_keywords.as_slice())
};
info!(" Using IMAP SEARCH with keyword filters");
imap_client.search_messages_advanced(
since_date.as_ref(),
None, // before_date
subject_keywords,
from_keywords,
).await?
} else {
// Use simple date-based search
imap_client.search_messages(since_date.as_ref()).await?
};
info!(" Found {} messages matching search criteria", message_uids.len());
// Handle sync mode - check for deleted messages
let mut messages_deleted = 0;
if source.mode == "sync" {
messages_deleted = self.handle_deleted_messages(db_name, mailbox, &message_uids).await
.unwrap_or_else(|e| {
warn!(" Failed to handle deleted messages: {}", e);
0
});
if messages_deleted > 0 {
info!(" 🗑️ Deleted {} messages that no longer exist on server", messages_deleted);
}
}
if message_uids.is_empty() {
return Ok(MailboxSyncResult {
mailbox: mailbox.to_string(),
messages_processed: 0,
messages_stored: 0,
messages_skipped: 0,
messages_deleted,
last_uid: None,
sync_time: start_time,
});
}
// Apply max message limit if specified
let uids_to_process = if let Some(max) = self.args.max_messages {
if message_uids.len() > max as usize {
info!(" Limiting to {} messages due to --max-messages flag", max);
&message_uids[..max as usize]
} else {
&message_uids
}
} else {
&message_uids
};
// Fetch and process messages
let messages = imap_client.fetch_messages(uids_to_process, self.args.max_messages, mailbox).await?;
let mut messages_stored = 0;
let mut messages_skipped = 0;
let mut last_uid = None;
for (mail_doc, attachments) in messages {
// Apply message filters
if !should_process_message(&mail_doc, &source.message_filter) {
messages_skipped += 1;
continue;
}
// Extract UID before moving the document
let uid_str = mail_doc.source_uid.clone();
// Store the message document first
match self.couch_client.store_mail_document(db_name, mail_doc).await {
Ok(doc_id) => {
messages_stored += 1;
// Store attachments if any exist
if !attachments.is_empty() {
for (filename, content_type, data) in attachments {
match self.couch_client.store_attachment(
db_name,
&doc_id,
&filename,
&content_type,
&data,
).await {
Ok(_) => {
debug!(" Stored attachment: {}", filename);
}
Err(e) => {
warn!(" Failed to store attachment {}: {}", filename, e);
}
}
}
}
// Parse UID from source_uid
if let Ok(uid) = uid_str.parse::<u32>() {
last_uid = Some(last_uid.map_or(uid, |prev: u32| prev.max(uid)));
}
}
Err(e) => {
warn!(" Failed to store message {}: {}", uid_str, e);
messages_skipped += 1;
}
}
}
// Update sync metadata
if let Some(uid) = last_uid {
let sync_metadata = SyncMetadata::new(
mailbox.to_string(),
start_time,
uid,
messages_stored,
);
if let Err(e) = self.couch_client.store_sync_metadata(db_name, &sync_metadata).await {
warn!(" Failed to store sync metadata: {}", e);
}
}
Ok(MailboxSyncResult {
mailbox: mailbox.to_string(),
messages_processed: uids_to_process.len() as u32,
messages_stored,
messages_skipped,
messages_deleted,
last_uid,
sync_time: start_time,
})
}
/// Handle deleted messages in sync mode
/// Compares UIDs from IMAP server with stored messages in CouchDB
/// and deletes messages that no longer exist on the server
async fn handle_deleted_messages(
&mut self,
db_name: &str,
mailbox: &str,
current_server_uids: &[u32],
) -> Result<u32> {
// Get all stored message UIDs for this mailbox from CouchDB
let stored_uids = self.get_stored_message_uids(db_name, mailbox).await?;
if stored_uids.is_empty() {
return Ok(0); // No stored messages to delete
}
// Find UIDs that exist in CouchDB but not on the server
let server_uid_set: std::collections::HashSet<u32> = current_server_uids.iter().cloned().collect();
let mut deleted_count = 0;
for stored_uid in stored_uids {
if !server_uid_set.contains(&stored_uid) {
// This message was deleted from the server, remove it from CouchDB
let doc_id = format!("{}_{}", mailbox, stored_uid);
match self.couch_client.delete_document(db_name, &doc_id).await {
Ok(_) => {
debug!(" Deleted document: {}", doc_id);
deleted_count += 1;
}
Err(e) => {
warn!(" Failed to delete document {}: {}", doc_id, e);
}
}
}
}
Ok(deleted_count)
}
/// Get all stored message UIDs for a mailbox from CouchDB
async fn get_stored_message_uids(&self, db_name: &str, mailbox: &str) -> Result<Vec<u32>> {
// Use the CouchDB client method to get stored UIDs
self.couch_client.get_mailbox_uids(db_name, mailbox).await
}
/// Print summary of sync results
pub fn print_sync_summary(&self, results: &[SourceSyncResult]) {
info!("\n🎉 Synchronization completed!");
info!("{}", "=".repeat(50));
let mut total_sources = 0;
let mut total_mailboxes = 0;
let mut total_messages = 0;
for result in results {
total_sources += 1;
total_mailboxes += result.mailboxes_processed;
total_messages += result.total_messages;
info!(
"📧 {}: {} mailboxes, {} messages (database: {})",
result.source_name,
result.mailboxes_processed,
result.total_messages,
result.database
);
}
info!("{}", "=".repeat(50));
info!(
"📊 Total: {} sources, {} mailboxes, {} messages",
total_sources, total_mailboxes, total_messages
);
if let Some(max) = self.args.max_messages {
info!("⚠️ Message limit was applied: {} per mailbox", max);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{CouchDbConfig, FolderFilter, MessageFilter};
fn create_test_config() -> Config {
Config {
couch_db: CouchDbConfig {
url: "http://localhost:5984".to_string(),
user: "admin".to_string(),
password: "password".to_string(),
},
mail_sources: vec![
MailSource {
name: "Test Account".to_string(),
enabled: true,
protocol: "imap".to_string(),
host: "localhost".to_string(),
port: 3143,
user: "testuser".to_string(),
password: "testpass".to_string(),
mode: "archive".to_string(),
folder_filter: FolderFilter {
include: vec!["*".to_string()],
exclude: vec!["Trash".to_string()],
},
message_filter: MessageFilter::default(),
}
],
}
}
#[test]
fn test_sync_coordinator_creation() {
let config = create_test_config();
let args = CommandLineArgs {
config_path: None,
max_messages: Some(10),
generate_bash_completion: false,
help: false,
};
// This will fail without a real CouchDB connection, but tests the structure
let result = SyncCoordinator::new(config, args);
assert!(result.is_ok());
}
// Additional integration tests would require running services
}