Skip to content

Commit 0bcc06b

Browse files
authored
Merge pull request #1698 from cmu-delphi/release/indicators_v0.3.24_utils_v0.3.6
Release covidcast-indicators 0.3.24
2 parents 1e993f7 + 984d97c commit 0bcc06b

File tree

11 files changed

+257
-2
lines changed

11 files changed

+257
-2
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.3.23
2+
current_version = 0.3.24
33
commit = True
44
message = chore: bump covidcast-indicators to {new_version}
55
tag = False

ansible/templates/quidel_covidtest-params-prod.json.j2

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
"export_end_date": "",
1111
"pull_start_date": "2020-05-26",
1212
"pull_end_date":"",
13+
"backfill_dir": "/common/backfill/quidel_covidtest",
14+
"backfill_merge_day": 0,
1315
"export_day_range":40,
1416
"aws_credentials": {
1517
"aws_access_key_id": "{{ quidel_aws_access_key_id }}",
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# -*- coding: utf-8 -*-
2+
"""Store backfill data."""
3+
import os
4+
import glob
5+
from datetime import datetime
6+
7+
import pandas as pd
8+
9+
from delphi_utils import GeoMapper
10+
11+
12+
gmpr = GeoMapper()
13+
14+
def store_backfill_file(df, _end_date, backfill_dir):
15+
"""
16+
Store county level backfill data into backfill_dir.
17+
18+
Parameter:
19+
df: pd.DataFrame
20+
Pre-process file at ZipCode level
21+
_end_date: datetime
22+
The most recent date when the raw data is received
23+
backfill_dir: str
24+
specified path to store backfill files.
25+
"""
26+
backfilldata = df.copy()
27+
backfilldata = gmpr.replace_geocode(backfilldata, from_code="zip", new_code="fips",
28+
from_col="zip", new_col="fips", date_col="timestamp")
29+
backfilldata = gmpr.add_geocode(backfilldata, from_code="fips", new_code="state_id",
30+
from_col="fips", new_col="state_id")
31+
backfilldata.rename({"timestamp": "time_value",
32+
"totalTest_total": "den_total",
33+
"positiveTest_total": "num_total",
34+
"positiveTest_age_0_4": "num_age_0_4",
35+
"totalTest_age_0_4": "den_age_0_4",
36+
"positiveTest_age_5_17": "num_age_5_17",
37+
"totalTest_age_5_17": "den_age_5_17",
38+
"positiveTest_age_18_49": "num_age_18_49",
39+
"totalTest_age_18_49": "den_age_18_49",
40+
"positiveTest_age_50_64": "num_age_50_64",
41+
"totalTest_age_50_64": "den_age_50_64",
42+
"positiveTest_age_65plus": "num_age_65plus",
43+
"totalTest_age_65plus": "den_age_65plus",
44+
"positiveTest_age_0_17": "num_age_0_17",
45+
"totalTest_age_0_17": "den_age_0_17"},
46+
axis=1, inplace=True)
47+
#Store one year's backfill data
48+
_start_date = _end_date.replace(year=_end_date.year-1)
49+
selected_columns = ['time_value', 'fips', 'state_id',
50+
'den_total', 'num_total',
51+
'num_age_0_4', 'den_age_0_4',
52+
'num_age_5_17', 'den_age_5_17',
53+
'num_age_18_49', 'den_age_18_49',
54+
'num_age_50_64', 'den_age_50_64',
55+
'num_age_65plus', 'den_age_65plus',
56+
'num_age_0_17', 'den_age_0_17']
57+
backfilldata = backfilldata.loc[backfilldata["time_value"] >= _start_date,
58+
selected_columns]
59+
path = backfill_dir + \
60+
"/quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
61+
# Store intermediate file into the backfill folder
62+
backfilldata.to_parquet(path, index=False)
63+
64+
def merge_backfill_file(backfill_dir, backfill_merge_day, today,
65+
test_mode=False, check_nd=25):
66+
"""
67+
Merge ~4 weeks' backfill data into one file.
68+
69+
Usually this function should merge 28 days' data into a new file so as to
70+
save the reading time when running the backfill pipelines. We set a softer
71+
threshold to allow flexibility in data delivery.
72+
73+
Parameters
74+
----------
75+
today : datetime
76+
The most recent date when the raw data is received
77+
backfill_dir : str
78+
specified path to store backfill files.
79+
backfill_merge_day: int
80+
The day of a week that we used to merge the backfill files. e.g. 0
81+
is Monday.
82+
test_mode: bool
83+
check_nd: int
84+
The criteria of the number of unmerged files. Ideally, we want the
85+
number to be 28, but we use a looser criteria from practical
86+
considerations
87+
"""
88+
new_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of_*")
89+
if len(new_files) == 0: # if no any daily file is stored
90+
return
91+
92+
def get_date(file_link):
93+
# Keep the function here consistent with the backfill path in
94+
# function `store_backfill_file`
95+
fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1]
96+
return datetime.strptime(fn, "%Y%m%d")
97+
98+
date_list = list(map(get_date, new_files))
99+
earliest_date = min(date_list)
100+
latest_date = max(date_list)
101+
102+
# Check whether to merge
103+
# Check the number of files that are not merged
104+
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
105+
return
106+
107+
# Start to merge files
108+
pdList = []
109+
for fn in new_files:
110+
df = pd.read_parquet(fn, engine='pyarrow')
111+
issue_date = get_date(fn)
112+
df["issue_date"] = issue_date
113+
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
114+
pdList.append(df)
115+
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"])
116+
path = backfill_dir + "/quidel_covidtest_from_%s_to_%s.parquet"%(
117+
datetime.strftime(earliest_date, "%Y%m%d"),
118+
datetime.strftime(latest_date, "%Y%m%d"))
119+
merged_file.to_parquet(path, index=False)
120+
121+
# Delete daily files once we have the merged one.
122+
if not test_mode:
123+
for fn in new_files:
124+
os.remove(fn)
125+
return

quidel_covidtest/delphi_quidel_covidtest/pull.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
from .constants import AGE_GROUPS
1212

13+
14+
1315
def get_from_s3(start_date, end_date, bucket, logger):
1416
"""
1517
Get raw data from aws s3 bucket.
@@ -57,6 +59,8 @@ def get_from_s3(start_date, end_date, bucket, logger):
5759

5860
# Fetch data received on the same day
5961
for fn in s3_files[search_date]:
62+
if ".csv" not in fn:
63+
continue #Add to avoid that the folder name was readed as a fn.
6064
if fn in set(df["fname"].values):
6165
continue
6266
obj = bucket.Object(key=fn)

quidel_covidtest/delphi_quidel_covidtest/run.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
check_export_start_date,
2727
check_export_end_date,
2828
update_cache_file)
29-
29+
from .backfill import (store_backfill_file, merge_backfill_file)
3030

3131
def log_exit(start_time, stats, logger):
3232
"""Log at program exit."""
@@ -66,6 +66,7 @@ def run_module(params: Dict[str, Any]):
6666
- indicator":
6767
- "static_file_dir": str, directory name with population information
6868
- "input_cache_dir": str, directory in which to cache input data
69+
- "backfill_dir": str, directory in which to store the backfill files
6970
- "export_start_date": str, YYYY-MM-DD format of earliest date to create output
7071
- "export_end_date": str, YYYY-MM-DD format of latest date to create output or "" to create
7172
through the present
@@ -85,16 +86,24 @@ def run_module(params: Dict[str, Any]):
8586
stats = []
8687
atexit.register(log_exit, start_time, stats, logger)
8788
cache_dir = params["indicator"]["input_cache_dir"]
89+
backfill_dir = params["indicator"]["backfill_dir"]
90+
backfill_merge_day = params["indicator"]["backfill_merge_day"]
8891
export_dir = params["common"]["export_dir"]
8992
export_start_date = params["indicator"]["export_start_date"]
9093
export_end_date = params["indicator"]["export_end_date"]
9194
export_day_range = params["indicator"]["export_day_range"]
9295

9396
# Pull data and update export date
9497
df, _end_date = pull_quidel_covidtest(params["indicator"], logger)
98+
# Merge 4 weeks' data into one file to save runtime
99+
# Notice that here we don't check the _end_date(receive date)
100+
# since we always want such merging happens on a certain day of a week
101+
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
95102
if _end_date is None:
96103
logger.info("The data is up-to-date. Currently, no new data to be ingested.")
97104
return
105+
# Store the backfill intermediate file
106+
store_backfill_file(df, _end_date, backfill_dir)
98107
export_end_date = check_export_end_date(
99108
export_end_date, _end_date, END_FROM_TODAY_MINUS)
100109
export_start_date = check_export_start_date(

quidel_covidtest/params.json.template

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
"indicator": {
88
"static_file_dir": "./static",
99
"input_cache_dir": "./cache",
10+
"backfill_dir": "./backfill",
11+
"backfill_merge_day": 0,
1012
"export_start_date": "2020-05-26",
1113
"export_end_date": "",
1214
"pull_start_date": "2020-05-26",

quidel_covidtest/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
required = [
55
"numpy",
66
"pandas",
7+
"pyarrow",
78
"pydocstyle",
89
"pytest",
910
"pytest-cov",
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.csv
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import logging
2+
import os
3+
import glob
4+
from datetime import datetime
5+
6+
import pandas as pd
7+
8+
from delphi_quidel_covidtest.pull import pull_quidel_covidtest
9+
10+
from delphi_quidel_covidtest.backfill import (store_backfill_file,
11+
merge_backfill_file)
12+
13+
END_FROM_TODAY_MINUS = 5
14+
EXPORT_DAY_RANGE = 40
15+
16+
TEST_LOGGER = logging.getLogger()
17+
backfill_dir="./backfill"
18+
19+
class TestBackfill:
20+
21+
df, _end_date = pull_quidel_covidtest({
22+
"static_file_dir": "../static",
23+
"input_cache_dir": "./cache",
24+
"export_start_date": "2020-06-30",
25+
"export_end_date": "",
26+
"pull_start_date": "2020-07-09",
27+
"pull_end_date":"",
28+
"aws_credentials": {
29+
"aws_access_key_id": "",
30+
"aws_secret_access_key": ""
31+
},
32+
"bucket_name": "",
33+
"wip_signal": "",
34+
"test_mode": True
35+
}, TEST_LOGGER)
36+
37+
def test_store_backfill_file(self):
38+
39+
store_backfill_file(self.df, datetime(2020, 1, 1), backfill_dir)
40+
fn = "quidel_covidtest_as_of_20200101.parquet"
41+
assert fn in os.listdir(backfill_dir)
42+
43+
backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow')
44+
45+
selected_columns = ['time_value', 'fips', 'state_id',
46+
'den_total', 'num_total',
47+
'num_age_0_4', 'den_age_0_4',
48+
'num_age_5_17', 'den_age_5_17',
49+
'num_age_18_49', 'den_age_18_49',
50+
'num_age_50_64', 'den_age_50_64',
51+
'num_age_65plus', 'den_age_65plus',
52+
'num_age_0_17', 'den_age_0_17']
53+
assert set(selected_columns) == set(backfill_df.columns)
54+
55+
os.remove(backfill_dir + "/" + fn)
56+
assert fn not in os.listdir(backfill_dir)
57+
58+
def test_merge_backfill_file(self):
59+
60+
today = datetime.today()
61+
fn = "quidel_covidtest_from_20200817_to_20200820.parquet"
62+
assert fn not in os.listdir(backfill_dir)
63+
64+
# Check the when no daily file stored
65+
today = datetime(2020, 8, 20)
66+
merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8)
67+
assert fn not in os.listdir(backfill_dir)
68+
69+
for d in range(17, 21):
70+
dropdate = datetime(2020, 8, d)
71+
store_backfill_file(self.df, dropdate, backfill_dir)
72+
73+
# Check the when the merged file is not generated
74+
today = datetime(2020, 8, 20)
75+
merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8)
76+
assert fn not in os.listdir(backfill_dir)
77+
78+
# Generate the merged file, but not delete it
79+
merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=2)
80+
assert fn in os.listdir(backfill_dir)
81+
82+
# Read daily file
83+
new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet")
84+
pdList = []
85+
for file in new_files:
86+
if "from" in file:
87+
continue
88+
df = pd.read_parquet(file, engine='pyarrow')
89+
issue_date = datetime.strptime(file[-16:-8], "%Y%m%d")
90+
df["issue_date"] = issue_date
91+
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
92+
pdList.append(df)
93+
os.remove(file)
94+
new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet")
95+
assert len(new_files) == 1
96+
97+
expected = pd.concat(pdList).sort_values(["time_value", "fips"])
98+
99+
# Read the merged file
100+
merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow')
101+
102+
assert set(expected.columns) == set(merged.columns)
103+
assert expected.shape[0] == merged.shape[0]
104+
assert expected.shape[1] == merged.shape[1]
105+
106+
os.remove(backfill_dir + "/" + fn)
107+
assert fn not in os.listdir(backfill_dir)
108+

quidel_covidtest/tests/test_pull.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,4 @@ def test_check_export_start_date(self):
103103
expected = [datetime(2020, 5, 26), datetime(2020, 6, 20), datetime(2020, 5, 26)]
104104

105105
assert tested == expected
106+

quidel_covidtest/tests/test_run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ class TestRun:
2121
"indicator": {
2222
"static_file_dir": "../static",
2323
"input_cache_dir": "./cache",
24+
"backfill_dir": "./backfill",
25+
"backfill_merge_day": 0,
2426
"export_start_date": "2020-06-30",
2527
"export_end_date": "",
2628
"pull_start_date": "2020-07-09",

0 commit comments

Comments
 (0)