Skip to content

Commit a620c80

Browse files
committed
chore: logical replication tests using v3 api
1 parent 03816c3 commit a620c80

File tree

1 file changed

+301
-0
lines changed

1 file changed

+301
-0
lines changed

test/v3_logical_replication_test.dart

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
import 'dart:async';
2+
3+
import 'package:async/async.dart';
4+
import 'package:postgres/messages.dart';
5+
import 'package:postgres/postgres.dart';
6+
import 'package:postgres/postgres_v3_experimental.dart';
7+
import 'package:stream_channel/stream_channel.dart';
8+
import 'package:test/expect.dart';
9+
import 'package:test/scaffolding.dart';
10+
11+
import 'docker.dart';
12+
13+
/// An Interceptor that listens to server events
14+
class _ServerMessagesInterceptor {
15+
final controller = StreamController<ServerMessage>.broadcast();
16+
17+
Stream<ServerMessage> get messages => controller.stream;
18+
19+
// For the current set of tests, we are only listening to server events and
20+
// we are not sending anything to the server so the second handler is left
21+
// empty
22+
late final transformer = StreamChannelTransformer<BaseMessage, BaseMessage>(
23+
StreamTransformer.fromHandlers(
24+
handleData: (data, sink) {
25+
if (!controller.isClosed) {
26+
controller.add(data as ServerMessage);
27+
}
28+
sink.add(data);
29+
},
30+
),
31+
StreamSinkTransformer.fromHandlers(),
32+
);
33+
}
34+
35+
void main() {
36+
usePostgresDocker();
37+
38+
// NOTES:
39+
// - Two PostgreSQL connections are needed for testing replication.
40+
// - One for listening to streaming replications (this connection will be locked).
41+
// - The other one to modify the database (e.g. insert, delete, update, truncate)
42+
group('test logical replication with pgoutput for decoding', () {
43+
// use this for listening to messages
44+
late final PgConnection replicationConn;
45+
46+
// use this for sending queries
47+
late final PgConnection changesConn;
48+
49+
// used to intercept server messages in the replication connection
50+
// the interceptor is used by tests to listen to replication stream
51+
final serverMessagesInterceptor = _ServerMessagesInterceptor();
52+
53+
// this table is for insert, update, and delete tests.
54+
final changesTable = 'test.temp_changes_table';
55+
56+
// this will be used for testing truncation
57+
// must be created before hand to add in publication
58+
final truncateTable = 'test.temp_truncate_table';
59+
60+
setUpAll(() async {
61+
// connection setup
62+
63+
// replication connection setup
64+
// used for creating replication slot and listening to changes in the db
65+
replicationConn = await PgConnection.open(
66+
PgEndpoint(
67+
host: 'localhost',
68+
database: 'dart_test',
69+
username: 'replication',
70+
password: 'replication',
71+
),
72+
sessionSettings: PgSessionSettings(
73+
replicationMode: ReplicationMode.logical,
74+
onBadSslCertificate: (cert) => true,
75+
transformer: serverMessagesInterceptor.transformer,
76+
),
77+
);
78+
79+
// changes connection setup
80+
// used to create changes in the db that are reflected in the replication
81+
// stream
82+
changesConn = await PgConnection.open(
83+
PgEndpoint(
84+
host: 'localhost',
85+
database: 'dart_test',
86+
username: 'dart',
87+
password: 'dart',
88+
),
89+
sessionSettings: PgSessionSettings(
90+
onBadSslCertificate: (cert) => true,
91+
),
92+
);
93+
94+
// create testing tables
95+
// note: primary keys are necessary for replication to work and they are
96+
// used as an identity replica (to allow update & delete) on tables
97+
// that are part of a publication.
98+
await changesConn.execute('create schema test');
99+
await changesConn.execute('create table $changesTable '
100+
'(id int GENERATED ALWAYS AS IDENTITY, value text, '
101+
'PRIMARY KEY (id));');
102+
await changesConn.execute('create table $truncateTable '
103+
'(id int GENERATED ALWAYS AS IDENTITY, value text, '
104+
'PRIMARY KEY (id));');
105+
106+
// create publication
107+
final publicationName = 'test_publication';
108+
await changesConn.execute('DROP PUBLICATION IF EXISTS $publicationName;');
109+
await changesConn.execute(
110+
'CREATE PUBLICATION $publicationName FOR TABLE $changesTable, $truncateTable;',
111+
);
112+
113+
final sysInfoRes = await replicationConn.execute(
114+
'IDENTIFY_SYSTEM;',
115+
useSimpleQueryProtocol: true,
116+
);
117+
118+
final xlogpos = sysInfoRes[0][2] as String;
119+
120+
// create replication slot
121+
final slotName = 'a_test_slot';
122+
123+
// the logical decoding used for testing
124+
final logicalDecodingPlugin = 'pgoutput';
125+
126+
// `TEMPORARY` will remove the slot after the connection is closed/dropped
127+
await replicationConn.execute(
128+
'CREATE_REPLICATION_SLOT $slotName TEMPORARY LOGICAL '
129+
'$logicalDecodingPlugin NOEXPORT_SNAPSHOT',
130+
useSimpleQueryProtocol: true);
131+
132+
// start replication process
133+
final statement = 'START_REPLICATION SLOT $slotName LOGICAL $xlogpos '
134+
"(proto_version '1', publication_names '$publicationName')";
135+
136+
// TODO(@osaxma): find a better way to handle this
137+
// Since `START_REPLICATION` is a connection blocking command so this
138+
// future will not complete until the replication process stops somehow
139+
// or by closing the connection, an error or timing out.
140+
// ignore: unawaited_futures
141+
replicationConn
142+
.execute(statement, useSimpleQueryProtocol: true)
143+
.timeout(Duration(seconds: 120))
144+
.catchError((e) {
145+
// this query will be cancelled once the connection is closed.
146+
// no need to handle the error
147+
throw e;
148+
});
149+
150+
await Future.delayed(Duration(seconds: 1));
151+
});
152+
153+
tearDownAll(() async {
154+
// TODO(@osaxma): close the connection
155+
// currently it's not possible -- see:
156+
// https://github.com/isoos/postgresql-dart/issues/119
157+
// await replicationConn.close();
158+
await changesConn.close();
159+
await serverMessagesInterceptor.controller.close();
160+
});
161+
162+
// BeginMessage -> InsertMessage -> CommitMessage
163+
test('- Receive InsertMessage after insert statement', () async {
164+
final stream = serverMessagesInterceptor.messages
165+
.where((event) => event is XLogDataMessage)
166+
.map((event) => (event as XLogDataMessage).data)
167+
// RelationMessage isn't always present (appears conditionally) so
168+
// it's skipped when present
169+
.where((event) => event is! RelationMessage)
170+
.take(3);
171+
172+
late final StreamController controller;
173+
controller = StreamController(
174+
onListen: () async {
175+
// don't await here otherwise what's after won't be executed.
176+
final future = controller.addStream(stream);
177+
await changesConn
178+
.execute("insert into $changesTable (value) values ('test');");
179+
await future;
180+
await controller.close();
181+
},
182+
);
183+
184+
final matchers = [
185+
isA<BeginMessage>(),
186+
isA<InsertMessage>(),
187+
isA<CommitMessage>(),
188+
];
189+
190+
expect(controller.stream, emitsInAnyOrder(matchers));
191+
});
192+
193+
// BeginMessage -> UpdateMessage -> CommitMessage
194+
test('- Receive UpdateMessage after update statement', () async {
195+
// insert data to be updated
196+
await changesConn
197+
.execute("insert into $changesTable (value) values ('update_test');");
198+
// wait to avoid capturing INSERT
199+
await Future.delayed(Duration(seconds: 3));
200+
final stream = serverMessagesInterceptor.messages
201+
.where((event) => event is XLogDataMessage)
202+
.map((event) => (event as XLogDataMessage).data)
203+
// RelationMessage isn't always present (appears conditionally) so
204+
// it's skipped when present
205+
.where((event) => event is! RelationMessage)
206+
.take(3);
207+
208+
late final StreamController controller;
209+
controller = StreamController(
210+
onListen: () async {
211+
// don't await here otherwise what's after won't be executed.
212+
final future = controller.addStream(stream);
213+
await changesConn.execute(
214+
"update $changesTable set value = 'updated_test_value'"
215+
"where value = 'update_test';",
216+
);
217+
await future;
218+
await controller.close();
219+
},
220+
);
221+
222+
final matchers = [
223+
isA<BeginMessage>(),
224+
isA<UpdateMessage>(),
225+
isA<CommitMessage>(),
226+
];
227+
228+
expect(controller.stream, emitsInAnyOrder(matchers));
229+
});
230+
// BeginMessage -> DeleteMessage -> CommitMessage
231+
test('- Receive DeleteMessage after delete statement', () async {
232+
// insert data to be delete
233+
await changesConn
234+
.execute("insert into $changesTable (value) values ('update_test');");
235+
// wait to avoid capturing INSERT
236+
await Future.delayed(Duration(seconds: 3));
237+
final stream = serverMessagesInterceptor.messages
238+
.where((event) => event is XLogDataMessage)
239+
.map((event) => (event as XLogDataMessage).data)
240+
// RelationMessage isn't always present (appears conditionally) so
241+
// it's skipped when present
242+
.where((event) => event is! RelationMessage)
243+
.take(3);
244+
245+
late final StreamController controller;
246+
controller = StreamController(
247+
onListen: () async {
248+
// don't await here otherwise what's after won't be executed.
249+
final future = controller.addStream(stream);
250+
await changesConn.execute(
251+
"delete from $changesTable where value = 'update_test';",
252+
);
253+
await future;
254+
await controller.close();
255+
},
256+
);
257+
258+
final matchers = [
259+
isA<BeginMessage>(),
260+
isA<DeleteMessage>(),
261+
isA<CommitMessage>(),
262+
];
263+
264+
expect(controller.stream, emitsInAnyOrder(matchers));
265+
});
266+
267+
// BeginMessage -> TruncateMessage -> CommitMessage
268+
test('- Receive TruncateMessage after delete statement', () async {
269+
// wait to for a second
270+
await Future.delayed(Duration(seconds: 1));
271+
final stream = serverMessagesInterceptor.messages
272+
.where((event) => event is XLogDataMessage)
273+
.map((event) => (event as XLogDataMessage).data)
274+
// RelationMessage isn't always present (appears conditionally) so
275+
// it's skipped when present
276+
.where((event) => event is! RelationMessage)
277+
.take(3);
278+
279+
late final StreamController controller;
280+
controller = StreamController(
281+
onListen: () async {
282+
// don't await here otherwise what's after won't be executed.
283+
final future = controller.addStream(stream);
284+
await changesConn.execute(
285+
'truncate table $truncateTable;',
286+
);
287+
await future;
288+
await controller.close();
289+
},
290+
);
291+
292+
final matchers = [
293+
isA<BeginMessage>(),
294+
isA<TruncateMessage>(),
295+
isA<CommitMessage>(),
296+
];
297+
298+
expect(controller.stream, emitsInOrder(matchers));
299+
});
300+
});
301+
}

0 commit comments

Comments
 (0)