|
1 | | -import tiktoken |
2 | | -import uuid |
3 | | -import requests |
4 | | -import base64 |
5 | | -import time |
6 | | -import openai |
7 | | -import requests |
8 | | -import pyotp |
9 | | -from datetime import datetime |
10 | | -from pydub import AudioSegment |
11 | | -from pydantic import BaseModel |
12 | 1 | from typing import ( |
13 | 2 | Dict, |
14 | 3 | List, |
15 | 4 | Any, |
16 | 5 | Optional, |
17 | 6 | Callable, |
18 | 7 | Type, |
| 8 | + Union, |
19 | 9 | get_args, |
20 | 10 | get_origin, |
21 | | - Union, |
| 11 | + get_type_hints, |
22 | 12 | ) |
| 13 | +from pydantic import BaseModel |
| 14 | +from pydub import AudioSegment |
| 15 | +from datetime import datetime |
23 | 16 | from enum import Enum |
| 17 | +import tiktoken |
| 18 | +import requests |
24 | 19 | import inspect |
25 | | -from typing import get_type_hints |
| 20 | +import base64 |
| 21 | +import openai |
| 22 | +import pyotp |
| 23 | +import uuid |
| 24 | +import time |
26 | 25 | import json |
| 26 | +import os |
27 | 27 |
|
28 | 28 |
|
29 | 29 | class ChatCompletions(BaseModel): |
@@ -1715,16 +1715,13 @@ def _generate_detailed_schema(self, model: Type[BaseModel], depth: int = 0) -> s |
1715 | 1715 | indent = " " * depth |
1716 | 1716 | for field, field_type in fields.items(): |
1717 | 1717 | description = f"{indent}{field}: " |
1718 | | - print(f"Processing field: {field}, type: {field_type}") # Debug print |
1719 | 1718 | origin_type = get_origin(field_type) |
1720 | 1719 | if origin_type is None: |
1721 | 1720 | origin_type = field_type |
1722 | | - print(f"Origin type: {origin_type}") # Debug print |
1723 | 1721 | if inspect.isclass(origin_type) and issubclass(origin_type, BaseModel): |
1724 | 1722 | description += f"Nested Model:\n{self._generate_detailed_schema(origin_type, depth + 1)}" |
1725 | 1723 | elif origin_type == list: |
1726 | 1724 | list_type = get_args(field_type)[0] |
1727 | | - print(f"List type: {list_type}") # Debug print |
1728 | 1725 | if inspect.isclass(list_type) and issubclass(list_type, BaseModel): |
1729 | 1726 | description += f"List of Nested Model:\n{self._generate_detailed_schema(list_type, depth + 1)}" |
1730 | 1727 | elif get_origin(list_type) == Union: |
@@ -1861,6 +1858,45 @@ def convert_list_of_dicts( |
1861 | 1858 | mapped_list.append(new_data) |
1862 | 1859 | return mapped_list |
1863 | 1860 |
|
| 1861 | + def create_extension( |
| 1862 | + self, |
| 1863 | + agent_name: str, |
| 1864 | + extension_name: str, |
| 1865 | + openapi_json_url: str, |
| 1866 | + ): |
| 1867 | + """ |
| 1868 | + Create an AGiXT extension for an OpenAPI specification from a JSON URL. |
| 1869 | +
|
| 1870 | + Parameters: |
| 1871 | + - extension_name (str): The name of the extension to create. |
| 1872 | + - openapi_json_url (str): The URL of the OpenAPI specification in JSON format. |
| 1873 | + """ |
| 1874 | + print( |
| 1875 | + f"Creating AGiXT extension for {extension_name}, this will take some time!" |
| 1876 | + ) |
| 1877 | + chain_name = self.execute_command( |
| 1878 | + agent_name=agent_name, |
| 1879 | + command_name="Generate Extension from OpenAPI", |
| 1880 | + command_args={ |
| 1881 | + "openapi_json_url": openapi_json_url, |
| 1882 | + "extension_name": extension_name, |
| 1883 | + }, |
| 1884 | + conversation_name=f"{extension_name} Extension Generation", |
| 1885 | + ) |
| 1886 | + extension_download = self.run_chain( |
| 1887 | + chain_name=chain_name, |
| 1888 | + agent_name=agent_name, |
| 1889 | + user_input=f"Create an AGiXT extension for {extension_name}.", |
| 1890 | + ) |
| 1891 | + file_name = extension_download.split("/")[-1] |
| 1892 | + extension_file = requests.get(extension_download) |
| 1893 | + extension_dir = os.path.join(os.getcwd(), "extensions") |
| 1894 | + extension_file_path = os.path.join(extension_dir, file_name) |
| 1895 | + os.makedirs(extension_dir, exist_ok=True) |
| 1896 | + with open(extension_file_path, "wb") as f: |
| 1897 | + f.write(extension_file.content) |
| 1898 | + return f"{extension_name} extension created and downloaded to {extension_file_path}" |
| 1899 | + |
1864 | 1900 | def get_dpo_response( |
1865 | 1901 | self, |
1866 | 1902 | agent_name: str, |
@@ -2041,3 +2077,30 @@ def get_unique_external_sources( |
2041 | 2077 | return response.json()["external_sources"] |
2042 | 2078 | except Exception as e: |
2043 | 2079 | return self.handle_error(e) |
| 2080 | + |
| 2081 | + |
| 2082 | +def snake_case(old_str: str = ""): |
| 2083 | + if not old_str: |
| 2084 | + return "" |
| 2085 | + if " " in old_str: |
| 2086 | + old_str = old_str.replace(" ", "") |
| 2087 | + if "@" in old_str: |
| 2088 | + old_str = old_str.replace("@", "_") |
| 2089 | + if "." in old_str: |
| 2090 | + old_str = old_str.replace(".", "_") |
| 2091 | + if "-" in old_str: |
| 2092 | + old_str = old_str.replace("-", "_") |
| 2093 | + if "&" in old_str: |
| 2094 | + old_str = old_str.replace("&", "and") |
| 2095 | + if ":" in old_str: |
| 2096 | + old_str = old_str.replace(":", "_") |
| 2097 | + snake_str = "" |
| 2098 | + for i, char in enumerate(old_str): |
| 2099 | + if char.isupper(): |
| 2100 | + if i != 0 and old_str[i - 1].islower(): |
| 2101 | + snake_str += "_" |
| 2102 | + if i != len(old_str) - 1 and old_str[i + 1].islower(): |
| 2103 | + snake_str += "_" |
| 2104 | + snake_str += char.lower() |
| 2105 | + snake_str = snake_str.strip("_") |
| 2106 | + return snake_str |
0 commit comments