Skip to content

feat: Accept None as column values in Buffer.row() API. #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ Explicit Buffers

For more advanced use cases where the same messages need to be sent to multiple
questdb instances or you want to decouple serialization and sending (as may be
in a multi-threaded application) construct :class:``questdb.ingress.Buffer``
objects explicitly, then pass them to the :func:``questdb.ingress.Sender.flush``
in a multi-threaded application) construct :class:`questdb.ingress.Buffer`
objects explicitly, then pass them to the :func:`questdb.ingress.Sender.flush`
method.

Note that this bypasses ``auto-flush`` logic
Expand Down
2 changes: 2 additions & 0 deletions proj.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ def clean():
_rm(PROJ_ROOT / 'src', '**/*.dylib')
_rm(PROJ_ROOT / 'src', '**/*.c')
_rm(PROJ_ROOT / 'src', '**/*.html')
_rm(PROJ_ROOT, 'rustup-init.exe')
_rm(PROJ_ROOT, 'rustup-init.sh')


@command
Expand Down
80 changes: 59 additions & 21 deletions src/questdb/ingress.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,15 @@ cdef class Buffer:
"""
line_sender_buffer_clear(self._impl)

def __len__(self):
def __len__(self) -> int:
"""
The current number of bytes currently in the buffer.

Equivalent (but cheaper) to ``len(str(sender))``.
"""
return line_sender_buffer_size(self._impl)

def __str__(self):
def __str__(self) -> str:
"""Return the constructed buffer as a string. Use for debugging."""
return self._to_str()

Expand Down Expand Up @@ -595,22 +595,19 @@ cdef class Buffer:
cdef line_sender_error* err = NULL
if not line_sender_buffer_at(self._impl, ts.value, &err):
raise c_err_to_py(err)
self._may_trigger_row_complete()
return 0

cdef inline int _at_dt(self, datetime dt) except -1:
cdef int64_t value = datetime_to_nanos(dt)
cdef line_sender_error* err = NULL
if not line_sender_buffer_at(self._impl, value, &err):
raise c_err_to_py(err)
self._may_trigger_row_complete()
return 0

cdef inline int _at_now(self) except -1:
cdef line_sender_error* err = NULL
if not line_sender_buffer_at_now(self._impl, &err):
raise c_err_to_py(err)
self._may_trigger_row_complete()
return 0

cdef inline int _at(self, object ts) except -1:
Expand All @@ -634,6 +631,7 @@ cdef class Buffer:
"""
Add a row to the buffer.
"""
cdef bint wrote_fields = False
self._set_marker()
try:
self._table(table_name)
Expand All @@ -644,14 +642,22 @@ cdef class Buffer:
if symbols is not None:
for name, value in symbols.items():
self._symbol(name, value)
wrote_fields = True
if columns is not None:
for name, value in columns.items():
self._column(name, value)
self._at(at)
self._clear_marker()
if value is not None:
self._column(name, value)
wrote_fields = True
if wrote_fields:
self._at(at)
self._clear_marker()
else:
self._rewind_to_marker()
except:
self._rewind_to_marker()
raise
if wrote_fields:
self._may_trigger_row_complete()

def row(
self,
Expand All @@ -660,7 +666,8 @@ cdef class Buffer:
symbols: Optional[Dict[str, str]]=None,
columns: Optional[Dict[
str,
Union[bool, int, float, str, TimestampMicros, datetime]]]=None,
Union[None, bool, int, float, str, TimestampMicros, datetime]]
]=None,
at: Union[None, TimestampNanos, datetime]=None):
"""
Add a single row (line) to the buffer.
Expand All @@ -679,7 +686,8 @@ cdef class Buffer:
'col3': 3.14,
'col4': 'xyz',
'col5': TimestampMicros(123456789),
'col6': datetime(2019, 1, 1, 12, 0, 0)},
'col6': datetime(2019, 1, 1, 12, 0, 0),
'col7': None},
at=TimestampNanos(123456789))

# Only symbols specified. Designated timestamp assigned by the db.
Expand Down Expand Up @@ -707,10 +715,38 @@ cdef class Buffer:
understand the difference between the ``SYMBOL`` and ``STRING`` types
(TL;DR: symbols are interned strings).

Column values can be specified with Python types directly and map as so:

.. list-table::
:header-rows: 1

* - Python type
- Serialized as ILP type
* - ``bool``
- `BOOLEAN <https://questdb.io/docs/reference/api/ilp/columnset-types#boolean>`_
* - ``int``
- `INTEGER <https://questdb.io/docs/reference/api/ilp/columnset-types#integer>`_
* - ``float``
- `FLOAT <https://questdb.io/docs/reference/api/ilp/columnset-types#float>`_
* - ``str``
- `STRING <https://questdb.io/docs/reference/api/ilp/columnset-types#string>`_
* - ``datetime.datetime`` and ``TimestampMicros``
- `TIMESTAMP <https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp>`_
* - ``None``
- *Column is skipped and not serialized.*

If the destination table was already created, then the columns types
will be cast to the types of the existing columns whenever possible
(Refer to the QuestDB documentation pages linked above).

:param table_name: The name of the table to which the row belongs.
:param symbols: A dictionary of symbol column names to ``str`` values.
:param columns: A dictionary of column names to ``bool``, ``int``,
``float``, ``str``, ``TimestampMicros`` or ``datetime`` values.
As a convenience, you can also pass a ``None`` value, however - due
to ILP protocol limitations - this will skip the column rather
necessarily writing a ``NULL`` value, so if the column did not exist
yet it will not be created.
:param at: The timestamp of the row. If ``None``, timestamp is assigned
by the server. If ``datetime``, the timestamp is converted to
nanoseconds. A nanosecond unix epoch timestamp can be passed
Expand Down Expand Up @@ -764,9 +800,9 @@ cdef class Buffer:

# buffer.tabular(
# 'table_name',
# [[True, 123, 3.14, 'xyz'],
# [False, 456, 6.28, 'abc'],
# [True, 789, 9.87, 'def']],
# [[True, None, 3.14, 'xyz'],
# [False, 123, 6.28, 'abc'],
# [True, 456, 9.87, 'def']],
# header=['col1', 'col2', 'col3', 'col4'],
# at=datetime.datetime.utcnow())

Expand Down Expand Up @@ -848,7 +884,7 @@ cdef class Buffer:
# buffer.tabular(
# 'table_name',
# [['abc', 123, 3.14, 'xyz'],
# ['def', 456, 6.28, 'abc'],
# ['def', 456, None, 'abc'],
# ['ghi', 789, 9.87, 'def']],
# header=['col1', 'col2', 'col3', 'col4'],
# symbols=True) # `col1` and `col4` are SYMBOL columns.
Expand Down Expand Up @@ -941,7 +977,7 @@ cdef class Sender:
sender.flush()


**Auto-flushing (on by default)**
**Auto-flushing (on by default, watermark at 63KiB)**

To avoid accumulating very large buffers, the sender will flush the buffer
automatically once its buffer reaches a certain byte-size watermark.
Expand Down Expand Up @@ -987,14 +1023,14 @@ cdef class Sender:
* A special ``'insecure_skip_verify'`` string: Dangerously disable all
TLS certificate verification (do *NOT* use in production environments).

**Positional constructor arguments for the ``Sender(..)``**
**Positional constructor arguments for the Sender(..)**

* ``host``: Hostname or IP address of the QuestDB server.

* ``port``: Port number of the QuestDB server.


**Keyword-only constructor arguments for the ``Sender(..)``**
**Keyword-only constructor arguments for the Sender(..)**

* ``interface`` (``str``): Network interface to bind to.
Set this if you have an accelerated network interface (e.g. Solarflare)
Expand All @@ -1011,13 +1047,15 @@ cdef class Sender:
This field is expressed in milliseconds. The default is 15 seconds.

* ``init_capacity`` (``int``): Initial buffer capacity of the internal buffer.
*See :class:`Buffer`'s constructor for more details.*
*Default: 65536 (64KiB).*
*See Buffer's constructor for more details.*

* ``max_name_length`` (``int``): Maximum length of a table or column name.
*See :class:`Buffer`'s constructor for more details.*
*See Buffer's constructor for more details.*

* ``auto_flush`` (``bool`` or ``int``): Whether to automatically flush the
buffer when it reaches a certain byte-size watermark.
*Default: 64512 (63KiB).*
*See above for details.*
"""

Expand Down Expand Up @@ -1195,7 +1233,7 @@ cdef class Sender:
if self._buffer is not None:
self._buffer._row_complete_sender = PyWeakref_NewRef(self, None)

def __enter__(self):
def __enter__(self) -> Sender:
"""Call :func:`Sender.connect` at the start of a ``with`` block."""
self.connect()
return self
Expand All @@ -1210,7 +1248,7 @@ cdef class Sender:
"""
return str(self._buffer)

def __len__(self):
def __len__(self) -> int:
"""
Number of bytes of unsent data in the internal buffer.

Expand Down
26 changes: 25 additions & 1 deletion test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,25 @@ def test_column(self):
'col4': 0.5,
'col5': 'val',
'col6': qi.TimestampMicros(12345),
'col7': two_h_after_epoch})
'col7': two_h_after_epoch,
'col8': None})
exp = (
'tbl1 col1=t,col2=f,col3=-1i,col4=0.5,'
'col5="val",col6=12345t,col7=7200000000t\n')
self.assertEqual(str(buf), exp)

def test_none_column(self):
buf = qi.Buffer()
buf.row('tbl1', columns={'col1': 1})
exp = 'tbl1 col1=1i\n'
self.assertEqual(str(buf), exp)
self.assertEqual(len(buf), len(exp))

# No fields to write, no fields written, therefore a no-op.
buf.row('tbl1', columns={'col1': None, 'col2': None})
self.assertEqual(str(buf), exp)
self.assertEqual(len(buf), len(exp))

def test_unicode(self):
buf = qi.Buffer()
buf.row('tbl1', symbols={'questdb1': '❤️'}, columns={'questdb2': '❤️'})
Expand Down Expand Up @@ -262,6 +275,17 @@ def test_immediate_auto_flush(self):
msgs = server.recv()
self.assertEqual(msgs, [b'tbl1,sym1=val1'])

def test_auto_flush_on_closed_socket(self):
with Server() as server:
with qi.Sender('localhost', server.port, auto_flush=True) as sender:
server.accept()
server.close()
exp_err = 'Could not flush buffer'
with self.assertRaisesRegexp(qi.IngressError, exp_err):
for _ in range(1000):
time.sleep(0.01)
sender.row('tbl1', symbols={'a': 'b'})

def test_dont_auto_flush(self):
with Server() as server:
with qi.Sender('localhost', server.port, auto_flush=0) as sender:
Expand Down