@@ -25,11 +25,13 @@ use itertools::{Either, Itertools};
2525use object_store:: { local:: LocalFileSystem , ObjectStore } ;
2626use once_cell:: sync:: OnceCell ;
2727use tokio:: { fs, sync:: Mutex } ;
28+ use human_size:: { SpecificSize , Gigibyte , Byte } ;
2829
2930use crate :: option:: CONFIG ;
3031
3132pub const STREAM_CACHE_FILENAME : & str = ".cache.json" ;
3233pub const CACHE_META_FILENAME : & str = ".cache_meta.json" ;
34+ pub const CURRENT_CACHE_VERSION : & str = "v1" ;
3335
3436#[ derive( Debug , serde:: Deserialize , serde:: Serialize ) ]
3537pub struct LocalCache {
@@ -42,7 +44,7 @@ pub struct LocalCache {
4244impl LocalCache {
4345 fn new ( ) -> Self {
4446 Self {
45- version : "v1" . to_string ( ) ,
47+ version : CURRENT_CACHE_VERSION . to_string ( ) ,
4648 current_size : 0 ,
4749 files : Cache :: new ( 100 ) ,
4850 }
@@ -58,7 +60,7 @@ pub struct CacheMeta {
5860impl CacheMeta {
5961 fn new ( ) -> Self {
6062 Self {
61- version : "v1" . to_string ( ) ,
63+ version : CURRENT_CACHE_VERSION . to_string ( ) ,
6264 size_capacity : 0 ,
6365 }
6466 }
@@ -97,7 +99,8 @@ impl LocalCacheManager {
9799
98100 pub async fn validate ( & self , config_capacity : u64 ) -> Result < ( ) , CacheError > {
99101 fs:: create_dir_all ( & self . cache_path ) . await ?;
100- let path = cache_meta_path ( & self . cache_path ) . unwrap ( ) ;
102+ let path = cache_meta_path ( & self . cache_path )
103+ . map_err ( |err| CacheError :: ObjectStoreError ( err. into ( ) ) ) ?;
101104 let resp = self
102105 . filesystem
103106 . get ( & path)
@@ -107,7 +110,15 @@ impl LocalCacheManager {
107110 let updated_cache = match resp {
108111 Ok ( bytes) => {
109112 let mut meta: CacheMeta = serde_json:: from_slice ( & bytes) ?;
110- if !meta. size_capacity == config_capacity {
113+ if meta. size_capacity != config_capacity {
114+ // log the change in cache size
115+ let configured_size_human: SpecificSize < Gigibyte > = SpecificSize :: new ( config_capacity as f64 , Byte ) . unwrap ( ) . into ( ) ;
116+ let current_size_human: SpecificSize < Gigibyte > = SpecificSize :: new ( meta. size_capacity as f64 , Byte ) . unwrap ( ) . into ( ) ;
117+ log:: warn!(
118+ "Cache size is updated from {} to {}" ,
119+ current_size_human,
120+ configured_size_human
121+ ) ;
111122 meta. size_capacity = config_capacity;
112123 Some ( meta)
113124 } else {
@@ -123,10 +134,6 @@ impl LocalCacheManager {
123134 } ;
124135
125136 if let Some ( updated_cache) = updated_cache {
126- log:: info!(
127- "Cache is updated to new size of {} Bytes" ,
128- & updated_cache. size_capacity
129- ) ;
130137 self . filesystem
131138 . put ( & path, serde_json:: to_vec ( & updated_cache) ?. into ( ) )
132139 . await ?
0 commit comments