Skip to content

Commit 12e2de8

Browse files
committed
remove close_handles for now
1 parent a320240 commit 12e2de8

File tree

1 file changed

+7
-15
lines changed

1 file changed

+7
-15
lines changed

crates/iceberg/src/writer/file_writer/rolling_writer.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
// under the License.
1717

1818
use arrow_array::RecordBatch;
19-
use futures::future::try_join_all;
2019

21-
use crate::runtime::{JoinHandle, spawn};
2220
use crate::spec::DataFileBuilder;
2321
use crate::writer::CurrentFileStatus;
2422
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
@@ -58,7 +56,7 @@ impl<B: FileWriterBuilder> FileWriterBuilder for RollingFileWriterBuilder<B> {
5856
inner: None,
5957
inner_builder: self.inner_builder,
6058
target_file_size: self.target_file_size,
61-
close_handles: vec![],
59+
data_file_builders: vec![],
6260
})
6361
}
6462
}
@@ -73,7 +71,7 @@ pub struct RollingFileWriter<B: FileWriterBuilder> {
7371
inner: Option<B::R>,
7472
inner_builder: B,
7573
target_file_size: usize,
76-
close_handles: Vec<JoinHandle<Result<Vec<DataFileBuilder>>>>,
74+
data_file_builders: Vec<DataFileBuilder>,
7775
}
7876

7977
impl<B: FileWriterBuilder> RollingFileWriter<B> {
@@ -105,8 +103,7 @@ impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> {
105103
if self.should_roll(input_size) {
106104
if let Some(inner) = self.inner.take() {
107105
// close the current writer, roll to a new file
108-
let handle = spawn(async move { inner.close().await });
109-
self.close_handles.push(handle);
106+
self.data_file_builders.extend(inner.close().await?);
110107

111108
// start a new writer
112109
self.inner = Some(self.inner_builder.clone().build().await?);
@@ -125,19 +122,14 @@ impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> {
125122
Ok(())
126123
}
127124

128-
async fn close(self) -> Result<Vec<DataFileBuilder>> {
129-
let mut data_file_builders = try_join_all(self.close_handles)
130-
.await?
131-
.into_iter()
132-
.flatten()
133-
.collect::<Vec<DataFileBuilder>>();
134-
125+
async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
135126
// close the current writer and merge the output
136127
if let Some(current_writer) = self.inner {
137-
data_file_builders.extend(current_writer.close().await?);
128+
self.data_file_builders
129+
.extend(current_writer.close().await?);
138130
}
139131

140-
Ok(data_file_builders)
132+
Ok(self.data_file_builders)
141133
}
142134
}
143135

0 commit comments

Comments
 (0)