diff --git a/internal/b/blueprints.go b/internal/b/blueprints.go index 6599cf04..365d419c 100644 --- a/internal/b/blueprints.go +++ b/internal/b/blueprints.go @@ -110,6 +110,9 @@ type Event struct { // The prev events of the event if we want to override or falsify them. // If it is left at nil, MustCreateEvent will populate it automatically based on the forward extremities. PrevEvents interface{} + + // If this is a redaction, the event that it redacts + Redacts string } func MustValidate(bp Blueprint) Blueprint { diff --git a/internal/client/client.go b/internal/client/client.go index 0ccd0388..9ad48486 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -248,7 +248,8 @@ func (c *CSAPI) MustSync(t *testing.T, syncReq SyncReq) (gjson.Result, string) { // In the unlikely event that you need ordering on your checks, call MustSyncUntil multiple times // with a single checker, and reuse the returned since token, as in the "Incremental sync" example. // -// Will time out after CSAPI.SyncUntilTimeout. Returns the latest since token used. +// Will time out after CSAPI.SyncUntilTimeout. Returns the `next_batch` token from the final +// response. func (c *CSAPI) MustSyncUntil(t *testing.T, syncReq SyncReq, checks ...SyncCheckOpt) string { t.Helper() start := time.Now() diff --git a/internal/federation/server.go b/internal/federation/server.go index b9759fe1..02fb7de1 100644 --- a/internal/federation/server.go +++ b/internal/federation/server.go @@ -263,6 +263,7 @@ func (s *Server) MustCreateEvent(t *testing.T, room *ServerRoom, ev b.Event) *go PrevEvents: prevEvents, Unsigned: unsigned, AuthEvents: ev.AuthEvents, + Redacts: ev.Redacts, } if eb.AuthEvents == nil { var stateNeeded gomatrixserverlib.StateNeeded diff --git a/tests/csapi/sync_test.go b/tests/csapi/sync_test.go index faca368f..4f49bf74 100644 --- a/tests/csapi/sync_test.go +++ b/tests/csapi/sync_test.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/complement/internal/b" "github.com/matrix-org/complement/internal/client" + "github.com/matrix-org/complement/internal/federation" "github.com/matrix-org/complement/runtime" ) @@ -251,6 +252,120 @@ func TestSync(t *testing.T) { res, _ := alice.MustSync(t, client.SyncReq{Since: nextBatch}) usersInPresenceEvents(t, res.Get("presence"), []string{}) }) + + t.Run("sync should succeed even if the sync token points to a redaction of an unknown event", func(t *testing.T) { + // this is a regression test for https://github.com/matrix-org/synapse/issues/12864 + // + // The idea here is that we need a sync token which points to a redaction + // for an event which doesn't exist. Such a redaction may not be served to + // the client. This can lead to server bugs when the server tries to fetch + // the event corresponding to the sync token. + // + // The C-S API does not permit us to generate such a redaction event, so + // we have to poke it in from a federated server. + // + // The situation is complicated further by the very fact that we + // cannot see the faulty redaction, and therefore cannot tell whether + // our sync token includes it or not. The normal trick here would be + // to send another (regular) event as a sentinel, and then if that sentinel + // is returned by /sync, we can be sure the faulty event has also been + // processed. However, that doesn't work here, because doing so will mean + // that the sync token points to the sentinel rather than the redaction, + // negating the whole point of the test. + // + // Instead, as a rough proxy, we send a sentinel in a *different* room. + // There is no guarantee that the target server will process the events + // in the order we send them, but in practice it seems to get close + // enough. + + t.Parallel() + + // alice creates two rooms, which charlie (on our test server) joins + srv := federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + federation.HandleTransactionRequests(nil, nil), + ) + cancel := srv.Listen() + defer cancel() + + charlie := srv.UserID("charlie") + + redactionRoomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + redactionRoom := srv.MustJoinRoom(t, deployment, "hs1", redactionRoomID, charlie) + + sentinelRoomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + sentinelRoom := srv.MustJoinRoom(t, deployment, "hs1", sentinelRoomID, charlie) + + // charlie creates a bogus redaction, which he sends out, followed by + // a good event - in another room - to act as a sentinel. It's not + // guaranteed, but hopefully if the sentinel is received, so was the + // redaction. + redactionEvent := srv.MustCreateEvent(t, redactionRoom, b.Event{ + Type: "m.room.redaction", + Sender: charlie, + Content: map[string]interface{}{}, + Redacts: "$12345", + }) + redactionRoom.AddEvent(redactionEvent) + t.Logf("Created redaction event %s", redactionEvent.EventID()) + srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{redactionEvent.JSON()}, nil) + + sentinelEvent := srv.MustCreateEvent(t, sentinelRoom, b.Event{ + Type: "m.room.test", + Sender: charlie, + Content: map[string]interface{}{"body": "1234"}, + }) + sentinelRoom.AddEvent(sentinelEvent) + srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{redactionEvent.JSON(), sentinelEvent.JSON()}, nil) + + // wait for the sentinel to arrive + nextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(sentinelRoomID, sentinelEvent.EventID())) + + // charlie sends another batch of events to force a gappy sync. + // We have to send 11 events to force a gap, since we use a filter with a timeline limit of 10 events. + pdus := make([]json.RawMessage, 11) + var lastSentEventId string + for i := range pdus { + ev := srv.MustCreateEvent(t, redactionRoom, b.Event{ + Type: "m.room.message", + Sender: charlie, + Content: map[string]interface{}{}, + }) + redactionRoom.AddEvent(ev) + pdus[i] = ev.JSON() + lastSentEventId = ev.EventID() + } + srv.MustSendTransaction(t, deployment, "hs1", pdus, nil) + t.Logf("Sent filler events, with final event %s", lastSentEventId) + + // sync, starting from the same ?since each time, until the final message turns up. + // This is basically an inlining of MustSyncUntil, with the key difference that we + // keep the same ?since each time, instead of incrementally syncing on each pass. + numResponsesReturned := 0 + start := time.Now() + for { + if time.Since(start) > alice.SyncUntilTimeout { + t.Fatalf("%s: timed out after %v. Seen %d /sync responses", alice.UserID, time.Since(start), numResponsesReturned) + } + // sync, using a filter with a limit smaller than the number of PDUs we sent. + syncResponse, _ := alice.MustSync(t, client.SyncReq{Filter: filterID, Since: nextBatch}) + numResponsesReturned += 1 + timeline := syncResponse.Get("rooms.join." + client.GjsonEscape(redactionRoomID) + ".timeline") + timelineEvents := timeline.Get("events").Array() + lastEventIdInSync := timelineEvents[len(timelineEvents)-1].Get("event_id").String() + + t.Logf("Iteration %d: /sync returned %d events, with final event %s", numResponsesReturned, len(timelineEvents), lastEventIdInSync) + if lastEventIdInSync == lastSentEventId { + // check we actually got a gappy sync - else this test isn't testing the right thing + if !timeline.Get("limited").Bool() { + t.Fatalf("Not a gappy sync after redaction") + } + break + } + } + + // that's it - we successfully did a gappy sync. + }) }) }