Skip to content

Commit d395a2a

Browse files
committed
feat(contrib): init tool upload-kktix-ticket-csv-to-bigquery.py
The tool is used to upload the ticket information exported from kktix to bigquery, and pre-process the "raw" data to be more bigquery friendly in column naming of tables. It's dangerous to upload data by default. So, we use dry-run mode by default. We would like to make the column names as much consistent as possible across years, so we use some heuristic. We may need to maintain the heuristic annually. Luckily and ideally, the annual maintanence will be one-off.
1 parent 9e72de2 commit d395a2a

File tree

3 files changed

+624
-50
lines changed

3 files changed

+624
-50
lines changed
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
#!/usr/bin/env python3
2+
import argparse
3+
import re
4+
import hashlib
5+
import logging
6+
7+
import unittest
8+
9+
import pandas as pd
10+
from google.cloud import bigquery
11+
12+
13+
CANONICAL_COLUMN_NAMES = [
14+
"ticket_type",
15+
"payment_status",
16+
"tags",
17+
"paid_date",
18+
"price",
19+
"invoice_policy",
20+
"invoiced_company_name",
21+
"unified_business_no",
22+
"dietary_habit",
23+
"years_of_using_python",
24+
"area_of_interest",
25+
"organization",
26+
"job_title",
27+
"country_or_region",
28+
"departure_from_region",
29+
"how_did_you_know_pycon_tw",
30+
"have_you_ever_attended_pycon_tw",
31+
"know_financial_aid",
32+
"gender",
33+
"pynight_attendee_numbers",
34+
"pynight_attending_or_not",
35+
"email_from_sponsor",
36+
"email_to_sponsor",
37+
"ive_already_read_and_i_accept_the_epidemic_prevention_of_pycon_tw",
38+
"ive_already_read_and_i_accept_the_privacy_policy_of_pycon_tw",
39+
"email",
40+
]
41+
42+
HEURISTIC_COMPATIBLE_MAPPING_TABLE = {
43+
# from 2020 reformatted column names
44+
"years_of_using_python_python": "years_of_using_python",
45+
"company_for_students_or_teachers_fill_in_the_school_department_name": "organization",
46+
"invoiced_company_name_optional": "invoiced_company_name",
47+
"unified_business_no_optional": "unified_business_no",
48+
"job_title_if_you_are_a_student_fill_in_student": "job_title",
49+
"come_from": "country_or_region",
50+
"departure_from_regions": "departure_from_region",
51+
"how_did_you_find_out_pycon_tw_pycon_tw": "how_did_you_know_pycon_tw",
52+
"have_you_ever_attended_pycon_tw_pycon_tw": "have_you_ever_attended_pycon_tw",
53+
"privacy_policy_of_pycon_tw_2020_pycon_tw_2020_bitly3eipaut": "privacy_policy_of_pycon_tw",
54+
"ive_already_read_and_i_accept_the_privacy_policy_of_pycon_tw_2020_pycon_tw_2020": "ive_already_read_and_i_accept_the_privacy_policy_of_pycon_tw",
55+
"ive_already_read_and_i_accept_the_epidemic_prevention_of_pycon_tw_2020_pycon_tw_2020_covid19": "ive_already_read_and_i_accept_the_epidemic_prevention_of_pycon_tw",
56+
"do_you_know_we_have_financial_aid_this_year": "know_financial_aid",
57+
"contact_email": "email",
58+
# from 2020 reformatted column names which made it duplicate
59+
"PyNight 參加意願僅供統計人數,實際是否舉辦需由官方另行公告": "pynight_attendee_numbers",
60+
"PyNight 參加意願": "pynight_attending_or_not",
61+
"是否願意收到贊助商轉發 Email 訊息": "email_from_sponsor",
62+
"是否願意提供 Email 給贊助商": "email_to_sponsor",
63+
}
64+
65+
66+
logging.basicConfig(level=logging.INFO)
67+
68+
69+
def upload_dataframe_to_bigquery(
70+
df: pd.DataFrame, project_id: str, dataset_name: str, table_name: str
71+
) -> None:
72+
client = bigquery.Client(project=project_id)
73+
74+
dataset_ref = bigquery.dataset.DatasetReference(project_id, dataset_name)
75+
table_ref = bigquery.table.TableReference(dataset_ref, table_name)
76+
77+
# dump the csv into bigquery
78+
job = client.load_table_from_dataframe(df, table_ref)
79+
80+
job.result()
81+
82+
logging.info(
83+
"Loaded {} rows into {}:{}.".format(job.output_rows, dataset_name, table_name)
84+
)
85+
86+
87+
def reserved_alphabet_space_underscore(string_as_is: str) -> str:
88+
regex = re.compile("[^a-zA-Z 0-9_]")
89+
return regex.sub("", string_as_is)
90+
91+
92+
def reserved_only_one_space_between_words(string_as_is: str) -> str:
93+
string_as_is = string_as_is.strip()
94+
# two or more space between two words
95+
# \w : word characters, a.k.a. alphanumeric and underscore
96+
match = re.search("\w[ ]{2,}\w", string_as_is)
97+
98+
if not match:
99+
return string_as_is
100+
101+
regex = re.compile("\s+")
102+
string_as_is = regex.sub(" ", string_as_is)
103+
104+
return string_as_is
105+
106+
107+
def get_reformatted_style_columns(columns: dict) -> dict:
108+
reformatted_columns = {}
109+
for key, column_name in columns.items():
110+
reformatted_column_name = reserved_alphabet_space_underscore(column_name)
111+
reformatted_column_name = reserved_only_one_space_between_words(
112+
reformatted_column_name
113+
)
114+
reformatted_column_name = reformatted_column_name.replace(" ", "_")
115+
reformatted_column_name = reformatted_column_name.lower()
116+
117+
reformatted_columns[key] = reformatted_column_name
118+
119+
return reformatted_columns
120+
121+
122+
def find_reformat_none_unique(columns: dict) -> dict:
123+
# reverse key-value of original dict to be value-key of reverse_dict
124+
reverse_dict = {}
125+
126+
for key, value in columns.items():
127+
reverse_dict.setdefault(value, set()).add(key)
128+
129+
result = [key for key, values in reverse_dict.items() if len(values) > 1]
130+
131+
return result
132+
133+
134+
def apply_compatible_mapping_name(columns: dict) -> dict:
135+
"""Unify names with a heuristic hash table"""
136+
updated_columns = apply_heuristic_name(columns)
137+
138+
return updated_columns
139+
140+
141+
def apply_heuristic_name(columns: dict) -> dict:
142+
updated_columns = dict(columns)
143+
144+
for candidate in HEURISTIC_COMPATIBLE_MAPPING_TABLE.keys():
145+
for key, value in columns.items():
146+
if candidate == value:
147+
candidate_value = HEURISTIC_COMPATIBLE_MAPPING_TABLE[candidate]
148+
updated_columns[key] = candidate_value
149+
150+
return updated_columns
151+
152+
153+
def init_rename_column_dict(columns_array: pd.core.indexes.base.Index) -> dict:
154+
columns_dict = {}
155+
156+
for item in columns_array:
157+
columns_dict[item] = item
158+
159+
return columns_dict
160+
161+
162+
def sanitize_column_names(df: pd.DataFrame) -> pd.DataFrame:
163+
"""
164+
Pre-process the column names of raw data
165+
166+
Pre-checking rules of column name black list and re-formatting if necessary.
167+
168+
The sanitized pre-process of data should follow the following rules:
169+
1. style of column name (which follows general SQL conventions)
170+
1-1. singular noun
171+
1-2. lower case
172+
1-3. snake-style (underscore-separated words)
173+
1-4. full word (if possible) except common abbreviations
174+
2. a column name SHOULD be unique
175+
3. backward compatible with column names in the past years
176+
"""
177+
rename_column_dict = init_rename_column_dict(df.columns)
178+
179+
# apply possible heuristic name if possible
180+
# this is mainly meant to resolve style-reformatted names duplicate conflicts
181+
applied_heuristic_columns = apply_heuristic_name(rename_column_dict)
182+
183+
# pre-process of style of column name
184+
style_reformatted_columns = get_reformatted_style_columns(applied_heuristic_columns)
185+
df.rename(columns=style_reformatted_columns)
186+
187+
# pre-process of name uniqueness
188+
duplicate_column_names = find_reformat_none_unique(style_reformatted_columns)
189+
logging.info(
190+
f"Found the following duplicate column names: {duplicate_column_names}"
191+
)
192+
193+
# pre-process of backward compatibility
194+
compatible_columns = apply_compatible_mapping_name(style_reformatted_columns)
195+
196+
return df.rename(columns=compatible_columns)
197+
198+
199+
def hash_string(string_to_hash: str) -> str:
200+
sha = hashlib.sha256()
201+
sha.update(string_to_hash.encode("utf-8"))
202+
string_hashed = sha.hexdigest()
203+
204+
return string_hashed
205+
206+
207+
def hash_privacy_info(df: pd.DataFrame) -> None:
208+
df["email"] = df["email"].apply(hash_string)
209+
210+
211+
def main():
212+
"""
213+
Commandline entrypoint
214+
"""
215+
parser = argparse.ArgumentParser(
216+
description="Sanitize ticket CSV and upload to BigQuery"
217+
)
218+
219+
parser.add_argument(
220+
"csv_file", type=str, help="Ticket CSV file",
221+
)
222+
223+
parser.add_argument("-p", "--project-id", help="BigQuery project ID")
224+
225+
parser.add_argument(
226+
"-d", "--dataset-name", help="BigQuery dataset name to create or append"
227+
)
228+
229+
parser.add_argument(
230+
"-t", "--table-name", help="BigQuery table name to create or append"
231+
)
232+
233+
parser.add_argument(
234+
"--upload",
235+
action="store_true",
236+
help="Parsing the file but not upload it",
237+
default=False,
238+
)
239+
240+
args = parser.parse_args()
241+
242+
# load the csv into bigquery
243+
df = pd.read_csv(args.csv_file)
244+
sanitized_df = sanitize_column_names(df)
245+
hash_privacy_info(sanitized_df)
246+
247+
if args.upload:
248+
upload_dataframe_to_bigquery(
249+
sanitized_df, args.project_id, args.dataset_name, args.table_name
250+
)
251+
else:
252+
logging.info("Dry-run mode. Data will not be uploaded.")
253+
logging.info("Column names (as-is):")
254+
logging.info(df.columns)
255+
logging.info("")
256+
logging.info("Column names (to-be):")
257+
logging.info(sanitized_df.columns)
258+
259+
return sanitized_df.columns
260+
261+
262+
class Test2020Ticket(unittest.TestCase):
263+
"""python -m unittest upload-kktix-ticket-csv-to-bigquery.py"""
264+
265+
CANONICAL_COLUMN_NAMES_2020 = [
266+
"ticket_type",
267+
"payment_status",
268+
"tags",
269+
"paid_date",
270+
"price",
271+
"invoice_policy",
272+
"invoiced_company_name_optional",
273+
"unified_business_no_optional",
274+
"dietary_habit",
275+
"years_of_using_python",
276+
"area_of_interest",
277+
"organization",
278+
"job_role",
279+
"country_or_region",
280+
"departure_from_region",
281+
"how_did_you_know_pycon_tw",
282+
"have_you_ever_attended_pycon_tw",
283+
"do_you_know_we_have_financial_aid_this_year",
284+
"gender",
285+
"pynight_attendee_numbers",
286+
"pynight_attending_or_not",
287+
"email_from_sponsor",
288+
"email_to_sponsor",
289+
"privacy_policy_of_pycon_tw",
290+
"ive_already_read_and_i_accept_the_privacy_policy_of_pycon_tw",
291+
]
292+
293+
@classmethod
294+
def setUpClass(cls):
295+
cls.df = pd.read_csv("./data/corporate-attendees.csv")
296+
cls.sanitized_df = sanitize_column_names(cls.df)
297+
298+
def test_column_number(self):
299+
assert len(self.sanitized_df.columns) == 26
300+
301+
def test_column_title_content(self):
302+
for column in self.sanitized_df.columns:
303+
if column not in CANONICAL_COLUMN_NAMES:
304+
logging.info(f"{column} is not in the canonical table.")
305+
assert False
306+
307+
def test_column_content(self):
308+
assert self.sanitized_df["ticket_type"][1] == "Regular 原價"
309+
310+
def test_hash(self):
311+
string_hashed = hash_string("1234567890-=qwertyuiop[]")
312+
313+
assert (
314+
string_hashed
315+
== "aefefa43927b374a9af62ab60e4512e86f974364919d1b09d0013254c667e512"
316+
)
317+
318+
def test_hash_email(self):
319+
hash_privacy_info(self.sanitized_df)
320+
321+
assert (
322+
self.sanitized_df["email"][1]
323+
== "caecbd114bfa0cc3fd43f2a68ce52a8a92141c6bca87e0418d4833af56e504f1"
324+
)
325+
326+
327+
if __name__ == "__main__":
328+
main()

0 commit comments

Comments
 (0)