11//! On-disk storage
22
3- use std:: sync:: Arc ;
3+ use std:: collections:: HashMap ;
4+ use std:: path:: PathBuf ;
5+ use std:: sync:: { Arc , RwLock } ;
46
5- use crate :: array:: DataChunk ;
6- use crate :: catalog:: TableRefId ;
7+ use anyhow:: anyhow;
8+ use bytes:: { Buf , BufMut } ;
9+
10+ use crate :: array:: { Array , ArrayBuilder , ArrayImpl , DataChunk , I32Array , I32ArrayBuilder } ;
11+ use crate :: catalog:: { ColumnDesc , TableRefId } ;
712
813/// The error type of storage operations.
914#[ derive( thiserror:: Error , Debug ) ]
@@ -17,13 +22,33 @@ pub type StorageRef = Arc<DiskStorage>;
1722pub type StorageTableRef = Arc < DiskTable > ;
1823
1924/// On-disk storage.
20- #[ derive( Clone ) ]
21- pub struct DiskStorage ;
25+ pub struct DiskStorage {
26+ /// All tables in the current storage engine.
27+ tables : RwLock < HashMap < TableRefId , StorageTableRef > > ,
28+
29+ /// The storage options.
30+ options : Arc < StorageOptions > ,
31+ }
32+
33+ pub struct StorageOptions {
34+ /// The directory of the storage
35+ base_path : PathBuf ,
36+ }
37+
38+ pub fn err ( error : impl Into < anyhow:: Error > ) -> StorageError {
39+ StorageError ( error. into ( ) )
40+ }
2241
2342/// An on-disk table.
2443pub struct DiskTable {
25- # [ allow ( dead_code ) ]
44+ /// Id of the table.
2645 id : TableRefId ,
46+
47+ /// Columns of the current table.
48+ column_descs : Arc < [ ColumnDesc ] > ,
49+
50+ /// The storage options.
51+ options : Arc < StorageOptions > ,
2752}
2853
2954impl Default for DiskStorage {
@@ -35,28 +60,94 @@ impl Default for DiskStorage {
3560impl DiskStorage {
3661 /// Create a new in-memory storage.
3762 pub fn new ( ) -> Self {
38- DiskStorage
63+ DiskStorage {
64+ tables : RwLock :: new ( HashMap :: new ( ) ) ,
65+ options : Arc :: new ( StorageOptions {
66+ base_path : "risinglight.db" . into ( ) ,
67+ } ) ,
68+ }
3969 }
4070
4171 /// Add a table.
42- pub fn add_table ( & self , _id : TableRefId ) -> StorageResult < ( ) > {
43- todo ! ( )
72+ pub fn add_table ( & self , id : TableRefId , column_descs : & [ ColumnDesc ] ) -> StorageResult < ( ) > {
73+ let mut tables = self . tables . write ( ) . unwrap ( ) ;
74+ let table = DiskTable {
75+ id,
76+ options : self . options . clone ( ) ,
77+ column_descs : column_descs. into ( ) ,
78+ } ;
79+ let res = tables. insert ( id, table. into ( ) ) ;
80+ if res. is_some ( ) {
81+ return Err ( anyhow ! ( "table already exists: {:?}" , id) . into ( ) ) ;
82+ }
83+ Ok ( ( ) )
4484 }
4585
4686 /// Get a table.
47- pub fn get_table ( & self , _id : TableRefId ) -> StorageResult < StorageTableRef > {
48- todo ! ( )
87+ pub fn get_table ( & self , id : TableRefId ) -> StorageResult < StorageTableRef > {
88+ let tables = self . tables . read ( ) . unwrap ( ) ;
89+ tables
90+ . get ( & id)
91+ . ok_or_else ( || anyhow ! ( "table not found: {:?}" , id) . into ( ) )
92+ . cloned ( )
4993 }
5094}
5195
96+ /// Encode an `I32Array` into a `Vec<u8>`.
97+ fn encode_int32_column ( a : & I32Array ) -> StorageResult < Vec < u8 > > {
98+ let mut buffer = Vec :: with_capacity ( a. len ( ) * 4 ) ;
99+ for item in a. iter ( ) {
100+ if let Some ( item) = item {
101+ buffer. put_i32_le ( * item) ;
102+ } else {
103+ return Err ( anyhow ! ( "nullable encoding not supported!" ) . into ( ) ) ;
104+ }
105+ }
106+ Ok ( buffer)
107+ }
108+
109+ fn decode_int32_column ( mut data : & [ u8 ] ) -> StorageResult < I32Array > {
110+ let mut builder = I32ArrayBuilder :: with_capacity ( data. len ( ) / 4 ) ;
111+ while data. has_remaining ( ) {
112+ builder. push ( Some ( & data. get_i32_le ( ) ) ) ;
113+ }
114+ Ok ( builder. finish ( ) )
115+ }
116+
52117impl DiskTable {
118+ fn table_path ( & self ) -> PathBuf {
119+ self . options . base_path . join ( self . id . table_id . to_string ( ) )
120+ }
121+
122+ fn column_path ( & self , column_id : usize ) -> PathBuf {
123+ self . table_path ( ) . join ( format ! ( "{}.col" , column_id) )
124+ }
125+
53126 /// Append a chunk to the table.
54- pub async fn append ( & self , _chunk : DataChunk ) -> StorageResult < ( ) > {
55- todo ! ( )
127+ pub async fn append ( & self , chunk : DataChunk ) -> StorageResult < ( ) > {
128+ for ( idx, column) in chunk. arrays ( ) . iter ( ) . enumerate ( ) {
129+ if let ArrayImpl :: Int32 ( column) = column {
130+ let column_path = self . column_path ( idx) ;
131+ let data = encode_int32_column ( column) ?;
132+ tokio:: fs:: create_dir_all ( column_path. parent ( ) . unwrap ( ) )
133+ . await
134+ . map_err ( err) ?;
135+ tokio:: fs:: write ( column_path, data) . await . map_err ( err) ?;
136+ } else {
137+ return Err ( anyhow ! ( "unsupported column type" ) . into ( ) ) ;
138+ }
139+ }
140+ Ok ( ( ) )
56141 }
57142
58143 /// Get all chunks of the table.
59144 pub async fn all_chunks ( & self ) -> StorageResult < Vec < DataChunk > > {
60- todo ! ( )
145+ let mut columns = vec ! [ ] ;
146+ for ( idx, _) in self . column_descs . iter ( ) . enumerate ( ) {
147+ let column_path = self . column_path ( idx) ;
148+ let data = tokio:: fs:: read ( column_path) . await . map_err ( err) ?;
149+ columns. push ( decode_int32_column ( & data) ?) ;
150+ }
151+ Ok ( vec ! [ columns. into_iter( ) . map( ArrayImpl :: Int32 ) . collect( ) ] )
61152 }
62153}
0 commit comments