diff --git a/src/shared/services/WorkPool.js b/src/shared/services/WorkPool.js new file mode 100644 index 00000000000..7ec2235f017 --- /dev/null +++ b/src/shared/services/WorkPool.js @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2002-2018 "Neo4j, Inc" + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +import { v4 as uuid } from 'uuid' + +class WorkPool { + static workerStates = { + BUSY: 'busy', + FREE: 'free' + } + constructor (createWorker, maxPoolSize = 15) { + this.createWorker = createWorker + this.maxPoolSize = maxPoolSize + this.register = [] + this.q = [] + } + // Public methods + getPoolSize (state) { + if (!state) { + return this.register.length + } + return this.register.filter(w => w.state === state).length + } + getQueueSize () { + return this.q.length + } + getWorkById (id) { + for (let i = 0; i < this.q.length; i++) { + if (this.q[i].id === id) { + return this.q[i] + } + } + return null + } + doWork ({ id, payload, onmessage }) { + const work = this._buildWorkObj({ id: id || uuid(), payload, onmessage }) + this._addToQ(work) + this._next() + return work + } + + // Implemtation details + _getFreeWorker (id) { + const len = this.register.length + for (let i = len - 1; i >= 0; i--) { + if (this.register[i].state === WorkPool.workerStates.FREE) { + return this.register[i] + } + } + const poolSize = this.getPoolSize() + if (poolSize < this.maxPoolSize) { + const workerObj = this._buildWorkerObj( + this.createWorker(), + WorkPool.workerStates.BUSY + ) + this.register.push(workerObj) + return workerObj + } + return null + } + _next () { + if (!this.getQueueSize()) { + return + } + const workerObj = this._getFreeWorker() + if (!workerObj) { + return + } + const work = this.q.shift() + workerObj.id = work.id + work._assignWorker(workerObj) + work._executeInitial() + } + _addToQ (work) { + this.q.push(work) + } + _removeFromQ (id) { + this.q.splice(this.q.findIndex(el => el.id === id), 1) + } + _unregisterWorker = id => { + const worker = this._getWorkerById(id) + if (worker === null) { + return + } + worker.state = WorkPool.workerStates.FREE + this._next() + } + _buildWorkObj ({ id, payload, onmessage }) { + const obj = { + id, + _worker: undefined, + _executed: false, + _finishFn: undefined + } + obj.execute = payload => { + if (payload && obj._workerObj) { + obj._workerObj.worker.postMessage(payload) + } + } + obj.onFinish = fn => (obj._finishFn = fn) + obj.finish = () => { + if (!obj._executed) { + this._removeFromQ(id) + } + obj._workerObj && obj._workerObj.finish(id) + obj._finishFn && obj._finishFn({ executed: obj._executed, id }) + } + obj._executeInitial = () => { + if (onmessage) { + obj._workerObj.worker.onmessage = onmessage + } + if (payload) { + obj._workerObj.worker.postMessage(payload) + } + obj._executed = true + } + obj._assignWorker = workerObj => { + workerObj.state = WorkPool.workerStates.BUSY + obj._workerObj = workerObj + } + + return obj + } + _buildWorkerObj (worker, state) { + const obj = { + worker, + state + } + obj.finish = id => { + this._unregisterWorker(id) + } + return obj + } + _getWorkerById (id) { + for (let i = 0; i < this.register.length; i++) { + if (this.register[i].id === id) { + return this.register[i] + } + } + return null + } +} + +export default WorkPool diff --git a/src/shared/services/WorkPool.test.js b/src/shared/services/WorkPool.test.js new file mode 100644 index 00000000000..e5def647986 --- /dev/null +++ b/src/shared/services/WorkPool.test.js @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2002-2018 "Neo4j, Inc" + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +/* global jest, beforeEach */ + +import { v4 as uuid } from 'uuid' +import WorkPool from './WorkPool' + +describe('creating work', () => { + let createWorker + let register + let id + let postMessage + beforeEach(() => { + postMessage = jest.fn() + createWorker = jest.fn(() => { + return { + postMessage + } + }) + register = new WorkPool(createWorker) + id = uuid() + }) + test('can do and finish work', () => { + // Given + const work = { + id + } + // When + const workObj = register.doWork(work) + + // Then + expect(register.getQueueSize()).toEqual(0) + expect(register.getPoolSize()).toEqual(1) + expect(register.getPoolSize(WorkPool.workerStates.BUSY)).toEqual(1) + expect(register.getPoolSize(WorkPool.workerStates.FREE)).toEqual(0) + expect(createWorker).toHaveBeenCalledTimes(1) + expect(workObj.id).toEqual(id) + + // When + workObj.finish() + + // Then + expect(register.getQueueSize()).toEqual(0) + expect(register.getPoolSize()).toEqual(1) + expect(register.getPoolSize(WorkPool.workerStates.BUSY)).toEqual(0) + expect(register.getPoolSize(WorkPool.workerStates.FREE)).toEqual(1) + expect(createWorker).toHaveBeenCalledTimes(1) + }) + test('can have onFinish ques', () => { + // Given + const id1 = { id: uuid() } + const id2 = { id: uuid() } + const id3 = { id: uuid() } + const onFinishFn1 = jest.fn() + const onFinishFn2 = jest.fn() + const onFinishFn3 = jest.fn() + + // When + const workObj1 = register.doWork(id1) + const workObj2 = register.doWork(id2) + workObj1.onFinish(onFinishFn1) + workObj2.onFinish(onFinishFn2) + + // Then + expect(createWorker).toHaveBeenCalledTimes(2) + expect(onFinishFn1).toHaveBeenCalledTimes(0) + expect(onFinishFn2).toHaveBeenCalledTimes(0) + + // When + workObj1.finish() + + // Then + expect(createWorker).toHaveBeenCalledTimes(2) + expect(onFinishFn1).toHaveBeenCalledTimes(1) + expect(onFinishFn2).toHaveBeenCalledTimes(0) + + // When + const workObj3 = register.doWork(id3) + workObj3.onFinish(onFinishFn3) + workObj2.finish() + + // Then + expect(onFinishFn1).toHaveBeenCalledTimes(1) + expect(onFinishFn2).toHaveBeenCalledTimes(1) + expect(onFinishFn3).toHaveBeenCalledTimes(0) + + // When + workObj3.finish() + + // Then + expect(onFinishFn1).toHaveBeenCalledTimes(1) + expect(onFinishFn2).toHaveBeenCalledTimes(1) + expect(onFinishFn3).toHaveBeenCalledTimes(1) + }) + + test('creates new workers if all are busy (not reaching pool size limit)', () => { + // Given + const id1 = { id: uuid() } + const id2 = { id: uuid() } + + // When + const workObj1 = register.doWork(id1) + const workObj2 = register.doWork(id2) + + // Then + expect(register.getPoolSize()).toEqual(2) + expect(createWorker).toHaveBeenCalledTimes(2) + expect(workObj1.id).toEqual(id1.id) + expect(workObj2.id).toEqual(id2.id) + expect(register.getPoolSize(WorkPool.workerStates.BUSY)).toEqual(2) + expect(register.getPoolSize(WorkPool.workerStates.FREE)).toEqual(0) + }) + test('re-uses workers if there are available ones', () => { + // Given + const id1 = { id: uuid() } + const id2 = { id: uuid() } + const id3 = { id: uuid() } + const id4 = { id: uuid() } + + // When + const workObj1 = register.doWork(id1) + const workObj2 = register.doWork(id2) + register.doWork(id3) + workObj1.finish() + workObj2.finish() + + // When + register.doWork(id4) + + // Then + expect(createWorker).toHaveBeenCalledTimes(3) + expect(register.getPoolSize()).toEqual(3) + expect(register.getPoolSize(WorkPool.workerStates.FREE)).toEqual(1) + expect(register.getPoolSize(WorkPool.workerStates.BUSY)).toEqual(2) + }) + it('exposes getWorkById', () => { + // Given + const localRegister = new WorkPool(createWorker, 1) + + // When + // Just do some work so queue pool limit is reached + localRegister.doWork({ id: 'nope' }) + + const work = { id } + const workObj = localRegister.doWork(work) + + // Then + expect(createWorker).toHaveBeenCalledTimes(1) + expect(workObj.id).toEqual(id) + + // When + const workObj2 = localRegister.getWorkById(id) + + // Then + expect(workObj).toBe(workObj2) // same obj in memory + + // When + // Try something that's not there + const notWorkObj = localRegister.getWorkById('not-present-id') + + // Then + expect(notWorkObj).toEqual(null) + }) + it('respcts the maxPoolSize and put jobs in queue', () => { + // Given + const poolSize = 2 + const localRegister = new WorkPool(createWorker, poolSize) + const id1 = { id: uuid() } + const id2 = { id: uuid() } + const id3 = { id: uuid() } + const id4 = { id: uuid() } + + // When + const workObj1 = localRegister.doWork(id1) + const workObj2 = localRegister.doWork(id2) + const workObj3 = localRegister.doWork(id3) + + // Then + expect(localRegister.getPoolSize()).toEqual(poolSize) + expect(localRegister.getPoolSize(WorkPool.workerStates.BUSY)).toEqual( + poolSize + ) + expect(localRegister.getPoolSize(WorkPool.workerStates.FREE)).toEqual(0) + expect(localRegister.getQueueSize()).toEqual(1) + + // When + workObj1.finish() + + // Then + expect(localRegister.getPoolSize()).toEqual(poolSize) + expect(localRegister.getPoolSize(WorkPool.workerStates.BUSY)).toEqual( + poolSize + ) + expect(localRegister.getPoolSize(WorkPool.workerStates.FREE)).toEqual(0) + expect(localRegister.getQueueSize()).toEqual(0) + + // When + const workObj4 = localRegister.doWork(id4) + + // Then + expect(localRegister.getPoolSize()).toEqual(poolSize) + expect(localRegister.getPoolSize(WorkPool.workerStates.BUSY)).toEqual( + poolSize + ) + expect(localRegister.getPoolSize(WorkPool.workerStates.FREE)).toEqual(0) + expect(localRegister.getQueueSize()).toEqual(1) + + // When + workObj2.finish() + workObj3.finish() + workObj4.finish() + + // Then + expect(createWorker).toHaveBeenCalledTimes(poolSize) + expect(localRegister.getPoolSize()).toEqual(poolSize) + expect(localRegister.getPoolSize(WorkPool.workerStates.FREE)).toEqual( + poolSize + ) + expect(localRegister.getPoolSize(WorkPool.workerStates.BUSY)).toEqual(0) + }) +}) diff --git a/src/shared/services/bolt/bolt.js b/src/shared/services/bolt/bolt.js index cd00ffed54b..cfb351a28d4 100644 --- a/src/shared/services/bolt/bolt.js +++ b/src/shared/services/bolt/bolt.js @@ -20,6 +20,7 @@ import { v4 } from 'uuid' import { v1 as neo4j } from 'neo4j-driver-alias' +import WorkPool from '../WorkPool' import * as mappings from './boltMappings' import * as boltConnection from './boltConnection' import { @@ -35,8 +36,7 @@ import BoltWorkerModule from 'worker-loader?inline!./boltWorker.js' /* eslint-enable import/no-webpack-loader-syntax */ let connectionProperties = null -let boltWorkerRegister = {} -let cancellationRegister = {} +let boltWorkPool = new WorkPool(() => new BoltWorkerModule(), 10) function openConnection (props, opts = {}, onLostConnection) { return new Promise((resolve, reject) => { @@ -59,9 +59,10 @@ function openConnection (props, opts = {}, onLostConnection) { } function cancelTransaction (id, cb) { - if (boltWorkerRegister[id]) { - cancellationRegister[id] = cb - boltWorkerRegister[id].postMessage(cancelTransactionMessage(id)) + const work = boltWorkPool.getWorkById(id) + if (work) { + work.onFinish(cb) + work.execute(cancelTransactionMessage(id)) } else { boltConnection.cancelTransaction(id, cb) } @@ -180,50 +181,31 @@ const addTypesAsField = result => { } function setupBoltWorker (id, workFn, onLostConnection = () => {}) { - const boltWorker = new BoltWorkerModule() - const onFinished = registerBoltWorker(id, boltWorker) const workerPromise = new Promise((resolve, reject) => { - boltWorker.postMessage(workFn) - boltWorker.onmessage = msg => { - if (msg.data.type === BOLT_CONNECTION_ERROR_MESSAGE) { - onFinished(boltWorker) - onLostConnection(msg.data.error) - return reject(msg.data.error) - } - if (msg.data.type === CYPHER_ERROR_MESSAGE) { - onFinished(boltWorker) - reject(msg.data.error) - } else if (msg.data.type === CYPHER_RESPONSE_MESSAGE) { - onFinished(boltWorker) - resolve(addTypesAsField(msg.data.result)) - } else if (msg.data.type === POST_CANCEL_TRANSACTION_MESSAGE) { - onFinished(boltWorker) + const work = boltWorkPool.doWork({ + id, + payload: workFn, + onmessage: msg => { + if (msg.data.type === BOLT_CONNECTION_ERROR_MESSAGE) { + work.finish() + onLostConnection(msg.data.error) + return reject(msg.data.error) + } + if (msg.data.type === CYPHER_ERROR_MESSAGE) { + work.finish() + reject(msg.data.error) + } else if (msg.data.type === CYPHER_RESPONSE_MESSAGE) { + work.finish() + resolve(addTypesAsField(msg.data.result)) + } else if (msg.data.type === POST_CANCEL_TRANSACTION_MESSAGE) { + work.finish() + } } - } + }) }) return workerPromise } -function registerBoltWorker (id, boltWorker) { - boltWorkerRegister[id] = boltWorker - return getWorkerFinalizer(boltWorkerRegister, cancellationRegister, id) -} - -function getWorkerFinalizer (workerRegister, cancellationRegister, workerId) { - return worker => { - if (cancellationRegister[workerId]) { - cancellationRegister[workerId]() - delete cancellationRegister[workerId] - } - if (workerRegister[workerId]) { - delete workerRegister[workerId] - } - if (worker) { - worker.terminate() - } - } -} - export default { directConnect: boltConnection.directConnect, openConnection,