1053 lines
37 KiB
Rust
1053 lines
37 KiB
Rust
// Hybrid repository - parallel racing between cache and server
|
|
//
|
|
// @req: UR-002 - Access media when online or offline
|
|
// @req: IR-013 - SQLite integration for local database
|
|
// @req: DR-012 - Local database for media metadata cache
|
|
// @req: DR-013 - Repository pattern for online/offline data access
|
|
|
|
use std::sync::Arc;
|
|
|
|
use async_trait::async_trait;
|
|
use log::{debug, warn};
|
|
use tokio::time::{timeout, Duration};
|
|
|
|
use super::{MediaRepository, OnlineRepository, OfflineRepository, types::*};
|
|
|
|
/// Hybrid repository combining online and offline data sources
|
|
///
|
|
/// Uses cache-first parallel racing strategy:
|
|
/// - Runs SQLite cache and HTTP server queries in parallel
|
|
/// - Cache has 100ms timeout for fast feedback
|
|
/// - Returns cache result if it has meaningful content
|
|
/// - Falls back to server result if cache is empty/stale
|
|
///
|
|
/// @req: UR-002 - Access media when online or offline
|
|
/// @req: DR-012 - Local database for media metadata cache
|
|
/// @req: DR-013 - Repository pattern for online/offline data access
|
|
pub struct HybridRepository {
|
|
online: Arc<OnlineRepository>,
|
|
offline: Arc<OfflineRepository>,
|
|
}
|
|
|
|
impl HybridRepository {
|
|
pub fn new(online: OnlineRepository, offline: OfflineRepository) -> Self {
|
|
Self {
|
|
online: Arc::new(online),
|
|
offline: Arc::new(offline),
|
|
}
|
|
}
|
|
|
|
/// Download raw bytes from a URL using the shared authenticated HTTP client.
|
|
/// Delegates to online repository for connection reuse and proper auth.
|
|
pub async fn download_bytes(&self, url: &str) -> Result<Vec<u8>, String> {
|
|
self.online.download_bytes(url).await
|
|
}
|
|
|
|
/// Get video stream URL with optional seeking support.
|
|
/// This method is online-only since offline playback uses local file paths.
|
|
pub async fn get_video_stream_url(
|
|
&self,
|
|
item_id: &str,
|
|
media_source_id: Option<&str>,
|
|
start_time_seconds: Option<f64>,
|
|
audio_stream_index: Option<i32>,
|
|
) -> Result<String, RepoError> {
|
|
self.online.get_video_stream_url(item_id, media_source_id, start_time_seconds, audio_stream_index).await
|
|
}
|
|
|
|
/// Race cache vs server, return first valid result
|
|
/// Prefer cache if it has meaningful content, otherwise use server
|
|
///
|
|
/// Core algorithm of the cache-first parallel racing strategy.
|
|
/// Runs both cache and server queries concurrently, then:
|
|
/// 1. If cache has meaningful content → return cache (fast path)
|
|
/// 2. If cache is empty/stale → return server (fresh data)
|
|
/// 3. If server fails → return cache even if empty (offline fallback)
|
|
///
|
|
/// @req: UR-002 - Access media when online or offline
|
|
/// @req: DR-013 - Repository pattern for online/offline data access
|
|
async fn parallel_race<T, F1, F2>(
|
|
&self,
|
|
cache_future: F1,
|
|
server_future: F2,
|
|
) -> Result<T, RepoError>
|
|
where
|
|
T: MeaningfulContent + Clone + Send + 'static,
|
|
F1: std::future::Future<Output = Result<T, RepoError>> + Send,
|
|
F2: std::future::Future<Output = Result<T, RepoError>> + Send,
|
|
{
|
|
// Wait for both to complete (cache has 100ms timeout)
|
|
let (cache_result, server_result) = tokio::join!(cache_future, server_future);
|
|
|
|
// Prefer cache if it has meaningful content
|
|
if let Ok(data) = &cache_result {
|
|
if data.has_content() {
|
|
debug!("[HybridRepo] Using cache result (has content)");
|
|
return Ok(data.clone());
|
|
}
|
|
}
|
|
|
|
// Fall back to server result
|
|
match server_result {
|
|
Ok(data) => {
|
|
debug!("[HybridRepo] Using server result");
|
|
// TODO: Spawn background cache update
|
|
Ok(data)
|
|
}
|
|
Err(e) => {
|
|
// Server failed, try to return cache even if empty
|
|
cache_result.or(Err(e))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Simple timeout wrapper for cache queries (100ms timeout)
|
|
///
|
|
/// @req: DR-013 - Repository pattern (cache-first with timeout)
|
|
async fn cache_with_timeout<T>(
|
|
&self,
|
|
future: impl std::future::Future<Output = Result<T, RepoError>> + Send,
|
|
) -> Result<T, RepoError> {
|
|
timeout(Duration::from_millis(100), future)
|
|
.await
|
|
.unwrap_or_else(|_| Err(RepoError::Database {
|
|
message: "Cache query timeout".to_string(),
|
|
}))
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl MediaRepository for HybridRepository {
|
|
async fn get_libraries(&self) -> Result<Vec<Library>, RepoError> {
|
|
// Libraries change infrequently, try cache first with fast timeout
|
|
let cache_future = self.cache_with_timeout(self.offline.get_libraries());
|
|
let server_future = self.online.get_libraries();
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_items(&self, parent_id: &str, options: Option<GetItemsOptions>) -> Result<SearchResult, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let offline_for_save = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let parent_id = parent_id.to_string();
|
|
let parent_id_clone = parent_id.clone();
|
|
let parent_id_for_save = parent_id.clone();
|
|
let opts_clone = options.clone();
|
|
|
|
// Check cache first to see if we have data
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_items(&parent_id, opts_clone).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_items(&parent_id_clone, options).await
|
|
};
|
|
|
|
// Wait for both, prefer cache if available
|
|
let (cache_result, server_result) = tokio::join!(cache_future, server_future);
|
|
|
|
// Check if cache had meaningful content
|
|
let cache_had_content = cache_result.as_ref()
|
|
.map(|data| data.has_content())
|
|
.unwrap_or(false);
|
|
|
|
// Prefer cache if it has content
|
|
let result = if cache_had_content {
|
|
debug!("[HybridRepo] Using cached data for parent {}", &parent_id_for_save[..8.min(parent_id_for_save.len())]);
|
|
cache_result?
|
|
} else {
|
|
// Use server result and save to cache for next time
|
|
let server_data = server_result?;
|
|
|
|
if !server_data.items.is_empty() {
|
|
let items_clone = server_data.items.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = offline_for_save.save_to_cache(&parent_id_for_save, &items_clone).await {
|
|
warn!("[HybridRepo] Failed to save {} items to cache: {:?}", items_clone.len(), e);
|
|
} else {
|
|
debug!("[HybridRepo] Saved {} items to cache for parent {}", items_clone.len(), &parent_id_for_save[..8.min(parent_id_for_save.len())]);
|
|
}
|
|
});
|
|
}
|
|
|
|
server_data
|
|
};
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
async fn get_item(&self, item_id: &str) -> Result<MediaItem, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let item_id = item_id.to_string();
|
|
let item_id_clone = item_id.clone();
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_item(&item_id).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_item(&item_id_clone).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_latest_items(&self, parent_id: &str, limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let parent_id = parent_id.to_string();
|
|
let parent_id_clone = parent_id.clone();
|
|
let limit_clone = limit;
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_latest_items(&parent_id, limit).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_latest_items(&parent_id_clone, limit_clone).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_resume_items(&self, parent_id: Option<&str>, limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let parent_id_str = parent_id.map(|s| s.to_string());
|
|
let parent_id_clone = parent_id_str.clone();
|
|
let limit_clone = limit;
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_resume_items(parent_id_str.as_deref(), limit).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_resume_items(parent_id_clone.as_deref(), limit_clone).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_next_up_episodes(&self, series_id: Option<&str>, limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
// Next up is dynamic, always fetch from server
|
|
self.online.get_next_up_episodes(series_id, limit).await
|
|
}
|
|
|
|
async fn get_recently_played_audio(&self, limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let limit_clone = limit;
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_recently_played_audio(limit).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_recently_played_audio(limit_clone).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_resume_movies(&self, limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let limit_clone = limit;
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_resume_movies(limit).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_resume_movies(limit_clone).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_genres(&self, parent_id: Option<&str>) -> Result<Vec<Genre>, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let parent_id_str = parent_id.map(|s| s.to_string());
|
|
let parent_id_clone = parent_id_str.clone();
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_genres(parent_id_str.as_deref()).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_genres(parent_id_clone.as_deref()).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn search(&self, query: &str, options: Option<SearchOptions>) -> Result<SearchResult, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let query = query.to_string();
|
|
let query_clone = query.clone();
|
|
let opts_clone = options.clone();
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.search(&query, opts_clone).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.search(&query_clone, options).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_playback_info(&self, item_id: &str) -> Result<PlaybackInfo, RepoError> {
|
|
// Playback info requires server communication for transcoding decisions
|
|
self.online.get_playback_info(item_id).await
|
|
}
|
|
|
|
async fn get_audio_stream_url(&self, item_id: &str) -> Result<String, RepoError> {
|
|
// Stream URLs require server communication - delegate to online repository
|
|
self.online.get_audio_stream_url(item_id).await
|
|
}
|
|
|
|
async fn report_playback_start(&self, item_id: &str, position_ticks: i64) -> Result<(), RepoError> {
|
|
// Playback reporting goes directly to server
|
|
self.online.report_playback_start(item_id, position_ticks).await
|
|
}
|
|
|
|
async fn report_playback_progress(&self, item_id: &str, position_ticks: i64) -> Result<(), RepoError> {
|
|
// Playback reporting goes directly to server
|
|
self.online.report_playback_progress(item_id, position_ticks).await
|
|
}
|
|
|
|
async fn report_playback_stopped(&self, item_id: &str, position_ticks: i64) -> Result<(), RepoError> {
|
|
// Playback reporting goes directly to server
|
|
self.online.report_playback_stopped(item_id, position_ticks).await
|
|
}
|
|
|
|
fn get_image_url(&self, item_id: &str, image_type: ImageType, options: Option<ImageOptions>) -> String {
|
|
// Always use online URL for images (thumbnail cache handles offline)
|
|
self.online.get_image_url(item_id, image_type, options)
|
|
}
|
|
|
|
fn get_subtitle_url(
|
|
&self,
|
|
item_id: &str,
|
|
media_source_id: &str,
|
|
stream_index: i32,
|
|
format: &str,
|
|
) -> String {
|
|
// Always use online URL for subtitles
|
|
self.online.get_subtitle_url(item_id, media_source_id, stream_index, format)
|
|
}
|
|
|
|
fn get_video_download_url(
|
|
&self,
|
|
item_id: &str,
|
|
quality: &str,
|
|
media_source_id: Option<&str>,
|
|
) -> String {
|
|
// Always use online URL for downloads
|
|
self.online.get_video_download_url(item_id, quality, media_source_id)
|
|
}
|
|
|
|
async fn mark_favorite(&self, item_id: &str) -> Result<(), RepoError> {
|
|
// Write operations go directly to server
|
|
self.online.mark_favorite(item_id).await
|
|
}
|
|
|
|
async fn unmark_favorite(&self, item_id: &str) -> Result<(), RepoError> {
|
|
// Write operations go directly to server
|
|
self.online.unmark_favorite(item_id).await
|
|
}
|
|
|
|
async fn get_person(&self, person_id: &str) -> Result<MediaItem, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let person_id = person_id.to_string();
|
|
let person_id_clone = person_id.clone();
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_person(&person_id).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_person(&person_id_clone).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_items_by_person(&self, person_id: &str, options: Option<GetItemsOptions>) -> Result<SearchResult, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let person_id = person_id.to_string();
|
|
let person_id_clone = person_id.clone();
|
|
let opts_clone = options.clone();
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_items_by_person(&person_id, opts_clone).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_items_by_person(&person_id_clone, options).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
async fn get_similar_items(&self, item_id: &str, limit: Option<usize>) -> Result<SearchResult, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let item_id = item_id.to_string();
|
|
let item_id_clone = item_id.clone();
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_similar_items(&item_id, limit).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_similar_items(&item_id_clone, limit).await
|
|
};
|
|
|
|
self.parallel_race(cache_future, server_future).await
|
|
}
|
|
|
|
// ===== Playlist Methods =====
|
|
|
|
async fn create_playlist(
|
|
&self,
|
|
name: &str,
|
|
item_ids: &[String],
|
|
) -> Result<PlaylistCreatedResult, RepoError> {
|
|
// Write operation - delegate directly to server
|
|
self.online.create_playlist(name, item_ids).await
|
|
}
|
|
|
|
async fn delete_playlist(&self, playlist_id: &str) -> Result<(), RepoError> {
|
|
// Write operation - delegate directly to server
|
|
self.online.delete_playlist(playlist_id).await
|
|
}
|
|
|
|
async fn rename_playlist(&self, playlist_id: &str, name: &str) -> Result<(), RepoError> {
|
|
// Write operation - delegate directly to server
|
|
self.online.rename_playlist(playlist_id, name).await
|
|
}
|
|
|
|
async fn get_playlist_items(
|
|
&self,
|
|
playlist_id: &str,
|
|
) -> Result<Vec<PlaylistEntry>, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let offline_for_save = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let playlist_id = playlist_id.to_string();
|
|
let playlist_id_clone = playlist_id.clone();
|
|
let playlist_id_for_save = playlist_id.clone();
|
|
|
|
let cache_future = self.cache_with_timeout(async move {
|
|
offline.get_playlist_items(&playlist_id).await
|
|
});
|
|
|
|
let server_future = async move {
|
|
online.get_playlist_items(&playlist_id_clone).await
|
|
};
|
|
|
|
let (cache_result, server_result) = tokio::join!(cache_future, server_future);
|
|
|
|
let cache_had_content = cache_result.as_ref()
|
|
.map(|data| data.has_content())
|
|
.unwrap_or(false);
|
|
|
|
if cache_had_content {
|
|
// If server also succeeded, update cache in background
|
|
if let Ok(server_entries) = server_result {
|
|
tokio::spawn(async move {
|
|
if let Err(e) = offline_for_save.save_playlist_items_to_cache(&playlist_id_for_save, &server_entries).await {
|
|
warn!("[HybridRepo] Failed to update playlist cache: {:?}", e);
|
|
}
|
|
});
|
|
}
|
|
return cache_result;
|
|
}
|
|
|
|
// Cache miss - use server result
|
|
match server_result {
|
|
Ok(entries) => {
|
|
let entries_clone = entries.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = offline_for_save.save_playlist_items_to_cache(&playlist_id_for_save, &entries_clone).await {
|
|
warn!("[HybridRepo] Failed to save playlist items to cache: {:?}", e);
|
|
}
|
|
});
|
|
Ok(entries)
|
|
}
|
|
Err(e) => cache_result.or(Err(e)),
|
|
}
|
|
}
|
|
|
|
async fn add_to_playlist(
|
|
&self,
|
|
playlist_id: &str,
|
|
item_ids: &[String],
|
|
) -> Result<(), RepoError> {
|
|
// Write operation - delegate directly to server
|
|
self.online.add_to_playlist(playlist_id, item_ids).await
|
|
}
|
|
|
|
async fn remove_from_playlist(
|
|
&self,
|
|
playlist_id: &str,
|
|
entry_ids: &[String],
|
|
) -> Result<(), RepoError> {
|
|
// Write operation - delegate directly to server
|
|
self.online.remove_from_playlist(playlist_id, entry_ids).await
|
|
}
|
|
|
|
async fn move_playlist_item(
|
|
&self,
|
|
playlist_id: &str,
|
|
item_id: &str,
|
|
new_index: u32,
|
|
) -> Result<(), RepoError> {
|
|
// Write operation - delegate directly to server
|
|
self.online.move_playlist_item(playlist_id, item_id, new_index).await
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::sync::Mutex;
|
|
|
|
/// Mock offline repository that tracks queries and saves
|
|
struct MockOfflineRepo {
|
|
items: Arc<Mutex<Vec<MediaItem>>>,
|
|
query_count: Arc<Mutex<usize>>,
|
|
save_count: Arc<Mutex<usize>>,
|
|
}
|
|
|
|
impl MockOfflineRepo {
|
|
fn new() -> Self {
|
|
Self {
|
|
items: Arc::new(Mutex::new(Vec::new())),
|
|
query_count: Arc::new(Mutex::new(0)),
|
|
save_count: Arc::new(Mutex::new(0)),
|
|
}
|
|
}
|
|
|
|
fn get_query_count(&self) -> usize {
|
|
*self.query_count.lock().unwrap()
|
|
}
|
|
|
|
fn get_save_count(&self) -> usize {
|
|
*self.save_count.lock().unwrap()
|
|
}
|
|
|
|
async fn save_to_cache(&self, _parent_id: &str, items: &[MediaItem]) -> Result<usize, RepoError> {
|
|
*self.save_count.lock().unwrap() += 1;
|
|
*self.items.lock().unwrap() = items.to_vec();
|
|
Ok(items.len())
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl MediaRepository for MockOfflineRepo {
|
|
async fn get_libraries(&self) -> Result<Vec<Library>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_items(&self, _parent_id: &str, _options: Option<GetItemsOptions>) -> Result<SearchResult, RepoError> {
|
|
*self.query_count.lock().unwrap() += 1;
|
|
let items = self.items.lock().unwrap().clone();
|
|
let count = items.len();
|
|
Ok(SearchResult {
|
|
items,
|
|
total_record_count: count,
|
|
})
|
|
}
|
|
|
|
async fn get_item(&self, _item_id: &str) -> Result<MediaItem, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_latest_items(&self, _parent_id: &str, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_resume_items(&self, _parent_id: Option<&str>, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_next_up_episodes(&self, _series_id: Option<&str>, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_recently_played_audio(&self, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_resume_movies(&self, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_genres(&self, _parent_id: Option<&str>) -> Result<Vec<Genre>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn search(&self, _query: &str, _options: Option<SearchOptions>) -> Result<SearchResult, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_playback_info(&self, _item_id: &str) -> Result<PlaybackInfo, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_audio_stream_url(&self, _item_id: &str) -> Result<String, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn report_playback_start(&self, _item_id: &str, _position_ticks: i64) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn report_playback_progress(&self, _item_id: &str, _position_ticks: i64) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn report_playback_stopped(&self, _item_id: &str, _position_ticks: i64) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
fn get_image_url(&self, _item_id: &str, _image_type: ImageType, _options: Option<ImageOptions>) -> String {
|
|
unimplemented!()
|
|
}
|
|
|
|
fn get_subtitle_url(
|
|
&self,
|
|
_item_id: &str,
|
|
_media_source_id: &str,
|
|
_stream_index: i32,
|
|
_format: &str,
|
|
) -> String {
|
|
unimplemented!()
|
|
}
|
|
|
|
fn get_video_download_url(
|
|
&self,
|
|
_item_id: &str,
|
|
_quality: &str,
|
|
_media_source_id: Option<&str>,
|
|
) -> String {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn mark_favorite(&self, _item_id: &str) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn unmark_favorite(&self, _item_id: &str) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_person(&self, _person_id: &str) -> Result<MediaItem, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_items_by_person(&self, _person_id: &str, _options: Option<GetItemsOptions>) -> Result<SearchResult, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_similar_items(&self, _item_id: &str, _limit: Option<usize>) -> Result<SearchResult, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn create_playlist(&self, _name: &str, _item_ids: &[String]) -> Result<PlaylistCreatedResult, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn delete_playlist(&self, _playlist_id: &str) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn rename_playlist(&self, _playlist_id: &str, _name: &str) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_playlist_items(&self, _playlist_id: &str) -> Result<Vec<PlaylistEntry>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn add_to_playlist(&self, _playlist_id: &str, _item_ids: &[String]) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn remove_from_playlist(&self, _playlist_id: &str, _entry_ids: &[String]) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn move_playlist_item(&self, _playlist_id: &str, _item_id: &str, _new_index: u32) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
}
|
|
|
|
/// Mock online repository that returns predefined items
|
|
struct MockOnlineRepo {
|
|
items: Vec<MediaItem>,
|
|
query_count: Arc<Mutex<usize>>,
|
|
}
|
|
|
|
impl MockOnlineRepo {
|
|
fn new(items: Vec<MediaItem>) -> Self {
|
|
Self {
|
|
items,
|
|
query_count: Arc::new(Mutex::new(0)),
|
|
}
|
|
}
|
|
|
|
fn get_query_count(&self) -> usize {
|
|
*self.query_count.lock().unwrap()
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl MediaRepository for MockOnlineRepo {
|
|
async fn get_libraries(&self) -> Result<Vec<Library>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_items(&self, _parent_id: &str, _options: Option<GetItemsOptions>) -> Result<SearchResult, RepoError> {
|
|
*self.query_count.lock().unwrap() += 1;
|
|
Ok(SearchResult {
|
|
items: self.items.clone(),
|
|
total_record_count: self.items.len(),
|
|
})
|
|
}
|
|
|
|
async fn get_item(&self, _item_id: &str) -> Result<MediaItem, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_latest_items(&self, _parent_id: &str, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_resume_items(&self, _parent_id: Option<&str>, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_next_up_episodes(&self, _series_id: Option<&str>, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_recently_played_audio(&self, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_resume_movies(&self, _limit: Option<usize>) -> Result<Vec<MediaItem>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_genres(&self, _parent_id: Option<&str>) -> Result<Vec<Genre>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn search(&self, _query: &str, _options: Option<SearchOptions>) -> Result<SearchResult, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_playback_info(&self, _item_id: &str) -> Result<PlaybackInfo, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_audio_stream_url(&self, _item_id: &str) -> Result<String, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn report_playback_start(&self, _item_id: &str, _position_ticks: i64) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn report_playback_progress(&self, _item_id: &str, _position_ticks: i64) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn report_playback_stopped(&self, _item_id: &str, _position_ticks: i64) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
fn get_image_url(&self, _item_id: &str, _image_type: ImageType, _options: Option<ImageOptions>) -> String {
|
|
unimplemented!()
|
|
}
|
|
|
|
fn get_subtitle_url(
|
|
&self,
|
|
_item_id: &str,
|
|
_media_source_id: &str,
|
|
_stream_index: i32,
|
|
_format: &str,
|
|
) -> String {
|
|
unimplemented!()
|
|
}
|
|
|
|
fn get_video_download_url(
|
|
&self,
|
|
_item_id: &str,
|
|
_quality: &str,
|
|
_media_source_id: Option<&str>,
|
|
) -> String {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn mark_favorite(&self, _item_id: &str) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn unmark_favorite(&self, _item_id: &str) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_person(&self, _person_id: &str) -> Result<MediaItem, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_items_by_person(&self, _person_id: &str, _options: Option<GetItemsOptions>) -> Result<SearchResult, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_similar_items(&self, _item_id: &str, _limit: Option<usize>) -> Result<SearchResult, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn create_playlist(&self, _name: &str, _item_ids: &[String]) -> Result<PlaylistCreatedResult, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn delete_playlist(&self, _playlist_id: &str) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn rename_playlist(&self, _playlist_id: &str, _name: &str) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn get_playlist_items(&self, _playlist_id: &str) -> Result<Vec<PlaylistEntry>, RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn add_to_playlist(&self, _playlist_id: &str, _item_ids: &[String]) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn remove_from_playlist(&self, _playlist_id: &str, _entry_ids: &[String]) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
|
|
async fn move_playlist_item(&self, _playlist_id: &str, _item_id: &str, _new_index: u32) -> Result<(), RepoError> {
|
|
unimplemented!()
|
|
}
|
|
}
|
|
|
|
fn create_test_item(id: &str, name: &str) -> MediaItem {
|
|
MediaItem {
|
|
id: id.to_string(),
|
|
name: name.to_string(),
|
|
item_type: "Movie".to_string(),
|
|
server_id: "test-server".to_string(),
|
|
parent_id: Some("parent-123".to_string()),
|
|
library_id: Some("library-456".to_string()),
|
|
overview: Some("Test overview".to_string()),
|
|
genres: Some(vec!["Action".to_string(), "Adventure".to_string()]),
|
|
runtime_ticks: Some(7200000000),
|
|
production_year: Some(2024),
|
|
community_rating: Some(8.5),
|
|
official_rating: Some("PG-13".to_string()),
|
|
primary_image_tag: Some("image-tag-123".to_string()),
|
|
backdrop_image_tags: Some(vec!["backdrop-1".to_string()]),
|
|
parent_backdrop_image_tags: None,
|
|
album_id: None,
|
|
album_name: None,
|
|
album_artist: None,
|
|
artists: None,
|
|
artist_items: None,
|
|
index_number: None,
|
|
series_id: None,
|
|
series_name: None,
|
|
season_id: None,
|
|
season_name: None,
|
|
parent_index_number: None,
|
|
user_data: None,
|
|
media_streams: None,
|
|
media_sources: None,
|
|
people: None,
|
|
}
|
|
}
|
|
|
|
/// Helper to test the caching logic
|
|
struct TestHybridRepo {
|
|
offline: Arc<MockOfflineRepo>,
|
|
online: Arc<MockOnlineRepo>,
|
|
}
|
|
|
|
impl TestHybridRepo {
|
|
fn new(server_items: Vec<MediaItem>) -> Self {
|
|
let offline = Arc::new(MockOfflineRepo::new());
|
|
let online = Arc::new(MockOnlineRepo::new(server_items));
|
|
Self { offline, online }
|
|
}
|
|
|
|
/// Test version of get_items that implements the cache logic
|
|
async fn get_items(&self, parent_id: &str) -> Result<SearchResult, RepoError> {
|
|
let offline = Arc::clone(&self.offline);
|
|
let offline_for_save = Arc::clone(&self.offline);
|
|
let online = Arc::clone(&self.online);
|
|
let parent_id = parent_id.to_string();
|
|
let parent_id_clone = parent_id.clone();
|
|
let parent_id_for_save = parent_id.clone();
|
|
|
|
// Check cache first
|
|
let cache_future = async move {
|
|
offline.get_items(&parent_id, None).await
|
|
};
|
|
|
|
let server_future = async move {
|
|
online.get_items(&parent_id_clone, None).await
|
|
};
|
|
|
|
// Wait for both, prefer cache if available
|
|
let (cache_result, server_result) = tokio::join!(cache_future, server_future);
|
|
|
|
// Check if cache had meaningful content
|
|
let cache_had_content = cache_result.as_ref()
|
|
.map(|data| data.has_content())
|
|
.unwrap_or(false);
|
|
|
|
// Prefer cache if it has content (mimics hybrid.rs get_items logic)
|
|
let result = if cache_had_content {
|
|
cache_result?
|
|
} else {
|
|
// Use server result and save to cache for next time
|
|
let server_data = server_result?;
|
|
|
|
if !server_data.items.is_empty() {
|
|
let items_clone = server_data.items.clone();
|
|
offline_for_save.save_to_cache(&parent_id_for_save, &items_clone).await?;
|
|
}
|
|
|
|
server_data
|
|
};
|
|
|
|
Ok(result)
|
|
}
|
|
}
|
|
|
|
/// Test cache miss saves server data to cache for next time
|
|
///
|
|
/// @req-test: UR-002 - Access media when online or offline
|
|
/// @req-test: DR-013 - Repository pattern for online/offline data access
|
|
/// @req-test: DR-012 - Local database for media metadata cache
|
|
#[tokio::test]
|
|
async fn test_cache_miss_saves_to_cache() {
|
|
// Setup: Server has 3 items, cache is empty
|
|
let server_items = vec![
|
|
create_test_item("item-1", "Movie 1"),
|
|
create_test_item("item-2", "Movie 2"),
|
|
create_test_item("item-3", "Movie 3"),
|
|
];
|
|
|
|
let repo = TestHybridRepo::new(server_items.clone());
|
|
|
|
// First request - cache miss
|
|
let result = repo.get_items("parent-123").await.unwrap();
|
|
|
|
// Should return server items
|
|
assert_eq!(result.items.len(), 3);
|
|
assert_eq!(result.items[0].id, "item-1");
|
|
|
|
// Should have queried both cache and server
|
|
assert_eq!(repo.offline.get_query_count(), 1, "Cache should be queried once");
|
|
assert_eq!(repo.online.get_query_count(), 1, "Server should be queried once");
|
|
|
|
// Should have saved to cache
|
|
assert_eq!(repo.offline.get_save_count(), 1, "Should save to cache on miss");
|
|
}
|
|
|
|
/// Test cache hit prevents duplicate save to cache
|
|
///
|
|
/// Verifies parallel racing strategy: both cache and server are queried,
|
|
/// but when cache has content, it's used and no duplicate save occurs.
|
|
///
|
|
/// @req-test: UR-002 - Access media when online or offline
|
|
/// @req-test: DR-013 - Repository pattern for online/offline data access
|
|
/// @req-test: DR-012 - Local database cache (avoid duplicate writes)
|
|
#[tokio::test]
|
|
async fn test_cache_hit_no_save() {
|
|
// Setup: Server has 3 items, we'll pre-populate cache
|
|
let server_items = vec![
|
|
create_test_item("item-1", "Movie 1"),
|
|
create_test_item("item-2", "Movie 2"),
|
|
create_test_item("item-3", "Movie 3"),
|
|
];
|
|
|
|
let repo = TestHybridRepo::new(server_items.clone());
|
|
|
|
// Pre-populate cache
|
|
repo.offline.save_to_cache("parent-123", &server_items).await.unwrap();
|
|
assert_eq!(repo.offline.get_save_count(), 1);
|
|
|
|
// Second request - cache hit
|
|
let result = repo.get_items("parent-123").await.unwrap();
|
|
|
|
// Should return cached items
|
|
assert_eq!(result.items.len(), 3);
|
|
assert_eq!(result.items[0].id, "item-1");
|
|
|
|
// Should have queried cache and server (parallel race)
|
|
assert_eq!(repo.offline.get_query_count(), 1, "Cache should be queried");
|
|
assert_eq!(repo.online.get_query_count(), 1, "Server is queried in parallel");
|
|
|
|
// Should NOT have saved again (no duplicate save)
|
|
assert_eq!(repo.offline.get_save_count(), 1, "Should NOT save when using cache");
|
|
}
|
|
|
|
/// Test empty results are not saved to cache
|
|
///
|
|
/// @req-test: DR-013 - Repository pattern (edge case handling)
|
|
/// @req-test: DR-012 - Local database cache (avoid saving empty data)
|
|
#[tokio::test]
|
|
async fn test_empty_cache_returns_empty_result() {
|
|
// Setup: Server has no items
|
|
let repo = TestHybridRepo::new(vec![]);
|
|
|
|
// Request with empty server
|
|
let result = repo.get_items("parent-123").await.unwrap();
|
|
|
|
// Should return empty result
|
|
assert_eq!(result.items.len(), 0);
|
|
|
|
// Should NOT save empty results
|
|
assert_eq!(repo.offline.get_save_count(), 0, "Should not save empty results");
|
|
}
|
|
|
|
/// Test SearchResult::has_content helper method
|
|
///
|
|
/// @req-test: DR-013 - Repository pattern (content detection helper)
|
|
#[tokio::test]
|
|
async fn test_has_content_check() {
|
|
// Test that SearchResult::has_content works correctly
|
|
let empty_result = SearchResult {
|
|
items: vec![],
|
|
total_record_count: 0,
|
|
};
|
|
assert!(!empty_result.has_content(), "Empty result should not have content");
|
|
|
|
let result_with_items = SearchResult {
|
|
items: vec![create_test_item("item-1", "Movie 1")],
|
|
total_record_count: 1,
|
|
};
|
|
assert!(result_with_items.has_content(), "Result with items should have content");
|
|
}
|
|
}
|