mail2couch/rust/src/couch.rs

476 lines
17 KiB
Rust
Raw Normal View History

//! CouchDB client integration for mail2couch
//!
//! This module provides a CouchDB client that handles database operations
//! for storing email messages and sync metadata.
use crate::config::CouchDbConfig;
use crate::schemas::{MailDocument, SyncMetadata};
use anyhow::{anyhow, Result};
use reqwest::{Client, StatusCode};
use serde_json::Value;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum CouchError {
#[error("HTTP request failed: {0}")]
Http(#[from] reqwest::Error),
#[error("CouchDB error: {status} - {message}")]
CouchDb { status: u16, message: String },
#[error("Document not found: {id}")]
NotFound { id: String },
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Database error: {0}")]
Database(String),
}
/// CouchDB client for mail2couch operations
pub struct CouchClient {
client: Client,
base_url: String,
auth: Option<(String, String)>,
}
/// Response from CouchDB for document operations
#[derive(Debug, serde::Deserialize)]
pub struct CouchResponse {
pub ok: Option<bool>,
pub id: Option<String>,
pub rev: Option<String>,
pub error: Option<String>,
pub reason: Option<String>,
}
impl CouchClient {
/// Generic retry helper for CouchDB operations
async fn retry_operation<F, Fut, T>(&self, operation_name: &str, operation: F) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
const MAX_RETRIES: u32 = 3;
const RETRY_DELAY_MS: u64 = 1000;
let mut last_error = None;
for attempt in 1..=MAX_RETRIES {
match operation().await {
Ok(result) => {
if attempt > 1 {
log::debug!("✅ CouchDB {} successful on attempt {}", operation_name, attempt);
}
return Ok(result);
}
Err(e) => {
// Check if this is a retryable error
let is_retryable = match &e.downcast_ref::<CouchError>() {
Some(CouchError::Http(_)) => true, // Network errors are retryable
Some(CouchError::CouchDb { status, .. }) => {
// Retry on server errors (5xx) but not client errors (4xx)
*status >= 500
}
_ => false, // Other errors are not retryable
};
last_error = Some(e);
if is_retryable && attempt < MAX_RETRIES {
log::warn!(
"🔄 CouchDB {} attempt {} failed, retrying in {}ms: {}",
operation_name,
attempt,
RETRY_DELAY_MS,
last_error.as_ref().unwrap()
);
tokio::time::sleep(Duration::from_millis(RETRY_DELAY_MS)).await;
} else {
break;
}
}
}
}
Err(anyhow!(
"CouchDB {} failed after {} attempts. Last error: {}",
operation_name,
MAX_RETRIES,
last_error.unwrap()
))
}
/// Create a new CouchDB client
pub fn new(config: &CouchDbConfig) -> Result<Self> {
let client = Client::new();
let base_url = config.url.trim_end_matches('/').to_string();
let auth = if !config.user.is_empty() {
Some((config.user.clone(), config.password.clone()))
} else {
None
};
Ok(CouchClient {
client,
base_url,
auth,
})
}
/// Test connection to CouchDB
pub async fn test_connection(&self) -> Result<()> {
let url = format!("{}/", self.base_url);
let mut request = self.client.get(&url);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
if response.status().is_success() {
Ok(())
} else {
Err(anyhow!("CouchDB connection failed: {}", response.status()))
}
}
/// Create a database if it doesn't exist
pub async fn create_database(&self, db_name: &str) -> Result<()> {
// First check if database exists
if self.database_exists(db_name).await? {
return Ok(());
}
let url = format!("{}/{}", self.base_url, db_name);
let mut request = self.client.put(&url);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
match response.status() {
StatusCode::CREATED | StatusCode::ACCEPTED => Ok(()),
status => {
let error_text = response.text().await?;
Err(anyhow!("Failed to create database {}: {} - {}", db_name, status, error_text))
}
}
}
/// Check if a database exists
pub async fn database_exists(&self, db_name: &str) -> Result<bool> {
let url = format!("{}/{}", self.base_url, db_name);
let mut request = self.client.head(&url);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
Ok(response.status().is_success())
}
/// Store a mail document in CouchDB with optional attachments and retry logic
pub async fn store_mail_document(&self, db_name: &str, mut document: MailDocument) -> Result<String> {
// Set the document ID if not already set
if document.id.is_none() {
document.set_id();
}
let doc_id = document.id.as_ref().unwrap().clone();
// Check if document already exists to avoid duplicates
if self.document_exists(db_name, &doc_id).await? {
return Ok(doc_id);
}
self.retry_operation("store_mail_document", || async {
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
let mut request = self.client.put(&url).json(&document);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await
.map_err(|e| CouchError::Http(e))?;
match response.status() {
StatusCode::CREATED | StatusCode::ACCEPTED => {
let couch_response: CouchResponse = response.json().await
.map_err(|e| CouchError::Http(e))?;
Ok(couch_response.id.unwrap_or_else(|| doc_id.clone()))
}
status => {
let error_text = response.text().await
.unwrap_or_else(|_| "Failed to read error response".to_string());
Err(CouchError::CouchDb {
status: status.as_u16(),
message: error_text,
}.into())
}
}
}).await
}
/// Store an attachment for a document in CouchDB
pub async fn store_attachment(
&self,
db_name: &str,
doc_id: &str,
attachment_name: &str,
content_type: &str,
data: &[u8],
) -> Result<String> {
// First get the current document revision
let doc_response = self.get_document_rev(db_name, doc_id).await?;
let rev = doc_response.ok_or_else(|| anyhow!("Document {} not found", doc_id))?;
// Upload the attachment
let url = format!("{}/{}/{}/{}?rev={}", self.base_url, db_name, doc_id, attachment_name, rev);
let mut request = self.client
.put(&url)
.header("Content-Type", content_type)
.body(data.to_vec());
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
match response.status() {
StatusCode::CREATED | StatusCode::ACCEPTED => {
let couch_response: CouchResponse = response.json().await?;
Ok(couch_response.rev.unwrap_or_else(|| rev))
}
status => {
let error_text = response.text().await?;
Err(anyhow!("Failed to store attachment {}: {} - {}", attachment_name, status, error_text))
}
}
}
/// Get document revision
async fn get_document_rev(&self, db_name: &str, doc_id: &str) -> Result<Option<String>> {
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
let mut request = self.client.get(&url);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
match response.status() {
StatusCode::OK => {
let doc: Value = response.json().await?;
Ok(doc["_rev"].as_str().map(|s| s.to_string()))
}
StatusCode::NOT_FOUND => Ok(None),
status => {
let error_text = response.text().await?;
Err(anyhow!("Failed to get document {}: {} - {}", doc_id, status, error_text))
}
}
}
/// Store sync metadata in CouchDB
pub async fn store_sync_metadata(&self, db_name: &str, metadata: &SyncMetadata) -> Result<String> {
let doc_id = metadata.id.as_ref().unwrap();
// Try to get existing document first to get the revision
let mut metadata_to_store = metadata.clone();
if let Ok(existing) = self.get_sync_metadata(db_name, &metadata.mailbox).await {
metadata_to_store.rev = existing.rev;
}
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
let mut request = self.client.put(&url).json(&metadata_to_store);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
match response.status() {
StatusCode::CREATED | StatusCode::ACCEPTED => {
let couch_response: CouchResponse = response.json().await?;
Ok(couch_response.id.unwrap_or_else(|| doc_id.clone()))
}
status => {
let error_text = response.text().await?;
Err(anyhow!("Failed to store sync metadata {}: {} - {}", doc_id, status, error_text))
}
}
}
/// Get sync metadata for a mailbox
pub async fn get_sync_metadata(&self, db_name: &str, mailbox: &str) -> Result<SyncMetadata> {
let doc_id = format!("sync_metadata_{}", mailbox);
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
let mut request = self.client.get(&url);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
match response.status() {
StatusCode::OK => {
let metadata: SyncMetadata = response.json().await?;
Ok(metadata)
}
StatusCode::NOT_FOUND => {
Err(CouchError::NotFound { id: doc_id }.into())
}
status => {
let error_text = response.text().await?;
Err(anyhow!("Failed to get sync metadata {}: {} - {}", doc_id, status, error_text))
}
}
}
/// Check if a document exists
pub async fn document_exists(&self, db_name: &str, doc_id: &str) -> Result<bool> {
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
let mut request = self.client.head(&url);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
Ok(response.status() == StatusCode::OK)
}
/// Get database information
pub async fn get_database_info(&self, db_name: &str) -> Result<Value> {
let url = format!("{}/{}", self.base_url, db_name);
let mut request = self.client.get(&url);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
match response.status() {
StatusCode::OK => {
let info: Value = response.json().await?;
Ok(info)
}
status => {
let error_text = response.text().await?;
Err(anyhow!("Failed to get database info for {}: {} - {}", db_name, status, error_text))
}
}
}
/// Get all message UIDs for a specific mailbox from CouchDB
pub async fn get_mailbox_uids(&self, db_name: &str, mailbox: &str) -> Result<Vec<u32>> {
let url = format!("{}/{}/_all_docs", self.base_url, db_name);
let query_params = [
("startkey", format!("\"{}\"", mailbox)),
("endkey", format!("\"{}\\ufff0\"", mailbox)), // High Unicode character for range end
("include_docs", "false".to_string()),
];
let mut request = self.client.get(&url).query(&query_params);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
if !response.status().is_success() {
return Err(anyhow!("Failed to query stored messages: {}", response.status()));
}
let result: serde_json::Value = response.json().await?;
let mut uids = Vec::new();
if let Some(rows) = result["rows"].as_array() {
for row in rows {
if let Some(id) = row["id"].as_str() {
// Parse UID from document ID format: {mailbox}_{uid}
if let Some(uid_str) = id.strip_prefix(&format!("{}_", mailbox)) {
if let Ok(uid) = uid_str.parse::<u32>() {
uids.push(uid);
}
}
}
}
}
Ok(uids)
}
/// Delete a document (used in sync mode for deleted messages)
pub async fn delete_document(&self, db_name: &str, doc_id: &str) -> Result<()> {
// First get the document to get its revision
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
let mut request = self.client.get(&url);
if let Some((username, password)) = &self.auth {
request = request.basic_auth(username, Some(password));
}
let response = request.send().await?;
if response.status() == StatusCode::NOT_FOUND {
return Ok(()); // Document already doesn't exist
}
let doc: Value = response.json().await?;
let rev = doc["_rev"].as_str()
.ok_or_else(|| anyhow!("Document {} has no _rev field", doc_id))?;
// Now delete the document
let delete_url = format!("{}/{}/{}?rev={}", self.base_url, db_name, doc_id, rev);
let mut delete_request = self.client.delete(&delete_url);
if let Some((username, password)) = &self.auth {
delete_request = delete_request.basic_auth(username, Some(password));
}
let delete_response = delete_request.send().await?;
match delete_response.status() {
StatusCode::OK | StatusCode::ACCEPTED => Ok(()),
status => {
let error_text = delete_response.text().await?;
Err(anyhow!("Failed to delete document {}: {} - {}", doc_id, status, error_text))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::CouchDbConfig;
fn test_config() -> CouchDbConfig {
CouchDbConfig {
url: "http://localhost:5984".to_string(),
user: "admin".to_string(),
password: "password".to_string(),
}
}
#[tokio::test]
async fn test_client_creation() {
let config = test_config();
let client = CouchClient::new(&config);
assert!(client.is_ok());
}
// Note: Additional integration tests would require a running CouchDB instance
// These would be similar to the Go implementation tests
}