@@ -45,21 +45,34 @@ impl SerializationSink for FileSerializationSink {
4545 ref mut addr
4646 } = * data;
4747
48- assert ! ( num_bytes <= buffer. len( ) ) ;
49- let mut buf_start = * buf_pos;
50- let mut buf_end = buf_start + num_bytes;
48+ let curr_addr = * addr;
49+ * addr += num_bytes as u32 ;
5150
52- if buf_end > buffer. len ( ) {
53- file. write_all ( & buffer[ ..buf_start] ) . expect ( "failed to write buffer" ) ;
54- buf_start = 0 ;
55- buf_end = num_bytes;
56- }
51+ let buf_start = * buf_pos;
52+ let buf_end = buf_start + num_bytes;
5753
58- write ( & mut buffer[ buf_start .. buf_end] ) ;
59- * buf_pos = buf_end;
54+ if buf_end <= buffer. len ( ) {
55+ // We have enough space in the buffer, just write the data to it.
56+ write ( & mut buffer[ buf_start .. buf_end] ) ;
57+ * buf_pos = buf_end;
58+ } else {
59+ // We don't have enough space in the buffer, so flush to disk
60+ file. write_all ( & buffer[ ..buf_start] ) . unwrap ( ) ;
61+
62+ if num_bytes <= buffer. len ( ) {
63+ // There's enough space in the buffer, after flushing
64+ write ( & mut buffer[ 0 .. num_bytes] ) ;
65+ * buf_pos = num_bytes;
66+ } else {
67+ // Even after flushing the buffer there isn't enough space, so
68+ // fall back to dynamic allocation
69+ let mut temp_buffer = vec ! [ 0 ; num_bytes] ;
70+ write ( & mut temp_buffer[ ..] ) ;
71+ file. write_all ( & temp_buffer[ ..] ) . unwrap ( ) ;
72+ * buf_pos = 0 ;
73+ }
74+ }
6075
61- let curr_addr = * addr;
62- * addr += num_bytes as u32 ;
6376 Addr ( curr_addr)
6477 }
6578}
@@ -75,7 +88,7 @@ impl Drop for FileSerializationSink {
7588 } = * data;
7689
7790 if * buf_pos > 0 {
78- file. write_all ( & buffer[ ..* buf_pos] ) . expect ( "failed to write buffer" ) ;
91+ file. write_all ( & buffer[ ..* buf_pos] ) . unwrap ( ) ;
7992 }
8093 }
8194}
0 commit comments