@@ -80,11 +80,11 @@ def store_search_entries_txn(
8080 if not self .hs .config .server .enable_search :
8181 return
8282 if isinstance (self .database_engine , PostgresEngine ):
83- sql = """
84- INSERT INTO event_search
85- (event_id, room_id, key, vector, stream_ordering, origin_server_ts)
86- VALUES (?,?,?,to_tsvector('english', ?),?,?)
87- """
83+ sql = (
84+ " INSERT INTO event_search"
85+ " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
86+ " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
87+ )
8888
8989 args1 = (
9090 (
@@ -101,20 +101,20 @@ def store_search_entries_txn(
101101 txn .execute_batch (sql , args1 )
102102
103103 elif isinstance (self .database_engine , Sqlite3Engine ):
104- self .db_pool .simple_insert_many_txn (
105- txn ,
106- table = "event_search" ,
107- keys = ("event_id" , "room_id" , "key" , "value" ),
108- values = (
109- (
110- entry .event_id ,
111- entry .room_id ,
112- entry .key ,
113- _clean_value_for_search (entry .value ),
114- )
115- for entry in entries
116- ),
104+ sql = (
105+ "INSERT INTO event_search (event_id, room_id, key, value)"
106+ " VALUES (?,?,?,?)"
107+ )
108+ args2 = (
109+ (
110+ entry .event_id ,
111+ entry .room_id ,
112+ entry .key ,
113+ _clean_value_for_search (entry .value ),
114+ )
115+ for entry in entries
117116 )
117+ txn .execute_batch (sql , args2 )
118118
119119 else :
120120 # This should be unreachable.
@@ -162,17 +162,15 @@ async def _background_reindex_search(
162162 TYPES = ["m.room.name" , "m.room.message" , "m.room.topic" ]
163163
164164 def reindex_search_txn (txn : LoggingTransaction ) -> int :
165- sql = """
166- SELECT stream_ordering, event_id, room_id, type, json, origin_server_ts
167- FROM events
168- JOIN event_json USING (room_id, event_id)
169- WHERE ? <= stream_ordering AND stream_ordering < ?
170- AND (%s)
171- ORDER BY stream_ordering DESC
172- LIMIT ?
173- """ % (
174- " OR " .join ("type = '%s'" % (t ,) for t in TYPES ),
175- )
165+ sql = (
166+ "SELECT stream_ordering, event_id, room_id, type, json, "
167+ " origin_server_ts FROM events"
168+ " JOIN event_json USING (room_id, event_id)"
169+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
170+ " AND (%s)"
171+ " ORDER BY stream_ordering DESC"
172+ " LIMIT ?"
173+ ) % (" OR " .join ("type = '%s'" % (t ,) for t in TYPES ),)
176174
177175 txn .execute (sql , (target_min_stream_id , max_stream_id , batch_size ))
178176
@@ -286,10 +284,8 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
286284
287285 try :
288286 c .execute (
289- """
290- CREATE INDEX CONCURRENTLY event_search_fts_idx
291- ON event_search USING GIN (vector)
292- """
287+ "CREATE INDEX CONCURRENTLY event_search_fts_idx"
288+ " ON event_search USING GIN (vector)"
293289 )
294290 except psycopg2 .ProgrammingError as e :
295291 logger .warning (
@@ -327,16 +323,12 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
327323 # We create with NULLS FIRST so that when we search *backwards*
328324 # we get the ones with non null origin_server_ts *first*
329325 c .execute (
330- """
331- CREATE INDEX CONCURRENTLY event_search_room_order
332- ON event_search(room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
333- """
326+ "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search("
327+ "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
334328 )
335329 c .execute (
336- """
337- CREATE INDEX CONCURRENTLY event_search_order
338- ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
339- """
330+ "CREATE INDEX CONCURRENTLY event_search_order ON event_search("
331+ "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
340332 )
341333 conn .set_session (autocommit = False )
342334
@@ -353,14 +345,14 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
353345 )
354346
355347 def reindex_search_txn (txn : LoggingTransaction ) -> Tuple [int , bool ]:
356- sql = """
357- UPDATE event_search AS es
358- SET stream_ordering = e.stream_ordering, origin_server_ts = e.origin_server_ts
359- FROM events AS e
360- WHERE e.event_id = es.event_id
361- AND ? <= e.stream_ordering AND e.stream_ordering < ?
362- RETURNING es.stream_ordering
363- """
348+ sql = (
349+ " UPDATE event_search AS es SET stream_ordering = e.stream_ordering,"
350+ " origin_server_ts = e.origin_server_ts"
351+ " FROM events AS e"
352+ " WHERE e.event_id = es.event_id"
353+ " AND ? <= e.stream_ordering AND e.stream_ordering < ?"
354+ " RETURNING es.stream_ordering"
355+ )
364356
365357 min_stream_id = max_stream_id - batch_size
366358 txn .execute (sql , (min_stream_id , max_stream_id ))
@@ -464,33 +456,33 @@ async def search_msgs(
464456 if isinstance (self .database_engine , PostgresEngine ):
465457 search_query = search_term
466458 tsquery_func = self .database_engine .tsquery_func
467- sql = f"""
468- SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) AS rank,
469- room_id, event_id
470- FROM event_search
471- WHERE vector @@ { tsquery_func } ('english', ?)
472- """
459+ sql = (
460+ f" SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) AS rank,"
461+ " room_id, event_id"
462+ " FROM event_search"
463+ f" WHERE vector @@ { tsquery_func } ('english', ?)"
464+ )
473465 args = [search_query , search_query ] + args
474466
475- count_sql = f"""
476- SELECT room_id, count(*) as count FROM event_search
477- WHERE vector @@ { tsquery_func } ('english', ?)
478- """
467+ count_sql = (
468+ " SELECT room_id, count(*) as count FROM event_search"
469+ f" WHERE vector @@ { tsquery_func } ('english', ?)"
470+ )
479471 count_args = [search_query ] + count_args
480472 elif isinstance (self .database_engine , Sqlite3Engine ):
481473 search_query = _parse_query_for_sqlite (search_term )
482474
483- sql = """
484- SELECT rank(matchinfo(event_search)) as rank, room_id, event_id
485- FROM event_search
486- WHERE value MATCH ?
487- """
475+ sql = (
476+ " SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
477+ " FROM event_search"
478+ " WHERE value MATCH ?"
479+ )
488480 args = [search_query ] + args
489481
490- count_sql = """
491- SELECT room_id, count(*) as count FROM event_search
492- WHERE value MATCH ?
493- """
482+ count_sql = (
483+ " SELECT room_id, count(*) as count FROM event_search"
484+ " WHERE value MATCH ?"
485+ )
494486 count_args = [search_query ] + count_args
495487 else :
496488 # This should be unreachable.
@@ -596,27 +588,26 @@ async def search_rooms(
596588 raise SynapseError (400 , "Invalid pagination token" )
597589
598590 clauses .append (
599- """
600- (origin_server_ts < ? OR (origin_server_ts = ? AND stream_ordering < ?))
601- """
591+ "(origin_server_ts < ?"
592+ " OR (origin_server_ts = ? AND stream_ordering < ?))"
602593 )
603594 args .extend ([origin_server_ts , origin_server_ts , stream ])
604595
605596 if isinstance (self .database_engine , PostgresEngine ):
606597 search_query = search_term
607598 tsquery_func = self .database_engine .tsquery_func
608- sql = f"""
609- SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) as rank,
610- origin_server_ts, stream_ordering, room_id, event_id
611- FROM event_search
612- WHERE vector @@ { tsquery_func } ('english', ?) AND
613- """
599+ sql = (
600+ f" SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) as rank,"
601+ " origin_server_ts, stream_ordering, room_id, event_id"
602+ " FROM event_search"
603+ f" WHERE vector @@ { tsquery_func } ('english', ?) AND "
604+ )
614605 args = [search_query , search_query ] + args
615606
616- count_sql = f"""
617- SELECT room_id, count(*) as count FROM event_search
618- WHERE vector @@ { tsquery_func } ('english', ?) AND
619- """
607+ count_sql = (
608+ " SELECT room_id, count(*) as count FROM event_search"
609+ f" WHERE vector @@ { tsquery_func } ('english', ?) AND "
610+ )
620611 count_args = [search_query ] + count_args
621612 elif isinstance (self .database_engine , Sqlite3Engine ):
622613
@@ -628,24 +619,23 @@ async def search_rooms(
628619 # in the events table to get the topological ordering. We need
629620 # to use the indexes in this order because sqlite refuses to
630621 # MATCH unless it uses the full text search index
631- sql = """
632- SELECT
633- rank(matchinfo) as rank, room_id, event_id, origin_server_ts, stream_ordering
634- FROM (
635- SELECT key, event_id, matchinfo(event_search) as matchinfo
636- FROM event_search
637- WHERE value MATCH ?
622+ sql = (
623+ "SELECT rank(matchinfo) as rank, room_id, event_id,"
624+ " origin_server_ts, stream_ordering"
625+ " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
626+ " FROM event_search"
627+ " WHERE value MATCH ?"
628+ " )"
629+ " CROSS JOIN events USING (event_id)"
630+ " WHERE "
638631 )
639- CROSS JOIN events USING (event_id)
640- WHERE
641- """
642632 search_query = _parse_query_for_sqlite (search_term )
643633 args = [search_query ] + args
644634
645- count_sql = """
646- SELECT room_id, count(*) as count FROM event_search
647- WHERE value MATCH ? AND
648- """
635+ count_sql = (
636+ " SELECT room_id, count(*) as count FROM event_search"
637+ " WHERE value MATCH ? AND "
638+ )
649639 count_args = [search_query ] + count_args
650640 else :
651641 # This should be unreachable.
@@ -657,10 +647,10 @@ async def search_rooms(
657647 # We add an arbitrary limit here to ensure we don't try to pull the
658648 # entire table from the database.
659649 if isinstance (self .database_engine , PostgresEngine ):
660- sql += """
661- ORDER BY origin_server_ts DESC NULLS LAST, stream_ordering DESC NULLS LAST
662- LIMIT ?
663- """
650+ sql += (
651+ " ORDER BY origin_server_ts DESC NULLS LAST,"
652+ " stream_ordering DESC NULLS LAST LIMIT ?"
653+ )
664654 elif isinstance (self .database_engine , Sqlite3Engine ):
665655 sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
666656 else :
0 commit comments