Imagine a file in the following fixed format:
<unique record identifier><white_space><numeric value>
e.g.
1426828011 9
1426828028 350
1426828037 25
1426828056 231
1426828058 109
1426828066 111
.
.
.
Write a program that reads from 'stdin' the contents of a file, and optionally accepts the absolute path of a file from the command line. The file/stdin stream is expected to be in the above format. You may assume that the numeric value fits within a 32 bit integer. The output should be a list of the unique ids associated with the X-largest values in the rightmost column, where X is specified by an input parameter. For example, given the input data above and X=3, the following would be valid output:
1426828028
1426828066
1426828056
Note that the output does not need to be in any particular order. Multiple instances of the same numeric value count as distinct records of the total X. So if we have 4 records with values: 200, 200, 115, 110 and X=2 then the result must consist of the two IDs that point to 200 and 200 and no more. Please think about the solution you submit as a production library (API / documentation), or something you would submit for code review to a large project.
Your solution should take into account extremely large files.
In order to resolve the TRI-AD Challenge I built this Python project with the following features:
- python 3.8.2
- a very extensive collection of unit tests with pytest
- structured logging with structlog
- project packaged with Python wheel
- a command line interface using argparse
- a docker machinery plus a makefile handler to allow running this app containerized
- Python 3.8.2
- pip
- virtualenv
- virtualenvwrapper
- docker
- make
This project can be used in three forms:
- as a
standlone applicationusing its command line interface (CLI) - as a
dockerized standlone applicationusing the CLI - as a
libraryimporting the package into a third part project
In your terminal, execute the following commands:
# 1. Create a virtualenv
mkvirtualenv -p python3.8 triad
# 2. Activate virtualenv
workon triad
# 3. Install requirements
pip install -r requirements.txt
# 4. Install triad package
python setup.py installNote: from here onwards we will assume triad virtualenv is always active.
Execute triad, the following helper usage should be displayed:
usage: triad [-h] -x X_LARGEST [-f FILE] [-a {naive,partitions}] [-c CHUNKSIZE] [-d] [-v]
triad: error: the following arguments are required: -x/--x-largestThis section will show the different options the app's cli provides, and how to execute it.
Notes:
- Besides showing the expected results, the app has by default enabled its
logging facility to
stdoutwithloglevel INFO. You can disable this stdout logs removingstdoutfrom theroot handlerintriad/logging.ymlconfiguration file. - some repetitive output is folded with three dots (
...).
Execute:
triad --helpThis should be displayed:
usage: triad [-h] -x X_LARGEST [-f FILE] [-a {naive,partitions}] [-c CHUNKSIZE] [-d] [-v]
optional arguments:
-h, --help show this help message and exit
-x X_LARGEST, --x-largest X_LARGEST
X largest values to find in the input data.
-f FILE, --file FILE Path file of the input data (if is not provided, the input data will be read from stdin).
-a {naive,partitions}, --algorithm {naive,partitions}
Set the algorithm used for processing the input data.
-c CHUNKSIZE, --chunksize CHUNKSIZE
Set the chunk size for reading the input data (default to 200000 lines).
-d, --disable-validations
No validations will be performed to the input data.
-v, --verbose Set log level to DEBUG.If you don't, create an input_data and input_data_malformed files:
tee input_data <<HEREDOC
1426828011 9
1426828028 350
1426828037 25
1426828056 231
1426828058 109
1426828066 111
HEREDOCtee input_data_malformed <<HEREDOC
1426828011 9
a1426828028 350
1426828037b 25
1426828056 231
1426828058 109d
1426828066 111
HEREDOCtriad -f input_data -x 1The output should be:
[triad.algorithms.naive] 2020-07-02 19:32:17,211: INFO uuid='e26bf8ac-bcb3-11ea-b4c9-60e3acafd2f7' event='Using X-largest parameter' X=1
[triad.algorithms.naive] 2020-07-02 19:32:17,223: INFO uuid='e26bf8ac-bcb3-11ea-b4c9-60e3acafd2f7' event='Input was processed !' total_lines=6
1426828028
[triad] 2020-07-02 19:32:17,224: INFO uuid='e26bf8ac-bcb3-11ea-b4c9-60e3acafd2f7' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}cat input_data | triad -x 1The output should be:
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=1
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=6
1426828028
[triad] 2020-07-02 19:32:17,224: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}Note: reading from stdin in interactive mode is not supported (it doesn't
make sense in the context of this Challenge), so, executing triad -x 1 will show:
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=1
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'FAILED', 'reason': 'Stdin interactive mode is not supported'}This project provides two algorithms in order to process the input:
- naive
- partitions
Use partitions:
triad -f input_data -x 3 -a partitionsOutput:
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.partitions] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=6
1426828066
1426828056
1426828028
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}Use naive (the default):
triad -f input_data --x-largest 3 --algorithm naiveOutput:
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=6
1426828028
1426828056
1426828066
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}You can use the -v/--verbose parameter:
triad -f input_data --x-largest 3 --algorithm naive --verboseOutput (note: elapsed time processing is shown: 0.0100s):
[triad] 2020-...: DEBUG uuid='...' event='Triad cli invoked' cli_args=Namespace(algorithm='naive', chunksize=None, disable_validations=False, file='input_data', verbose=True, x_largest=3)
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input will be read in chunks of n lines' n=200000
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input validation is enabled'
[triad] 2020-...: DEBUG uuid='...' event='Input will be read from file' file='input_data'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Starting to process input ...'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=6
[triad.tools] 2020-...: DEBUG uuid='...' event='elapsed time' time='0.0100s' name='process_input' arguments="<triad.algorithms.naive.TopNaive object at 0x7f28b3bcfc10>, filepath_or_buffer='input_data'"
1426828028 350
1426828056 231
1426828066 111
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}
This app assumes two scenarios:
- the input_data contains NO errors. In this case you can safely use
-d/--disable-validationsparameter. The difference is that disabling validations performs much faster!
To show the difference in performance processing a dataset of 20 million records ~ 380 MiB, takes ~ 5.62 seconds:
triad -f ../jupyter_data/input_dataset_20m -x 3 --disable-validations --verbose[triad] 2020-...: DEBUG uuid='...' event='Triad cli invoked' cli_args=Namespace(algorithm='naive', chunksize=None, disable_validations=True, file='../jupyter_data/input_dataset_20m', verbose=True, x_largest=3)
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input will be read in chunks of n lines' n=200000
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input validation is disabled'
[triad] 2020-...: DEBUG uuid='...' event='Input will be read from file' file='../jupyter_data/input_dataset_20m'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Starting to process input ...'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=20000000
[triad.tools] 2020-...: DEBUG uuid='...' event='elapsed time' time='5.6269s' name='process_input' arguments="<triad.algorithms.naive.TopNaive object at 0x7fe9493edbe0>, filepath_or_buffer='../jupyter_data/input_dataset_20m'"
3788094 99999992
14238111 99999988
4733711 99999986
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}Running the same dataset without disabling validations, takes ~ 36 seconds:
triad -f ../jupyter_data/input_dataset_20m -x 3 --verbose [triad] 2020-...: DEBUG uuid='...' event='Triad cli invoked' cli_args=Namespace(algorithm='naive', chunksize=None, disable_validations=False, file='../jupyter_data/input_dataset_20m', verbose=True, x_largest=3)
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input will be read in chunks of n lines' n=200000
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input validation is enabled'
[triad] 2020-...: DEBUG uuid='...' event='Input will be read from file' file='../jupyter_data/input_dataset_20m'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Starting to process input ...'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=20000000
[triad.tools] 2020-...: DEBUG uuid='...' event='elapsed time' time='36.4375s' name='process_input' arguments="<triad.algorithms.naive.TopNaive object at 0x7f03cedddbb0>, filepath_or_buffer='../jupyter_data/input_dataset_20m'"
3788094 99999992
14238111 99999988
4733711 99999986
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}- input_data is malformed: you can't disable the validations, otherwise at the first error the processing will stop and no results will be obtained:
triad -f input_data_malformed -x 3 --verbose --disable-validations[triad] 2020-...: DEBUG uuid='...' event='Triad cli invoked' cli_args=Namespace(algorithm='naive', chunksize=None, disable_validations=True, file='input_data_malformed', verbose=True, x_largest=3)
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input will be read in chunks of n lines' n=200000
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input validation is disabled'
[triad] 2020-...: DEBUG uuid='...' event='Input will be read from file' file='input_data_malformed'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Starting to process input ...'
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'FAILED', 'reason': "The input is malformed: invalid literal for int() with base 10: 'a1426828011'"}The following example using --verbose and input_data_malformed shows
how the input data is validated and those invalid values are discarded.
triad -f input_data_malformed -x 3 --verbose[triad] 2020-...: DEBUG uuid='...' event='Triad cli invoked' cli_args=Namespace(algorithm='naive', chunksize=None, disable_validations=False, file='input_data_malformed', verbose=True, x_largest=3)
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input will be read in chunks of n lines' n=200000
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input validation is enabled'
[triad] 2020-...: DEBUG uuid='...' event='Input will be read from file' file='input_data_malformed'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Starting to process input ...'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Discarding invalid value' invalid_value='a1426828028'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Discarding invalid value' invalid_value='1426828037b'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Discarding invalid value' invalid_value='109d'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=6
[triad.tools] 2020-...: DEBUG uuid='...' event='elapsed time' time='0.0096s' name='process_input' arguments="<triad.algorithms.naive.TopNaive object at 0x7fa60d794af0>, filepath_or_buffer='input_data_malformed'"
1426828056 231
1426828066 111
1426828011 9
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}You can change the chunksize used to read the input (needed for extremely large files):
triad -f input_data_malformed -x 3 --chunksize 1 --verbose[triad] 2020-...: DEBUG uuid='...' event='Triad cli invoked' cli_args=Namespace(algorithm='naive', chunksize=1, disable_validations=False, file='input_data_malformed', verbose=True, x_largest=3)
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input will be read in chunks of n lines' n=1
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Input validation is enabled'
[triad] 2020-...: DEBUG uuid='...' event='Input will be read from file' file='input_data_malformed'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Starting to process input ...'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Reading a chunk of data ...'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Discarding invalid value' invalid_value='a1426828028'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Reading a chunk of data ...'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Discarding invalid value' invalid_value='1426828037b'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Reading a chunk of data ...'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Reading a chunk of data ...'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Discarding invalid value' invalid_value='109d'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Reading a chunk of data ...'
[triad.algorithms.naive] 2020-...: DEBUG uuid='...' event='Reading a chunk of data ...'
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=6
[triad.tools] 2020-...: DEBUG uuid='...' event='elapsed time' time='0.0412s' name='process_input' arguments="<triad.algorithms.naive.TopNaive object at 0x7f86cb5b2dc0>, filepath_or_buffer='input_data_malformed'"
1426828056 231
1426828066 111
1426828011 9
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}For this flavor usage, make is used to enable some shortcuts when using
docker interface (in a production scenario docker-compose would be used instead).
make docker-build-imagedocker run triadOutput:
usage: triad [-h] -x X_LARGEST [-f FILE] [-a {naive,partitions}]
[-c CHUNKSIZE] [-d] [-v]
optional arguments:
-h, --help show this help message and exit
...docker run -v ${PWD}:/opt/triad triad -f input_data -x 3Output:
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Using X-largest parameter' X=3
[triad.algorithms.naive] 2020-...: INFO uuid='...' event='Input was processed !' total_lines=6
1426828028
1426828056
1426828066
[triad] 2020-...: INFO uuid='...' event='Triad finalized' output={'status': 'TERMINATED', 'reason': 'Program gracefully terminated'}Notes:
$PWDenvar holds the current working directory-vdocker parameter maps the source code into the container, in order to allow the container access the input_data file
This scenario, as noted before, assumes you are working within the previously
created triad virtualenv.
from triad import initialize_logging
from triad.algorithms.api import *
from triad.exceptions import TriadBaseException
initialize_logging(disable=True)
top = TopNaive(x_largest=3)
try:
top.process_input(filepath_or_buffer="input_data")
except TriadBaseException as err: # production shouldn't use broad exceptions
print(f"Some error occurred: {err}")
top.dump_results(verbose=False) # verbose controls "count" dumping
# how to access results:
for uri, count in top.results:
print(f"The unique record identifier '{uri}' has a count of '{count}'.")
Output:
1426828028
1426828056
1426828066
The unique record identifier '1426828028' has a count of '350'.
The unique record identifier '1426828056' has a count of '231'.
The unique record identifier '1426828066' has a count of '111'.The easy way:
# with docker
make docker-run-tests
# or inside triad's virtualenv
pip install -r requirements-test.txt
pytestOutput:
.
.
.
Using /usr/local/lib/python3.8/site-packages
Finished processing dependencies for triad==0.1.0
============================= test session starts ==============================
platform linux -- Python 3.8.2, pytest-5.4.3, py-1.9.0, pluggy-0.13.1
rootdir: /tmp/src, inifile: setup.cfg, testpaths: tests
plugins: cov-2.10.0, mock-3.1.1
collected 203 items
tests/algorithms/test_basic.py ................ [ 7%]
tests/algorithms/test_process_malformed_input.py ....................... [ 19%]
[ 19%]
tests/algorithms/test_process_well_formed_input.py ..................... [ 29%]
........................................................................ [ 65%]
......................................................... [ 93%]
tests/algorithms/test_process_well_formed_input_partitions.py .......... [ 98%]
.... [100%]
----------- coverage: platform linux, python 3.8.2-final-0 -----------
Name Stmts Miss Cover
----------------------------------------------------
triad/__init__.py 17 8 53%
triad/__main__.py 39 39 0%
triad/algorithms/__init__.py 0 0 100%
triad/algorithms/api.py 4 0 100%
triad/algorithms/base.py 17 5 71%
triad/algorithms/naive.py 63 2 97%
triad/algorithms/partitions.py 36 2 94%
triad/cli.py 12 12 0%
triad/definitions.py 10 0 100%
triad/exceptions.py 6 0 100%
triad/tools.py 21 2 90%
----------------------------------------------------
TOTAL 225 70 69%
============================= 203 passed in 6.36s ==============================This project was architectured in order to allow to easily plug-in more algorithm implementations.
triad.algorithms.base.TopAlgorithm defines the interface to be implemented,
you should subclass it, and implement this two abstracts methods:
process_input(self, filepath_or_buffer: Union[str, io.BufferedReader]) -> Noneresults(self) -> Union[List[Tuple[int, int]], None]
When I started this Challenge, I needed to take some assumptions, but my overall process was guided by this two important Challenge Rules:
- please think about the solution you submit as a production library (API / documentation), or something you would submit for code review to a large project
- Your solution should take into account extremely large files
- research about algorithms the most efficient way to obtain n-largest elements
- look in the Python Ecosystem available solutions (a very basic principle is not reinventing the wheel)
- stay with the most promising solutions, research and analyze them, if they fits your problem, use them.
I looked Python Ecosystem and found:
- heapq an heap queue implementation in the Python Standard Library.
- pandas library has a nlargest implementation
- numpy offers an indirect partitioning implementation
I made an initial performance comparision between those three libraries/tools, and immediately discarded using the heapq approach.
Pandas under the hoods uses numpy, and both libraries at their core use a well-optimized C implementation. Also, the biggest time consuming step is the I/O done in the input data; I found that pandas reader and the default pandas parser engine (which are implemented in C) are extremely fast. I tried to write my Python reader, and parser, but in the end I couldn't match the performance. So, guided by the Rule 1, I stuck with pandas reader and pandas parser implementations.
Benchmark #1:
- compare pandas c engine vs python engine
- data input: 20 million lines ~ 380 MiB
| Using default engine="c" | engine="python" | diff |
|---|---|---|
| 6.43 seconds | 85.93 seconds | + 1336 % |
Benchmark #2:
- compare pandas c engine vs python engine
- data input: 200 million lines ~ 3.7 GiB
| Using default engine="c" | engine="python" |
|---|---|
| 83 seconds | stopped at 10 minutes !! |
Profile #1:
- profile an execution in order to measure functions durations
- data input: 20 million lines ~ 380 MiB
| cumtime | function |
|---|---|
| 5.36 s | method read of pandas._libs.parsers.TextReader ... |
| 1.9 s | rest of the program |
You can view that the entire program execution took 7.26 seconds, of which 5.36 s were taken for reading the input data (73% of the total).
To summarize this section: I want to be clear why I stuck using pandas reader and pandas C engine.
In this section I will analyze TopPartition implementation:
-
input data is read in N chunks, one in each iteration
-
each chunk of n elements is partitioned, which means all elements smaller than the k-th element are moved before this element and all remaining (equal or greater) are moved behind it. The ordering of the elements in the two partitions is undefined. Those k-th and greater elements are stored in an auxiliary list. numpy.partition under the hood uses the introselect algorithm which have:
- Worst-case performance: O(n)
- Best-case performance: O(n)
- after all chunks were processed, step 2 is performed one last time in auxiliary list, thus obtaining the final result.
To complete this problem of size N * n in which n takes at max O(n):
T(N*n) (running time) = N * n => N * O(n) ~ O(n) => linear time complexity
We need to assume that the number of n-largest values <<< len(input_data), in other words finding the 1000th-largest with an input of 1000 elements doesn't make sense, if that were the case, maybe the problem is related to sorting the input. Note that this app do not limit the --x-largest parameter.
In this scenario, the algorithm needs at a time a space allocation
for one of the N chunks of size n at a time, concluding:
O(1) => constant space complexity (depends on --chunksize/-c value used)
You can build triad's wheel package executing:
# with docker
make docker-build-wheel
# or using the venv
python setup.py bdist_wheelThe package can be found in dist/triad-0.1.0-py3-none-any.whl and is
installable like any other Python library as:
pip install triad-0.1.0-py3-none-any.whlProcessing ./dist/triad-0.1.0-py3-none-any.whl
Requirement already satisfied: PyYAML==5.3.1 ...
.
.
.
Installing collected packages: triad
Successfully installed triad-0.1.0
Triad library is capable of logging to a syslog, for this you will need to install
and setup a syslog (for example the rsyslog flavor), and then enable it through
the config file triad/logging.yml:
- add syslog handler
handlers: []->handlers: [syslog]