Skip to content

Commit 6dc9d57

Browse files
committed
feat(contrib): init tool upload-kktix-ticket-csv-to-bigquery.py
The tool is used to upload the ticket information exported from kktix to bigquery, and pre-process the "raw" data to be more bigquery friendly in column naming of tables. It's dangerous to upload data by default. So, we use dry-run mode by default. We would like to make the column names as much consistent as possible across years, so we use some heuristic. We may need to maintain the heuristic annually. Luckily and ideally, the annual maintanence will be one-off.
1 parent 9e72de2 commit 6dc9d57

File tree

3 files changed

+622
-50
lines changed

3 files changed

+622
-50
lines changed
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
#!/usr/bin/env python3
2+
import argparse
3+
import hashlib
4+
import logging
5+
import re
6+
import unittest
7+
8+
import pandas as pd
9+
from google.cloud import bigquery
10+
11+
CANONICAL_COLUMN_NAMES = [
12+
"ticket_type",
13+
"payment_status",
14+
"tags",
15+
"paid_date",
16+
"price",
17+
"invoice_policy",
18+
"invoiced_company_name",
19+
"unified_business_no",
20+
"dietary_habit",
21+
"years_of_using_python",
22+
"area_of_interest",
23+
"organization",
24+
"job_title",
25+
"country_or_region",
26+
"departure_from_region",
27+
"how_did_you_know_pycon_tw",
28+
"have_you_ever_attended_pycon_tw",
29+
"know_financial_aid",
30+
"gender",
31+
"pynight_attendee_numbers",
32+
"pynight_attending_or_not",
33+
"email_from_sponsor",
34+
"email_to_sponsor",
35+
"ive_already_read_and_i_accept_the_epidemic_prevention_of_pycon_tw",
36+
"ive_already_read_and_i_accept_the_privacy_policy_of_pycon_tw",
37+
"email",
38+
]
39+
40+
HEURISTIC_COMPATIBLE_MAPPING_TABLE = {
41+
# from 2020 reformatted column names
42+
"years_of_using_python_python": "years_of_using_python",
43+
"company_for_students_or_teachers_fill_in_the_school_department_name": "organization",
44+
"invoiced_company_name_optional": "invoiced_company_name",
45+
"unified_business_no_optional": "unified_business_no",
46+
"job_title_if_you_are_a_student_fill_in_student": "job_title",
47+
"come_from": "country_or_region",
48+
"departure_from_regions": "departure_from_region",
49+
"how_did_you_find_out_pycon_tw_pycon_tw": "how_did_you_know_pycon_tw",
50+
"have_you_ever_attended_pycon_tw_pycon_tw": "have_you_ever_attended_pycon_tw",
51+
"privacy_policy_of_pycon_tw_2020_pycon_tw_2020_bitly3eipaut": "privacy_policy_of_pycon_tw",
52+
"ive_already_read_and_i_accept_the_privacy_policy_of_pycon_tw_2020_pycon_tw_2020": "ive_already_read_and_i_accept_the_privacy_policy_of_pycon_tw",
53+
"ive_already_read_and_i_accept_the_epidemic_prevention_of_pycon_tw_2020_pycon_tw_2020_covid19": "ive_already_read_and_i_accept_the_epidemic_prevention_of_pycon_tw",
54+
"do_you_know_we_have_financial_aid_this_year": "know_financial_aid",
55+
"contact_email": "email",
56+
# from 2020 reformatted column names which made it duplicate
57+
"PyNight 參加意願僅供統計人數,實際是否舉辦需由官方另行公告": "pynight_attendee_numbers",
58+
"PyNight 參加意願": "pynight_attending_or_not",
59+
"是否願意收到贊助商轉發 Email 訊息": "email_from_sponsor",
60+
"是否願意提供 Email 給贊助商": "email_to_sponsor",
61+
}
62+
63+
64+
logging.basicConfig(level=logging.INFO)
65+
66+
67+
def upload_dataframe_to_bigquery(
68+
df: pd.DataFrame, project_id: str, dataset_name: str, table_name: str
69+
) -> None:
70+
client = bigquery.Client(project=project_id)
71+
72+
dataset_ref = bigquery.dataset.DatasetReference(project_id, dataset_name)
73+
table_ref = bigquery.table.TableReference(dataset_ref, table_name)
74+
75+
# dump the csv into bigquery
76+
job = client.load_table_from_dataframe(df, table_ref)
77+
78+
job.result()
79+
80+
logging.info(
81+
"Loaded {} rows into {}:{}.".format(job.output_rows, dataset_name, table_name)
82+
)
83+
84+
85+
def reserved_alphabet_space_underscore(string_as_is: str) -> str:
86+
regex = re.compile("[^a-zA-Z 0-9_]")
87+
return regex.sub("", string_as_is)
88+
89+
90+
def reserved_only_one_space_between_words(string_as_is: str) -> str:
91+
string_as_is = string_as_is.strip()
92+
# two or more space between two words
93+
# \w : word characters, a.k.a. alphanumeric and underscore
94+
match = re.search(r"\w[ ]{2,}\w", string_as_is)
95+
96+
if not match:
97+
return string_as_is
98+
99+
regex = re.compile(r"\s+")
100+
string_as_is = regex.sub(" ", string_as_is)
101+
102+
return string_as_is
103+
104+
105+
def get_reformatted_style_columns(columns: dict) -> dict:
106+
reformatted_columns = {}
107+
for key, column_name in columns.items():
108+
reformatted_column_name = reserved_alphabet_space_underscore(column_name)
109+
reformatted_column_name = reserved_only_one_space_between_words(
110+
reformatted_column_name
111+
)
112+
reformatted_column_name = reformatted_column_name.replace(" ", "_")
113+
reformatted_column_name = reformatted_column_name.lower()
114+
115+
reformatted_columns[key] = reformatted_column_name
116+
117+
return reformatted_columns
118+
119+
120+
def find_reformat_none_unique(columns: dict) -> list:
121+
# reverse key-value of original dict to be value-key of reverse_dict
122+
reverse_dict = {}
123+
124+
for key, value in columns.items():
125+
reverse_dict.setdefault(value, set()).add(key)
126+
127+
result = [key for key, values in reverse_dict.items() if len(values) > 1]
128+
129+
return result
130+
131+
132+
def apply_compatible_mapping_name(columns: dict) -> dict:
133+
"""Unify names with a heuristic hash table"""
134+
updated_columns = apply_heuristic_name(columns)
135+
136+
return updated_columns
137+
138+
139+
def apply_heuristic_name(columns: dict) -> dict:
140+
updated_columns = dict(columns)
141+
142+
for candidate in HEURISTIC_COMPATIBLE_MAPPING_TABLE.keys():
143+
for key, value in columns.items():
144+
if candidate == value:
145+
candidate_value = HEURISTIC_COMPATIBLE_MAPPING_TABLE[candidate]
146+
updated_columns[key] = candidate_value
147+
148+
return updated_columns
149+
150+
151+
def init_rename_column_dict(columns_array: pd.core.indexes.base.Index) -> dict:
152+
columns_dict = {}
153+
154+
for item in columns_array:
155+
columns_dict[item] = item
156+
157+
return columns_dict
158+
159+
160+
def sanitize_column_names(df: pd.DataFrame) -> pd.DataFrame:
161+
"""
162+
Pre-process the column names of raw data
163+
164+
Pre-checking rules of column name black list and re-formatting if necessary.
165+
166+
The sanitized pre-process of data should follow the following rules:
167+
1. style of column name (which follows general SQL conventions)
168+
1-1. singular noun
169+
1-2. lower case
170+
1-3. snake-style (underscore-separated words)
171+
1-4. full word (if possible) except common abbreviations
172+
2. a column name SHOULD be unique
173+
3. backward compatible with column names in the past years
174+
"""
175+
rename_column_dict = init_rename_column_dict(df.columns)
176+
177+
# apply possible heuristic name if possible
178+
# this is mainly meant to resolve style-reformatted names duplicate conflicts
179+
applied_heuristic_columns = apply_heuristic_name(rename_column_dict)
180+
181+
# pre-process of style of column name
182+
style_reformatted_columns = get_reformatted_style_columns(applied_heuristic_columns)
183+
df.rename(columns=style_reformatted_columns)
184+
185+
# pre-process of name uniqueness
186+
duplicate_column_names = find_reformat_none_unique(style_reformatted_columns)
187+
logging.info(
188+
f"Found the following duplicate column names: {duplicate_column_names}"
189+
)
190+
191+
# pre-process of backward compatibility
192+
compatible_columns = apply_compatible_mapping_name(style_reformatted_columns)
193+
194+
return df.rename(columns=compatible_columns)
195+
196+
197+
def hash_string(string_to_hash: str) -> str:
198+
sha = hashlib.sha256()
199+
sha.update(string_to_hash.encode("utf-8"))
200+
string_hashed = sha.hexdigest()
201+
202+
return string_hashed
203+
204+
205+
def hash_privacy_info(df: pd.DataFrame) -> None:
206+
df["email"] = df["email"].apply(hash_string)
207+
208+
209+
def main():
210+
"""
211+
Commandline entrypoint
212+
"""
213+
parser = argparse.ArgumentParser(
214+
description="Sanitize ticket CSV and upload to BigQuery"
215+
)
216+
217+
parser.add_argument(
218+
"csv_file", type=str, help="Ticket CSV file",
219+
)
220+
221+
parser.add_argument("-p", "--project-id", help="BigQuery project ID")
222+
223+
parser.add_argument(
224+
"-d", "--dataset-name", help="BigQuery dataset name to create or append"
225+
)
226+
227+
parser.add_argument(
228+
"-t", "--table-name", help="BigQuery table name to create or append"
229+
)
230+
231+
parser.add_argument(
232+
"--upload",
233+
action="store_true",
234+
help="Parsing the file but not upload it",
235+
default=False,
236+
)
237+
238+
args = parser.parse_args()
239+
240+
# load the csv into bigquery
241+
df = pd.read_csv(args.csv_file)
242+
sanitized_df = sanitize_column_names(df)
243+
hash_privacy_info(sanitized_df)
244+
245+
if args.upload:
246+
upload_dataframe_to_bigquery(
247+
sanitized_df, args.project_id, args.dataset_name, args.table_name
248+
)
249+
else:
250+
logging.info("Dry-run mode. Data will not be uploaded.")
251+
logging.info("Column names (as-is):")
252+
logging.info(df.columns)
253+
logging.info("")
254+
logging.info("Column names (to-be):")
255+
logging.info(sanitized_df.columns)
256+
257+
return sanitized_df.columns
258+
259+
260+
class Test2020Ticket(unittest.TestCase):
261+
"""python -m unittest upload-kktix-ticket-csv-to-bigquery.py"""
262+
263+
CANONICAL_COLUMN_NAMES_2020 = [
264+
"ticket_type",
265+
"payment_status",
266+
"tags",
267+
"paid_date",
268+
"price",
269+
"invoice_policy",
270+
"invoiced_company_name_optional",
271+
"unified_business_no_optional",
272+
"dietary_habit",
273+
"years_of_using_python",
274+
"area_of_interest",
275+
"organization",
276+
"job_role",
277+
"country_or_region",
278+
"departure_from_region",
279+
"how_did_you_know_pycon_tw",
280+
"have_you_ever_attended_pycon_tw",
281+
"do_you_know_we_have_financial_aid_this_year",
282+
"gender",
283+
"pynight_attendee_numbers",
284+
"pynight_attending_or_not",
285+
"email_from_sponsor",
286+
"email_to_sponsor",
287+
"privacy_policy_of_pycon_tw",
288+
"ive_already_read_and_i_accept_the_privacy_policy_of_pycon_tw",
289+
]
290+
291+
@classmethod
292+
def setUpClass(cls):
293+
cls.df = pd.read_csv("./data/corporate-attendees.csv")
294+
cls.sanitized_df = sanitize_column_names(cls.df)
295+
296+
def test_column_number(self):
297+
assert len(self.sanitized_df.columns) == 26
298+
299+
def test_column_title_content(self):
300+
for column in self.sanitized_df.columns:
301+
if column not in CANONICAL_COLUMN_NAMES:
302+
logging.info(f"{column} is not in the canonical table.")
303+
assert False
304+
305+
def test_column_content(self):
306+
assert self.sanitized_df["ticket_type"][1] == "Regular 原價"
307+
308+
def test_hash(self):
309+
string_hashed = hash_string("1234567890-=qwertyuiop[]")
310+
311+
assert (
312+
string_hashed
313+
== "aefefa43927b374a9af62ab60e4512e86f974364919d1b09d0013254c667e512"
314+
)
315+
316+
def test_hash_email(self):
317+
hash_privacy_info(self.sanitized_df)
318+
319+
assert (
320+
self.sanitized_df["email"][1]
321+
== "caecbd114bfa0cc3fd43f2a68ce52a8a92141c6bca87e0418d4833af56e504f1"
322+
)
323+
324+
325+
if __name__ == "__main__":
326+
main()

0 commit comments

Comments
 (0)