11use crate :: serialization:: { Addr , SerializationSink } ;
22use std:: error:: Error ;
33use std:: fs;
4- use std:: io:: { BufWriter , Write } ;
4+ use std:: io:: { Write } ;
55use std:: path:: Path ;
6- use std :: sync :: Mutex ;
6+ use parking_lot :: Mutex ;
77
88pub struct FileSerializationSink {
9- data : Mutex < ( BufWriter < fs:: File > , u32 ) > ,
9+ data : Mutex < Inner > ,
10+ }
11+
12+ struct Inner {
13+ file : fs:: File ,
14+ buffer : Vec < u8 > ,
15+ buf_pos : usize ,
16+ addr : u32 ,
1017}
1118
1219impl SerializationSink for FileSerializationSink {
@@ -16,7 +23,12 @@ impl SerializationSink for FileSerializationSink {
1623 let file = fs:: File :: create ( path) ?;
1724
1825 Ok ( FileSerializationSink {
19- data : Mutex :: new ( ( BufWriter :: new ( file) , 0 ) ) ,
26+ data : Mutex :: new ( Inner {
27+ file,
28+ buffer : vec ! [ 0 ; 1024 * 512 ] ,
29+ buf_pos : 0 ,
30+ addr : 0
31+ } ) ,
2032 } )
2133 }
2234
@@ -25,17 +37,45 @@ impl SerializationSink for FileSerializationSink {
2537 where
2638 W : FnOnce ( & mut [ u8 ] ) ,
2739 {
28- let mut buffer = vec ! [ 0 ; num_bytes] ;
29- write ( buffer. as_mut_slice ( ) ) ;
40+ let mut data = self . data . lock ( ) ;
41+ let Inner {
42+ ref mut file,
43+ ref mut buffer,
44+ ref mut buf_pos,
45+ ref mut addr
46+ } = * data;
3047
31- let mut data = self . data . lock ( ) . expect ( "couldn't acquire lock" ) ;
32- let curr_addr = data . 1 ;
33- let file = & mut data . 0 ;
48+ assert ! ( num_bytes <= buffer . len ( ) ) ;
49+ let mut buf_start = * buf_pos ;
50+ let mut buf_end = buf_start + num_bytes ;
3451
35- file. write_all ( & buffer) . expect ( "failed to write buffer" ) ;
52+ if buf_end > buffer. len ( ) {
53+ file. write_all ( & buffer[ ..buf_start] ) . expect ( "failed to write buffer" ) ;
54+ buf_start = 0 ;
55+ buf_end = num_bytes;
56+ }
3657
37- data. 1 += num_bytes as u32 ;
58+ write ( & mut buffer[ buf_start .. buf_end] ) ;
59+ * buf_pos = buf_end;
3860
61+ let curr_addr = * addr;
62+ * addr += num_bytes as u32 ;
3963 Addr ( curr_addr)
4064 }
4165}
66+
67+ impl Drop for FileSerializationSink {
68+ fn drop ( & mut self ) {
69+ let mut data = self . data . lock ( ) ;
70+ let Inner {
71+ ref mut file,
72+ ref mut buffer,
73+ ref mut buf_pos,
74+ addr : _,
75+ } = * data;
76+
77+ if * buf_pos > 0 {
78+ file. write_all ( & buffer[ ..* buf_pos] ) . expect ( "failed to write buffer" ) ;
79+ }
80+ }
81+ }
0 commit comments