diff --git a/perplexity-ask/dist/index.js b/perplexity-ask/dist/index.js new file mode 100755 index 0000000..2a946fb --- /dev/null +++ b/perplexity-ask/dist/index.js @@ -0,0 +1,307 @@ +#!/usr/bin/env node +var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { + function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } + return new (P || (P = Promise))(function (resolve, reject) { + function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } + function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } + function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } + step((generator = generator.apply(thisArg, _arguments || [])).next()); + }); +}; +import { Server } from "@modelcontextprotocol/sdk/server/index.js"; +import { CallToolRequestSchema, ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js"; +import { createSSEServer } from "./sse-server.js"; +/** + * Definition of the Perplexity Ask Tool. + * This tool accepts an array of messages and returns a chat completion response + * from the Perplexity API, with citations appended to the message if provided. + */ +const PERPLEXITY_ASK_TOOL = { + name: "perplexity_ask", + description: "Engages in a conversation using the Sonar API. " + + "Accepts an array of messages (each with a role and content) " + + "and returns a ask completion response from the Perplexity model.", + inputSchema: { + type: "object", + properties: { + messages: { + type: "array", + items: { + type: "object", + properties: { + role: { + type: "string", + description: "Role of the message (e.g., system, user, assistant)", + }, + content: { + type: "string", + description: "The content of the message", + }, + }, + required: ["role", "content"], + }, + description: "Array of conversation messages", + }, + }, + required: ["messages"], + }, +}; +/** + * Definition of the Perplexity Research Tool. + * This tool performs deep research queries using the Perplexity API. + */ +const PERPLEXITY_RESEARCH_TOOL = { + name: "perplexity_research", + description: "Performs deep research using the Perplexity API. " + + "Accepts an array of messages (each with a role and content) " + + "and returns a comprehensive research response with citations.", + inputSchema: { + type: "object", + properties: { + messages: { + type: "array", + items: { + type: "object", + properties: { + role: { + type: "string", + description: "Role of the message (e.g., system, user, assistant)", + }, + content: { + type: "string", + description: "The content of the message", + }, + }, + required: ["role", "content"], + }, + description: "Array of conversation messages", + }, + }, + required: ["messages"], + }, +}; +/** + * Definition of the Perplexity Reason Tool. + * This tool performs reasoning queries using the Perplexity API. + */ +const PERPLEXITY_REASON_TOOL = { + name: "perplexity_reason", + description: "Performs reasoning tasks using the Perplexity API. " + + "Accepts an array of messages (each with a role and content) " + + "and returns a well-reasoned response using the sonar-reasoning-pro model.", + inputSchema: { + type: "object", + properties: { + messages: { + type: "array", + items: { + type: "object", + properties: { + role: { + type: "string", + description: "Role of the message (e.g., system, user, assistant)", + }, + content: { + type: "string", + description: "The content of the message", + }, + }, + required: ["role", "content"], + }, + description: "Array of conversation messages", + }, + }, + required: ["messages"], + }, +}; +// Retrieve the Perplexity API key from environment variables +const PERPLEXITY_API_KEY = process.env.PERPLEXITY_API_KEY; +if (!PERPLEXITY_API_KEY) { + console.error("Error: PERPLEXITY_API_KEY environment variable is required"); + process.exit(1); +} +/** + * Performs a chat completion by sending a request to the Perplexity API. + * Appends citations to the returned message content if they exist. + * + * @param {Array<{ role: string; content: string }>} messages - An array of message objects. + * @param {string} model - The model to use for the completion. + * @returns {Promise} The chat completion result with appended citations. + * @throws Will throw an error if the API request fails. + */ +function performChatCompletion(messages_1) { + return __awaiter(this, arguments, void 0, function* (messages, model = "sonar-pro") { + // Construct the API endpoint URL and request body + const url = new URL("https://api.perplexity.ai/chat/completions"); + const body = { + model: model, // Model identifier passed as parameter + messages: messages, + // Additional parameters can be added here if required (e.g., max_tokens, temperature, etc.) + // See the Sonar API documentation for more details: + // https://docs.perplexity.ai/api-reference/chat-completions + }; + let response; + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 90000); // 90秒超时 + try { + response = yield fetch(url.toString(), { + method: "POST", + headers: { + "Content-Type": "application/json", + "Authorization": `Bearer ${PERPLEXITY_API_KEY}`, + }, + body: JSON.stringify(body), + signal: controller.signal, + }); + clearTimeout(timeoutId); + } + catch (error) { + clearTimeout(timeoutId); + if (error instanceof Error && error.name === 'AbortError') { + throw new Error('Perplexity API request timed out after 90 seconds'); + } + throw new Error(`Network error while calling Perplexity API: ${error}`); + } + // Check for non-successful HTTP status + if (!response.ok) { + let errorText; + try { + errorText = yield response.text(); + } + catch (parseError) { + errorText = "Unable to parse error response"; + } + throw new Error(`Perplexity API error: ${response.status} ${response.statusText}\n${errorText}`); + } + // Attempt to parse the JSON response from the API + let data; + try { + data = yield response.json(); + } + catch (jsonError) { + throw new Error(`Failed to parse JSON response from Perplexity API: ${jsonError}`); + } + // Directly retrieve the main message content from the response + let messageContent = data.choices[0].message.content; + // If citations are provided, append them to the message content + if (data.citations && Array.isArray(data.citations) && data.citations.length > 0) { + messageContent += "\n\nCitations:\n"; + data.citations.forEach((citation, index) => { + messageContent += `[${index + 1}] ${citation}\n`; + }); + } + return messageContent; + }); +} +// Initialize the server with tool metadata and capabilities +const server = new Server({ + name: "example-servers/perplexity-ask", + version: "0.1.0", +}, { + capabilities: { + tools: {}, + }, +}); +/** + * Registers a handler for listing available tools. + * When the client requests a list of tools, this handler returns all available Perplexity tools. + */ +server.setRequestHandler(ListToolsRequestSchema, () => __awaiter(void 0, void 0, void 0, function* () { + return ({ + tools: [PERPLEXITY_ASK_TOOL, PERPLEXITY_RESEARCH_TOOL, PERPLEXITY_REASON_TOOL], + }); +})); +/** + * Registers a handler for calling a specific tool. + * Processes requests by validating input and invoking the appropriate tool. + * + * @param {object} request - The incoming tool call request. + * @returns {Promise} The response containing the tool's result or an error. + */ +server.setRequestHandler(CallToolRequestSchema, (request) => __awaiter(void 0, void 0, void 0, function* () { + try { + const { name, arguments: args } = request.params; + if (!args) { + throw new Error("No arguments provided"); + } + switch (name) { + case "perplexity_ask": { + if (!Array.isArray(args.messages)) { + throw new Error("Invalid arguments for perplexity_ask: 'messages' must be an array"); + } + // Invoke the chat completion function with the provided messages + const messages = args.messages; + const result = yield performChatCompletion(messages, "sonar-pro"); + return { + content: [{ type: "text", text: result }], + isError: false, + }; + } + case "perplexity_research": { + if (!Array.isArray(args.messages)) { + throw new Error("Invalid arguments for perplexity_research: 'messages' must be an array"); + } + // Invoke the chat completion function with the provided messages using the deep research model + const messages = args.messages; + const result = yield performChatCompletion(messages, "sonar-deep-research"); + return { + content: [{ type: "text", text: result }], + isError: false, + }; + } + case "perplexity_reason": { + if (!Array.isArray(args.messages)) { + throw new Error("Invalid arguments for perplexity_reason: 'messages' must be an array"); + } + // Invoke the chat completion function with the provided messages using the reasoning model + const messages = args.messages; + const result = yield performChatCompletion(messages, "sonar-reasoning-pro"); + return { + content: [{ type: "text", text: result }], + isError: false, + }; + } + default: + // Respond with an error if an unknown tool is requested + return { + content: [{ type: "text", text: `Unknown tool: ${name}` }], + isError: true, + }; + } + } + catch (error) { + // Return error details in the response + return { + content: [ + { + type: "text", + text: `Error: ${error instanceof Error ? error.message : String(error)}`, + }, + ], + isError: true, + }; + } +})); +/** + * Initializes and runs the server using standard I/O for communication. + * Logs an error and exits if the server fails to start. + */ +function runServer() { + return __awaiter(this, void 0, void 0, function* () { + try { + const PORT = process.env.PORT || 3001; + const sseServer = createSSEServer(server); + sseServer.listen(PORT); + console.error(`Perplexity MCP Server running on SSE and listening on port ${PORT} with Ask, Research, and Reason tools`); + } + catch (error) { + console.error("Fatal error running server:", error); + process.exit(1); + } + }); +} +// Start the server and catch any startup errors +runServer().catch((error) => { + console.error("Fatal error running server:", error); + process.exit(1); +}); diff --git a/perplexity-ask/dist/sse-server.js b/perplexity-ask/dist/sse-server.js new file mode 100755 index 0000000..8f099f3 --- /dev/null +++ b/perplexity-ask/dist/sse-server.js @@ -0,0 +1,105 @@ +var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { + function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } + return new (P || (P = Promise))(function (resolve, reject) { + function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } + function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } + function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } + step((generator = generator.apply(thisArg, _arguments || [])).next()); + }); +}; +import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; +import express from "express"; +export function createSSEServer(mcpServer) { + const app = express(); + // Add CORS middleware + app.use((req, res, next) => { + res.header('Access-Control-Allow-Origin', '*'); + res.header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS'); + res.header('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept, Authorization, Cache-Control'); + if (req.method === 'OPTIONS') { + res.sendStatus(200); + } + else { + next(); + } + }); + // Add middleware to parse JSON bodies + app.use(express.json()); + const transportMap = new Map(); + app.get("/sse", (req, res) => __awaiter(this, void 0, void 0, function* () { + const transport = new SSEServerTransport("/messages", res); + console.log(`[SSE] New SSE connection established with sessionId: ${transport.sessionId}`); + transportMap.set(transport.sessionId, transport); + console.log(`[SSE] Transport added to map. Total transports: ${transportMap.size}`); + // 完整的连接清理函数 + const cleanup = () => { + console.log(`[SSE] Connection cleanup for sessionId: ${transport.sessionId}`); + transportMap.delete(transport.sessionId); + console.log(`[SSE] Transport removed from map. Total transports: ${transportMap.size}`); + if (connectionTimeout) { + clearTimeout(connectionTimeout); + } + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + } + }; + // 监听所有可能的断连事件 + res.on('close', cleanup); + res.on('error', (error) => { + console.error(`[SSE] Connection error for sessionId: ${transport.sessionId}`, error); + cleanup(); + }); + res.on('finish', cleanup); + // 添加心跳机制 (每30秒发送一次心跳) + const heartbeatInterval = setInterval(() => { + try { + res.write(': heartbeat\n\n'); + } + catch (error) { + console.error(`[SSE] Heartbeat failed for sessionId: ${transport.sessionId}`, error); + cleanup(); + } + }, 30000); + // 添加连接超时保护 (10分钟) + const connectionTimeout = setTimeout(() => { + console.log(`[SSE] Connection timeout for sessionId: ${transport.sessionId}`); + res.end(); + }, 600000); + try { + yield mcpServer.connect(transport); + } + catch (error) { + console.error(`[SSE] Error connecting to MCP server for sessionId: ${transport.sessionId}`, error); + cleanup(); + } + })); + app.post("/messages", (req, res) => __awaiter(this, void 0, void 0, function* () { + const sessionId = req.query.sessionId; + console.log(`[SSE] POST /messages received with sessionId: ${sessionId}`); + console.log(`[SSE] Available sessionIds: ${Array.from(transportMap.keys()).join(', ')}`); + if (!sessionId) { + console.error('[SSE] Message received without sessionId'); + res.status(400).json({ error: 'sessionId is required' }); + return; + } + const transport = transportMap.get(sessionId); + if (transport) { + console.log(`[SSE] Transport found for sessionId: ${sessionId}, handling message`); + try { + yield transport.handlePostMessage(req, res); + console.log(`[SSE] Message handled successfully for sessionId: ${sessionId}`); + } + catch (error) { + console.error(`[SSE] Error handling message for sessionId: ${sessionId}`, error); + if (!res.headersSent) { + res.status(500).json({ error: 'Internal server error' }); + } + } + } + else { + console.error(`[SSE] No transport found for sessionId: ${sessionId}`); + res.status(404).json({ error: 'Session not found' }); + } + })); + return app; +} diff --git a/perplexity-ask/index.ts b/perplexity-ask/index.ts index 3ec13c9..9200dcd 100644 --- a/perplexity-ask/index.ts +++ b/perplexity-ask/index.ts @@ -148,6 +148,9 @@ async function performChatCompletion( }; let response; + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 90000); // 90秒超时 + try { response = await fetch(url.toString(), { method: "POST", @@ -156,8 +159,14 @@ async function performChatCompletion( "Authorization": `Bearer ${PERPLEXITY_API_KEY}`, }, body: JSON.stringify(body), + signal: controller.signal, }); + clearTimeout(timeoutId); } catch (error) { + clearTimeout(timeoutId); + if (error instanceof Error && error.name === 'AbortError') { + throw new Error('Perplexity API request timed out after 90 seconds'); + } throw new Error(`Network error while calling Perplexity API: ${error}`); } diff --git a/perplexity-ask/sse-server.ts b/perplexity-ask/sse-server.ts index a8ab8bc..4771342 100644 --- a/perplexity-ask/sse-server.ts +++ b/perplexity-ask/sse-server.ts @@ -4,19 +4,83 @@ import express from "express"; export function createSSEServer(mcpServer: Server) { const app = express(); + + // Add CORS middleware + app.use((req, res, next) => { + res.header('Access-Control-Allow-Origin', '*'); + res.header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS'); + res.header('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept, Authorization, Cache-Control'); + + if (req.method === 'OPTIONS') { + res.sendStatus(200); + } else { + next(); + } + }); + + // Add middleware to parse JSON bodies + app.use(express.json()); const transportMap = new Map(); app.get("/sse", async (req, res) => { const transport = new SSEServerTransport("/messages", res); + console.log(`[SSE] New SSE connection established with sessionId: ${transport.sessionId}`); transportMap.set(transport.sessionId, transport); - await mcpServer.connect(transport); + console.log(`[SSE] Transport added to map. Total transports: ${transportMap.size}`); + + // 完整的连接清理函数 + const cleanup = () => { + console.log(`[SSE] Connection cleanup for sessionId: ${transport.sessionId}`); + transportMap.delete(transport.sessionId); + console.log(`[SSE] Transport removed from map. Total transports: ${transportMap.size}`); + if (connectionTimeout) { + clearTimeout(connectionTimeout); + } + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + } + }; + + // 监听所有可能的断连事件 + res.on('close', cleanup); + res.on('error', (error) => { + console.error(`[SSE] Connection error for sessionId: ${transport.sessionId}`, error); + cleanup(); + }); + res.on('finish', cleanup); + + // 添加心跳机制 (每30秒发送一次心跳) + const heartbeatInterval = setInterval(() => { + try { + res.write(': heartbeat\n\n'); + } catch (error) { + console.error(`[SSE] Heartbeat failed for sessionId: ${transport.sessionId}`, error); + cleanup(); + } + }, 30000); + + // 添加连接超时保护 (10分钟) + const connectionTimeout = setTimeout(() => { + console.log(`[SSE] Connection timeout for sessionId: ${transport.sessionId}`); + res.end(); + }, 600000); + + try { + await mcpServer.connect(transport); + } catch (error) { + console.error(`[SSE] Error connecting to MCP server for sessionId: ${transport.sessionId}`, error); + cleanup(); + } }); - app.post("/messages", (req, res) => { + app.post("/messages", async (req, res) => { const sessionId = req.query.sessionId as string; + console.log(`[SSE] POST /messages received with sessionId: ${sessionId}`); + console.log(`[SSE] Available sessionIds: ${Array.from(transportMap.keys()).join(', ')}`); + if (!sessionId) { - console.error('Message received without sessionId'); + console.error('[SSE] Message received without sessionId'); res.status(400).json({ error: 'sessionId is required' }); return; } @@ -24,7 +88,19 @@ export function createSSEServer(mcpServer: Server) { const transport = transportMap.get(sessionId); if (transport) { - transport.handlePostMessage(req, res); + console.log(`[SSE] Transport found for sessionId: ${sessionId}, handling message`); + try { + await transport.handlePostMessage(req, res); + console.log(`[SSE] Message handled successfully for sessionId: ${sessionId}`); + } catch (error) { + console.error(`[SSE] Error handling message for sessionId: ${sessionId}`, error); + if (!res.headersSent) { + res.status(500).json({ error: 'Internal server error' }); + } + } + } else { + console.error(`[SSE] No transport found for sessionId: ${sessionId}`); + res.status(404).json({ error: 'Session not found' }); } });