Skip to content

Commit 1e993f7

Browse files
authored
Merge pull request #1693 from cmu-delphi/release/indicators_v0.3.23_utils_v0.3.6
Release covidcast-indicators 0.3.23
2 parents cfde689 + bc2d27e commit 1e993f7

9 files changed

+33
-9
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.3.22
2+
current_version = 0.3.23
33
commit = True
44
message = chore: bump covidcast-indicators to {new_version}
55
tag = False

changehc/delphi_changehc/backfill.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
from datetime import datetime
1010
# third party
1111
import pandas as pd
12+
from delphi_utils import GeoMapper
1213

14+
gmpr = GeoMapper()
1315

1416
def store_backfill_file(df, _end_date, backfill_dir, numtype, geo, weekday):
1517
"""
@@ -35,17 +37,19 @@ def store_backfill_file(df, _end_date, backfill_dir, numtype, geo, weekday):
3537
return
3638

3739
backfilldata = df.reset_index().copy()
40+
backfilldata = gmpr.add_geocode(backfilldata, from_code="fips", new_code="state_id",
41+
from_col="fips", new_col="state_id")
3842
backfilldata.rename({"timestamp": "time_value"}, axis=1, inplace=True)
3943
#Store one year's backfill data
4044
_start_date = _end_date.replace(year=_end_date.year-1)
41-
selected_columns = ['time_value', 'fips',
45+
selected_columns = ['time_value', 'fips', 'state_id',
4246
'num', 'den']
4347
backfilldata = backfilldata.loc[backfilldata["time_value"] >= _start_date,
4448
selected_columns]
4549
path = backfill_dir + \
4650
"/changehc_%s_as_of_%s.parquet"%(numtype, datetime.strftime(_end_date, "%Y%m%d"))
4751
# Store intermediate file into the backfill folder
48-
backfilldata.to_parquet(path)
52+
backfilldata.to_parquet(path, index=False)
4953

5054
def merge_backfill_file(backfill_dir, numtype, geo, weekday, backfill_merge_day,
5155
today, test_mode=False, check_nd=25):
@@ -83,6 +87,9 @@ def merge_backfill_file(backfill_dir, numtype, geo, weekday, backfill_merge_day,
8387

8488
new_files = glob.glob(backfill_dir + "/changehc_%s_as_of_*"%numtype)
8589

90+
if len(new_files) == 0: # if no any daily file is stored
91+
return
92+
8693
def get_date(file_link):
8794
# Keep the function here consistent with the backfill path in
8895
# function `store_backfill_file`
@@ -111,7 +118,7 @@ def get_date(file_link):
111118
numtype,
112119
datetime.strftime(earliest_date, "%Y%m%d"),
113120
datetime.strftime(latest_date, "%Y%m%d"))
114-
merged_file.to_parquet(path)
121+
merged_file.to_parquet(path, index=False)
115122

116123
# Delete daily files once we have the merged one.
117124
if not test_mode:

changehc/tests/backfill/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

changehc/tests/test_backfill.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def test_store_backfill_file(self):
6868
fn = "changehc_covid_as_of_20200101.parquet"
6969
backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow')
7070

71-
selected_columns = ['time_value', 'fips',
71+
selected_columns = ['time_value', 'fips', 'state_id',
7272
'num', 'den']
7373
assert set(selected_columns) == set(backfill_df.columns)
7474

@@ -77,35 +77,51 @@ def test_store_backfill_file(self):
7777

7878
def test_merge_backfill_file(self):
7979

80-
today = datetime.today()
8180
geo = "county"
8281
weekday = False
8382
numtype = "covid"
8483

85-
new_files = glob.glob(backfill_dir + "/changehc_%s*.parquet"%numtype)
84+
today = datetime(2020, 6, 4)
8685
fn = "changehc_covid_from_20200601_to_20200604.parquet"
8786
assert fn not in os.listdir(backfill_dir)
8887

88+
merge_backfill_file(backfill_dir, numtype, geo, weekday, today.weekday(),
89+
today, test_mode=True, check_nd=2)
90+
assert fn not in os.listdir(backfill_dir)
91+
92+
# Generate backfill daily files
93+
for d in range(1, 5):
94+
dropdate = datetime(2020, 6, d)
95+
store_backfill_file(combined_data, dropdate, backfill_dir, \
96+
numtype, geo, weekday)
97+
98+
8999
# Check the when the merged file is not generated
90100
today = datetime(2020, 6, 4)
91101
merge_backfill_file(backfill_dir, numtype, geo, weekday, today.weekday(),
92102
today, test_mode=True, check_nd=8)
93103
assert fn not in os.listdir(backfill_dir)
94104

95-
# Generate the merged file, but not delete it
105+
# Generate the merged file, but not delete it
96106
merge_backfill_file(backfill_dir, numtype, geo, weekday, today.weekday(),
97107
today, test_mode=True, check_nd=2)
98108
assert fn in os.listdir(backfill_dir)
99109

100110
# Read daily file
111+
new_files = glob.glob(backfill_dir + "/changehc_%s*.parquet"%numtype)
101112
pdList = []
102113
for file in new_files:
114+
if "from" in file:
115+
continue
103116
df = pd.read_parquet(file, engine='pyarrow')
104117
issue_date = datetime.strptime(file[-16:-8], "%Y%m%d")
105118
df["issue_date"] = issue_date
106119
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
107120
pdList.append(df)
108-
121+
os.remove(file)
122+
new_files = glob.glob(backfill_dir + "/changehc_%s*.parquet"%numtype)
123+
assert len(new_files) == 1
124+
109125
expected = pd.concat(pdList).sort_values(["time_value", "fips"])
110126

111127
# Read the merged file

0 commit comments

Comments
 (0)