From f13160439d8d81c63a2d4ab3181db8afb3bbbc09 Mon Sep 17 00:00:00 2001 From: Ryan Rowland Date: Wed, 16 May 2018 01:11:46 -0700 Subject: [PATCH 1/3] Add DeviceData and eventHandlers.js --- .gitignore | 1 + README.md | 11 +++++++++ eventHandlers.sample.js | 26 ++++++++++++++++++++++ index.js | 6 ++++- spec/PoolAgent.spec.js | 40 +++++++++++++++++++++++++++++++++ src/PoolAgent.js | 49 +++++++++++++++++++++++++++++++++++------ src/PoolServer.js | 15 ++++++++++++- 7 files changed, 139 insertions(+), 9 deletions(-) create mode 100644 eventHandlers.sample.js diff --git a/.gitignore b/.gitignore index c727bb7..eb3a80a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules .idea .DS_Store +eventHandlers.js diff --git a/README.md b/README.md index e6ff449..6bbea6e 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,17 @@ While the server(s) and the service are designed to run continuously, the pool p ## Run Run `node index.js --config=[CONFIG_FILE]`. See `[server|service|payout].sample.conf` for sample configuration files and clarifications. +## Custom Behavior +The pool server supports custom behaviors through the optional `eventHandlers.js` file. Code written here +will be run every time a specific event occurs. The currently supported events are: + +* **beforeRegister** - Called on a *server* configuration when a register message is received, before + setting up the `PoolAgent`. +* **onRegister** - Called on a *server* configuration after a `PoolAgent` has successfully been registered. + +See `eventHandlers.sample.js` for the template. To use, copy to a new file: `eventHandlers.js`, and add your +own logic inside the defined callbacks. The pool server will automatically include this file if it exists. + ## License Copyright 2018 The Nimiq Foundation diff --git a/eventHandlers.sample.js b/eventHandlers.sample.js new file mode 100644 index 0000000..11b2b06 --- /dev/null +++ b/eventHandlers.sample.js @@ -0,0 +1,26 @@ +/** + * NOTE: Don't modify this file! Copy this file to `evenHandlers.js` and it will + * automatically be included in the pool server! + */ + +/** + * Fired when a REGISTER message is received, before creating a corresponding + * PoolAgent. Good time to perform validation or mutation of message data. + * @param {Object} msg - The full register message. This is not a copy; any + * mutation will affect the data used to create the PoolAgent. + * @param {mysql.PoolConnection} connectionPool - A MySQL connection pool, + * logged in as 'pool_server'. + * @throws {Error} Should throw an Error with a message to send to the device + * if registration should not continue. + * @returns {void} + */ +module.exports.beforeRegister = function beforeRegister(msg, connectionPool) { } + +/** + * Fired when a new PoolAgent is registered to the PoolServer. + * @param {PoolAgent} agent - The Agent for the newly registered device. + * @param {mysql.PoolConnection} connectionPool - A MySQL connection pool, + * logged in as 'pool_server'. + * @returns {void} + */ +module.exports.onRegister = async function onRegister(agent, connectionPool) { } diff --git a/index.js b/index.js index 0abf673..a1356ab 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ const Nimiq = require('@nimiq/core'); const argv = require('minimist')(process.argv.slice(2)); const config = require('./src/Config.js')(argv.config); +const fs = require('fs'); const PoolServer = require('./src/PoolServer.js'); const PoolService = require('./src/PoolService.js'); @@ -24,6 +25,9 @@ if (config.poolPayout.enabled && (config.poolServer.enabled || config.poolServic process.exit(1); } +const eventHandlers = fs.existsSync('./eventHandlers.js') + ? require('./eventHandlers.js') : undefined; + Nimiq.Log.instance.level = config.log.level; for (const tag in config.log.tags) { Nimiq.Log.instance.setLoggable(tag, config.log.tags[tag]); @@ -77,7 +81,7 @@ for (const seedPeer of config.seedPeers) { } if (config.poolServer.enabled) { - const poolServer = new PoolServer($.consensus, config.pool, config.poolServer.port, config.poolServer.mySqlPsw, config.poolServer.mySqlHost, config.poolServer.sslKeyPath, config.poolServer.sslCertPath); + const poolServer = new PoolServer($.consensus, config.pool, config.poolServer.port, config.poolServer.mySqlPsw, config.poolServer.mySqlHost, config.poolServer.sslKeyPath, config.poolServer.sslCertPath, eventHandlers); if (config.poolMetricsServer.enabled) { $.metricsServer = new MetricsServer(config.poolServer.sslKeyPath, config.poolServer.sslCertPath, config.poolMetricsServer.port, config.poolMetricsServer.password); diff --git a/spec/PoolAgent.spec.js b/spec/PoolAgent.spec.js index 5fce73b..639a3e4 100644 --- a/spec/PoolAgent.spec.js +++ b/spec/PoolAgent.spec.js @@ -224,4 +224,44 @@ describe('PoolAgent', () => { done(); })().catch(done.fail); }); + + it('fires eventHandler callbacks', (done) => { + (async () => { + const keyPair = Nimiq.KeyPair.generate(); + const clientAddress = keyPair.publicKey.toAddress(); + const consensus = await Nimiq.Consensus.volatileFull(); + + let beforeCalled = false; + const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', 'localhost', '', '', { + beforeRegister: (msg, connectionPool) => { + expect(msg.deviceId).toEqual(111111111); + msg.deviceId = 123; + beforeCalled = true; + }, + onRegister: async (agent, connectionPool) => { + expect(agent.deviceId).toEqual(123); + expect(agent.deviceData).toEqual({ deviceGroup: 'foobar' }); + expect(beforeCalled).toEqual(true); + done(); + } + }); + + await poolServer.start(); + const poolAgent = new PoolAgent(poolServer, { close: () => {}, send: () => {} }, Nimiq.NetAddress.fromIP('1.2.3.4')); + spyOn(poolAgent, '_regenerateNonce').and.callFake(() => { poolAgent._nonce = 42 }); + + const registerMsg = { + message: 'register', + address: clientAddress.toUserFriendlyAddress(), + deviceId: 111111111, + deviceData: { + deviceGroup: 'foobar' + }, + mode: 'smart', + genesisHash: Nimiq.BufferUtils.toBase64(Nimiq.GenesisConfig.GENESIS_HASH.serialize()) + }; + + await poolAgent._onMessage(registerMsg); + })().catch(done.fail); + }); }); diff --git a/src/PoolAgent.js b/src/PoolAgent.js index fe171e7..f7c6b5c 100644 --- a/src/PoolAgent.js +++ b/src/PoolAgent.js @@ -17,7 +17,7 @@ class PoolAgent extends Nimiq.Observable { this._netAddress = netAddress; /** @type {PoolAgent.Mode} */ - this.mode = PoolAgent.Mode.UNREGISTERED; + this._mode = PoolAgent.Mode.UNREGISTERED; /** @type {number} */ this._difficulty = this._pool.config.startDifficulty; @@ -37,6 +37,30 @@ class PoolAgent extends Nimiq.Observable { /** @type {Nimiq.Timers} */ this._timers = new Nimiq.Timers(); this._timers.resetTimeout('connection-timeout', () => this._onError(), this._pool.config.connectionTimeout); + + // Public interface + Object.defineProperties(this, { + /** @type {object} */ + deviceData: { + enumerable: true, + get: () => this._deviceData + }, + /** @type {number} */ + deviceId: { + enumerable: true, + get: () => this._deviceId + }, + /** @type {PoolAgent.Mode} */ + mode: { + enumerable: true, + get: () => this._mode + }, + /** @type {boolean} */ + isRegistered: { + enumerable: true, + get: () => this._registered + } + }); } /** @@ -46,7 +70,7 @@ class PoolAgent extends Nimiq.Observable { * @param {Nimiq.Hash} accountsHash */ async updateBlock(prevBlock, transactions, prunedAccounts, accountsHash) { - if (this.mode !== PoolAgent.Mode.NANO) return; + if (this._mode !== PoolAgent.Mode.NANO) return; if (!prevBlock || !transactions || !prunedAccounts || !accountsHash) return; this._currentBody = new Nimiq.BlockBody(this._pool.poolAddress, transactions, this._extraData, prunedAccounts); @@ -104,9 +128,9 @@ class PoolAgent extends Nimiq.Observable { switch (msg.message) { case PoolAgent.MESSAGE_SHARE: { - if (this.mode === PoolAgent.Mode.NANO) { + if (this._mode === PoolAgent.Mode.NANO) { await this._onNanoShareMessage(msg); - } else if (this.mode === PoolAgent.Mode.SMART) { + } else if (this._mode === PoolAgent.Mode.SMART) { await this._onSmartShareMessage(msg); } this._sharesSinceReset++; @@ -133,14 +157,22 @@ class PoolAgent extends Nimiq.Observable { return; } + try { + this._pool.eventHandlers.beforeRegister(msg, this._pool.connectionPool); + } catch (e) { + this._sendError(e.message); + return; + } + this._address = Nimiq.Address.fromUserFriendlyAddress(msg.address); this._deviceId = msg.deviceId; + this._deviceData = msg.deviceData; switch (msg.mode) { case PoolAgent.MODE_SMART: - this.mode = PoolAgent.Mode.SMART; + this._mode = PoolAgent.Mode.SMART; break; case PoolAgent.MODE_NANO: - this.mode = PoolAgent.Mode.NANO; + this._mode = PoolAgent.Mode.NANO; break; default: throw new Error('Client did not specify mode'); @@ -165,13 +197,14 @@ class PoolAgent extends Nimiq.Observable { }); this._sendSettings(); - if (this.mode === PoolAgent.Mode.NANO) { + if (this._mode === PoolAgent.Mode.NANO) { this._pool.requestCurrentHead(this); } await this.sendBalance(); this._timers.resetInterval('send-balance', () => this.sendBalance(), 1000 * 60 * 5); this._timers.resetInterval('send-keep-alive-ping', () => this._ws.ping(), 1000 * 10); + this._pool.eventHandlers.onRegister(this, this._pool.connectionPool); Nimiq.Log.i(PoolAgent, `REGISTER ${this._address.toUserFriendlyAddress()}, current balance: ${await this._pool.getUserBalance(this._userId)}`); } @@ -449,11 +482,13 @@ class PoolAgent extends Nimiq.Observable { _onClose() { this._offAll(); + this._registered = false; this._timers.clearAll(); this._pool.removeAgent(this); } _onError() { + this._registered = false; this._pool.removeAgent(this); this._ws.close(); } diff --git a/src/PoolServer.js b/src/PoolServer.js index 8aa3e6a..c492bd1 100644 --- a/src/PoolServer.js +++ b/src/PoolServer.js @@ -7,6 +7,12 @@ const fs = require('fs'); const PoolAgent = require('./PoolAgent.js'); const Helper = require('./Helper.js'); +/** + * @typedef EventHandlers + * @property {function} beforeRegister + * @property {function} onRegister + */ + class PoolServer extends Nimiq.Observable { /** * @param {Nimiq.FullConsensus} consensus @@ -16,8 +22,9 @@ class PoolServer extends Nimiq.Observable { * @param {string} mySqlHost * @param {string} sslKeyPath * @param {string} sslCertPath + * @param {EventHandlers} [eventHandlers] */ - constructor(consensus, config, port, mySqlPsw, mySqlHost, sslKeyPath, sslCertPath) { + constructor(consensus, config, port, mySqlPsw, mySqlHost, sslKeyPath, sslCertPath, eventHandlers) { super(); /** @type {Nimiq.FullConsensus} */ @@ -29,6 +36,12 @@ class PoolServer extends Nimiq.Observable { /** @type {Nimiq.Address} */ this.poolAddress = Nimiq.Address.fromUserFriendlyAddress(config.address); + /** @type {EventHandlers} */ + this.eventHandlers = Object.assign({ + beforeRegister: () => { }, + onRegister: async () => { } + }, eventHandlers); + /** @type {PoolConfig} */ this._config = config; From 2a9ede11756dc9c338875cd1f5477e74dc3bec69 Mon Sep 17 00:00:00 2001 From: Ryan Rowland Date: Mon, 21 May 2018 12:06:05 -0700 Subject: [PATCH 2/3] Address PR feedback --- README.md | 6 ++--- eventHandlers.sample.js | 7 +++--- spec/PoolAgent.spec.js | 4 ++-- src/PoolAgent.js | 49 +++++++++++++++++++---------------------- src/PoolServer.js | 8 +++---- 5 files changed, 36 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 6bbea6e..cb264fb 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,9 @@ Run `node index.js --config=[CONFIG_FILE]`. See `[server|service|payout].sample. The pool server supports custom behaviors through the optional `eventHandlers.js` file. Code written here will be run every time a specific event occurs. The currently supported events are: -* **beforeRegister** - Called on a *server* configuration when a register message is received, before - setting up the `PoolAgent`. -* **onRegister** - Called on a *server* configuration after a `PoolAgent` has successfully been registered. +* **onRegisterMessage** - Called on a *server* configuration when a register message is received, before + setting `PoolAgent` has processed it. +* **onRegistrationCompleted** - Called on a *server* configuration after a `PoolAgent` has successfully been registered. See `eventHandlers.sample.js` for the template. To use, copy to a new file: `eventHandlers.js`, and add your own logic inside the defined callbacks. The pool server will automatically include this file if it exists. diff --git a/eventHandlers.sample.js b/eventHandlers.sample.js index 11b2b06..7674871 100644 --- a/eventHandlers.sample.js +++ b/eventHandlers.sample.js @@ -4,8 +4,9 @@ */ /** - * Fired when a REGISTER message is received, before creating a corresponding + * Fired when a REGISTER message is received, before being processed by the * PoolAgent. Good time to perform validation or mutation of message data. + * @param {PoolAgent} agent - The Agent for the newly registered device. * @param {Object} msg - The full register message. This is not a copy; any * mutation will affect the data used to create the PoolAgent. * @param {mysql.PoolConnection} connectionPool - A MySQL connection pool, @@ -14,7 +15,7 @@ * if registration should not continue. * @returns {void} */ -module.exports.beforeRegister = function beforeRegister(msg, connectionPool) { } +module.exports.onRegisterMessage = function onRegisterMessage(agent, msg, connectionPool) { } /** * Fired when a new PoolAgent is registered to the PoolServer. @@ -23,4 +24,4 @@ module.exports.beforeRegister = function beforeRegister(msg, connectionPool) { } * logged in as 'pool_server'. * @returns {void} */ -module.exports.onRegister = async function onRegister(agent, connectionPool) { } +module.exports.onRegistrationCompleted = async function onRegistrationCompleted(agent, connectionPool) { } diff --git a/spec/PoolAgent.spec.js b/spec/PoolAgent.spec.js index 639a3e4..2f23a84 100644 --- a/spec/PoolAgent.spec.js +++ b/spec/PoolAgent.spec.js @@ -233,12 +233,12 @@ describe('PoolAgent', () => { let beforeCalled = false; const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', 'localhost', '', '', { - beforeRegister: (msg, connectionPool) => { + onRegisterMessage: (agent, msg, connectionPool) => { expect(msg.deviceId).toEqual(111111111); msg.deviceId = 123; beforeCalled = true; }, - onRegister: async (agent, connectionPool) => { + onRegistrationCompleted: async (agent, connectionPool) => { expect(agent.deviceId).toEqual(123); expect(agent.deviceData).toEqual({ deviceGroup: 'foobar' }); expect(beforeCalled).toEqual(true); diff --git a/src/PoolAgent.js b/src/PoolAgent.js index f7c6b5c..03b37e3 100644 --- a/src/PoolAgent.js +++ b/src/PoolAgent.js @@ -37,30 +37,6 @@ class PoolAgent extends Nimiq.Observable { /** @type {Nimiq.Timers} */ this._timers = new Nimiq.Timers(); this._timers.resetTimeout('connection-timeout', () => this._onError(), this._pool.config.connectionTimeout); - - // Public interface - Object.defineProperties(this, { - /** @type {object} */ - deviceData: { - enumerable: true, - get: () => this._deviceData - }, - /** @type {number} */ - deviceId: { - enumerable: true, - get: () => this._deviceId - }, - /** @type {PoolAgent.Mode} */ - mode: { - enumerable: true, - get: () => this._mode - }, - /** @type {boolean} */ - isRegistered: { - enumerable: true, - get: () => this._registered - } - }); } /** @@ -158,7 +134,7 @@ class PoolAgent extends Nimiq.Observable { } try { - this._pool.eventHandlers.beforeRegister(msg, this._pool.connectionPool); + this._pool.eventHandlers.onRegisterMessage(this, msg, this._pool.connectionPool); } catch (e) { this._sendError(e.message); return; @@ -204,7 +180,7 @@ class PoolAgent extends Nimiq.Observable { this._timers.resetInterval('send-balance', () => this.sendBalance(), 1000 * 60 * 5); this._timers.resetInterval('send-keep-alive-ping', () => this._ws.ping(), 1000 * 10); - this._pool.eventHandlers.onRegister(this, this._pool.connectionPool); + this._pool.eventHandlers.onRegistrationCompleted(this, this._pool.connectionPool); Nimiq.Log.i(PoolAgent, `REGISTER ${this._address.toUserFriendlyAddress()}, current balance: ${await this._pool.getUserBalance(this._userId)}`); } @@ -492,6 +468,27 @@ class PoolAgent extends Nimiq.Observable { this._pool.removeAgent(this); this._ws.close(); } + + + /** @type {object} */ + get deviceData() { + return this._deviceData; + } + + /** @type {number} */ + get deviceId() { + return this._deviceId; + } + + /** @type {PoolAgent.Mode} */ + get mode() { + return this._mode; + } + + /** @type {boolean} */ + get isRegistered() { + return this._registered; + } } PoolAgent.MESSAGE_REGISTER = 'register'; PoolAgent.MESSAGE_REGISTERED = 'registered'; diff --git a/src/PoolServer.js b/src/PoolServer.js index c492bd1..22363b7 100644 --- a/src/PoolServer.js +++ b/src/PoolServer.js @@ -9,8 +9,8 @@ const Helper = require('./Helper.js'); /** * @typedef EventHandlers - * @property {function} beforeRegister - * @property {function} onRegister + * @property {function} onRegisterMessage + * @property {function} onRegistrationCompleted */ class PoolServer extends Nimiq.Observable { @@ -38,8 +38,8 @@ class PoolServer extends Nimiq.Observable { /** @type {EventHandlers} */ this.eventHandlers = Object.assign({ - beforeRegister: () => { }, - onRegister: async () => { } + onRegisterMessage: () => { }, + onRegistrationCompleted: async () => { } }, eventHandlers); /** @type {PoolConfig} */ From 28cc3b858c32ff641b7020c1f55e50e101f921d0 Mon Sep 17 00:00:00 2001 From: Ryan Rowland Date: Mon, 21 May 2018 12:59:03 -0700 Subject: [PATCH 3/3] Update eventHandlers async --- eventHandlers.sample.js | 2 +- spec/PoolAgent.spec.js | 2 +- src/PoolAgent.js | 4 ++-- src/PoolServer.js | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/eventHandlers.sample.js b/eventHandlers.sample.js index 7674871..dca7fad 100644 --- a/eventHandlers.sample.js +++ b/eventHandlers.sample.js @@ -15,7 +15,7 @@ * if registration should not continue. * @returns {void} */ -module.exports.onRegisterMessage = function onRegisterMessage(agent, msg, connectionPool) { } +module.exports.onRegisterMessage = async function onRegisterMessage(agent, msg, connectionPool) { } /** * Fired when a new PoolAgent is registered to the PoolServer. diff --git a/spec/PoolAgent.spec.js b/spec/PoolAgent.spec.js index 2f23a84..3f320d1 100644 --- a/spec/PoolAgent.spec.js +++ b/spec/PoolAgent.spec.js @@ -233,7 +233,7 @@ describe('PoolAgent', () => { let beforeCalled = false; const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', 'localhost', '', '', { - onRegisterMessage: (agent, msg, connectionPool) => { + onRegisterMessage: async (agent, msg, connectionPool) => { expect(msg.deviceId).toEqual(111111111); msg.deviceId = 123; beforeCalled = true; diff --git a/src/PoolAgent.js b/src/PoolAgent.js index 03b37e3..9585ebf 100644 --- a/src/PoolAgent.js +++ b/src/PoolAgent.js @@ -134,7 +134,7 @@ class PoolAgent extends Nimiq.Observable { } try { - this._pool.eventHandlers.onRegisterMessage(this, msg, this._pool.connectionPool); + await this._pool.eventHandlers.onRegisterMessage(this, msg, this._pool.connectionPool); } catch (e) { this._sendError(e.message); return; @@ -180,7 +180,7 @@ class PoolAgent extends Nimiq.Observable { this._timers.resetInterval('send-balance', () => this.sendBalance(), 1000 * 60 * 5); this._timers.resetInterval('send-keep-alive-ping', () => this._ws.ping(), 1000 * 10); - this._pool.eventHandlers.onRegistrationCompleted(this, this._pool.connectionPool); + await this._pool.eventHandlers.onRegistrationCompleted(this, this._pool.connectionPool); Nimiq.Log.i(PoolAgent, `REGISTER ${this._address.toUserFriendlyAddress()}, current balance: ${await this._pool.getUserBalance(this._userId)}`); } diff --git a/src/PoolServer.js b/src/PoolServer.js index 22363b7..b5286d8 100644 --- a/src/PoolServer.js +++ b/src/PoolServer.js @@ -38,7 +38,7 @@ class PoolServer extends Nimiq.Observable { /** @type {EventHandlers} */ this.eventHandlers = Object.assign({ - onRegisterMessage: () => { }, + onRegisterMessage: async () => { }, onRegistrationCompleted: async () => { } }, eventHandlers);