diff --git a/docs/examples.rst b/docs/examples.rst index c57b7d01..a33ac847 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -35,8 +35,8 @@ Explicit Buffers For more advanced use cases where the same messages need to be sent to multiple questdb instances or you want to decouple serialization and sending (as may be -in a multi-threaded application) construct :class:``questdb.ingress.Buffer`` -objects explicitly, then pass them to the :func:``questdb.ingress.Sender.flush`` +in a multi-threaded application) construct :class:`questdb.ingress.Buffer` +objects explicitly, then pass them to the :func:`questdb.ingress.Sender.flush` method. Note that this bypasses ``auto-flush`` logic diff --git a/proj.py b/proj.py index 51e981f1..8b958cc1 100755 --- a/proj.py +++ b/proj.py @@ -136,6 +136,8 @@ def clean(): _rm(PROJ_ROOT / 'src', '**/*.dylib') _rm(PROJ_ROOT / 'src', '**/*.c') _rm(PROJ_ROOT / 'src', '**/*.html') + _rm(PROJ_ROOT, 'rustup-init.exe') + _rm(PROJ_ROOT, 'rustup-init.sh') @command diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index 13ad5f05..6f66212f 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -462,7 +462,7 @@ cdef class Buffer: """ line_sender_buffer_clear(self._impl) - def __len__(self): + def __len__(self) -> int: """ The current number of bytes currently in the buffer. @@ -470,7 +470,7 @@ cdef class Buffer: """ return line_sender_buffer_size(self._impl) - def __str__(self): + def __str__(self) -> str: """Return the constructed buffer as a string. Use for debugging.""" return self._to_str() @@ -595,7 +595,6 @@ cdef class Buffer: cdef line_sender_error* err = NULL if not line_sender_buffer_at(self._impl, ts.value, &err): raise c_err_to_py(err) - self._may_trigger_row_complete() return 0 cdef inline int _at_dt(self, datetime dt) except -1: @@ -603,14 +602,12 @@ cdef class Buffer: cdef line_sender_error* err = NULL if not line_sender_buffer_at(self._impl, value, &err): raise c_err_to_py(err) - self._may_trigger_row_complete() return 0 cdef inline int _at_now(self) except -1: cdef line_sender_error* err = NULL if not line_sender_buffer_at_now(self._impl, &err): raise c_err_to_py(err) - self._may_trigger_row_complete() return 0 cdef inline int _at(self, object ts) except -1: @@ -634,6 +631,7 @@ cdef class Buffer: """ Add a row to the buffer. """ + cdef bint wrote_fields = False self._set_marker() try: self._table(table_name) @@ -644,14 +642,22 @@ cdef class Buffer: if symbols is not None: for name, value in symbols.items(): self._symbol(name, value) + wrote_fields = True if columns is not None: for name, value in columns.items(): - self._column(name, value) - self._at(at) - self._clear_marker() + if value is not None: + self._column(name, value) + wrote_fields = True + if wrote_fields: + self._at(at) + self._clear_marker() + else: + self._rewind_to_marker() except: self._rewind_to_marker() raise + if wrote_fields: + self._may_trigger_row_complete() def row( self, @@ -660,7 +666,8 @@ cdef class Buffer: symbols: Optional[Dict[str, str]]=None, columns: Optional[Dict[ str, - Union[bool, int, float, str, TimestampMicros, datetime]]]=None, + Union[None, bool, int, float, str, TimestampMicros, datetime]] + ]=None, at: Union[None, TimestampNanos, datetime]=None): """ Add a single row (line) to the buffer. @@ -679,7 +686,8 @@ cdef class Buffer: 'col3': 3.14, 'col4': 'xyz', 'col5': TimestampMicros(123456789), - 'col6': datetime(2019, 1, 1, 12, 0, 0)}, + 'col6': datetime(2019, 1, 1, 12, 0, 0), + 'col7': None}, at=TimestampNanos(123456789)) # Only symbols specified. Designated timestamp assigned by the db. @@ -707,10 +715,38 @@ cdef class Buffer: understand the difference between the ``SYMBOL`` and ``STRING`` types (TL;DR: symbols are interned strings). + Column values can be specified with Python types directly and map as so: + + .. list-table:: + :header-rows: 1 + + * - Python type + - Serialized as ILP type + * - ``bool`` + - `BOOLEAN `_ + * - ``int`` + - `INTEGER `_ + * - ``float`` + - `FLOAT `_ + * - ``str`` + - `STRING `_ + * - ``datetime.datetime`` and ``TimestampMicros`` + - `TIMESTAMP `_ + * - ``None`` + - *Column is skipped and not serialized.* + + If the destination table was already created, then the columns types + will be cast to the types of the existing columns whenever possible + (Refer to the QuestDB documentation pages linked above). + :param table_name: The name of the table to which the row belongs. :param symbols: A dictionary of symbol column names to ``str`` values. :param columns: A dictionary of column names to ``bool``, ``int``, ``float``, ``str``, ``TimestampMicros`` or ``datetime`` values. + As a convenience, you can also pass a ``None`` value, however - due + to ILP protocol limitations - this will skip the column rather + necessarily writing a ``NULL`` value, so if the column did not exist + yet it will not be created. :param at: The timestamp of the row. If ``None``, timestamp is assigned by the server. If ``datetime``, the timestamp is converted to nanoseconds. A nanosecond unix epoch timestamp can be passed @@ -764,9 +800,9 @@ cdef class Buffer: # buffer.tabular( # 'table_name', - # [[True, 123, 3.14, 'xyz'], - # [False, 456, 6.28, 'abc'], - # [True, 789, 9.87, 'def']], + # [[True, None, 3.14, 'xyz'], + # [False, 123, 6.28, 'abc'], + # [True, 456, 9.87, 'def']], # header=['col1', 'col2', 'col3', 'col4'], # at=datetime.datetime.utcnow()) @@ -848,7 +884,7 @@ cdef class Buffer: # buffer.tabular( # 'table_name', # [['abc', 123, 3.14, 'xyz'], - # ['def', 456, 6.28, 'abc'], + # ['def', 456, None, 'abc'], # ['ghi', 789, 9.87, 'def']], # header=['col1', 'col2', 'col3', 'col4'], # symbols=True) # `col1` and `col4` are SYMBOL columns. @@ -941,7 +977,7 @@ cdef class Sender: sender.flush() - **Auto-flushing (on by default)** + **Auto-flushing (on by default, watermark at 63KiB)** To avoid accumulating very large buffers, the sender will flush the buffer automatically once its buffer reaches a certain byte-size watermark. @@ -987,14 +1023,14 @@ cdef class Sender: * A special ``'insecure_skip_verify'`` string: Dangerously disable all TLS certificate verification (do *NOT* use in production environments). - **Positional constructor arguments for the ``Sender(..)``** + **Positional constructor arguments for the Sender(..)** * ``host``: Hostname or IP address of the QuestDB server. * ``port``: Port number of the QuestDB server. - **Keyword-only constructor arguments for the ``Sender(..)``** + **Keyword-only constructor arguments for the Sender(..)** * ``interface`` (``str``): Network interface to bind to. Set this if you have an accelerated network interface (e.g. Solarflare) @@ -1011,13 +1047,15 @@ cdef class Sender: This field is expressed in milliseconds. The default is 15 seconds. * ``init_capacity`` (``int``): Initial buffer capacity of the internal buffer. - *See :class:`Buffer`'s constructor for more details.* + *Default: 65536 (64KiB).* + *See Buffer's constructor for more details.* * ``max_name_length`` (``int``): Maximum length of a table or column name. - *See :class:`Buffer`'s constructor for more details.* + *See Buffer's constructor for more details.* * ``auto_flush`` (``bool`` or ``int``): Whether to automatically flush the buffer when it reaches a certain byte-size watermark. + *Default: 64512 (63KiB).* *See above for details.* """ @@ -1195,7 +1233,7 @@ cdef class Sender: if self._buffer is not None: self._buffer._row_complete_sender = PyWeakref_NewRef(self, None) - def __enter__(self): + def __enter__(self) -> Sender: """Call :func:`Sender.connect` at the start of a ``with`` block.""" self.connect() return self @@ -1210,7 +1248,7 @@ cdef class Sender: """ return str(self._buffer) - def __len__(self): + def __len__(self) -> int: """ Number of bytes of unsent data in the internal buffer. diff --git a/test/test.py b/test/test.py index 738ec115..cf04161a 100755 --- a/test/test.py +++ b/test/test.py @@ -68,12 +68,25 @@ def test_column(self): 'col4': 0.5, 'col5': 'val', 'col6': qi.TimestampMicros(12345), - 'col7': two_h_after_epoch}) + 'col7': two_h_after_epoch, + 'col8': None}) exp = ( 'tbl1 col1=t,col2=f,col3=-1i,col4=0.5,' 'col5="val",col6=12345t,col7=7200000000t\n') self.assertEqual(str(buf), exp) + def test_none_column(self): + buf = qi.Buffer() + buf.row('tbl1', columns={'col1': 1}) + exp = 'tbl1 col1=1i\n' + self.assertEqual(str(buf), exp) + self.assertEqual(len(buf), len(exp)) + + # No fields to write, no fields written, therefore a no-op. + buf.row('tbl1', columns={'col1': None, 'col2': None}) + self.assertEqual(str(buf), exp) + self.assertEqual(len(buf), len(exp)) + def test_unicode(self): buf = qi.Buffer() buf.row('tbl1', symbols={'questdb1': '❤️'}, columns={'questdb2': '❤️'}) @@ -262,6 +275,17 @@ def test_immediate_auto_flush(self): msgs = server.recv() self.assertEqual(msgs, [b'tbl1,sym1=val1']) + def test_auto_flush_on_closed_socket(self): + with Server() as server: + with qi.Sender('localhost', server.port, auto_flush=True) as sender: + server.accept() + server.close() + exp_err = 'Could not flush buffer' + with self.assertRaisesRegexp(qi.IngressError, exp_err): + for _ in range(1000): + time.sleep(0.01) + sender.row('tbl1', symbols={'a': 'b'}) + def test_dont_auto_flush(self): with Server() as server: with qi.Sender('localhost', server.port, auto_flush=0) as sender: