@@ -24,9 +24,9 @@ use uuid::Uuid;
2424use crate :: error:: Result ;
2525use crate :: io:: OutputFile ;
2626use crate :: spec:: {
27- DataFile , DataFileFormat , FormatVersion , ManifestEntry , ManifestFile , ManifestListWriter ,
28- ManifestWriterBuilder , Operation , Snapshot , SnapshotReference , SnapshotRetention , Struct ,
29- StructType , Summary , MAIN_BRANCH ,
27+ DataContentType , DataFile , DataFileFormat , FormatVersion , ManifestContentType , ManifestEntry ,
28+ ManifestFile , ManifestListWriter , ManifestWriterBuilder , Operation , Snapshot ,
29+ SnapshotReference , SnapshotRetention , Struct , StructType , Summary , MAIN_BRANCH ,
3030} ;
3131use crate :: transaction:: Transaction ;
3232use crate :: { Error , ErrorKind , TableRequirement , TableUpdate } ;
@@ -65,6 +65,7 @@ pub(crate) struct SnapshotProduceAction<'a> {
6565 commit_uuid : Uuid ,
6666 snapshot_properties : HashMap < String , String > ,
6767 pub added_data_files : Vec < DataFile > ,
68+ added_delete_files : Vec < DataFile > ,
6869 // A counter used to generate unique manifest file names.
6970 // It starts from 0 and increments for each new manifest file.
7071 // Note: This counter is limited to the range of (0..u64::MAX).
@@ -85,6 +86,7 @@ impl<'a> SnapshotProduceAction<'a> {
8586 commit_uuid,
8687 snapshot_properties,
8788 added_data_files : vec ! [ ] ,
89+ added_delete_files : vec ! [ ] ,
8890 manifest_counter : ( 0 ..) ,
8991 key_metadata,
9092 } )
@@ -129,13 +131,7 @@ impl<'a> SnapshotProduceAction<'a> {
129131 data_files : impl IntoIterator < Item = DataFile > ,
130132 ) -> Result < & mut Self > {
131133 let data_files: Vec < DataFile > = data_files. into_iter ( ) . collect ( ) ;
132- for data_file in & data_files {
133- if data_file. content_type ( ) != crate :: spec:: DataContentType :: Data {
134- return Err ( Error :: new (
135- ErrorKind :: DataInvalid ,
136- "Only data content type is allowed for fast append" ,
137- ) ) ;
138- }
134+ for data_file in data_files {
139135 // Check if the data file partition spec id matches the table default partition spec id.
140136 if self . tx . table . metadata ( ) . default_partition_spec_id ( ) != data_file. partition_spec_id {
141137 return Err ( Error :: new (
@@ -147,8 +143,12 @@ impl<'a> SnapshotProduceAction<'a> {
147143 data_file. partition ( ) ,
148144 self . tx . table . metadata ( ) . default_partition_type ( ) ,
149145 ) ?;
146+ if data_file. content_type ( ) == DataContentType :: Data {
147+ self . added_data_files . push ( data_file) ;
148+ } else {
149+ self . added_delete_files . push ( data_file) ;
150+ }
150151 }
151- self . added_data_files . extend ( data_files) ;
152152 Ok ( self )
153153 }
154154
@@ -165,9 +165,32 @@ impl<'a> SnapshotProduceAction<'a> {
165165 }
166166
167167 // Write manifest file for added data files and return the ManifestFile for ManifestList.
168- async fn write_added_manifest ( & mut self ) -> Result < ManifestFile > {
169- let added_data_files = std:: mem:: take ( & mut self . added_data_files ) ;
168+ async fn write_added_manifest (
169+ & mut self ,
170+ added_data_files : Vec < DataFile > ,
171+ ) -> Result < ManifestFile > {
170172 let snapshot_id = self . snapshot_id ;
173+ let content_type = {
174+ let mut data_num = 0 ;
175+ let mut delete_num = 0 ;
176+ for f in & added_data_files {
177+ match f. content_type ( ) {
178+ DataContentType :: Data => data_num += 1 ,
179+ DataContentType :: PositionDeletes => delete_num += 1 ,
180+ DataContentType :: EqualityDeletes => delete_num += 1 ,
181+ }
182+ }
183+ if data_num == added_data_files. len ( ) {
184+ ManifestContentType :: Data
185+ } else if delete_num == added_data_files. len ( ) {
186+ ManifestContentType :: Deletes
187+ } else {
188+ return Err ( Error :: new (
189+ ErrorKind :: DataInvalid ,
190+ "added DataFile for a ManifestFile should be same type (Data or Delete)" ,
191+ ) ) ;
192+ }
193+ } ;
171194 let manifest_entries = added_data_files. into_iter ( ) . map ( |data_file| {
172195 let builder = ManifestEntry :: builder ( )
173196 . status ( crate :: spec:: ManifestStatus :: Added )
@@ -196,7 +219,10 @@ impl<'a> SnapshotProduceAction<'a> {
196219 if self . tx . table . metadata ( ) . format_version ( ) == FormatVersion :: V1 {
197220 builder. build_v1 ( )
198221 } else {
199- builder. build_v2_data ( )
222+ match content_type {
223+ ManifestContentType :: Data => builder. build_v2_data ( ) ,
224+ ManifestContentType :: Deletes => builder. build_v2_deletes ( ) ,
225+ }
200226 }
201227 } ;
202228 for entry in manifest_entries {
@@ -210,12 +236,19 @@ impl<'a> SnapshotProduceAction<'a> {
210236 snapshot_produce_operation : & OP ,
211237 manifest_process : & MP ,
212238 ) -> Result < Vec < ManifestFile > > {
213- let added_manifest = self . write_added_manifest ( ) . await ?;
239+ let mut manifest_files = vec ! [ ] ;
240+ let data_files = std:: mem:: take ( & mut self . added_data_files ) ;
241+ let delete_files = std:: mem:: take ( & mut self . added_delete_files ) ;
242+ if !data_files. is_empty ( ) {
243+ let added_manifest = self . write_added_manifest ( data_files) . await ?;
244+ manifest_files. push ( added_manifest) ;
245+ }
246+ if !delete_files. is_empty ( ) {
247+ let added_delete_manifest = self . write_added_manifest ( delete_files) . await ?;
248+ manifest_files. push ( added_delete_manifest) ;
249+ }
214250 let existing_manifests = snapshot_produce_operation. existing_manifest ( self ) . await ?;
215- // # TODO
216- // Support process delete entries.
217251
218- let mut manifest_files = vec ! [ added_manifest] ;
219252 manifest_files. extend ( existing_manifests) ;
220253 let manifest_files = manifest_process. process_manifeset ( manifest_files) ;
221254 Ok ( manifest_files)
0 commit comments