@@ -80,11 +80,11 @@ def store_search_entries_txn(
8080 if not self .hs .config .server .enable_search :
8181 return
8282 if isinstance (self .database_engine , PostgresEngine ):
83- sql = (
84- " INSERT INTO event_search"
85- " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
86- " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
87- )
83+ sql = """
84+ INSERT INTO event_search
85+ (event_id, room_id, key, vector, stream_ordering, origin_server_ts)
86+ VALUES (?,?,?,to_tsvector('english', ?),?,?)
87+ """
8888
8989 args1 = (
9090 (
@@ -101,20 +101,20 @@ def store_search_entries_txn(
101101 txn .execute_batch (sql , args1 )
102102
103103 elif isinstance (self .database_engine , Sqlite3Engine ):
104- sql = (
105- "INSERT INTO event_search (event_id, room_id, key, value)"
106- " VALUES (?,?,?,?)"
107- )
108- args2 = (
109- (
110- entry .event_id ,
111- entry .room_id ,
112- entry .key ,
113- _clean_value_for_search (entry .value ),
114- )
115- for entry in entries
104+ self .db_pool .simple_insert_many_txn (
105+ txn ,
106+ table = "event_search" ,
107+ keys = ("event_id" , "room_id" , "key" , "value" ),
108+ values = (
109+ (
110+ entry .event_id ,
111+ entry .room_id ,
112+ entry .key ,
113+ _clean_value_for_search (entry .value ),
114+ )
115+ for entry in entries
116+ ),
116117 )
117- txn .execute_batch (sql , args2 )
118118
119119 else :
120120 # This should be unreachable.
@@ -162,15 +162,17 @@ async def _background_reindex_search(
162162 TYPES = ["m.room.name" , "m.room.message" , "m.room.topic" ]
163163
164164 def reindex_search_txn (txn : LoggingTransaction ) -> int :
165- sql = (
166- "SELECT stream_ordering, event_id, room_id, type, json, "
167- " origin_server_ts FROM events"
168- " JOIN event_json USING (room_id, event_id)"
169- " WHERE ? <= stream_ordering AND stream_ordering < ?"
170- " AND (%s)"
171- " ORDER BY stream_ordering DESC"
172- " LIMIT ?"
173- ) % (" OR " .join ("type = '%s'" % (t ,) for t in TYPES ),)
165+ sql = """
166+ SELECT stream_ordering, event_id, room_id, type, json, origin_server_ts
167+ FROM events
168+ JOIN event_json USING (room_id, event_id)
169+ WHERE ? <= stream_ordering AND stream_ordering < ?
170+ AND (%s)
171+ ORDER BY stream_ordering DESC
172+ LIMIT ?
173+ """ % (
174+ " OR " .join ("type = '%s'" % (t ,) for t in TYPES ),
175+ )
174176
175177 txn .execute (sql , (target_min_stream_id , max_stream_id , batch_size ))
176178
@@ -284,8 +286,10 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
284286
285287 try :
286288 c .execute (
287- "CREATE INDEX CONCURRENTLY event_search_fts_idx"
288- " ON event_search USING GIN (vector)"
289+ """
290+ CREATE INDEX CONCURRENTLY event_search_fts_idx
291+ ON event_search USING GIN (vector)
292+ """
289293 )
290294 except psycopg2 .ProgrammingError as e :
291295 logger .warning (
@@ -323,12 +327,16 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
323327 # We create with NULLS FIRST so that when we search *backwards*
324328 # we get the ones with non null origin_server_ts *first*
325329 c .execute (
326- "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search("
327- "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
330+ """
331+ CREATE INDEX CONCURRENTLY event_search_room_order
332+ ON event_search(room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
333+ """
328334 )
329335 c .execute (
330- "CREATE INDEX CONCURRENTLY event_search_order ON event_search("
331- "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
336+ """
337+ CREATE INDEX CONCURRENTLY event_search_order
338+ ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
339+ """
332340 )
333341 conn .set_session (autocommit = False )
334342
@@ -345,14 +353,14 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
345353 )
346354
347355 def reindex_search_txn (txn : LoggingTransaction ) -> Tuple [int , bool ]:
348- sql = (
349- " UPDATE event_search AS es SET stream_ordering = e.stream_ordering,"
350- " origin_server_ts = e.origin_server_ts"
351- " FROM events AS e"
352- " WHERE e.event_id = es.event_id"
353- " AND ? <= e.stream_ordering AND e.stream_ordering < ?"
354- " RETURNING es.stream_ordering"
355- )
356+ sql = """
357+ UPDATE event_search AS es
358+ SET stream_ordering = e.stream_ordering, origin_server_ts = e.origin_server_ts
359+ FROM events AS e
360+ WHERE e.event_id = es.event_id
361+ AND ? <= e.stream_ordering AND e.stream_ordering < ?
362+ RETURNING es.stream_ordering
363+ """
356364
357365 min_stream_id = max_stream_id - batch_size
358366 txn .execute (sql , (min_stream_id , max_stream_id ))
@@ -456,33 +464,33 @@ async def search_msgs(
456464 if isinstance (self .database_engine , PostgresEngine ):
457465 search_query = search_term
458466 tsquery_func = self .database_engine .tsquery_func
459- sql = (
460- f" SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) AS rank,"
461- " room_id, event_id"
462- " FROM event_search"
463- f" WHERE vector @@ { tsquery_func } ('english', ?)"
464- )
467+ sql = f"""
468+ SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) AS rank,
469+ room_id, event_id
470+ FROM event_search
471+ WHERE vector @@ { tsquery_func } ('english', ?)
472+ """
465473 args = [search_query , search_query ] + args
466474
467- count_sql = (
468- " SELECT room_id, count(*) as count FROM event_search"
469- f" WHERE vector @@ { tsquery_func } ('english', ?)"
470- )
475+ count_sql = f"""
476+ SELECT room_id, count(*) as count FROM event_search
477+ WHERE vector @@ { tsquery_func } ('english', ?)
478+ """
471479 count_args = [search_query ] + count_args
472480 elif isinstance (self .database_engine , Sqlite3Engine ):
473481 search_query = _parse_query_for_sqlite (search_term )
474482
475- sql = (
476- " SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
477- " FROM event_search"
478- " WHERE value MATCH ?"
479- )
483+ sql = """
484+ SELECT rank(matchinfo(event_search)) as rank, room_id, event_id
485+ FROM event_search
486+ WHERE value MATCH ?
487+ """
480488 args = [search_query ] + args
481489
482- count_sql = (
483- " SELECT room_id, count(*) as count FROM event_search"
484- " WHERE value MATCH ?"
485- )
490+ count_sql = """
491+ SELECT room_id, count(*) as count FROM event_search
492+ WHERE value MATCH ?
493+ """
486494 count_args = [search_query ] + count_args
487495 else :
488496 # This should be unreachable.
@@ -588,26 +596,27 @@ async def search_rooms(
588596 raise SynapseError (400 , "Invalid pagination token" )
589597
590598 clauses .append (
591- "(origin_server_ts < ?"
592- " OR (origin_server_ts = ? AND stream_ordering < ?))"
599+ """
600+ (origin_server_ts < ? OR (origin_server_ts = ? AND stream_ordering < ?))
601+ """
593602 )
594603 args .extend ([origin_server_ts , origin_server_ts , stream ])
595604
596605 if isinstance (self .database_engine , PostgresEngine ):
597606 search_query = search_term
598607 tsquery_func = self .database_engine .tsquery_func
599- sql = (
600- f" SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) as rank,"
601- " origin_server_ts, stream_ordering, room_id, event_id"
602- " FROM event_search"
603- f" WHERE vector @@ { tsquery_func } ('english', ?) AND "
604- )
608+ sql = f"""
609+ SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) as rank,
610+ origin_server_ts, stream_ordering, room_id, event_id
611+ FROM event_search
612+ WHERE vector @@ { tsquery_func } ('english', ?) AND
613+ """
605614 args = [search_query , search_query ] + args
606615
607- count_sql = (
608- " SELECT room_id, count(*) as count FROM event_search"
609- f" WHERE vector @@ { tsquery_func } ('english', ?) AND "
610- )
616+ count_sql = f"""
617+ SELECT room_id, count(*) as count FROM event_search
618+ WHERE vector @@ { tsquery_func } ('english', ?) AND
619+ """
611620 count_args = [search_query ] + count_args
612621 elif isinstance (self .database_engine , Sqlite3Engine ):
613622
@@ -619,23 +628,24 @@ async def search_rooms(
619628 # in the events table to get the topological ordering. We need
620629 # to use the indexes in this order because sqlite refuses to
621630 # MATCH unless it uses the full text search index
622- sql = (
623- "SELECT rank(matchinfo) as rank, room_id, event_id,"
624- " origin_server_ts, stream_ordering"
625- " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
626- " FROM event_search"
627- " WHERE value MATCH ?"
628- " )"
629- " CROSS JOIN events USING (event_id)"
630- " WHERE "
631+ sql = """
632+ SELECT
633+ rank(matchinfo) as rank, room_id, event_id, origin_server_ts, stream_ordering
634+ FROM (
635+ SELECT key, event_id, matchinfo(event_search) as matchinfo
636+ FROM event_search
637+ WHERE value MATCH ?
631638 )
639+ CROSS JOIN events USING (event_id)
640+ WHERE
641+ """
632642 search_query = _parse_query_for_sqlite (search_term )
633643 args = [search_query ] + args
634644
635- count_sql = (
636- " SELECT room_id, count(*) as count FROM event_search"
637- " WHERE value MATCH ? AND "
638- )
645+ count_sql = """
646+ SELECT room_id, count(*) as count FROM event_search
647+ WHERE value MATCH ? AND
648+ """
639649 count_args = [search_query ] + count_args
640650 else :
641651 # This should be unreachable.
@@ -647,10 +657,10 @@ async def search_rooms(
647657 # We add an arbitrary limit here to ensure we don't try to pull the
648658 # entire table from the database.
649659 if isinstance (self .database_engine , PostgresEngine ):
650- sql += (
651- " ORDER BY origin_server_ts DESC NULLS LAST,"
652- " stream_ordering DESC NULLS LAST LIMIT ?"
653- )
660+ sql += """
661+ ORDER BY origin_server_ts DESC NULLS LAST, stream_ordering DESC NULLS LAST
662+ LIMIT ?
663+ """
654664 elif isinstance (self .database_engine , Sqlite3Engine ):
655665 sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
656666 else :
0 commit comments