Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions mpas_analysis/ocean/plot_depth_integrated_time_series_subtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals

import xarray as xr
import os
import xarray

from mpas_analysis.shared import AnalysisTask

from mpas_analysis.shared.plot.plotting import timeseries_analysis_plot

from mpas_analysis.shared.generalized_reader import open_multifile_dataset
from mpas_analysis.shared.io import open_mpas_dataset
from mpas_analysis.shared.io import open_mpas_dataset, write_netcdf

from mpas_analysis.shared.timekeeping.utility import date_to_days, \
days_to_datetime
Expand Down Expand Up @@ -216,6 +216,18 @@ def setup_and_check(self): # {{{
self.refFileName = '{}/{}'.format(baseDirectory,
self.inFileName)

preprocessedReferenceRunName = config.get(
'runs', 'preprocessedReferenceRunName')
if preprocessedReferenceRunName != 'None':

assert(not os.path.isabs(self.inFileName))

baseDirectory = build_config_full_path(
config, 'output', 'timeSeriesSubdirectory')

self.preprocessedFileName = '{}/preprocessed_{}'.format(
baseDirectory, self.inFileName)

if not os.path.isabs(self.inFileName):
baseDirectory = build_config_full_path(
config, 'output', 'timeSeriesSubdirectory')
Expand Down Expand Up @@ -354,6 +366,13 @@ def run_task(self): # {{{
'not be plotted.')
preprocessedReferenceRunName = 'None'

# rolling mean seems to have trouble with dask data sets so we
# write out the data set and read it back as a single-file data set
# (without dask)
dsPreprocessed = dsPreprocessed.drop('xtime')
write_netcdf(dsPreprocessed, self.preprocessedFileName)
dsPreprocessed = xarray.open_dataset(self.preprocessedFileName)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwolfram, if you have time to take a look at this, it's mainly a question of seeing if you're good with this solution for converting a multi-file data set to a single file data set or if you'd suggest some other way of handling the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding from S Hoyer is that this is the preferred way to handle these types of problems. The only issue I foresee with this is that as data becomes larger, this doesn't scale too well. However, recent work has been to develop parallel writing functionality into xarray so I wouldn't worry about this in the short term.

I think this is a reasonable way to convert a multi-file dataset into a single file data set 👍


if preprocessedReferenceRunName != 'None':
color = 'r'
title = '{} \n {} (red)'.format(title,
Expand All @@ -369,13 +388,13 @@ def run_task(self): # {{{
divisionDepths] + ['btm']

# these preprocessed data are OHC *anomalies*
dsPreprocessed = compute_moving_avg(dsPreprocessed,
movingAveragePoints)
for rangeIndex in range(len(suffixes)):
variableName = '{}_{}'.format(preprocessedFieldPrefix,
suffixes[rangeIndex])
if variableName in list(dsPreprocessed.data_vars.keys()):
field = dsPreprocessed[variableName]
field = compute_moving_avg(field, movingAveragePoints)
timeSeries.append(field)
timeSeries.append(dsPreprocessed[variableName])
else:
self.logger.warning('Warning: Preprocessed variable {} '
'not found. Skipping.'.format(
Expand Down