Skip to content

Commit 27dac99

Browse files
committed
feat: use new sync service
1 parent 3124743 commit 27dac99

File tree

9 files changed

+7575
-14245
lines changed

9 files changed

+7575
-14245
lines changed

package-lock.json

Lines changed: 7301 additions & 14089 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,16 @@
3939
"test:browser": "aegir test --target browser"
4040
},
4141
"dependencies": {
42-
"ioredis": "^4.19.4",
42+
"emittery": "^0.8.1",
4343
"ipaddr.js": "^2.0.0",
44+
"isomorphic-ws": "^4.0.1",
4445
"lodash.flatten": "^4.4.0",
45-
"winston": "^3.3.3"
46+
"winston": "^3.3.3",
47+
"ws": "^7.4.4"
4648
},
4749
"devDependencies": {
48-
"@types/ioredis": "^4.17.9",
49-
"aegir": "^30.0.1"
50+
"@types/ws": "^7.4.1",
51+
"aegir": "^33.0.0"
5052
},
5153
"engines": {
5254
"node": ">=12.0.0"

src/sync/index.js

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
'use strict'
22

3-
const { redisClient } = require('./redis')
3+
const { createSocket } = require('./socket')
44
const { createState } = require('./state')
55
const { createTopic } = require('./topic')
66
const { createSugar } = require('./sugar')
7+
const { createPubSub } = require('./pubsub')
78

89
/** @typedef {import('winston').Logger} Logger */
10+
/** @typedef {import('events').EventEmitter} EventEmitter */
911
/** @typedef {import('../runtime').RunEnv} RunEnv */
1012
/** @typedef {import('../runtime').RunParams} RunParams */
1113
/** @typedef {import('./types').SyncClient} SyncClient */
14+
/** @typedef {import('./types').Request} Request */
15+
/** @typedef {import('./types').Response} Response */
1216

1317
/**
1418
* Returns a new sync client that is bound to the provided runEnv. All the operations
@@ -28,19 +32,18 @@ function newBoundClient (runenv) {
2832
* @returns {Promise<SyncClient>}
2933
*/
3034
async function newClient (logger, extractor) {
31-
const redis = await redisClient(logger)
35+
const socket = await createSocket(logger)
36+
const pubsub = createPubSub(logger, socket)
3237

3338
const base = {
34-
...createState(logger, extractor, redis),
35-
...createTopic(logger, extractor, redis)
39+
...createState(logger, extractor, pubsub, socket),
40+
...createTopic(logger, extractor, pubsub, socket)
3641
}
3742

3843
return {
3944
...base,
4045
...createSugar(base),
41-
close: () => {
42-
redis.disconnect()
43-
}
46+
close: socket.close
4447
}
4548
}
4649

src/sync/pubsub.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
'use strict'
2+
3+
/** @typedef {import('../runtime').RunParams} RunParams */
4+
/** @typedef {import('./types').PubSub} PubSub */
5+
/** @typedef {import('./types').Response} Response */
6+
/** @typedef {import('./types').Request} Request */
7+
/** @typedef {import('./types').Socket} Socket */
8+
/** @typedef {import('events').EventEmitter} EventEmitter */
9+
10+
/**
11+
* @param {import('winston').Logger} logger
12+
* @param {Socket} socket
13+
* @returns {PubSub}
14+
*/
15+
function createPubSub (logger, socket) {
16+
return {
17+
publish: async (topic, payload) => {
18+
const res = await socket.requestOnce({
19+
publish: {
20+
topic: topic,
21+
payload: payload
22+
}
23+
})
24+
25+
if (res.error) {
26+
throw res.error
27+
}
28+
29+
return res.publish.seq
30+
},
31+
subscribe: async (key) => {
32+
return socket.request({
33+
subscribe: {
34+
topic: key
35+
}
36+
})
37+
}
38+
}
39+
}
40+
41+
module.exports = {
42+
createPubSub
43+
}

src/sync/redis.js

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/sync/socket.js

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
'use strict'
2+
3+
const Emittery = require('emittery')
4+
const WebSocket = require('isomorphic-ws')
5+
6+
const ENV_SYNC_SERVICE_HOST = 'SYNC_SERVICE_HOST'
7+
const ENV_SYNC_SERVICE_PORT = 'SYNC_SERVICE_PORT'
8+
9+
/** @typedef {import('winston').Logger} Logger */
10+
/** @typedef {import('events').EventEmitter} EventEmitter */
11+
/** @typedef {import('../runtime').RunEnv} RunEnv */
12+
/** @typedef {import('../runtime').RunParams} RunParams */
13+
/** @typedef {import('./types').SyncClient} SyncClient */
14+
/** @typedef {import('./types').Request} Request */
15+
/** @typedef {import('./types').Response} Response */
16+
/** @typedef {import('./types').ResponseIterator} ResponseIterator */
17+
/** @typedef {import('./types').Socket} Socket */
18+
19+
/**
20+
* @param {import('winston').Logger} logger
21+
* @returns {Promise<Socket>}
22+
*/
23+
function createSocket (logger) {
24+
const address = socketAddress()
25+
const ws = new WebSocket(address)
26+
const emitter = new Emittery()
27+
let next = 0
28+
29+
return new Promise((resolve, reject) => {
30+
ws.onopen = function open () {
31+
resolve({
32+
request,
33+
requestOnce,
34+
close: () => {
35+
ws.close()
36+
}
37+
})
38+
}
39+
40+
ws.onclose = function close () {
41+
logger.info('connection to sync server closed')
42+
}
43+
44+
ws.onmessage = function incoming (event) {
45+
const res = /** @type Response */(JSON.parse(event.data.toString()))
46+
emitter.emit(res.id, res)
47+
}
48+
49+
/**
50+
* @param {Request} req
51+
* @returns {Promise<Response>}
52+
*/
53+
const requestOnce = async function (req) {
54+
const id = (next++).toString()
55+
const promise = emitter.once(id)
56+
57+
req.id = id
58+
ws.send(JSON.stringify(req))
59+
60+
const data = await promise
61+
return data
62+
}
63+
64+
/**
65+
* @param {Request} req
66+
* @returns {ResponseIterator}
67+
*/
68+
const request = function (req) {
69+
const id = (next++).toString()
70+
const it = emitter.events(id)
71+
let run = true
72+
73+
req.id = id
74+
ws.send(JSON.stringify(req))
75+
76+
const cancel = () => {
77+
run = false
78+
emitter.clearListeners(id)
79+
}
80+
81+
const wait = (async function * () {
82+
try {
83+
for await (const data of it) {
84+
yield data
85+
}
86+
} catch (e) {
87+
if (run) throw e
88+
}
89+
})()
90+
91+
return {
92+
cancel,
93+
wait
94+
}
95+
}
96+
})
97+
}
98+
99+
function socketAddress () {
100+
let host = process.env[ENV_SYNC_SERVICE_HOST]
101+
let port = process.env[ENV_SYNC_SERVICE_PORT]
102+
103+
if (!port) {
104+
port = '5050'
105+
}
106+
107+
if (!host) {
108+
host = 'testground-sync-service'
109+
}
110+
111+
return `ws://${host}:${port}`
112+
}
113+
114+
module.exports = {
115+
createSocket
116+
}

0 commit comments

Comments
 (0)