diff --git a/resources/functions/get_usage_button.py b/resources/functions/get_usage_button.py new file mode 100644 index 0000000..e29450c --- /dev/null +++ b/resources/functions/get_usage_button.py @@ -0,0 +1,167 @@ +""" +title: 计费信息按钮 +author: fl0w1nd +author_url: https://github.com/fl0w1nd +version: 0.1.1 +icon_url:  +required_open_webui_version: 0.4.0 +""" + +from pydantic import BaseModel, Field +from typing import Optional, Union, Generator, Iterator + +import os +import requests +import asyncio +import json + + +class Action: + class Valves(BaseModel): + show_cost: bool = Field( + default=True, + description="是否显示费用", + json_schema_extra={"ui:group": "显示设置"}, + ) + show_balance: bool = Field( + default=True, + description="是否显示余额", + json_schema_extra={"ui:group": "显示设置"}, + ) + show_tokens: bool = Field( + default=True, + description="是否显示token数", + json_schema_extra={"ui:group": "显示设置"}, + ) + show_tokens_per_sec: bool = Field( + default=True, + description="是否显示每秒输出token数", + json_schema_extra={"ui:group": "显示设置"}, + ) + + def __init__(self): + self.valves = self.Valves() + pass + + async def action( + self, + body: dict, + __user__=None, + __event_emitter__=None, + __event_call__=None, + ) -> Optional[dict]: + print(f"action:{__name__}") + + # 查找最新一条assistant消息的索引和内容 + messages = body.get("messages", []) + assistant_indexes = [ + i for i, msg in enumerate(messages) if msg.get("role") == "assistant" + ] + + if not assistant_indexes: + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "没有找到assistant消息", "done": True}, + } + ) + return None + + # 获取最后一条assistant消息的索引和内容 + last_assistant_index = assistant_indexes[-1] + last_assistant_message = messages[last_assistant_index] + + # 获取消息 ID + message_id = last_assistant_message.get("id") + + if not message_id: + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "无法获取消息ID", "done": True}, + } + ) + return None + + # 构建文件路径 + file_path = os.path.join("/app/backend/data/record", f"{message_id}.json") + + # 检查文件是否存在 + if not os.path.exists(file_path): + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"未查找到该消息的计费记录,请联系管理员", + "done": True, + }, + } + ) + return None + + # 读取统计信息 + try: + with open(file_path, "r") as f: + stats_data = json.load(f) + except Exception as e: + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"读取统计文件失败: {str(e)}", + "done": True, + }, + } + ) + return None + + # 构建状态栏显示的统计信息 + stats_array = [] + + if self.valves.show_cost and "total_cost" in stats_data: + stats_array.append(f"Cost: ${stats_data['total_cost']:.6f}") + if self.valves.show_balance and "new_balance" in stats_data: + stats_array.append(f"Balance: ${stats_data['new_balance']:.6f}") + if ( + self.valves.show_tokens + and "input_tokens" in stats_data + and "output_tokens" in stats_data + ): + stats_array.append( + f"Token: {stats_data['input_tokens']}+{stats_data['output_tokens']}" + ) + + # 计算耗时(如果有elapsed_time) + if "elapsed_time" in stats_data: + elapsed_time = stats_data["elapsed_time"] + stats_array.append(f"Time: {elapsed_time:.2f}s") + + # 计算每秒输出速度 + if ( + self.valves.show_tokens_per_sec + and "output_tokens" in stats_data + and elapsed_time > 0 + ): + stats_array.append( + f"{(stats_data['output_tokens']/elapsed_time):.2f} T/s" + ) + + stats = " | ".join(stat for stat in stats_array) + + # 发送状态更新 + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": stats, + "done": True, + }, + } + ) + + return None diff --git a/resources/functions/openwebui_monitor_invisible.py b/resources/functions/openwebui_monitor_invisible.py new file mode 100644 index 0000000..2177246 --- /dev/null +++ b/resources/functions/openwebui_monitor_invisible.py @@ -0,0 +1,221 @@ +from typing import Optional, Callable, Any, Awaitable +from pydantic import Field, BaseModel +import requests +import time +from open_webui.utils.misc import get_last_assistant_message +import json +import os + + +class Filter: + class Valves(BaseModel): + API_ENDPOINT: str = Field( + default="", description="The base URL for the API endpoint." + ) + API_KEY: str = Field(default="", description="API key for authentication.") + priority: int = Field( + default=5, description="Priority level for the filter operations." + ) + + def __init__(self): + self.type = "filter" + self.name = "OpenWebUI Monitor" + self.valves = self.Valves() + self.outage = False + self.start_time = None + + def _prepare_user_dict(self, __user__: dict) -> dict: + """将 __user__ 对象转换为可序列化的字典""" + user_dict = dict(__user__) # 创建副本以避免修改原始对象 + + # 如果存在 valves 且是 BaseModel 的实例,将其转换为字典 + if "valves" in user_dict and hasattr(user_dict["valves"], "model_dump"): + user_dict["valves"] = user_dict["valves"].model_dump() + + return user_dict + + def inlet( + self, body: dict, user: Optional[dict] = None, __user__: dict = {} + ) -> dict: + self.start_time = time.time() + + try: + post_url = f"{self.valves.API_ENDPOINT}/api/v1/inlet" + headers = {"Authorization": f"Bearer {self.valves.API_KEY}"} + + # 使用 _prepare_user_dict 处理 __user__ 对象 + user_dict = self._prepare_user_dict(__user__) + + response = requests.post( + post_url, headers=headers, json={"user": user_dict, "body": body} + ) + + if response.status_code == 401: + return body + + response.raise_for_status() + response_data = response.json() + + if not response_data.get("success"): + error_msg = response_data.get("error", "未知错误") + error_type = response_data.get("error_type", "UNKNOWN_ERROR") + raise Exception(f"请求失败: [{error_type}] {error_msg}") + + self.outage = response_data.get("balance", 0) <= 0 + if self.outage: + raise Exception(f"余额不足: 当前余额 `{response_data['balance']:.4f}`") + + return body + + except requests.exceptions.RequestException as e: + if ( + isinstance(e, requests.exceptions.HTTPError) + and e.response.status_code == 401 + ): + return body + raise Exception(f"网络请求失败: {str(e)}") + except Exception as e: + raise Exception(f"处理请求时发生错误: {str(e)}") + + async def outlet( + self, + body: dict, + user: Optional[dict] = None, + __user__: dict = {}, + __event_emitter__: Callable[[Any], Awaitable[None]] = None, + ) -> dict: + if self.outage: + return body + + try: + post_url = f"{self.valves.API_ENDPOINT}/api/v1/outlet" + headers = {"Authorization": f"Bearer {self.valves.API_KEY}"} + + # 使用 _prepare_user_dict 处理 __user__ 对象 + user_dict = self._prepare_user_dict(__user__) + + request_data = { + "user": user_dict, + "body": body, + } + + response = requests.post(post_url, headers=headers, json=request_data) + + if response.status_code == 401: + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "API密钥验证失败", + "done": True, + }, + } + ) + return body + + response.raise_for_status() + result = response.json() + + if not result.get("success"): + error_msg = result.get("error", "未知错误") + error_type = result.get("error_type", "UNKNOWN_ERROR") + raise Exception(f"请求失败: [{error_type}] {error_msg}") + + # 获取统计数据 + input_tokens = result["inputTokens"] + output_tokens = result["outputTokens"] + total_cost = result["totalCost"] + new_balance = result["newBalance"] + + print(f"user_dict: {json.dumps(user_dict, indent=4)}") + print(f"inlet body: {json.dumps(body, indent=4)}") + + # 从 body 中获取消息 ID + messages = body.get("messages", []) + message_id = messages[-1].get("id") if messages else None + + if message_id: # 需要 message_id + # 构建统计信息字典 + stats_data = { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "total_cost": total_cost, + "new_balance": new_balance, + } + + # 计算耗时(如果有start_time) + if self.start_time: + elapsed_time = time.time() - self.start_time + stats_data["elapsed_time"] = elapsed_time + + # 计算每秒输出速度,使用三元运算符避免除以零 + stats_data["tokens_per_sec"] = ( + output_tokens / elapsed_time if elapsed_time > 0 else 0 + ) + + # 指定目标目录路径 + directory_path = "/app/backend/data/record" + + # 确保目录存在 + os.makedirs(directory_path, exist_ok=True) + + # 构建文件路径 + file_path = os.path.join(directory_path, f"{message_id}.json") + + # 将统计信息写入 JSON 文件 + with open(file_path, "w") as f: + json.dump(stats_data, f, indent=4) + else: + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"无法获取消息ID", + "done": True, + }, + } + ) + + return body + + except requests.exceptions.RequestException as e: + if ( + isinstance(e, requests.exceptions.HTTPError) + and e.response.status_code == 401 + ): + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "API密钥验证失败", + "done": True, + }, + } + ) + return body + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"网络请求失败: {str(e)}", + "done": True, + }, + } + ) + raise Exception(f"网络请求失败: {str(e)}") + except Exception as e: + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"错误: {str(e)}", + "done": True, + }, + } + ) + raise Exception(f"处理请求时发生错误: {str(e)}") diff --git a/resources/tutorials/zh-cn/deployment_guide_zh.md b/resources/tutorials/zh-cn/deployment_guide_zh.md index 544e80e..1fa20bc 100644 --- a/resources/tutorials/zh-cn/deployment_guide_zh.md +++ b/resources/tutorials/zh-cn/deployment_guide_zh.md @@ -51,7 +51,9 @@ sudo docker compose up -d 至此部署已完成!请自行发布网站到公网。如果想修改端口,请修改 `docker-compose.yml` 文件中的 `ports` 中 `:` 前面的端口数字。 -## 二、安装 OpenWebUI 函数插件 +## 二、安装 OpenWebUI 函数插件(二选一) + +### 显式显示计费信息函数 1. 打开 OpenWebUI 管理员面板的 `函数` 页面,点击 `+` 创建新函数,将 [这个函数](https://github.com/VariantConst/OpenWebUI-Monitor/blob/main/resources/functions/openwebui_monitor.py) 的代码粘贴进去并保存。 @@ -63,3 +65,21 @@ sudo docker compose up -d 3. 启用函数,并点击 `…` 打开详细配置,全局启用函数 image + +4. 该函数会默认在每个回复消息顶部直接显示计费信息 + +### 隐式(手动触发)显示计费信息函数[可选] + +若你选择隐式显示计费,则取而代之用 [这个函数](https://github.com/VariantConst/OpenWebUI-Monitor/blob/main/resources/functions/openwebui_monitor_invisible.py) 的代码粘贴进去并保存,同样需要启用函数,并点击 `…` 打开详细配置,全局启用函数。但是要额外再安装一个 Action 函数插件 + +* Action 函数 + +同理,选择添加并复制[Action函数](https://github.com/VariantConst/OpenWebUI-Monitor/blob/main/resources/functions/get_usage_button.py)的代码粘贴进去保存,启用函数,并点击 `…` 打开详细配置,全局启用函数。 +该函数会接管原先计费插件的统计信息显示选项配置 + +* 使用 + +![CleanShot 2024-12-10 at 13 41 08](https://github.com/user-attachments/assets/e999d022-339e-41d3-9bf9-a6f8d9877fe8) + + +手动点击底部的“计费信息”按钮来显示消息,但要注意的是该方式只能显示对话最新(最底部)的消息计费信息