From e5dd7d2b3a767d0fd71bc2f632d9fe96b27c70f9 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Thu, 18 Nov 2021 13:52:28 +0000 Subject: [PATCH 1/4] refactor a little --- nowcasting_dataset/manager.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index ac809a16..3730150a 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -419,24 +419,21 @@ def create_batches(self, overwrite_batches: bool) -> None: locations_for_each_example["t0_datetime_UTC"] = pd.to_datetime( locations_for_each_example["t0_datetime_UTC"] ) - locations_for_each_example_of_each_split[split_name] = locations_for_each_example + if len(locations_for_each_example) > 0: + locations_for_each_example_of_each_split[split_name] = locations_for_each_example # Fire up a separate process for each DataSource, and pass it a list of batches to # create, and whether to utils.upload_and_delete_local_files(). # TODO: Issue 321: Split this up into separate functions!!! n_data_sources = len(self.data_sources) nd_utils.set_fsspec_for_multiprocess() - for split_name in splits_which_need_more_batches: - locations_for_split = locations_for_each_example_of_each_split[split_name] + for split_name, locations_for_split in locations_for_each_example_of_each_split.items(): with futures.ProcessPoolExecutor(max_workers=n_data_sources) as executor: future_create_batches_jobs = [] for worker_id, (data_source_name, data_source) in enumerate( self.data_sources.items() ): - if len(locations_for_split) == 0: - break - # Get indexes of first batch and example. And subset locations_for_split. idx_of_first_batch = first_batches_to_create[split_name][data_source_name] idx_of_first_example = idx_of_first_batch * self.config.process.batch_size From 46f37fc8610bbe29971b19e098dbb807541782d2 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Thu, 18 Nov 2021 14:10:21 +0000 Subject: [PATCH 2/4] Replace ProcessPoolExecutor with multiprocessing.Pool --- nowcasting_dataset/manager.py | 43 +++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index 3730150a..5538ea4c 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -1,7 +1,7 @@ """Manager class.""" import logging -from concurrent import futures +import multiprocessing from pathlib import Path from typing import Optional, Union @@ -428,8 +428,8 @@ def create_batches(self, overwrite_batches: bool) -> None: n_data_sources = len(self.data_sources) nd_utils.set_fsspec_for_multiprocess() for split_name, locations_for_split in locations_for_each_example_of_each_split.items(): - with futures.ProcessPoolExecutor(max_workers=n_data_sources) as executor: - future_create_batches_jobs = [] + with multiprocessing.Pool(processes=n_data_sources) as pool: + async_results_from_create_batches = [] for worker_id, (data_source_name, data_source) in enumerate( self.data_sources.items() ): @@ -443,6 +443,8 @@ def create_batches(self, overwrite_batches: bool) -> None: dst_path = ( self.config.output_data.filepath / split_name.value / data_source_name ) + + # TODO: Guarantee that local temp path is unique and empty. local_temp_path = ( self.local_temp_path / split_name.value @@ -455,9 +457,8 @@ def create_batches(self, overwrite_batches: bool) -> None: if self.save_batches_locally_and_upload: nd_fs_utils.makedirs(local_temp_path, exist_ok=True) - # Submit data_source.create_batches task to the worker process. - future = executor.submit( - data_source.create_batches, + # Key word arguments to be passed into data_source.create_batches(): + kwargs_for_create_batches = dict( spatial_and_temporal_locations_of_each_example=locations, idx_of_first_batch=idx_of_first_batch, batch_size=self.config.process.batch_size, @@ -465,17 +466,21 @@ def create_batches(self, overwrite_batches: bool) -> None: local_temp_path=local_temp_path, upload_every_n_batches=self.config.process.upload_every_n_batches, ) - future_create_batches_jobs.append(future) - # Wait for all futures to finish: - for future, data_source_name in zip( - future_create_batches_jobs, self.data_sources.keys() - ): - # Call exception() to propagate any exceptions raised by the worker process into - # the main process, and to wait for the worker to finish. - exception = future.exception() - if exception is not None: - logger.exception( - f"Worker process {data_source_name} raised exception!\n{exception}" - ) - raise exception + # Submit data_source.create_batches task to the worker process. + async_result = pool.apply_async( + data_source.create_batches, + **kwargs_for_create_batches, + callback=lambda result: logger.info( + f"{data_source_name} finish created batches for {split_name}!" + ), + error_callback=lambda exception: logger.error( + f"Exception raised by {data_source_name} whilst creating batches for" + f" {split_name}: \n{exception}" + ), + ) + async_results_from_create_batches.append(async_result) + + # Wait for all async_results to finish: + for async_result in async_results_from_create_batches: + async_result.wait() From 96421699c8d664f0c657775cf03ec0e476fe8ad7 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Thu, 18 Nov 2021 14:38:55 +0000 Subject: [PATCH 3/4] separate out logging messages --- nowcasting_dataset/manager.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index 5538ea4c..607da7c8 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -467,16 +467,25 @@ def create_batches(self, overwrite_batches: bool) -> None: upload_every_n_batches=self.config.process.upload_every_n_batches, ) + # Logger messages for callbacks: + callback_msg = ( + f"{data_source_name} has finished created batches for {split_name}!" + ) + error_callback_msg = ( + f"Exception raised by {data_source_name} whilst creating batches for" + f" {split_name}:\n" + ) + # Submit data_source.create_batches task to the worker process. + logger.debug( + f"About to submit create_batches task for {data_source_name}, {split_name}" + ) async_result = pool.apply_async( data_source.create_batches, - **kwargs_for_create_batches, - callback=lambda result: logger.info( - f"{data_source_name} finish created batches for {split_name}!" - ), + kwds=kwargs_for_create_batches, + callback=lambda result: logger.info(callback_msg), error_callback=lambda exception: logger.error( - f"Exception raised by {data_source_name} whilst creating batches for" - f" {split_name}: \n{exception}" + error_callback_msg + str(exception) ), ) async_results_from_create_batches.append(async_result) @@ -484,3 +493,5 @@ def create_batches(self, overwrite_batches: bool) -> None: # Wait for all async_results to finish: for async_result in async_results_from_create_batches: async_result.wait() + + logger.info(f"Finished creating batches for {split_name}!") From ff0f11d3db6526755b0d899574bcdb79435fd130 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Thu, 18 Nov 2021 14:40:34 +0000 Subject: [PATCH 4/4] add TODO issue number --- nowcasting_dataset/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index 607da7c8..202e9dc3 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -444,7 +444,7 @@ def create_batches(self, overwrite_batches: bool) -> None: self.config.output_data.filepath / split_name.value / data_source_name ) - # TODO: Guarantee that local temp path is unique and empty. + # TODO: Issue 455: Guarantee that local temp path is unique and empty. local_temp_path = ( self.local_temp_path / split_name.value