Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 0 additions & 56 deletions README.md

This file was deleted.

74 changes: 56 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Promise.series = (iterable, action) => {
);
};

const rp = require('request-promise');
const EventEmitter = require('events');
const util = require('util');
const uuid = require('uuid');
Expand Down Expand Up @@ -203,7 +204,7 @@ class Hydra extends EventEmitter {
}

if (partialConfig && process.env.HYDRA_REDIS_URL) {
this._connectToRedis({redis: {url: process.env.HYDRA_REDIS_URL}})
this._connectToRedis({ redis: { url: process.env.HYDRA_REDIS_URL } })
.then(() => {
if (!this.redisdb) {
reject(new Error('No Redis connection'));
Expand Down Expand Up @@ -301,7 +302,7 @@ class Hydra extends EventEmitter {
let interfaceMask = segments[1];
Object.keys(interfaces).
forEach((itf) => {
interfaces[itf].forEach((interfaceRecord)=>{
interfaces[itf].forEach((interfaceRecord) => {
if (itf === interfaceName && interfaceRecord.netmask === interfaceMask && interfaceRecord.family === 'IPv4') {
this.config.serviceIP = interfaceRecord.address;
}
Expand All @@ -315,7 +316,7 @@ class Hydra extends EventEmitter {
let firstSelected = false;
Object.keys(interfaces).
forEach((itf) => {
interfaces[itf].forEach((interfaceRecord)=>{
interfaces[itf].forEach((interfaceRecord) => {
if (!firstSelected && interfaceRecord.family === 'IPv4' && interfaceRecord.address !== '127.0.0.1') {
this.config.serviceIP = interfaceRecord.address;
firstSelected = true;
Expand Down Expand Up @@ -359,21 +360,27 @@ class Hydra extends EventEmitter {

const promises = [];
if (!this.testMode) {
this._logMessage('error', 'Service is shutting down.');
this.redisdb.batch()

this._logMessage('error', 'Service is shutting down. kokoko is shuttingdown');
this.redisdb[Utils.getBatchOrPipeline(this.redisdb)]()
.expire(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health`, KEY_EXPIRATION_TTL)
.expire(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health:log`, ONE_WEEK_IN_SECONDS)
.exec();

if (this.mcMessageChannelClient) {
promises.push(this.mcMessageChannelClient.quitAsync());
promises.push(this.mcMessageChannelClient.quit());
// promises.push(Utils.quitOrQuit(this.mcMessageChannelClient));
}

if (this.mcDirectMessageChannelClient) {
promises.push(this.mcDirectMessageChannelClient.quitAsync());
// promises.push(this.mcMessageChannelClient.quit());
// promises.push(Utils.quitOrQuit(this.mcMessageChannelClient));
}

}
Object.keys(this.messageChannelPool).forEach((keyname) => {
promises.push(this.messageChannelPool[keyname].quitAsync());
this.messageChannelPool[keyname].quit();
// promises.push(Utils.quitOrQuit(this.messageChannelPool[keyname]));
});
if (this.redisdb) {
this.redisdb.del(`${redisPreKey}:${this.serviceName}:${this.instanceID}:presence`, () => {
Expand Down Expand Up @@ -752,7 +759,7 @@ class Hydra extends EventEmitter {
hostName: this.hostName
});
if (entry && !this.redisdb.closing) {
let cmd = (this.testMode) ? 'multi' : 'batch';
let cmd = (this.testMode) ? 'multi' : Utils.getBatchOrPipeline(this.redisdb);
this.redisdb[cmd]()
.setex(`${redisPreKey}:${this.serviceName}:${this.instanceID}:presence`, KEY_EXPIRATION_TTL, this.instanceID)
.hset(`${redisPreKey}:nodes`, this.instanceID, entry)
Expand All @@ -770,7 +777,7 @@ class Hydra extends EventEmitter {
let entry = Object.assign({
updatedOn: this._getTimeStamp()
}, this._getHealth());
let cmd = (this.testMode) ? 'multi' : 'batch';
let cmd = (this.testMode) ? 'multi' : Utils.getBatchOrPipeline(this.redisdb);
this.redisdb[cmd]()
.setex(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health`, KEY_EXPIRATION_TTL, Utils.safeJSONStringify(entry))
.expire(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health:log`, ONE_WEEK_IN_SECONDS)
Expand Down Expand Up @@ -880,7 +887,7 @@ class Hydra extends EventEmitter {
if (err) {
reject(err);
} else {
let serviceList = result.map((service) => {
let serviceList = Utils.ioToRedisMultiAdapter(result).map((service) => {
return Utils.safeJSONParse(service);
});
resolve(serviceList);
Expand Down Expand Up @@ -990,7 +997,7 @@ class Hydra extends EventEmitter {
reject(err);
} else {
let instanceList = [];
result.forEach((instance) => {
Utils.ioToRedisMultiAdapter(result).forEach((instance) => {
if (instance) {
let instanceObj = Utils.safeJSONParse(instance);
if (instanceObj) {
Expand Down Expand Up @@ -1075,7 +1082,7 @@ class Hydra extends EventEmitter {
if (err) {
reject(err);
} else {
let instanceList = result.map((instance) => {
let instanceList = Utils.ioToRedisMultiAdapter(result).map((instance) => {
return Utils.safeJSONParse(instance);
});
this.internalCache.put(cacheKey, instanceList, KEY_EXPIRATION_TTL);
Expand Down Expand Up @@ -1137,6 +1144,7 @@ class Hydra extends EventEmitter {
} else {
let response = [];
if (result && result.length > 0) {
result = Utils.ioToRedisMultiAdapter(result);
result = result[0];
result.forEach((entry) => {
response.push(Utils.safeJSONParse(entry));
Expand Down Expand Up @@ -1318,8 +1326,8 @@ class Hydra extends EventEmitter {
* @return {promise} promise - response from API in resolved promise or
* error in rejected promise.
*/
_makeAPIRequest(message, sendOpts = { }) {
return new Promise((resolve, reject) => {
_makeAPIRequest(message, sendOpts = {}) {
return new Promise(async (resolve, reject) => {
let umfmsg = UMFMessage.createMessage(message);
if (!umfmsg.validate()) {
resolve(this._createServerResponseWithReason(ServerResponse.HTTP_BAD_REQUEST, UMF_INVALID_MESSAGE));
Expand All @@ -1342,6 +1350,36 @@ class Hydra extends EventEmitter {
return;
}

// case this was an external route then dont check the Service presence and just make the request
if (sendOpts.isExternal) {
try {
let response = await rp(`${parsedRoute.serviceName}${parsedRoute.apiRoute}`, {
timeout: sendOpts.timeout,
method: parsedRoute.httpMethod,
headers: umfmsg.message.headers,
body: parsedRoute.httpMethod === 'get' ? null : umfmsg.message.body,
resolveWithFullResponse: true
});

let body;
// handling parse error if not json just leave it as it is
try {
body = JSON.parse(response.body);
} catch (err) { }

let serverResponse = new ServerResponse();
response = serverResponse.createResponseObject(response.statusCode, {
result: body,
headers: response.headers
});

resolve(response);
return 0;
} catch (err) {
resolve(this._createServerResponseWithReason(ServerResponse.HTTP_SERVER_ERROR, err.message));
}
}

this._getServicePresence(parsedRoute.serviceName)
.then((instances) => {
if (instances.length === 0) {
Expand Down Expand Up @@ -1847,7 +1885,7 @@ class Hydra extends EventEmitter {
portsTried.push(port);

const server = require('net').createServer();
server.listen({port, host}, () => {
server.listen({ port, host }, () => {
server.once('close', () => {
callback(port);
});
Expand Down Expand Up @@ -2094,7 +2132,7 @@ class IHydra extends Hydra {
* @return {promise} promise - response from API in resolved promise or
* error in rejected promise.
*/
makeAPIRequest(message, sendOpts = { }) {
makeAPIRequest(message, sendOpts = {}) {
return super._makeAPIRequest(message, sendOpts);
}

Expand Down Expand Up @@ -2293,4 +2331,4 @@ class IHydra extends Hydra {
}
}

module.exports = new IHydra;
module.exports = new IHydra;
15 changes: 10 additions & 5 deletions lib/redis-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ class RedisConnection {
if (testMode) {
redis = require('redis-mock');
} else {
redis = require('redis');
Promise.promisifyAll(redis.RedisClient.prototype);
Promise.promisifyAll(redis.Multi.prototype);
redis = require('ioredis');
redis.Promise = Promise;
}

let url = {};
Expand Down Expand Up @@ -85,7 +84,13 @@ class RedisConnection {
*/
_connect() {
let self = new Promise((resolve, reject) => {
let db = redis.createClient(this.redisConfig);
let db = redis.createClient({
maxRetriesPerRequest: null,
enableReadyCheck: true,
retryStrategy: (times) => 1000,
...this.redisConfig
});

db.once('ready', () => resolve(db));
db.on('error', (err) => {
if (self.isPending()) {
Expand Down Expand Up @@ -122,4 +127,4 @@ class RedisConnection {
}
}

module.exports = RedisConnection;
module.exports = RedisConnection;
14 changes: 13 additions & 1 deletion lib/umfmessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class UMFMessage {
*/
function createMessageInstance(message) {
let proxy = new Proxy(new UMFMessage(), {
get: function(target, name, _receiver) {
get: function (target, name, _receiver) {
return name in target ?
target[name] : target.message[name];
},
Expand Down Expand Up @@ -214,6 +214,7 @@ function parseRoute(toValue) {
error = 'route field has invalid number of routable segments';
} else {
let atPos = segments[0].indexOf('@');

if (atPos > -1) {
let x = segments.shift();
instance = x.substring(0, atPos);
Expand All @@ -224,11 +225,21 @@ function parseRoute(toValue) {
subID = segs[1];
}
}

if (segments[0].indexOf('http') === 0) {
let url = `${segments[0]}:${segments[1]}`;

// case there was a port
// if the next segment was a number then this is a port
if (/^[0-9][0-9]*$/.test(segments[2])) {
url = `${url}:${segments[2]}`;
segments.shift();
}

segments.shift();
segments[0] = url;
}

serviceName = segments.shift();
apiRoute = segments.join(':');
let s1 = apiRoute.indexOf('[');
Expand All @@ -247,6 +258,7 @@ function parseRoute(toValue) {
}
}
}

return {
instance,
subID,
Expand Down
59 changes: 58 additions & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,63 @@ class Utils {
[a[i - 1], a[j]] = [a[j], a[i - 1]];
}
}

/**
* @name ioToRedisTrxAdapter
* @summary this function works as an adapter for the ioredis to node redis multi operations result
* so we don't have to change the code
* @param {array} this is the output of the multi operation from IORedis
* @return {array} this is the changed result that the redis package would output
*/
static ioToRedisMultiAdapter(multiResult) {
let result = [];
for (let trx of multiResult) {

// this is the case where this output is already an output from node_redis package
if (!Array.isArray(trx))
return multiResult;

// if the trx index 0 is null then there is no error
if (!trx[0]) {
result.push(trx[1]);
continue;
}

// if there is a value in the first element then this is an error
if (trx[0]) {
result.push(trx[0].ReplyError);
}
}

return result;
}

/**
* @name getBatchOrPipeline
* @summary just a function to see wethere this is instance of node_redis or ioredis
* and return the right function name for making batch commands.
* @param {object} take the redisDB instance
* @return {string} return the function name for making batch commands in each package
*/
static getBatchOrPipeline(redisDB) {
if (redisDB.batch)
return 'batch';

return 'pipeline';
}

/**
* @name quit
* @summary this is a function to call the quit function either in node_redis or ioredis
* @param {object} take the redisDB instance
* @return {string} return the function name for making batch commands in each package
*/
static quitOrQuit(redisDB){
if (redisDB.quitAsync)
return redisDB.quitAsync();

return redisDB.quit();
}
}

module.exports = Utils;
module.exports = Utils;
Loading