372 lines
11 KiB
Rust

//! Tauri commands for sync queue operations
//!
//! The sync queue stores mutations (favorites, playback progress, etc.)
//! that need to be synced to the Jellyfin server when connectivity is restored.
//! TRACES: UR-002, UR-017, UR-025 | DR-014
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tauri::State;
use super::storage::DatabaseWrapper;
use crate::storage::db_service::{DatabaseService, Query, QueryParam};
/// Sync queue item returned to frontend
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SyncQueueItem {
pub id: i64,
pub user_id: String,
pub operation: String,
pub item_id: Option<String>,
pub payload: Option<String>,
pub status: String,
pub retry_count: i32,
pub created_at: Option<String>,
pub error_message: Option<String>,
}
/// Queue a mutation for sync to server
#[tauri::command]
pub async fn sync_queue_mutation(
db: State<'_, DatabaseWrapper>,
user_id: String,
operation: String,
item_id: Option<String>,
payload: Option<String>,
) -> Result<i64, String> {
let db_service = {
let database = db.0.lock().map_err(|e| e.to_string())?;
Arc::new(database.service())
};
let query = Query::with_params(
"INSERT INTO sync_queue (user_id, operation, item_id, payload, status, created_at)
VALUES (?, ?, ?, ?, 'pending', CURRENT_TIMESTAMP)",
vec![
QueryParam::String(user_id),
QueryParam::String(operation),
item_id.map(QueryParam::String).unwrap_or(QueryParam::Null),
payload.map(QueryParam::String).unwrap_or(QueryParam::Null),
],
);
db_service.execute(query).await.map_err(|e| e.to_string())?;
let id = db_service.last_insert_rowid().await.map_err(|e| e.to_string())?;
Ok(id)
}
/// Get all pending sync operations for a user
#[tauri::command]
pub async fn sync_get_pending(
db: State<'_, DatabaseWrapper>,
user_id: String,
limit: Option<i32>,
) -> Result<Vec<SyncQueueItem>, String> {
let db_service = {
let database = db.0.lock().map_err(|e| e.to_string())?;
Arc::new(database.service())
};
let sql = if let Some(l) = limit {
format!(
"SELECT id, user_id, operation, item_id, payload, status, retry_count, created_at, error_message
FROM sync_queue
WHERE user_id = ? AND status IN ('pending', 'failed')
ORDER BY created_at ASC
LIMIT {}",
l
)
} else {
"SELECT id, user_id, operation, item_id, payload, status, retry_count, created_at, error_message
FROM sync_queue
WHERE user_id = ? AND status IN ('pending', 'failed')
ORDER BY created_at ASC".to_string()
};
let query = Query::with_params(sql, vec![QueryParam::String(user_id)]);
db_service
.query_many(query, |row| {
Ok(SyncQueueItem {
id: row.get(0)?,
user_id: row.get(1)?,
operation: row.get(2)?,
item_id: row.get(3)?,
payload: row.get(4)?,
status: row.get(5)?,
retry_count: row.get(6)?,
created_at: row.get(7)?,
error_message: row.get(8)?,
})
})
.await
.map_err(|e| e.to_string())
}
/// Mark a sync operation as in progress
#[tauri::command]
pub async fn sync_mark_processing(
db: State<'_, DatabaseWrapper>,
id: i64,
) -> Result<(), String> {
let db_service = {
let database = db.0.lock().map_err(|e| e.to_string())?;
Arc::new(database.service())
};
let query = Query::with_params(
"UPDATE sync_queue SET status = 'processing' WHERE id = ?",
vec![QueryParam::Int64(id)],
);
db_service.execute(query).await.map_err(|e| e.to_string())?;
Ok(())
}
/// Mark a sync operation as completed
#[tauri::command]
pub async fn sync_mark_completed(
db: State<'_, DatabaseWrapper>,
id: i64,
) -> Result<(), String> {
let db_service = {
let database = db.0.lock().map_err(|e| e.to_string())?;
Arc::new(database.service())
};
let query = Query::with_params(
"UPDATE sync_queue SET status = 'completed', processed_at = CURRENT_TIMESTAMP WHERE id = ?",
vec![QueryParam::Int64(id)],
);
db_service.execute(query).await.map_err(|e| e.to_string())?;
Ok(())
}
/// Mark a sync operation as failed with error message
#[tauri::command]
pub async fn sync_mark_failed(
db: State<'_, DatabaseWrapper>,
id: i64,
error: String,
) -> Result<(), String> {
let db_service = {
let database = db.0.lock().map_err(|e| e.to_string())?;
Arc::new(database.service())
};
let query = Query::with_params(
"UPDATE sync_queue
SET status = 'failed',
retry_count = retry_count + 1,
error_message = ?,
processed_at = CURRENT_TIMESTAMP
WHERE id = ?",
vec![QueryParam::String(error), QueryParam::Int64(id)],
);
db_service.execute(query).await.map_err(|e| e.to_string())?;
Ok(())
}
/// Get count of pending sync operations for a user
#[tauri::command]
pub async fn sync_get_pending_count(
db: State<'_, DatabaseWrapper>,
user_id: String,
) -> Result<i32, String> {
let db_service = {
let database = db.0.lock().map_err(|e| e.to_string())?;
Arc::new(database.service())
};
let query = Query::with_params(
"SELECT COUNT(*) FROM sync_queue WHERE user_id = ? AND status IN ('pending', 'failed')",
vec![QueryParam::String(user_id)],
);
db_service
.query_one(query, |row| row.get(0))
.await
.map_err(|e| e.to_string())
}
/// Delete completed sync operations older than specified days
#[tauri::command]
pub async fn sync_cleanup_completed(
db: State<'_, DatabaseWrapper>,
days_old: i32,
) -> Result<i32, String> {
let db_service = {
let database = db.0.lock().map_err(|e| e.to_string())?;
Arc::new(database.service())
};
let query = Query::with_params(
"DELETE FROM sync_queue
WHERE status = 'completed'
AND processed_at < datetime('now', ?)",
vec![QueryParam::String(format!("-{} days", days_old))],
);
let deleted = db_service.execute(query).await.map_err(|e| e.to_string())?;
Ok(deleted as i32)
}
/// Delete all sync operations for a user (used during logout)
#[tauri::command]
pub async fn sync_clear_user(
db: State<'_, DatabaseWrapper>,
user_id: String,
) -> Result<(), String> {
let db_service = {
let database = db.0.lock().map_err(|e| e.to_string())?;
Arc::new(database.service())
};
let query = Query::with_params(
"DELETE FROM sync_queue WHERE user_id = ?",
vec![QueryParam::String(user_id)],
);
db_service.execute(query).await.map_err(|e| e.to_string())?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sync_queue_item_serialization() {
let item = SyncQueueItem {
id: 1,
user_id: "user-123".to_string(),
operation: "favorite".to_string(),
item_id: Some("item-456".to_string()),
payload: Some(r#"{"isFavorite": true}"#.to_string()),
status: "pending".to_string(),
retry_count: 0,
created_at: Some("2024-02-14T08:00:00Z".to_string()),
error_message: None,
};
// Should serialize successfully
let json = serde_json::to_string(&item);
assert!(json.is_ok());
let serialized = json.unwrap();
assert!(serialized.contains("user-123"));
assert!(serialized.contains("favorite"));
assert!(serialized.contains("pending"));
}
#[test]
fn test_sync_queue_item_with_error() {
let item = SyncQueueItem {
id: 2,
user_id: "user-789".to_string(),
operation: "update_progress".to_string(),
item_id: Some("item-999".to_string()),
payload: None,
status: "failed".to_string(),
retry_count: 3,
created_at: Some("2024-02-14T07:00:00Z".to_string()),
error_message: Some("Connection timeout".to_string()),
};
let json = serde_json::to_string(&item).unwrap();
assert!(json.contains("failed"));
assert!(json.contains("Connection timeout"));
assert!(json.contains("3")); // retry_count
}
#[test]
fn test_sync_queue_item_without_optional_fields() {
let item = SyncQueueItem {
id: 3,
user_id: "user-000".to_string(),
operation: "clear_progress".to_string(),
item_id: None,
payload: None,
status: "completed".to_string(),
retry_count: 0,
created_at: None,
error_message: None,
};
let json = serde_json::to_string(&item).unwrap();
assert!(json.contains("completed"));
assert!(json.contains("null") || json.contains("\"itemId\":null"));
}
#[test]
fn test_sync_status_values() {
// Verify all expected status values
let valid_statuses = vec!["pending", "processing", "completed", "failed"];
for status in valid_statuses {
let item = SyncQueueItem {
id: 1,
user_id: "test".to_string(),
operation: "test".to_string(),
item_id: None,
payload: None,
status: status.to_string(),
retry_count: 0,
created_at: None,
error_message: None,
};
let json = serde_json::to_string(&item).unwrap();
assert!(json.contains(status));
}
}
#[test]
fn test_query_param_generation() {
// Test QueryParam generation for sync operations
let user_id = "user-123".to_string();
let operation = "favorite".to_string();
let params: Vec<QueryParam> = vec![
QueryParam::String(user_id.clone()),
QueryParam::String(operation.clone()),
QueryParam::Null,
QueryParam::Null,
];
assert_eq!(params.len(), 4);
assert!(matches!(params[0], QueryParam::String(_)));
assert!(matches!(params[1], QueryParam::String(_)));
assert!(matches!(params[2], QueryParam::Null));
assert!(matches!(params[3], QueryParam::Null));
}
#[test]
fn test_retry_count_increment() {
// Verify retry count management
let mut item = SyncQueueItem {
id: 1,
user_id: "user-123".to_string(),
operation: "favorite".to_string(),
item_id: None,
payload: None,
status: "pending".to_string(),
retry_count: 0,
created_at: None,
error_message: None,
};
// Simulate retries
for i in 1..=5 {
item.retry_count = i;
item.status = if i < 3 { "pending" } else { "failed" }.to_string();
assert!(item.retry_count == i);
}
}
}