@@ -80,11 +80,11 @@ def store_search_entries_txn(
80
80
if not self .hs .config .server .enable_search :
81
81
return
82
82
if isinstance (self .database_engine , PostgresEngine ):
83
- sql = """
84
- INSERT INTO event_search
85
- (event_id, room_id, key, vector, stream_ordering, origin_server_ts)
86
- VALUES (?,?,?,to_tsvector('english', ?),?,?)
87
- """
83
+ sql = (
84
+ " INSERT INTO event_search"
85
+ " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
86
+ " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
87
+ )
88
88
89
89
args1 = (
90
90
(
@@ -101,20 +101,20 @@ def store_search_entries_txn(
101
101
txn .execute_batch (sql , args1 )
102
102
103
103
elif isinstance (self .database_engine , Sqlite3Engine ):
104
- self .db_pool .simple_insert_many_txn (
105
- txn ,
106
- table = "event_search" ,
107
- keys = ("event_id" , "room_id" , "key" , "value" ),
108
- values = (
109
- (
110
- entry .event_id ,
111
- entry .room_id ,
112
- entry .key ,
113
- _clean_value_for_search (entry .value ),
114
- )
115
- for entry in entries
116
- ),
104
+ sql = (
105
+ "INSERT INTO event_search (event_id, room_id, key, value)"
106
+ " VALUES (?,?,?,?)"
107
+ )
108
+ args2 = (
109
+ (
110
+ entry .event_id ,
111
+ entry .room_id ,
112
+ entry .key ,
113
+ _clean_value_for_search (entry .value ),
114
+ )
115
+ for entry in entries
117
116
)
117
+ txn .execute_batch (sql , args2 )
118
118
119
119
else :
120
120
# This should be unreachable.
@@ -162,17 +162,15 @@ async def _background_reindex_search(
162
162
TYPES = ["m.room.name" , "m.room.message" , "m.room.topic" ]
163
163
164
164
def reindex_search_txn (txn : LoggingTransaction ) -> int :
165
- sql = """
166
- SELECT stream_ordering, event_id, room_id, type, json, origin_server_ts
167
- FROM events
168
- JOIN event_json USING (room_id, event_id)
169
- WHERE ? <= stream_ordering AND stream_ordering < ?
170
- AND (%s)
171
- ORDER BY stream_ordering DESC
172
- LIMIT ?
173
- """ % (
174
- " OR " .join ("type = '%s'" % (t ,) for t in TYPES ),
175
- )
165
+ sql = (
166
+ "SELECT stream_ordering, event_id, room_id, type, json, "
167
+ " origin_server_ts FROM events"
168
+ " JOIN event_json USING (room_id, event_id)"
169
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
170
+ " AND (%s)"
171
+ " ORDER BY stream_ordering DESC"
172
+ " LIMIT ?"
173
+ ) % (" OR " .join ("type = '%s'" % (t ,) for t in TYPES ),)
176
174
177
175
txn .execute (sql , (target_min_stream_id , max_stream_id , batch_size ))
178
176
@@ -286,10 +284,8 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
286
284
287
285
try :
288
286
c .execute (
289
- """
290
- CREATE INDEX CONCURRENTLY event_search_fts_idx
291
- ON event_search USING GIN (vector)
292
- """
287
+ "CREATE INDEX CONCURRENTLY event_search_fts_idx"
288
+ " ON event_search USING GIN (vector)"
293
289
)
294
290
except psycopg2 .ProgrammingError as e :
295
291
logger .warning (
@@ -327,16 +323,12 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
327
323
# We create with NULLS FIRST so that when we search *backwards*
328
324
# we get the ones with non null origin_server_ts *first*
329
325
c .execute (
330
- """
331
- CREATE INDEX CONCURRENTLY event_search_room_order
332
- ON event_search(room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
333
- """
326
+ "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search("
327
+ "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
334
328
)
335
329
c .execute (
336
- """
337
- CREATE INDEX CONCURRENTLY event_search_order
338
- ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
339
- """
330
+ "CREATE INDEX CONCURRENTLY event_search_order ON event_search("
331
+ "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
340
332
)
341
333
conn .set_session (autocommit = False )
342
334
@@ -353,14 +345,14 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
353
345
)
354
346
355
347
def reindex_search_txn (txn : LoggingTransaction ) -> Tuple [int , bool ]:
356
- sql = """
357
- UPDATE event_search AS es
358
- SET stream_ordering = e.stream_ordering, origin_server_ts = e.origin_server_ts
359
- FROM events AS e
360
- WHERE e.event_id = es.event_id
361
- AND ? <= e.stream_ordering AND e.stream_ordering < ?
362
- RETURNING es.stream_ordering
363
- """
348
+ sql = (
349
+ " UPDATE event_search AS es SET stream_ordering = e.stream_ordering,"
350
+ " origin_server_ts = e.origin_server_ts"
351
+ " FROM events AS e"
352
+ " WHERE e.event_id = es.event_id"
353
+ " AND ? <= e.stream_ordering AND e.stream_ordering < ?"
354
+ " RETURNING es.stream_ordering"
355
+ )
364
356
365
357
min_stream_id = max_stream_id - batch_size
366
358
txn .execute (sql , (min_stream_id , max_stream_id ))
@@ -464,33 +456,33 @@ async def search_msgs(
464
456
if isinstance (self .database_engine , PostgresEngine ):
465
457
search_query = search_term
466
458
tsquery_func = self .database_engine .tsquery_func
467
- sql = f"""
468
- SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) AS rank,
469
- room_id, event_id
470
- FROM event_search
471
- WHERE vector @@ { tsquery_func } ('english', ?)
472
- """
459
+ sql = (
460
+ f" SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) AS rank,"
461
+ " room_id, event_id"
462
+ " FROM event_search"
463
+ f" WHERE vector @@ { tsquery_func } ('english', ?)"
464
+ )
473
465
args = [search_query , search_query ] + args
474
466
475
- count_sql = f"""
476
- SELECT room_id, count(*) as count FROM event_search
477
- WHERE vector @@ { tsquery_func } ('english', ?)
478
- """
467
+ count_sql = (
468
+ " SELECT room_id, count(*) as count FROM event_search"
469
+ f" WHERE vector @@ { tsquery_func } ('english', ?)"
470
+ )
479
471
count_args = [search_query ] + count_args
480
472
elif isinstance (self .database_engine , Sqlite3Engine ):
481
473
search_query = _parse_query_for_sqlite (search_term )
482
474
483
- sql = """
484
- SELECT rank(matchinfo(event_search)) as rank, room_id, event_id
485
- FROM event_search
486
- WHERE value MATCH ?
487
- """
475
+ sql = (
476
+ " SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
477
+ " FROM event_search"
478
+ " WHERE value MATCH ?"
479
+ )
488
480
args = [search_query ] + args
489
481
490
- count_sql = """
491
- SELECT room_id, count(*) as count FROM event_search
492
- WHERE value MATCH ?
493
- """
482
+ count_sql = (
483
+ " SELECT room_id, count(*) as count FROM event_search"
484
+ " WHERE value MATCH ?"
485
+ )
494
486
count_args = [search_query ] + count_args
495
487
else :
496
488
# This should be unreachable.
@@ -596,27 +588,26 @@ async def search_rooms(
596
588
raise SynapseError (400 , "Invalid pagination token" )
597
589
598
590
clauses .append (
599
- """
600
- (origin_server_ts < ? OR (origin_server_ts = ? AND stream_ordering < ?))
601
- """
591
+ "(origin_server_ts < ?"
592
+ " OR (origin_server_ts = ? AND stream_ordering < ?))"
602
593
)
603
594
args .extend ([origin_server_ts , origin_server_ts , stream ])
604
595
605
596
if isinstance (self .database_engine , PostgresEngine ):
606
597
search_query = search_term
607
598
tsquery_func = self .database_engine .tsquery_func
608
- sql = f"""
609
- SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) as rank,
610
- origin_server_ts, stream_ordering, room_id, event_id
611
- FROM event_search
612
- WHERE vector @@ { tsquery_func } ('english', ?) AND
613
- """
599
+ sql = (
600
+ f" SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) as rank,"
601
+ " origin_server_ts, stream_ordering, room_id, event_id"
602
+ " FROM event_search"
603
+ f" WHERE vector @@ { tsquery_func } ('english', ?) AND "
604
+ )
614
605
args = [search_query , search_query ] + args
615
606
616
- count_sql = f"""
617
- SELECT room_id, count(*) as count FROM event_search
618
- WHERE vector @@ { tsquery_func } ('english', ?) AND
619
- """
607
+ count_sql = (
608
+ " SELECT room_id, count(*) as count FROM event_search"
609
+ f" WHERE vector @@ { tsquery_func } ('english', ?) AND "
610
+ )
620
611
count_args = [search_query ] + count_args
621
612
elif isinstance (self .database_engine , Sqlite3Engine ):
622
613
@@ -628,24 +619,23 @@ async def search_rooms(
628
619
# in the events table to get the topological ordering. We need
629
620
# to use the indexes in this order because sqlite refuses to
630
621
# MATCH unless it uses the full text search index
631
- sql = """
632
- SELECT
633
- rank(matchinfo) as rank, room_id, event_id, origin_server_ts, stream_ordering
634
- FROM (
635
- SELECT key, event_id, matchinfo(event_search) as matchinfo
636
- FROM event_search
637
- WHERE value MATCH ?
622
+ sql = (
623
+ "SELECT rank(matchinfo) as rank, room_id, event_id,"
624
+ " origin_server_ts, stream_ordering"
625
+ " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
626
+ " FROM event_search"
627
+ " WHERE value MATCH ?"
628
+ " )"
629
+ " CROSS JOIN events USING (event_id)"
630
+ " WHERE "
638
631
)
639
- CROSS JOIN events USING (event_id)
640
- WHERE
641
- """
642
632
search_query = _parse_query_for_sqlite (search_term )
643
633
args = [search_query ] + args
644
634
645
- count_sql = """
646
- SELECT room_id, count(*) as count FROM event_search
647
- WHERE value MATCH ? AND
648
- """
635
+ count_sql = (
636
+ " SELECT room_id, count(*) as count FROM event_search"
637
+ " WHERE value MATCH ? AND "
638
+ )
649
639
count_args = [search_query ] + count_args
650
640
else :
651
641
# This should be unreachable.
@@ -657,10 +647,10 @@ async def search_rooms(
657
647
# We add an arbitrary limit here to ensure we don't try to pull the
658
648
# entire table from the database.
659
649
if isinstance (self .database_engine , PostgresEngine ):
660
- sql += """
661
- ORDER BY origin_server_ts DESC NULLS LAST, stream_ordering DESC NULLS LAST
662
- LIMIT ?
663
- """
650
+ sql += (
651
+ " ORDER BY origin_server_ts DESC NULLS LAST,"
652
+ " stream_ordering DESC NULLS LAST LIMIT ?"
653
+ )
664
654
elif isinstance (self .database_engine , Sqlite3Engine ):
665
655
sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
666
656
else :
0 commit comments