From 63617f102e065e0e2040391cf5e016d4d455d88d Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 30 Jun 2025 18:53:12 +0330 Subject: [PATCH 1/2] fix: using langchain tool calling which is better! --- requirements.txt | 4 +- tasks/agent.py | 10 +-- tasks/hivemind/agent.py | 110 +++++++++++++++++---------- tasks/hivemind/query_data_sources.py | 72 +++++++----------- 4 files changed, 103 insertions(+), 93 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9a56e7b..c820d88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,7 @@ crewai==0.105.0 tc-temporal-backend==1.1.4 transformers[torch]==4.49.0 nest-asyncio==1.6.0 -openai==1.66.3 +openai==1.93.0 tc-hivemind-backend==1.4.3 +langchain==0.3.26 +langchain-openai==0.3.27 diff --git a/tasks/agent.py b/tasks/agent.py index 4257008..61b87f0 100644 --- a/tasks/agent.py +++ b/tasks/agent.py @@ -134,10 +134,10 @@ async def run_hivemind_agent_activity( } ) + error_fallback_answer = "Looks like things didn't go through. Please give it another go." if isinstance(final_answer, str) and "encountered an error" in final_answer.lower(): logging.error(f"final_answer: {final_answer}") - fallback_answer = "Looks like things didn't go through. Please give it another go." - + # Update step: Error handling mongo_persistence.update_workflow_step( workflow_id=workflow_id, @@ -145,10 +145,10 @@ async def run_hivemind_agent_activity( step_data={ "errorType": "crewai_error", "originalAnswer": final_answer, - "fallbackAnswer": fallback_answer, + "fallbackAnswer": error_fallback_answer, } ) - final_answer = fallback_answer + final_answer = error_fallback_answer if memory and final_answer != "NONE": chat = f"User: {payload.query}\nAgent: {final_answer}" @@ -178,7 +178,7 @@ async def run_hivemind_agent_activity( status="completed_no_answer" ) - if final_answer == "NONE": + if final_answer == "NONE" or final_answer == error_fallback_answer: return None else: return final_answer diff --git a/tasks/hivemind/agent.py b/tasks/hivemind/agent.py index 8cbbe79..38a6ab3 100644 --- a/tasks/hivemind/agent.py +++ b/tasks/hivemind/agent.py @@ -4,13 +4,15 @@ from crewai.flow.flow import Flow, listen, start, router from crewai.llm import LLM from tasks.hivemind.classify_question import ClassifyQuestion -from tasks.hivemind.query_data_sources import RAGPipelineTool -from crewai.process import Process +from tasks.hivemind.query_data_sources import make_rag_tool from pydantic import BaseModel from crewai.tools import tool from openai import OpenAI from typing import Optional from tasks.mongo_persistence import MongoPersistence +from langchain_openai import ChatOpenAI +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain.agents import AgentExecutor, create_openai_functions_agent class AgenticFlowState(BaseModel): @@ -154,50 +156,76 @@ def detect_question_type(self) -> str: @router("rag") def do_rag_query(self) -> str: - query_data_source_tool = RAGPipelineTool.setup_tools( - community_id=self.community_id, - enable_answer_skipping=self.enable_answer_skipping, - workflow_id=self.workflow_id, - ) + # query_data_source_tool = RAGPipelineTool.setup_tools( + # community_id=self.community_id, + # enable_answer_skipping=self.enable_answer_skipping, + # workflow_id=self.workflow_id, + # ) + + # q_a_bot_agent = Agent( + # role="Q&A Bot", + # goal=( + # "You decide when to rely on your internal knowledge and when to retrieve real-time data. " + # "For queries that are not specific to community data, answer using your own LLM knowledge. " + # "Your final response must not exceed 250 words." + # ), + # backstory=( + # "You are an intelligent agent capable of giving concise answers to questions." + # ), + # allow_delegation=True, + # llm=LLM(model="gpt-4o-mini-2024-07-18"), + # ) + # rag_task = Task( + # description=( + # "Answer the following query using a maximum of 250 words. " + # "If the query is specific to community data, use the tool to retrieve updated information; " + # f"otherwise, answer using your internal knowledge.\n\nQuery: {self.state.user_query}" + # ), + # expected_output="A clear, well-structured answer under 250 words that directly addresses the query using appropriate information sources", + # agent=q_a_bot_agent, + # tools=[ + # query_data_source_tool(result_as_answer=True), + # ], + # ) + + # crew = Crew( + # agents=[q_a_bot_agent], + # tasks=[rag_task], + # process=Process.hierarchical, + # manager_llm=LLM(model="gpt-4o-mini-2024-07-18"), + # verbose=True, + # ) + + # crew_output = crew.kickoff() - q_a_bot_agent = Agent( - role="Q&A Bot", - goal=( - "You decide when to rely on your internal knowledge and when to retrieve real-time data. " - "For queries that are not specific to community data, answer using your own LLM knowledge. " - "Your final response must not exceed 250 words." - ), - backstory=( - "You are an intelligent agent capable of giving concise answers to questions." - ), - allow_delegation=True, - llm=LLM(model="gpt-4o-mini-2024-07-18"), - ) - rag_task = Task( - description=( - "Answer the following query using a maximum of 250 words. " - "If the query is specific to community data, use the tool to retrieve updated information; " - f"otherwise, answer using your internal knowledge.\n\nQuery: {self.state.user_query}" - ), - expected_output="A clear, well-structured answer under 250 words that directly addresses the query using appropriate information sources", - agent=q_a_bot_agent, - tools=[ - query_data_source_tool(result_as_answer=True), - ], - ) + # Store the latest crew output and increment retry count + # self.state.last_answer = crew_output + + llm = ChatOpenAI(model="gpt-4o-mini-2024-07-18") + rag_tool = make_rag_tool(self.enable_answer_skipping, self.community_id, self.workflow_id) + tools = [rag_tool] - crew = Crew( - agents=[q_a_bot_agent], - tasks=[rag_task], - process=Process.hierarchical, - manager_llm=LLM(model="gpt-4o-mini-2024-07-18"), - verbose=True, + SYSTEM_INSTRUCTIONS = f"""\ + You are a helpful assistant. + """ + + prompt = ChatPromptTemplate.from_messages( + [ + ("system", SYSTEM_INSTRUCTIONS), + MessagesPlaceholder("chat_history", optional=True), + ("human", "{input}"), + MessagesPlaceholder("agent_scratchpad"), + ] ) + agent = create_openai_functions_agent(llm, tools, prompt) - crew_output = crew.kickoff() + # Run the agent + agent_executor = AgentExecutor( + agent=agent, tools=tools, verbose=True, return_intermediate_steps=False + ) - # Store the latest crew output and increment retry count - self.state.last_answer = crew_output + result = agent_executor.invoke({"input": self.state.user_query}) + self.state.last_answer = result["output"] self.state.retry_count += 1 return "stop" diff --git a/tasks/hivemind/query_data_sources.py b/tasks/hivemind/query_data_sources.py index bf210f7..fe46313 100644 --- a/tasks/hivemind/query_data_sources.py +++ b/tasks/hivemind/query_data_sources.py @@ -1,17 +1,15 @@ import asyncio import os -from uuid import uuid1 import nest_asyncio from dotenv import load_dotenv -from typing import Type, Optional +from typing import Optional, Callable from tc_temporal_backend.client import TemporalClient from tc_temporal_backend.schema.hivemind import HivemindQueryPayload -from pydantic import BaseModel, Field nest_asyncio.apply() -from crewai.tools import BaseTool +from langchain.tools import tool class QueryDataSources: @@ -64,55 +62,34 @@ def load_hivemind_queue(self) -> str: return hivemind_queue -class RAGPipelineToolSchema(BaseModel): - """Input schema for RAGPipelineTool.""" +def make_rag_tool(enable_answer_skipping: bool, community_id: str, workflow_id: Optional[str] = None) -> Callable: + """ + Make the RAG pipeline tool. + Passing the arguments to the tool instead of relying on the LLM to pass them (making the work for LLM easier) - query: str = Field( - ..., - description=( - "The input query string provided by the user. The name is case sensitive." - "Please provide a value of type string. This parameter is required." - ), - ) + Args: + enable_answer_skipping (bool): The flag to enable answer skipping. + community_id (str): The community ID. + workflow_id (Optional[str]): The workflow ID. - -class RAGPipelineTool(BaseTool): - name: str = "RAG pipeline tool" - description: str = ( - "This tool implements a Retrieval-Augmented Generation (RAG) pipeline which " - "queries available data sources to provide accurate answers to user queries. " - ) - args_schema: Type[BaseModel] = RAGPipelineToolSchema - - @classmethod - def setup_tools(cls, community_id: str, enable_answer_skipping: bool, workflow_id: Optional[str] = None): - """ - Setup the tool with the necessary community identifier, the flag to enable answer skipping, - and the workflow ID for tracking. + Returns: + Callable: The RAG pipeline tool. + """ + @tool(return_direct=True) + def get_rag_answer(query: str) -> str: """ - cls.community_id = community_id - cls.enable_answer_skipping = enable_answer_skipping - cls.workflow_id = workflow_id - return cls + Get the answer from the RAG pipeline - def _run(self, query: str) -> str: - """ - Execute the RAG pipeline by querying the available data sources. - - Parameters - ------------ - query : str - The input query string provided by the user. + Args: + query (str): The input query string provided by the user. - Returns - ---------- - response : str - The response obtained after querying the data sources. + Returns: + str: The answer to the query. """ query_data_sources = QueryDataSources( - community_id=self.community_id, - enable_answer_skipping=self.enable_answer_skipping, - workflow_id=self.workflow_id, + community_id=community_id, + enable_answer_skipping=enable_answer_skipping, + workflow_id=workflow_id, ) response = asyncio.run(query_data_sources.query(query)) @@ -121,3 +98,6 @@ def _run(self, query: str) -> str: return "NONE" else: return response + + # returing the tool function + return get_rag_answer \ No newline at end of file From 1fa6d64888ec63c2f35cc637b2c1301341ab03db Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 30 Jun 2025 18:59:12 +0330 Subject: [PATCH 2/2] refactor: clean up string formatting in agent.py and reorder import in query_data_sources.py --- tasks/hivemind/agent.py | 2 +- tasks/hivemind/query_data_sources.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tasks/hivemind/agent.py b/tasks/hivemind/agent.py index 38a6ab3..0735218 100644 --- a/tasks/hivemind/agent.py +++ b/tasks/hivemind/agent.py @@ -205,7 +205,7 @@ def do_rag_query(self) -> str: rag_tool = make_rag_tool(self.enable_answer_skipping, self.community_id, self.workflow_id) tools = [rag_tool] - SYSTEM_INSTRUCTIONS = f"""\ + SYSTEM_INSTRUCTIONS = """\ You are a helpful assistant. """ diff --git a/tasks/hivemind/query_data_sources.py b/tasks/hivemind/query_data_sources.py index fe46313..5014395 100644 --- a/tasks/hivemind/query_data_sources.py +++ b/tasks/hivemind/query_data_sources.py @@ -4,12 +4,12 @@ import nest_asyncio from dotenv import load_dotenv from typing import Optional, Callable +from langchain.tools import tool from tc_temporal_backend.client import TemporalClient from tc_temporal_backend.schema.hivemind import HivemindQueryPayload nest_asyncio.apply() -from langchain.tools import tool class QueryDataSources: