diff --git a/.gitignore b/.gitignore index c727bb7..eb3a80a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules .idea .DS_Store +eventHandlers.js diff --git a/README.md b/README.md index e6ff449..cb264fb 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,17 @@ While the server(s) and the service are designed to run continuously, the pool p ## Run Run `node index.js --config=[CONFIG_FILE]`. See `[server|service|payout].sample.conf` for sample configuration files and clarifications. +## Custom Behavior +The pool server supports custom behaviors through the optional `eventHandlers.js` file. Code written here +will be run every time a specific event occurs. The currently supported events are: + +* **onRegisterMessage** - Called on a *server* configuration when a register message is received, before + setting `PoolAgent` has processed it. +* **onRegistrationCompleted** - Called on a *server* configuration after a `PoolAgent` has successfully been registered. + +See `eventHandlers.sample.js` for the template. To use, copy to a new file: `eventHandlers.js`, and add your +own logic inside the defined callbacks. The pool server will automatically include this file if it exists. + ## License Copyright 2018 The Nimiq Foundation diff --git a/eventHandlers.sample.js b/eventHandlers.sample.js new file mode 100644 index 0000000..dca7fad --- /dev/null +++ b/eventHandlers.sample.js @@ -0,0 +1,27 @@ +/** + * NOTE: Don't modify this file! Copy this file to `evenHandlers.js` and it will + * automatically be included in the pool server! + */ + +/** + * Fired when a REGISTER message is received, before being processed by the + * PoolAgent. Good time to perform validation or mutation of message data. + * @param {PoolAgent} agent - The Agent for the newly registered device. + * @param {Object} msg - The full register message. This is not a copy; any + * mutation will affect the data used to create the PoolAgent. + * @param {mysql.PoolConnection} connectionPool - A MySQL connection pool, + * logged in as 'pool_server'. + * @throws {Error} Should throw an Error with a message to send to the device + * if registration should not continue. + * @returns {void} + */ +module.exports.onRegisterMessage = async function onRegisterMessage(agent, msg, connectionPool) { } + +/** + * Fired when a new PoolAgent is registered to the PoolServer. + * @param {PoolAgent} agent - The Agent for the newly registered device. + * @param {mysql.PoolConnection} connectionPool - A MySQL connection pool, + * logged in as 'pool_server'. + * @returns {void} + */ +module.exports.onRegistrationCompleted = async function onRegistrationCompleted(agent, connectionPool) { } diff --git a/index.js b/index.js index 0abf673..a1356ab 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ const Nimiq = require('@nimiq/core'); const argv = require('minimist')(process.argv.slice(2)); const config = require('./src/Config.js')(argv.config); +const fs = require('fs'); const PoolServer = require('./src/PoolServer.js'); const PoolService = require('./src/PoolService.js'); @@ -24,6 +25,9 @@ if (config.poolPayout.enabled && (config.poolServer.enabled || config.poolServic process.exit(1); } +const eventHandlers = fs.existsSync('./eventHandlers.js') + ? require('./eventHandlers.js') : undefined; + Nimiq.Log.instance.level = config.log.level; for (const tag in config.log.tags) { Nimiq.Log.instance.setLoggable(tag, config.log.tags[tag]); @@ -77,7 +81,7 @@ for (const seedPeer of config.seedPeers) { } if (config.poolServer.enabled) { - const poolServer = new PoolServer($.consensus, config.pool, config.poolServer.port, config.poolServer.mySqlPsw, config.poolServer.mySqlHost, config.poolServer.sslKeyPath, config.poolServer.sslCertPath); + const poolServer = new PoolServer($.consensus, config.pool, config.poolServer.port, config.poolServer.mySqlPsw, config.poolServer.mySqlHost, config.poolServer.sslKeyPath, config.poolServer.sslCertPath, eventHandlers); if (config.poolMetricsServer.enabled) { $.metricsServer = new MetricsServer(config.poolServer.sslKeyPath, config.poolServer.sslCertPath, config.poolMetricsServer.port, config.poolMetricsServer.password); diff --git a/spec/PoolAgent.spec.js b/spec/PoolAgent.spec.js index 5fce73b..3f320d1 100644 --- a/spec/PoolAgent.spec.js +++ b/spec/PoolAgent.spec.js @@ -224,4 +224,44 @@ describe('PoolAgent', () => { done(); })().catch(done.fail); }); + + it('fires eventHandler callbacks', (done) => { + (async () => { + const keyPair = Nimiq.KeyPair.generate(); + const clientAddress = keyPair.publicKey.toAddress(); + const consensus = await Nimiq.Consensus.volatileFull(); + + let beforeCalled = false; + const poolServer = new PoolServer(consensus, POOL_CONFIG, 9999, '', 'localhost', '', '', { + onRegisterMessage: async (agent, msg, connectionPool) => { + expect(msg.deviceId).toEqual(111111111); + msg.deviceId = 123; + beforeCalled = true; + }, + onRegistrationCompleted: async (agent, connectionPool) => { + expect(agent.deviceId).toEqual(123); + expect(agent.deviceData).toEqual({ deviceGroup: 'foobar' }); + expect(beforeCalled).toEqual(true); + done(); + } + }); + + await poolServer.start(); + const poolAgent = new PoolAgent(poolServer, { close: () => {}, send: () => {} }, Nimiq.NetAddress.fromIP('1.2.3.4')); + spyOn(poolAgent, '_regenerateNonce').and.callFake(() => { poolAgent._nonce = 42 }); + + const registerMsg = { + message: 'register', + address: clientAddress.toUserFriendlyAddress(), + deviceId: 111111111, + deviceData: { + deviceGroup: 'foobar' + }, + mode: 'smart', + genesisHash: Nimiq.BufferUtils.toBase64(Nimiq.GenesisConfig.GENESIS_HASH.serialize()) + }; + + await poolAgent._onMessage(registerMsg); + })().catch(done.fail); + }); }); diff --git a/src/PoolAgent.js b/src/PoolAgent.js index fe171e7..9585ebf 100644 --- a/src/PoolAgent.js +++ b/src/PoolAgent.js @@ -17,7 +17,7 @@ class PoolAgent extends Nimiq.Observable { this._netAddress = netAddress; /** @type {PoolAgent.Mode} */ - this.mode = PoolAgent.Mode.UNREGISTERED; + this._mode = PoolAgent.Mode.UNREGISTERED; /** @type {number} */ this._difficulty = this._pool.config.startDifficulty; @@ -46,7 +46,7 @@ class PoolAgent extends Nimiq.Observable { * @param {Nimiq.Hash} accountsHash */ async updateBlock(prevBlock, transactions, prunedAccounts, accountsHash) { - if (this.mode !== PoolAgent.Mode.NANO) return; + if (this._mode !== PoolAgent.Mode.NANO) return; if (!prevBlock || !transactions || !prunedAccounts || !accountsHash) return; this._currentBody = new Nimiq.BlockBody(this._pool.poolAddress, transactions, this._extraData, prunedAccounts); @@ -104,9 +104,9 @@ class PoolAgent extends Nimiq.Observable { switch (msg.message) { case PoolAgent.MESSAGE_SHARE: { - if (this.mode === PoolAgent.Mode.NANO) { + if (this._mode === PoolAgent.Mode.NANO) { await this._onNanoShareMessage(msg); - } else if (this.mode === PoolAgent.Mode.SMART) { + } else if (this._mode === PoolAgent.Mode.SMART) { await this._onSmartShareMessage(msg); } this._sharesSinceReset++; @@ -133,14 +133,22 @@ class PoolAgent extends Nimiq.Observable { return; } + try { + await this._pool.eventHandlers.onRegisterMessage(this, msg, this._pool.connectionPool); + } catch (e) { + this._sendError(e.message); + return; + } + this._address = Nimiq.Address.fromUserFriendlyAddress(msg.address); this._deviceId = msg.deviceId; + this._deviceData = msg.deviceData; switch (msg.mode) { case PoolAgent.MODE_SMART: - this.mode = PoolAgent.Mode.SMART; + this._mode = PoolAgent.Mode.SMART; break; case PoolAgent.MODE_NANO: - this.mode = PoolAgent.Mode.NANO; + this._mode = PoolAgent.Mode.NANO; break; default: throw new Error('Client did not specify mode'); @@ -165,13 +173,14 @@ class PoolAgent extends Nimiq.Observable { }); this._sendSettings(); - if (this.mode === PoolAgent.Mode.NANO) { + if (this._mode === PoolAgent.Mode.NANO) { this._pool.requestCurrentHead(this); } await this.sendBalance(); this._timers.resetInterval('send-balance', () => this.sendBalance(), 1000 * 60 * 5); this._timers.resetInterval('send-keep-alive-ping', () => this._ws.ping(), 1000 * 10); + await this._pool.eventHandlers.onRegistrationCompleted(this, this._pool.connectionPool); Nimiq.Log.i(PoolAgent, `REGISTER ${this._address.toUserFriendlyAddress()}, current balance: ${await this._pool.getUserBalance(this._userId)}`); } @@ -449,14 +458,37 @@ class PoolAgent extends Nimiq.Observable { _onClose() { this._offAll(); + this._registered = false; this._timers.clearAll(); this._pool.removeAgent(this); } _onError() { + this._registered = false; this._pool.removeAgent(this); this._ws.close(); } + + + /** @type {object} */ + get deviceData() { + return this._deviceData; + } + + /** @type {number} */ + get deviceId() { + return this._deviceId; + } + + /** @type {PoolAgent.Mode} */ + get mode() { + return this._mode; + } + + /** @type {boolean} */ + get isRegistered() { + return this._registered; + } } PoolAgent.MESSAGE_REGISTER = 'register'; PoolAgent.MESSAGE_REGISTERED = 'registered'; diff --git a/src/PoolServer.js b/src/PoolServer.js index 8aa3e6a..b5286d8 100644 --- a/src/PoolServer.js +++ b/src/PoolServer.js @@ -7,6 +7,12 @@ const fs = require('fs'); const PoolAgent = require('./PoolAgent.js'); const Helper = require('./Helper.js'); +/** + * @typedef EventHandlers + * @property {function} onRegisterMessage + * @property {function} onRegistrationCompleted + */ + class PoolServer extends Nimiq.Observable { /** * @param {Nimiq.FullConsensus} consensus @@ -16,8 +22,9 @@ class PoolServer extends Nimiq.Observable { * @param {string} mySqlHost * @param {string} sslKeyPath * @param {string} sslCertPath + * @param {EventHandlers} [eventHandlers] */ - constructor(consensus, config, port, mySqlPsw, mySqlHost, sslKeyPath, sslCertPath) { + constructor(consensus, config, port, mySqlPsw, mySqlHost, sslKeyPath, sslCertPath, eventHandlers) { super(); /** @type {Nimiq.FullConsensus} */ @@ -29,6 +36,12 @@ class PoolServer extends Nimiq.Observable { /** @type {Nimiq.Address} */ this.poolAddress = Nimiq.Address.fromUserFriendlyAddress(config.address); + /** @type {EventHandlers} */ + this.eventHandlers = Object.assign({ + onRegisterMessage: async () => { }, + onRegistrationCompleted: async () => { } + }, eventHandlers); + /** @type {PoolConfig} */ this._config = config;