Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
package-lock.json
node_modules
.env
.serverless
.serverless
/src/test/*
/src/test/openai_stream.ts
data.json
data1.json
embeddings.csv
4 changes: 4 additions & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
registry=https://npm.pkg.github.com/@workduck-io
registry=https://registry.npmjs.org
@workduck-io:registry=https://npm.pkg.github.com
//npm.pkg.github.com/:_authToken=${YARN_TOKEN}
14 changes: 14 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"printWidth": 80,
"tabWidth": 2,
"useTabs": false,
"semi": true,
"singleQuote": true,
"quoteProps": "as-needed",
"jsxSingleQuote": false,
"trailingComma": "es5",
"bracketSpacing": true,
"jsxBracketSameLine": true,
"arrowParens": "avoid",
"endOfLine": "auto"
}
Empty file added output.txt
Empty file.
29 changes: 26 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "mex-websockets",
"name": "mex-streaming",
"version": "1.0.0",
"description": "A service for all mex' websockets related needs",
"description": "A service for all mex' streaming related needs",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
Expand All @@ -10,10 +10,33 @@
"license": "MIT",
"devDependencies": {
"@serverless/typescript": "^3.30.1",
"@slack/web-api": "^6.8.1",
"@types/aws-lambda": "^8.10.115",
"@types/csv-write-stream": "^2.0.0",
"@types/jquery": "^3.5.16",
"csv-write-stream": "^2.0.0",
"serverless-esbuild": "^1.44.0",
"ts-node": "^10.9.1",
"tsconfig-paths": "^4.2.0"
"tsconfig-paths": "^4.2.0",
"typescript": "^5.1.3",
"undici": "^5.23.0"
},
"dependencies": {
"@aws-sdk/client-lambda": "^3.405.0",
"@pinecone-database/pinecone": "^0.1.6",
"@workduck-io/invoke-lambda": "^0.0.3",
"axios": "^1.4.0",
"cheerio": "^1.0.0-rc.12",
"compute-cosine-similarity": "^1.0.0",
"cosine-similarity": "^1.0.1",
"csv-parser": "^3.0.0",
"d3-dsv": "^2.0.0",
"esbuild": "^0.17.19",
"jwt-decode": "^3.1.2",
"langchain": "^0.0.81",
"nanoid": "^3.3.4",
"peggy": "^3.0.2",
"react": "^18.2.0",
"socket.io-client": "^4.7.1"
}
}
1 change: 1 addition & 0 deletions resources/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const environment = {
// AWS_ACCESS_KEY_ID: '${env:AWS_ACCESS_KEY_ID}',
// AWS_SECRET_ACCESS_KEY: '${env:AWS_SECRET_ACCESS_KEY}',
OPENAI_API_KEY: '${env:OPENAI_API_KEY}',
PINECONE_API_KEY: '${env:PINECONE_API_KEY}',
STAGE: '${self:custom.myStage}',
AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1',
NODE_OPTIONS: '--enable-source-maps --stack-trace-limit=1000',
Expand Down
37 changes: 13 additions & 24 deletions resources/functions.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,20 @@
import type { AWS } from '@serverless/typescript';

const functions: AWS['functions'] = {
websockets: {
handler: 'src/lambdas/websockets/handler.handleRequest',
events: [
{
websocket: {
route: '$connect',
},
},
{
websocket: {
route: '$disconnect',
},
},
{
websocket: {
route: '$default',
},
},
{
websocket: {
route: 'sendMessage',
},
},
],
streamingFunction: {
handler: 'src/lambdas/http/handler.handleRequest',
url: {
cors: true,
},
timeout: 20
},
moderator: {
handler: 'src/lambdas/moderator/handler.handleRequest',
url: {
cors: true,
},
timeout: 120
}
}

export default functions;
20 changes: 18 additions & 2 deletions serverless.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import role from "./resources/role"
import environment from "./resources/environment"

const serverlessConfiguration: AWS = {
service: 'mex-websockets',
service: 'mex-streaming',
frameworkVersion: '3',
plugins: ['serverless-esbuild', 'serverless-offline'],
provider: {
name: 'aws',
runtime: 'nodejs14.x',
runtime: 'nodejs18.x',
stage: 'local',
region: 'us-east-1',
iam: {
Expand All @@ -20,7 +20,23 @@ const serverlessConfiguration: AWS = {
functions,
custom: {
myStage: '${opt:stage, self:provider.stage}',
esbuild: {
packager: 'yarn',
minify: true,
bundle: true,
sourcemap: true,
}
},
resources: {
Resources: {
StreamingFunctionLambdaFunctionUrl: { // find "Lambda::Url" resource : https://github.com/serverless/serverless/issues/11906#issuecomment-1565686984
Type: 'AWS::Lambda::Url',
Properties: {
InvokeMode: "RESPONSE_STREAM"
}
}
}
}
};

module.exports = serverlessConfiguration;
20 changes: 20 additions & 0 deletions src/global.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { APIGatewayProxyEventV2, Context, Handler } from 'aws-lambda';
import { Writable } from 'stream';

declare global {
namespace awslambda {
export namespace HttpResponseStream {
function from(writable: Writable, metadata: any): Writable;
}

export type StreamifyHandler = (
event: APIGatewayProxyEventV2,
responseStream: Writable,
context: Context
) => Promise<any>;

export function streamifyResponse(
handler: StreamifyHandler
): Handler<any, any>;
}
}
107 changes: 107 additions & 0 deletions src/lambdas/http/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { ChatOpenAI } from 'langchain/chat_models/openai';
import { Writable } from 'stream';
import { isAuthorized } from '../../utils/authorizeUtils';
import { invokeAgent } from '../../utils/flowiseUtils';
import { getDetails, updateTokenUsage } from '../../utils/lambdaUtils';
import { getMessagesInLangchainFormat } from '../../utils/messageFormatterUtils';
import { extractToken } from '../../utils/tokenUtils';

const HEADER_WORKSPACE_ID = 'mex-workspace-id';

// Function to initialize and call ChatOpenAI
async function initiateChat(apiKey, messages, responseStream: Writable) {
const chat = new ChatOpenAI({
openAIApiKey: apiKey,
streaming: true,
callbacks: [
{
handleLLMNewToken(token) {
responseStream.write(token);
},
},
],
});
await chat.call(getMessagesInLangchainFormat(messages));
responseStream.write('\n');
responseStream.end();
}

export const handleRequest = awslambda.streamifyResponse(async function (
event,
responseStream,
context
) {
try {
let httpResponseMetadata = {
status: 200,
headers: {
'Content-Type': 'application/json',
//"Access-Control-Allow-Origin": "http://localhost:3333", // Required for CORS support to work
'Access-Control-Allow-Credentials': true, // Required for cookies, authorization headers with HTTPS
'Access-Control-Allow-Methods': 'PUT, GET, HEAD, POST, DELETE, OPTIONS',
},
};

responseStream = awslambda.HttpResponseStream.from(
responseStream,
httpResponseMetadata
);
const workspaceId = event.headers[HEADER_WORKSPACE_ID];
const bearerToken = extractToken(event.headers.authorization);
//await isAuthorized(bearerToken, workspaceId);

try {
await isAuthorized(bearerToken, workspaceId);
} catch (error) {
httpResponseMetadata = {
status: 401,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Credentials': true, // Required for cookies, authorization headers with HTTPS
'Access-Control-Allow-Methods':
'PUT, GET, HEAD, POST, DELETE, OPTIONS',
//"Access-Control-Allow-Origin": "http://localhost:3333", // Required for CORS support to work
},
};

const body = JSON.stringify({
message: 'Unauthorized',
});

responseStream = awslambda.HttpResponseStream.from(
responseStream,
httpResponseMetadata
);
responseStream.write(body);
responseStream.end();
return;
}

const requestType = JSON.parse(event.body).type;
// console.log('Event.Body.Type parsed = ' + requestType);
if (requestType === 'agent') {
// DevFlows integration
// TODO : check for existence of question field in the body.
// TODO : add support for multiple intents.
invokeAgent(event.body, responseStream);
} else {
/* call lambda to get apiKey to use & messages in desired format */
const { apiKey, isSystemToken, messages } = await getDetails(
event.body,
workspaceId,
bearerToken
);

await initiateChat(apiKey, messages, responseStream);

if (isSystemToken) {
await updateTokenUsage(workspaceId, bearerToken);
}
}
} catch (error) {
console.error(error);

//responseStream.write(`${error}`);
responseStream.end();
}
});
47 changes: 47 additions & 0 deletions src/lambdas/moderator/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { isAuthorized } from '../../utils/authorizeUtils';
import {
handleCallAnalysis,
handleOutreachEmail,
} from '../../utils/generateUtils';
import { buildResponse } from '../../utils/responseUtils';
import { extractToken } from '../../utils/tokenUtils';

const HEADER_WORKSPACE_ID = 'mex-workspace-id';

export const handleRequest = async event => {
try {
const workspaceId = event.headers[HEADER_WORKSPACE_ID];
const bearerToken = extractToken(event.headers.authorization);

await isAuthorized(bearerToken, workspaceId);
const body = JSON.parse(event.body);
let response;

switch (body.type) {
case 'outreach_email':
response = await handleOutreachEmail(
event.body,
workspaceId,
bearerToken
);
break;
case 'call_analysis':
response = await handleCallAnalysis(
event.body,
workspaceId,
bearerToken
);
break;
default:
throw new Error('Invalid request type');
}

return buildResponse(200, response);
} catch (error) {
if (error.message === 'Unauthorized') {
return buildResponse(401, { message: 'Unauthorized' });
} else {
return buildResponse(500, { message: error.message });
}
}
};
15 changes: 0 additions & 15 deletions src/lambdas/websockets/handler.ts

This file was deleted.

23 changes: 0 additions & 23 deletions src/lambdas/websockets/handlerFactory.ts

This file was deleted.

Loading