Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ crewai==0.105.0
tc-temporal-backend==1.1.4
transformers[torch]==4.49.0
nest-asyncio==1.6.0
openai==1.66.3
openai==1.93.0
tc-hivemind-backend==1.4.3
langchain==0.3.26
langchain-openai==0.3.27
10 changes: 5 additions & 5 deletions tasks/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,21 @@ async def run_hivemind_agent_activity(
}
)

error_fallback_answer = "Looks like things didn't go through. Please give it another go."
if isinstance(final_answer, str) and "encountered an error" in final_answer.lower():
logging.error(f"final_answer: {final_answer}")
fallback_answer = "Looks like things didn't go through. Please give it another go."


# Update step: Error handling
mongo_persistence.update_workflow_step(
workflow_id=workflow_id,
step_name="error_handling",
step_data={
"errorType": "crewai_error",
"originalAnswer": final_answer,
"fallbackAnswer": fallback_answer,
"fallbackAnswer": error_fallback_answer,
}
)
final_answer = fallback_answer
final_answer = error_fallback_answer

if memory and final_answer != "NONE":
chat = f"User: {payload.query}\nAgent: {final_answer}"
Expand Down Expand Up @@ -178,7 +178,7 @@ async def run_hivemind_agent_activity(
status="completed_no_answer"
)

if final_answer == "NONE":
if final_answer == "NONE" or final_answer == error_fallback_answer:
return None
else:
return final_answer
Expand Down
110 changes: 69 additions & 41 deletions tasks/hivemind/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from crewai.flow.flow import Flow, listen, start, router
from crewai.llm import LLM
from tasks.hivemind.classify_question import ClassifyQuestion
from tasks.hivemind.query_data_sources import RAGPipelineTool
from crewai.process import Process
from tasks.hivemind.query_data_sources import make_rag_tool
from pydantic import BaseModel
from crewai.tools import tool
from openai import OpenAI
from typing import Optional
from tasks.mongo_persistence import MongoPersistence
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents import AgentExecutor, create_openai_functions_agent


class AgenticFlowState(BaseModel):
Expand Down Expand Up @@ -154,50 +156,76 @@ def detect_question_type(self) -> str:

@router("rag")
def do_rag_query(self) -> str:
query_data_source_tool = RAGPipelineTool.setup_tools(
community_id=self.community_id,
enable_answer_skipping=self.enable_answer_skipping,
workflow_id=self.workflow_id,
)
# query_data_source_tool = RAGPipelineTool.setup_tools(
# community_id=self.community_id,
# enable_answer_skipping=self.enable_answer_skipping,
# workflow_id=self.workflow_id,
# )

# q_a_bot_agent = Agent(
# role="Q&A Bot",
# goal=(
# "You decide when to rely on your internal knowledge and when to retrieve real-time data. "
# "For queries that are not specific to community data, answer using your own LLM knowledge. "
# "Your final response must not exceed 250 words."
# ),
# backstory=(
# "You are an intelligent agent capable of giving concise answers to questions."
# ),
# allow_delegation=True,
# llm=LLM(model="gpt-4o-mini-2024-07-18"),
# )
# rag_task = Task(
# description=(
# "Answer the following query using a maximum of 250 words. "
# "If the query is specific to community data, use the tool to retrieve updated information; "
# f"otherwise, answer using your internal knowledge.\n\nQuery: {self.state.user_query}"
# ),
# expected_output="A clear, well-structured answer under 250 words that directly addresses the query using appropriate information sources",
# agent=q_a_bot_agent,
# tools=[
# query_data_source_tool(result_as_answer=True),
# ],
# )

# crew = Crew(
# agents=[q_a_bot_agent],
# tasks=[rag_task],
# process=Process.hierarchical,
# manager_llm=LLM(model="gpt-4o-mini-2024-07-18"),
# verbose=True,
# )

# crew_output = crew.kickoff()

q_a_bot_agent = Agent(
role="Q&A Bot",
goal=(
"You decide when to rely on your internal knowledge and when to retrieve real-time data. "
"For queries that are not specific to community data, answer using your own LLM knowledge. "
"Your final response must not exceed 250 words."
),
backstory=(
"You are an intelligent agent capable of giving concise answers to questions."
),
allow_delegation=True,
llm=LLM(model="gpt-4o-mini-2024-07-18"),
)
rag_task = Task(
description=(
"Answer the following query using a maximum of 250 words. "
"If the query is specific to community data, use the tool to retrieve updated information; "
f"otherwise, answer using your internal knowledge.\n\nQuery: {self.state.user_query}"
),
expected_output="A clear, well-structured answer under 250 words that directly addresses the query using appropriate information sources",
agent=q_a_bot_agent,
tools=[
query_data_source_tool(result_as_answer=True),
],
)
# Store the latest crew output and increment retry count
# self.state.last_answer = crew_output

llm = ChatOpenAI(model="gpt-4o-mini-2024-07-18")
rag_tool = make_rag_tool(self.enable_answer_skipping, self.community_id, self.workflow_id)
tools = [rag_tool]

crew = Crew(
agents=[q_a_bot_agent],
tasks=[rag_task],
process=Process.hierarchical,
manager_llm=LLM(model="gpt-4o-mini-2024-07-18"),
verbose=True,
SYSTEM_INSTRUCTIONS = """\
You are a helpful assistant.
"""

prompt = ChatPromptTemplate.from_messages(
[
("system", SYSTEM_INSTRUCTIONS),
MessagesPlaceholder("chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"),
]
)
agent = create_openai_functions_agent(llm, tools, prompt)

crew_output = crew.kickoff()
# Run the agent
agent_executor = AgentExecutor(
agent=agent, tools=tools, verbose=True, return_intermediate_steps=False
)

# Store the latest crew output and increment retry count
self.state.last_answer = crew_output
result = agent_executor.invoke({"input": self.state.user_query})
self.state.last_answer = result["output"]
self.state.retry_count += 1

return "stop"
Expand Down
72 changes: 26 additions & 46 deletions tasks/hivemind/query_data_sources.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import asyncio
import os
from uuid import uuid1

import nest_asyncio
from dotenv import load_dotenv
from typing import Type, Optional
from typing import Optional, Callable
from langchain.tools import tool
from tc_temporal_backend.client import TemporalClient
from tc_temporal_backend.schema.hivemind import HivemindQueryPayload
from pydantic import BaseModel, Field

nest_asyncio.apply()

from crewai.tools import BaseTool


class QueryDataSources:
Expand Down Expand Up @@ -64,55 +62,34 @@ def load_hivemind_queue(self) -> str:
return hivemind_queue


class RAGPipelineToolSchema(BaseModel):
"""Input schema for RAGPipelineTool."""
def make_rag_tool(enable_answer_skipping: bool, community_id: str, workflow_id: Optional[str] = None) -> Callable:
"""
Make the RAG pipeline tool.
Passing the arguments to the tool instead of relying on the LLM to pass them (making the work for LLM easier)

query: str = Field(
...,
description=(
"The input query string provided by the user. The name is case sensitive."
"Please provide a value of type string. This parameter is required."
),
)
Args:
enable_answer_skipping (bool): The flag to enable answer skipping.
community_id (str): The community ID.
workflow_id (Optional[str]): The workflow ID.


class RAGPipelineTool(BaseTool):
name: str = "RAG pipeline tool"
description: str = (
"This tool implements a Retrieval-Augmented Generation (RAG) pipeline which "
"queries available data sources to provide accurate answers to user queries. "
)
args_schema: Type[BaseModel] = RAGPipelineToolSchema

@classmethod
def setup_tools(cls, community_id: str, enable_answer_skipping: bool, workflow_id: Optional[str] = None):
"""
Setup the tool with the necessary community identifier, the flag to enable answer skipping,
and the workflow ID for tracking.
Returns:
Callable: The RAG pipeline tool.
"""
@tool(return_direct=True)
def get_rag_answer(query: str) -> str:
"""
cls.community_id = community_id
cls.enable_answer_skipping = enable_answer_skipping
cls.workflow_id = workflow_id
return cls
Get the answer from the RAG pipeline

def _run(self, query: str) -> str:
"""
Execute the RAG pipeline by querying the available data sources.

Parameters
------------
query : str
The input query string provided by the user.
Args:
query (str): The input query string provided by the user.

Returns
----------
response : str
The response obtained after querying the data sources.
Returns:
str: The answer to the query.
"""
query_data_sources = QueryDataSources(
community_id=self.community_id,
enable_answer_skipping=self.enable_answer_skipping,
workflow_id=self.workflow_id,
community_id=community_id,
enable_answer_skipping=enable_answer_skipping,
workflow_id=workflow_id,
)
response = asyncio.run(query_data_sources.query(query))

Expand All @@ -121,3 +98,6 @@ def _run(self, query: str) -> str:
return "NONE"
else:
return response

# returing the tool function
return get_rag_answer