Skip to content

Commit d74f5c5

Browse files
authored
explicit parameters (#17)
* explicit parameters * fix param * bump patch version
1 parent 310100a commit d74f5c5

File tree

4 files changed

+74
-51
lines changed

4 files changed

+74
-51
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "pgmq"
3-
version = "1.0.1"
3+
version = "1.0.2"
44
description = "Python client for the PGMQ Postgres extension."
55
readme = "README.md"
66
license = "Apache-2.0"

src/pgmq/async_queue.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async def _create_partitioned_queue_internal(
110110
):
111111
self.logger.debug(f"Creating partitioned queue '{queue}'")
112112
await conn.execute(
113-
"SELECT pgmq.create($1, $2::text, $3::text);",
113+
"SELECT pgmq.create_partitioned(queue_name=>$1, partition_interval=>$2::text, retention_interval=>$3::text);",
114114
queue,
115115
partition_interval,
116116
retention_interval,
@@ -131,15 +131,17 @@ async def create_queue(self, queue: str, unlogged: bool = False, conn=None) -> N
131131
async def _create_queue_internal(self, queue, unlogged, conn):
132132
self.logger.debug(f"Creating queue '{queue}' with unlogged={unlogged}")
133133
if unlogged:
134-
await conn.execute("SELECT pgmq.create_unlogged($1);", queue)
134+
await conn.execute("SELECT pgmq.create_unlogged(queue_name=>$1);", queue)
135135
else:
136-
await conn.execute("SELECT pgmq.create($1);", queue)
136+
await conn.execute("SELECT pgmq.create(queue_name=>$1);", queue)
137137

138138
async def validate_queue_name(self, queue_name: str) -> None:
139139
"""Validate the length of a queue name."""
140140
self.logger.debug(f"validate_queue_name called with queue_name='{queue_name}'")
141141
async with self.pool.acquire() as conn:
142-
await conn.execute("SELECT pgmq.validate_queue_name($1);", queue_name)
142+
await conn.execute(
143+
"SELECT pgmq.validate_queue_name(queue_name=>$1);", queue_name
144+
)
143145

144146
@transaction
145147
async def drop_queue(
@@ -157,7 +159,9 @@ async def drop_queue(
157159

158160
async def _drop_queue_internal(self, queue, partitioned, conn):
159161
result = await conn.fetchrow(
160-
"SELECT pgmq.drop_queue($1, $2);", queue, partitioned
162+
"SELECT pgmq.drop_queue(queue_name=>$1, partitioned=>$2);",
163+
queue,
164+
partitioned,
161165
)
162166
self.logger.debug(f"Queue '{queue}' dropped: {result[0]}")
163167
return result[0]
@@ -206,21 +210,21 @@ async def _send_internal(
206210
result = None
207211
if delay:
208212
result = await conn.fetchrow(
209-
"SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::integer);",
213+
"SELECT * FROM pgmq.send(queue_name=>$1::text, msg=>$2::jsonb, delay=>$3::integer);",
210214
queue,
211215
dumps(message).decode("utf-8"),
212216
delay,
213217
)
214218
elif tz:
215219
result = await conn.fetchrow(
216-
"SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::timestamptz);",
220+
"SELECT * FROM pgmq.send(queue_name=>$1::text, msg=>$2::jsonb, delay=>$3::timestamptz);",
217221
queue,
218222
dumps(message).decode("utf-8"),
219223
tz,
220224
)
221225
else:
222226
result = await conn.fetchrow(
223-
"SELECT * FROM pgmq.send($1::text, $2::jsonb);",
227+
"SELECT * FROM pgmq.send(queue_name=>$1::text, msg=>$2::jsonb);",
224228
queue,
225229
dumps(message).decode("utf-8"),
226230
)
@@ -261,21 +265,21 @@ async def _send_batch_internal(
261265
result = None
262266
if delay:
263267
result = await conn.fetch(
264-
"SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3::integer);",
268+
"SELECT * FROM pgmq.send_batch(queue_name=>$1, msgs=>$2::jsonb[], delay=>$3::integer);",
265269
queue,
266270
jsonb_array,
267271
delay,
268272
)
269273
elif tz:
270274
result = await conn.fetch(
271-
"SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3::integer);",
275+
"SELECT * FROM pgmq.send_batch(queue_name=>$1, msgs=>$2::jsonb[], delay=>$3::timestamptz);",
272276
queue,
273277
jsonb_array,
274278
tz,
275279
)
276280
else:
277281
result = await conn.fetch(
278-
"SELECT * FROM pgmq.send_batch($1, $2::jsonb[]);",
282+
"SELECT * FROM pgmq.send_batch(queue_name=>$1, msgs=>$2::jsonb[]);",
279283
queue,
280284
jsonb_array,
281285
)
@@ -299,7 +303,7 @@ async def read(
299303
async def _read_internal(self, queue, vt, batch_size, conn):
300304
self.logger.debug(f"Reading message from queue '{queue}' with vt={vt}")
301305
rows = await conn.fetch(
302-
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
306+
"SELECT * FROM pgmq.read(queue_name=>$1::text, vt=>$2::integer, qty=>$3::integer);",
303307
queue,
304308
vt or self.vt,
305309
batch_size,
@@ -336,7 +340,7 @@ async def _read_batch_internal(self, queue, vt, batch_size, conn):
336340
f"Reading batch of messages from queue '{queue}' with vt={vt}"
337341
)
338342
rows = await conn.fetch(
339-
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
343+
"SELECT * FROM pgmq.read(queue_name=>$1::text, vt=>$2::integer, qty=>$3::integer);",
340344
queue,
341345
vt or self.vt,
342346
batch_size,
@@ -384,7 +388,7 @@ async def _read_with_poll_internal(
384388
):
385389
self.logger.debug(f"Reading messages with polling from queue '{queue}'")
386390
rows = await conn.fetch(
387-
"SELECT * FROM pgmq.read_with_poll($1, $2, $3, $4, $5);",
391+
"SELECT * FROM pgmq.read_with_poll(queue_name=>$1, vt=>$2, qty=>$3, max_poll_seconds=>$4, poll_interval_ms=>$5);",
388392
queue,
389393
vt or self.vt,
390394
qty,
@@ -416,7 +420,7 @@ async def pop(self, queue: str, conn=None) -> Message:
416420

417421
async def _pop_internal(self, queue, conn):
418422
self.logger.debug(f"Popping message from queue '{queue}'")
419-
rows = await conn.fetch("SELECT * FROM pgmq.pop($1);", queue)
423+
rows = await conn.fetch("SELECT * FROM pgmq.pop(queue_name=>$1);", queue)
420424
messages = [
421425
Message(
422426
msg_id=row[0],
@@ -445,7 +449,7 @@ async def delete(self, queue: str, msg_id: int, conn=None) -> bool:
445449
async def _delete_internal(self, queue, msg_id, conn):
446450
self.logger.debug(f"Deleting message with msg_id={msg_id} from queue '{queue}'")
447451
row = await conn.fetchrow(
448-
"SELECT pgmq.delete($1::text, $2::int);", queue, msg_id
452+
"SELECT pgmq.delete(queue_name=>$1::text, msg_id=>$2::int);", queue, msg_id
449453
)
450454
self.logger.debug(f"Message deleted: {row[0]}")
451455
return row[0]
@@ -469,7 +473,9 @@ async def _delete_batch_internal(self, queue, msg_ids, conn):
469473
f"Deleting messages with msg_ids={msg_ids} from queue '{queue}'"
470474
)
471475
results = await conn.fetch(
472-
"SELECT * FROM pgmq.delete($1::text, $2::int[]);", queue, msg_ids
476+
"SELECT * FROM pgmq.delete(queue_name=>$1::text, msg_ids=>$2::int[]);",
477+
queue,
478+
msg_ids,
473479
)
474480
deleted_ids = [result[0] for result in results]
475481
self.logger.debug(f"Messages deleted: {deleted_ids}")
@@ -492,7 +498,7 @@ async def _archive_internal(self, queue, msg_id, conn):
492498
f"Archiving message with msg_id={msg_id} from queue '{queue}'"
493499
)
494500
row = await conn.fetchrow(
495-
"SELECT pgmq.archive($1::text, $2::int);", queue, msg_id
501+
"SELECT pgmq.archive(queue_name=>$1::text, msg_id=>$2::int);", queue, msg_id
496502
)
497503
self.logger.debug(f"Message archived: {row[0]}")
498504
return row[0]
@@ -516,7 +522,9 @@ async def _archive_batch_internal(self, queue, msg_ids, conn):
516522
f"Archiving messages with msg_ids={msg_ids} from queue '{queue}'"
517523
)
518524
results = await conn.fetch(
519-
"SELECT * FROM pgmq.archive($1::text, $2::int[]);", queue, msg_ids
525+
"SELECT * FROM pgmq.archive(queue_name=>$1::text, msg_ids=>$2::int[]);",
526+
queue,
527+
msg_ids,
520528
)
521529
archived_ids = [result[0] for result in results]
522530
self.logger.debug(f"Messages archived: {archived_ids}")
@@ -534,7 +542,7 @@ async def purge(self, queue: str, conn=None) -> int:
534542

535543
async def _purge_internal(self, queue, conn):
536544
self.logger.debug(f"Purging queue '{queue}'")
537-
row = await conn.fetchrow("SELECT pgmq.purge_queue($1);", queue)
545+
row = await conn.fetchrow("SELECT pgmq.purge_queue(queue_name=>$1);", queue)
538546
self.logger.debug(f"Messages purged: {row[0]}")
539547
return row[0]
540548

@@ -550,7 +558,9 @@ async def metrics(self, queue: str, conn=None) -> QueueMetrics:
550558

551559
async def _metrics_internal(self, queue, conn):
552560
self.logger.debug(f"Fetching metrics for queue '{queue}'")
553-
result = await conn.fetchrow("SELECT * FROM pgmq.metrics($1);", queue)
561+
result = await conn.fetchrow(
562+
"SELECT * FROM pgmq.metrics(queue_name=>$1);", queue
563+
)
554564
metrics = QueueMetrics(
555565
queue_name=result[0],
556566
queue_length=result[1],
@@ -601,12 +611,15 @@ async def set_vt(self, queue: str, msg_id: int, vt: int, conn=None) -> Message:
601611
else:
602612
return await self._set_vt_internal(queue, msg_id, vt, conn)
603613

604-
async def _set_vt_internal(self, queue, msg_id, vt, conn):
614+
async def _set_vt_internal(self, queue: str, msg_id, vt, conn):
605615
self.logger.debug(
606616
f"Setting VT for msg_id={msg_id} in queue '{queue}' to vt={vt}"
607617
)
608618
row = await conn.fetchrow(
609-
"SELECT * FROM pgmq.set_vt($1, $2, $3);", queue, msg_id, vt
619+
"SELECT * FROM pgmq.set_vt(queue_name=>$1, msg_id=>$2, vt=>$3);",
620+
queue,
621+
msg_id,
622+
vt,
610623
)
611624
message = Message(
612625
msg_id=row[0],
@@ -630,5 +643,5 @@ async def detach_archive(self, queue: str, conn=None) -> None:
630643

631644
async def _detach_archive_internal(self, queue, conn):
632645
self.logger.debug(f"Detaching archive from queue '{queue}'")
633-
await conn.execute("SELECT pgmq.detach_archive($1);", queue)
646+
await conn.execute("SELECT pgmq.detach_archive(queue_name=>$1);", queue)
634647
self.logger.debug(f"Archive detached from queue '{queue}'")

0 commit comments

Comments
 (0)