309 lines
10 KiB
Rust
309 lines
10 KiB
Rust
|
|
//! CouchDB client integration for mail2couch
|
||
|
|
//!
|
||
|
|
//! This module provides a CouchDB client that handles database operations
|
||
|
|
//! for storing email messages and sync metadata.
|
||
|
|
|
||
|
|
use crate::config::CouchDbConfig;
|
||
|
|
use crate::schemas::{MailDocument, SyncMetadata};
|
||
|
|
use anyhow::{anyhow, Result};
|
||
|
|
use reqwest::{Client, StatusCode};
|
||
|
|
use serde_json::Value;
|
||
|
|
use thiserror::Error;
|
||
|
|
|
||
|
|
#[derive(Error, Debug)]
|
||
|
|
pub enum CouchError {
|
||
|
|
#[error("HTTP request failed: {0}")]
|
||
|
|
Http(#[from] reqwest::Error),
|
||
|
|
#[error("CouchDB error: {status} - {message}")]
|
||
|
|
CouchDb { status: u16, message: String },
|
||
|
|
#[error("Document not found: {id}")]
|
||
|
|
NotFound { id: String },
|
||
|
|
#[error("Serialization error: {0}")]
|
||
|
|
Serialization(#[from] serde_json::Error),
|
||
|
|
#[error("Database error: {0}")]
|
||
|
|
Database(String),
|
||
|
|
}
|
||
|
|
|
||
|
|
/// CouchDB client for mail2couch operations
|
||
|
|
pub struct CouchClient {
|
||
|
|
client: Client,
|
||
|
|
base_url: String,
|
||
|
|
auth: Option<(String, String)>,
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Response from CouchDB for document operations
|
||
|
|
#[derive(Debug, serde::Deserialize)]
|
||
|
|
pub struct CouchResponse {
|
||
|
|
pub ok: Option<bool>,
|
||
|
|
pub id: Option<String>,
|
||
|
|
pub rev: Option<String>,
|
||
|
|
pub error: Option<String>,
|
||
|
|
pub reason: Option<String>,
|
||
|
|
}
|
||
|
|
|
||
|
|
impl CouchClient {
|
||
|
|
/// Create a new CouchDB client
|
||
|
|
pub fn new(config: &CouchDbConfig) -> Result<Self> {
|
||
|
|
let client = Client::new();
|
||
|
|
let base_url = config.url.trim_end_matches('/').to_string();
|
||
|
|
let auth = if !config.user.is_empty() {
|
||
|
|
Some((config.user.clone(), config.password.clone()))
|
||
|
|
} else {
|
||
|
|
None
|
||
|
|
};
|
||
|
|
|
||
|
|
Ok(CouchClient {
|
||
|
|
client,
|
||
|
|
base_url,
|
||
|
|
auth,
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Test connection to CouchDB
|
||
|
|
pub async fn test_connection(&self) -> Result<()> {
|
||
|
|
let url = format!("{}/", self.base_url);
|
||
|
|
let mut request = self.client.get(&url);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
|
||
|
|
if response.status().is_success() {
|
||
|
|
Ok(())
|
||
|
|
} else {
|
||
|
|
Err(anyhow!("CouchDB connection failed: {}", response.status()))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Create a database if it doesn't exist
|
||
|
|
pub async fn create_database(&self, db_name: &str) -> Result<()> {
|
||
|
|
// First check if database exists
|
||
|
|
if self.database_exists(db_name).await? {
|
||
|
|
return Ok(());
|
||
|
|
}
|
||
|
|
|
||
|
|
let url = format!("{}/{}", self.base_url, db_name);
|
||
|
|
let mut request = self.client.put(&url);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
|
||
|
|
match response.status() {
|
||
|
|
StatusCode::CREATED | StatusCode::ACCEPTED => Ok(()),
|
||
|
|
status => {
|
||
|
|
let error_text = response.text().await?;
|
||
|
|
Err(anyhow!("Failed to create database {}: {} - {}", db_name, status, error_text))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Check if a database exists
|
||
|
|
pub async fn database_exists(&self, db_name: &str) -> Result<bool> {
|
||
|
|
let url = format!("{}/{}", self.base_url, db_name);
|
||
|
|
let mut request = self.client.head(&url);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
Ok(response.status().is_success())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Store a mail document in CouchDB
|
||
|
|
pub async fn store_mail_document(&self, db_name: &str, mut document: MailDocument) -> Result<String> {
|
||
|
|
// Set the document ID if not already set
|
||
|
|
if document.id.is_none() {
|
||
|
|
document.set_id();
|
||
|
|
}
|
||
|
|
|
||
|
|
let doc_id = document.id.as_ref().unwrap();
|
||
|
|
|
||
|
|
// Check if document already exists to avoid duplicates
|
||
|
|
if self.document_exists(db_name, doc_id).await? {
|
||
|
|
return Ok(doc_id.clone());
|
||
|
|
}
|
||
|
|
|
||
|
|
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
|
||
|
|
let mut request = self.client.put(&url).json(&document);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
|
||
|
|
match response.status() {
|
||
|
|
StatusCode::CREATED | StatusCode::ACCEPTED => {
|
||
|
|
let couch_response: CouchResponse = response.json().await?;
|
||
|
|
Ok(couch_response.id.unwrap_or_else(|| doc_id.clone()))
|
||
|
|
}
|
||
|
|
status => {
|
||
|
|
let error_text = response.text().await?;
|
||
|
|
Err(anyhow!("Failed to store document {}: {} - {}", doc_id, status, error_text))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Store sync metadata in CouchDB
|
||
|
|
pub async fn store_sync_metadata(&self, db_name: &str, metadata: &SyncMetadata) -> Result<String> {
|
||
|
|
let doc_id = metadata.id.as_ref().unwrap();
|
||
|
|
|
||
|
|
// Try to get existing document first to get the revision
|
||
|
|
let mut metadata_to_store = metadata.clone();
|
||
|
|
if let Ok(existing) = self.get_sync_metadata(db_name, &metadata.mailbox).await {
|
||
|
|
metadata_to_store.rev = existing.rev;
|
||
|
|
}
|
||
|
|
|
||
|
|
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
|
||
|
|
let mut request = self.client.put(&url).json(&metadata_to_store);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
|
||
|
|
match response.status() {
|
||
|
|
StatusCode::CREATED | StatusCode::ACCEPTED => {
|
||
|
|
let couch_response: CouchResponse = response.json().await?;
|
||
|
|
Ok(couch_response.id.unwrap_or_else(|| doc_id.clone()))
|
||
|
|
}
|
||
|
|
status => {
|
||
|
|
let error_text = response.text().await?;
|
||
|
|
Err(anyhow!("Failed to store sync metadata {}: {} - {}", doc_id, status, error_text))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Get sync metadata for a mailbox
|
||
|
|
pub async fn get_sync_metadata(&self, db_name: &str, mailbox: &str) -> Result<SyncMetadata> {
|
||
|
|
let doc_id = format!("sync_metadata_{}", mailbox);
|
||
|
|
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
|
||
|
|
let mut request = self.client.get(&url);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
|
||
|
|
match response.status() {
|
||
|
|
StatusCode::OK => {
|
||
|
|
let metadata: SyncMetadata = response.json().await?;
|
||
|
|
Ok(metadata)
|
||
|
|
}
|
||
|
|
StatusCode::NOT_FOUND => {
|
||
|
|
Err(CouchError::NotFound { id: doc_id }.into())
|
||
|
|
}
|
||
|
|
status => {
|
||
|
|
let error_text = response.text().await?;
|
||
|
|
Err(anyhow!("Failed to get sync metadata {}: {} - {}", doc_id, status, error_text))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Check if a document exists
|
||
|
|
pub async fn document_exists(&self, db_name: &str, doc_id: &str) -> Result<bool> {
|
||
|
|
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
|
||
|
|
let mut request = self.client.head(&url);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
Ok(response.status() == StatusCode::OK)
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Get database information
|
||
|
|
pub async fn get_database_info(&self, db_name: &str) -> Result<Value> {
|
||
|
|
let url = format!("{}/{}", self.base_url, db_name);
|
||
|
|
let mut request = self.client.get(&url);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
|
||
|
|
match response.status() {
|
||
|
|
StatusCode::OK => {
|
||
|
|
let info: Value = response.json().await?;
|
||
|
|
Ok(info)
|
||
|
|
}
|
||
|
|
status => {
|
||
|
|
let error_text = response.text().await?;
|
||
|
|
Err(anyhow!("Failed to get database info for {}: {} - {}", db_name, status, error_text))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Delete a document (used in sync mode for deleted messages)
|
||
|
|
pub async fn delete_document(&self, db_name: &str, doc_id: &str) -> Result<()> {
|
||
|
|
// First get the document to get its revision
|
||
|
|
let url = format!("{}/{}/{}", self.base_url, db_name, doc_id);
|
||
|
|
let mut request = self.client.get(&url);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
request = request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let response = request.send().await?;
|
||
|
|
|
||
|
|
if response.status() == StatusCode::NOT_FOUND {
|
||
|
|
return Ok(()); // Document already doesn't exist
|
||
|
|
}
|
||
|
|
|
||
|
|
let doc: Value = response.json().await?;
|
||
|
|
let rev = doc["_rev"].as_str()
|
||
|
|
.ok_or_else(|| anyhow!("Document {} has no _rev field", doc_id))?;
|
||
|
|
|
||
|
|
// Now delete the document
|
||
|
|
let delete_url = format!("{}/{}/{}?rev={}", self.base_url, db_name, doc_id, rev);
|
||
|
|
let mut delete_request = self.client.delete(&delete_url);
|
||
|
|
|
||
|
|
if let Some((username, password)) = &self.auth {
|
||
|
|
delete_request = delete_request.basic_auth(username, Some(password));
|
||
|
|
}
|
||
|
|
|
||
|
|
let delete_response = delete_request.send().await?;
|
||
|
|
|
||
|
|
match delete_response.status() {
|
||
|
|
StatusCode::OK | StatusCode::ACCEPTED => Ok(()),
|
||
|
|
status => {
|
||
|
|
let error_text = delete_response.text().await?;
|
||
|
|
Err(anyhow!("Failed to delete document {}: {} - {}", doc_id, status, error_text))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[cfg(test)]
|
||
|
|
mod tests {
|
||
|
|
use super::*;
|
||
|
|
use crate::config::CouchDbConfig;
|
||
|
|
|
||
|
|
fn test_config() -> CouchDbConfig {
|
||
|
|
CouchDbConfig {
|
||
|
|
url: "http://localhost:5984".to_string(),
|
||
|
|
user: "admin".to_string(),
|
||
|
|
password: "password".to_string(),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[tokio::test]
|
||
|
|
async fn test_client_creation() {
|
||
|
|
let config = test_config();
|
||
|
|
let client = CouchClient::new(&config);
|
||
|
|
assert!(client.is_ok());
|
||
|
|
}
|
||
|
|
|
||
|
|
// Note: Additional integration tests would require a running CouchDB instance
|
||
|
|
// These would be similar to the Go implementation tests
|
||
|
|
}
|