44from pathlib import Path
55from typing import Optional , Union
66
7- import fsspec
7+ import futures
88import numpy as np
99import pandas as pd
1010
11+ # nowcasting_dataset imports
1112import nowcasting_dataset .time as nd_time
1213import nowcasting_dataset .utils as nd_utils
1314from nowcasting_dataset import config
@@ -102,26 +103,6 @@ def initialise_data_sources(
102103 " data_source_which_defines_geospatial_locations."
103104 )
104105
105- def make_directories_if_necessary (self ) -> None :
106- """Make dirs: `<output_data.filepath> / <split_name> / <data_source_name>`.
107-
108- Also make `local_temp_path` if necessary.
109-
110- Works on any compute environment.
111- """
112- filesystem = fsspec .open (self .config .output_data .filepath ).fs
113- for split_name in split .SplitName :
114- for data_source_name in self .data_sources .keys ():
115- path = self .config .output_data .filepath / split_name .value / data_source_name
116- logger .info (f"Making { path } if necessary..." )
117- filesystem .mkdirs (path , exist_ok = True )
118-
119- if self .save_batches_locally_and_upload :
120- logger .info (f"Making { self .local_temp_path } if necessary..." )
121- filesystem .mkdirs (self .local_temp_path , exist_ok = True )
122- logger .info (f"Deleting all files in { self .local_temp_path } ..." )
123- nd_fs_utils .delete_all_files_in_temp_path (path = self .local_temp_path )
124-
125106 def create_files_specifying_spatial_and_temporal_locations_of_each_example_if_necessary (
126107 self ,
127108 ) -> None :
@@ -316,6 +297,10 @@ def _find_splits_which_need_more_batches(
316297 def create_batches (self , overwrite_batches : bool ) -> None :
317298 """Create batches (if necessary).
318299
300+ Make dirs: `<output_data.filepath> / <split_name> / <data_source_name>`.
301+
302+ Also make `local_temp_path` if necessary.
303+
319304 Args:
320305 overwrite_batches: If True then start from batch 0, regardless of which batches have
321306 previously been written to disk. If False then check which batches have previously been
@@ -335,7 +320,7 @@ def create_batches(self, overwrite_batches: bool) -> None:
335320 return
336321
337322 # Load locations for each example off disk.
338- locations_for_each_example_for_each_split : dict [split .SplitName , pd .DataFrame ] = {}
323+ locations_for_each_example_of_each_split : dict [split .SplitName , pd .DataFrame ] = {}
339324 for split_name in splits_which_need_more_batches :
340325 filename = self ._filename_of_locations_csv_file (split_name .value )
341326 logger .info (f"Loading { filename } ." )
@@ -345,7 +330,51 @@ def create_batches(self, overwrite_batches: bool) -> None:
345330 locations_for_each_example ["t0_datetime_UTC" ] = pd .to_datetime (
346331 locations_for_each_example ["t0_datetime_UTC" ]
347332 )
348- locations_for_each_example_for_each_split [split_name ] = locations_for_each_example
333+ locations_for_each_example_of_each_split [split_name ] = locations_for_each_example
349334
350- # TODO: Fire up a separate process for each DataSource, and pass it a list of batches to
335+ # Fire up a separate process for each DataSource, and pass it a list of batches to
351336 # create, and whether to utils.upload_and_delete_local_files().
337+ # TODO: Split this up into separate functions!!!
338+ n_data_sources = len (self .data_sources )
339+ for split_name in splits_which_need_more_batches :
340+ locations_for_split = locations_for_each_example_of_each_split [split_name ]
341+ with futures .ProcessPoolExecutor (max_workers = n_data_sources ) as executor :
342+ future_create_batches_jobs = []
343+ for worker_id , (data_source_name , data_source ) in enumerate (
344+ self .data_sources .items ()
345+ ):
346+ # Get indexes of first batch and example; and subset locations_for_split.
347+ idx_of_first_batch = first_batches_to_create [split_name ][data_source_name ]
348+ idx_of_first_example = idx_of_first_batch * self .config .process .batch_size
349+ locations = locations_for_split .loc [idx_of_first_example :]
350+
351+ # Get paths.
352+ dst_path = (
353+ self .config .output_data .filepath / split_name .value / data_source_name
354+ )
355+ temp_path = (
356+ self .temp_path / split_name .value / data_source_name / f"worker_{ worker_id } "
357+ )
358+
359+ # Make folders.
360+ nd_fs_utils .makedirs (dst_path , exist_ok = True )
361+ if self .save_batches_locally_and_upload :
362+ nd_fs_utils .makedirs (temp_path , exist_ok = True )
363+
364+ # Submit data_source.create_batches task to the worker process.
365+ future = executor .submit (
366+ data_source .create_batches ,
367+ spatial_and_temporal_locations_of_each_example = locations ,
368+ idx_of_first_batch = idx_of_first_batch ,
369+ batch_size = self .config .process .batch_size ,
370+ dst_path = dst_path ,
371+ temp_path = temp_path ,
372+ upload_every_n_batches = self .config .process .upload_every_n_batches ,
373+ )
374+ future_create_batches_jobs .append (future )
375+
376+ # Wait for all futures to finish:
377+ for future in future_create_batches_jobs :
378+ # Call exception() to propagate any exceptions raised by the worker process into
379+ # the main process, and to wait for the worker to finish.
380+ future .exception ()
0 commit comments