Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions resources/functions/get_usage_button.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""
title: 计费信息按钮
author: fl0w1nd
author_url: https://github.com/fl0w1nd
version: 0.1.1
icon_url: 
required_open_webui_version: 0.4.0
"""

from pydantic import BaseModel, Field
from typing import Optional, Union, Generator, Iterator

import os
import requests
import asyncio
import json


class Action:
class Valves(BaseModel):
show_cost: bool = Field(
default=True,
description="是否显示费用",
json_schema_extra={"ui:group": "显示设置"},
)
show_balance: bool = Field(
default=True,
description="是否显示余额",
json_schema_extra={"ui:group": "显示设置"},
)
show_tokens: bool = Field(
default=True,
description="是否显示token数",
json_schema_extra={"ui:group": "显示设置"},
)
show_tokens_per_sec: bool = Field(
default=True,
description="是否显示每秒输出token数",
json_schema_extra={"ui:group": "显示设置"},
)

def __init__(self):
self.valves = self.Valves()
pass

async def action(
self,
body: dict,
__user__=None,
__event_emitter__=None,
__event_call__=None,
) -> Optional[dict]:
print(f"action:{__name__}")

# 查找最新一条assistant消息的索引和内容
messages = body.get("messages", [])
assistant_indexes = [
i for i, msg in enumerate(messages) if msg.get("role") == "assistant"
]

if not assistant_indexes:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "没有找到assistant消息", "done": True},
}
)
return None

# 获取最后一条assistant消息的索引和内容
last_assistant_index = assistant_indexes[-1]
last_assistant_message = messages[last_assistant_index]

# 获取消息 ID
message_id = last_assistant_message.get("id")

if not message_id:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "无法获取消息ID", "done": True},
}
)
return None

# 构建文件路径
file_path = os.path.join("/app/backend/data/record", f"{message_id}.json")

# 检查文件是否存在
if not os.path.exists(file_path):
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"未查找到该消息的计费记录,请联系管理员",
"done": True,
},
}
)
return None

# 读取统计信息
try:
with open(file_path, "r") as f:
stats_data = json.load(f)
except Exception as e:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"读取统计文件失败: {str(e)}",
"done": True,
},
}
)
return None

# 构建状态栏显示的统计信息
stats_array = []

if self.valves.show_cost and "total_cost" in stats_data:
stats_array.append(f"Cost: ${stats_data['total_cost']:.6f}")
if self.valves.show_balance and "new_balance" in stats_data:
stats_array.append(f"Balance: ${stats_data['new_balance']:.6f}")
if (
self.valves.show_tokens
and "input_tokens" in stats_data
and "output_tokens" in stats_data
):
stats_array.append(
f"Token: {stats_data['input_tokens']}+{stats_data['output_tokens']}"
)

# 计算耗时(如果有elapsed_time)
if "elapsed_time" in stats_data:
elapsed_time = stats_data["elapsed_time"]
stats_array.append(f"Time: {elapsed_time:.2f}s")

# 计算每秒输出速度
if (
self.valves.show_tokens_per_sec
and "output_tokens" in stats_data
and elapsed_time > 0
):
stats_array.append(
f"{(stats_data['output_tokens']/elapsed_time):.2f} T/s"
)

stats = " | ".join(stat for stat in stats_array)

# 发送状态更新
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": stats,
"done": True,
},
}
)

return None
221 changes: 221 additions & 0 deletions resources/functions/openwebui_monitor_invisible.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
from typing import Optional, Callable, Any, Awaitable
from pydantic import Field, BaseModel
import requests
import time
from open_webui.utils.misc import get_last_assistant_message
import json
import os


class Filter:
class Valves(BaseModel):
API_ENDPOINT: str = Field(
default="", description="The base URL for the API endpoint."
)
API_KEY: str = Field(default="", description="API key for authentication.")
priority: int = Field(
default=5, description="Priority level for the filter operations."
)

def __init__(self):
self.type = "filter"
self.name = "OpenWebUI Monitor"
self.valves = self.Valves()
self.outage = False
self.start_time = None

def _prepare_user_dict(self, __user__: dict) -> dict:
"""将 __user__ 对象转换为可序列化的字典"""
user_dict = dict(__user__) # 创建副本以避免修改原始对象

# 如果存在 valves 且是 BaseModel 的实例,将其转换为字典
if "valves" in user_dict and hasattr(user_dict["valves"], "model_dump"):
user_dict["valves"] = user_dict["valves"].model_dump()

return user_dict

def inlet(
self, body: dict, user: Optional[dict] = None, __user__: dict = {}
) -> dict:
self.start_time = time.time()

try:
post_url = f"{self.valves.API_ENDPOINT}/api/v1/inlet"
headers = {"Authorization": f"Bearer {self.valves.API_KEY}"}

# 使用 _prepare_user_dict 处理 __user__ 对象
user_dict = self._prepare_user_dict(__user__)

response = requests.post(
post_url, headers=headers, json={"user": user_dict, "body": body}
)

if response.status_code == 401:
return body

response.raise_for_status()
response_data = response.json()

if not response_data.get("success"):
error_msg = response_data.get("error", "未知错误")
error_type = response_data.get("error_type", "UNKNOWN_ERROR")
raise Exception(f"请求失败: [{error_type}] {error_msg}")

self.outage = response_data.get("balance", 0) <= 0
if self.outage:
raise Exception(f"余额不足: 当前余额 `{response_data['balance']:.4f}`")

return body

except requests.exceptions.RequestException as e:
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 401
):
return body
raise Exception(f"网络请求失败: {str(e)}")
except Exception as e:
raise Exception(f"处理请求时发生错误: {str(e)}")

async def outlet(
self,
body: dict,
user: Optional[dict] = None,
__user__: dict = {},
__event_emitter__: Callable[[Any], Awaitable[None]] = None,
) -> dict:
if self.outage:
return body

try:
post_url = f"{self.valves.API_ENDPOINT}/api/v1/outlet"
headers = {"Authorization": f"Bearer {self.valves.API_KEY}"}

# 使用 _prepare_user_dict 处理 __user__ 对象
user_dict = self._prepare_user_dict(__user__)

request_data = {
"user": user_dict,
"body": body,
}

response = requests.post(post_url, headers=headers, json=request_data)

if response.status_code == 401:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "API密钥验证失败",
"done": True,
},
}
)
return body

response.raise_for_status()
result = response.json()

if not result.get("success"):
error_msg = result.get("error", "未知错误")
error_type = result.get("error_type", "UNKNOWN_ERROR")
raise Exception(f"请求失败: [{error_type}] {error_msg}")

# 获取统计数据
input_tokens = result["inputTokens"]
output_tokens = result["outputTokens"]
total_cost = result["totalCost"]
new_balance = result["newBalance"]

print(f"user_dict: {json.dumps(user_dict, indent=4)}")
print(f"inlet body: {json.dumps(body, indent=4)}")

# 从 body 中获取消息 ID
messages = body.get("messages", [])
message_id = messages[-1].get("id") if messages else None

if message_id: # 需要 message_id
# 构建统计信息字典
stats_data = {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_cost": total_cost,
"new_balance": new_balance,
}

# 计算耗时(如果有start_time)
if self.start_time:
elapsed_time = time.time() - self.start_time
stats_data["elapsed_time"] = elapsed_time

# 计算每秒输出速度,使用三元运算符避免除以零
stats_data["tokens_per_sec"] = (
output_tokens / elapsed_time if elapsed_time > 0 else 0
)

# 指定目标目录路径
directory_path = "/app/backend/data/record"

# 确保目录存在
os.makedirs(directory_path, exist_ok=True)

# 构建文件路径
file_path = os.path.join(directory_path, f"{message_id}.json")

# 将统计信息写入 JSON 文件
with open(file_path, "w") as f:
json.dump(stats_data, f, indent=4)
else:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"无法获取消息ID",
"done": True,
},
}
)

return body

except requests.exceptions.RequestException as e:
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response.status_code == 401
):
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "API密钥验证失败",
"done": True,
},
}
)
return body
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"网络请求失败: {str(e)}",
"done": True,
},
}
)
raise Exception(f"网络请求失败: {str(e)}")
except Exception as e:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"错误: {str(e)}",
"done": True,
},
}
)
raise Exception(f"处理请求时发生错误: {str(e)}")
Loading
Loading