Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
from routers import attach
from routers import inventory
from api import api_call
from tariff import Tariff
from config import API_KEY as APIKEY

app = FastAPI(title='SmartLinkAPI')

app.state.tariffs = {
tariff['billing_uuid']: unescape(tariff['name'])
tariff['billing_uuid']: Tariff(unescape(tariff['name']))
for tariff in api_call('tariff', 'get')['data'].values()
}
app.state.customer_groups = {
Expand All @@ -47,8 +48,7 @@
'host': olt['host'],
'online': bool(olt['is_online']),
'location': unescape(olt['location'])
} for olt in api_call('device', 'get_data', 'object_type=olt&is_hide_ifaces_data=1')['data']
.values()
} for olt in api_call('device', 'get_data', 'object_type=olt&is_hide_ifaces_data=1')['data'].values()
]
app.state.divisions = [
{
Expand Down
6 changes: 2 additions & 4 deletions routers/attach.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ def api_get_attachs(id: int, include_task: bool = False):
if include_task:
tasks = api_call('task', 'get_list', f'customer_id={id}')['list'].split(',')
for task in tasks:
task_attachs = normalize_items(api_call('attach', 'get',
f'object_id={task}&object_type=task'))
task_attachs = normalize_items(api_call('attach', 'get', f'object_id={task}&object_type=task'))
if isinstance(task_attachs, dict):
for attach in task_attachs: attach['source'] = 'task'
attachs.extend(task_attachs)
Expand All @@ -23,8 +22,7 @@ def api_get_attachs(id: int, include_task: bool = False):
# 'url': api_call('attach', 'get_file_temporary_link', f'uuid={attach["id"]}'),
'url': get_attach_url(attach['id']),
'name': attach['internal_filepath'],
'extension': attach['internal_filepath'].split('.')[1].lower()
if '.' in attach['internal_filepath'] else None,
'extension': attach['internal_filepath'].split('.')[1].lower() if '.' in attach['internal_filepath'] else None,
'created_at': attach['date_add'],
'source': attach.get('source', 'customer'),
'source_id': attach.get('object_id'),
Expand Down
50 changes: 29 additions & 21 deletions routers/box.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
from json import loads
from json.decoder import JSONDecodeError

from fastapi import APIRouter
from fastapi.responses import JSONResponse
from fastapi.requests import Request

from api import api_call
from utils import extract_sn, normalize_items, remove_sn, status_to_str, list_to_str, str_to_list, get_coordinates, get_box_map_link
from routers.customer import _process_customer
from utils import normalize_items, list_to_str, str_to_list, get_coordinates, get_box_map_link

router = APIRouter(prefix='/box')

@router.get('/{id}')
def api_get_box(
request: Request,
id: int,
get_onu_level: bool = False,
get_olt_data: bool = False,
get_tasks: bool = False,
limit: int | None = None,
exclude_customer_ids: list[int] = []
exclude_customer_ids: str = '[]'
):
def _get_onu_level(name) -> float | None:
if extract_sn(name) is None:
return
data = api_call('device', 'get_ont_data', f'id={extract_sn(name)}').get('data')
if not isinstance(data, dict):
return
return data.get('level_onu_rx')

def _get_tasks(entity: str, entity_id: int) -> list[int]:
res = api_call('task', 'get_list', f'{entity}_id={entity_id}&state_id=18,3,17,11,1,16,19')
return list(map(int, str_to_list(res.get('list', ''))))
Expand All @@ -30,23 +28,33 @@ def _build_customer(customer: dict) -> dict | None:
name = customer.get('full_name')
if name is None:
return None
return {
'id': customer['id'],
'name': remove_sn(name),
'last_activity': customer.get('date_activity'),
'status': status_to_str(customer['state_id']),
'sn': extract_sn(name),
'onu_level': _get_onu_level(name) if get_onu_level else None,
'tasks': _get_tasks('customer', customer['id']) if get_tasks else None
}
return _process_customer(request.app.state.tariffs, request.app.state.customer_groups, customer, get_olt_data)
# {
# 'id': customer['id'],
# 'name': remove_sn(name),
# 'last_activity': customer.get('date_activity'),
# 'status': status_to_str(customer['state_id']),
# 'sn': extract_sn(name),
# 'onu_level': _get_onu_level(name) if get_onu_level else None,
# 'tasks': _get_tasks('customer', customer['id']) if get_tasks else None
# }

exclude_ids = []
if exclude_customer_ids:
try:
exclude_ids: list[int] = loads(exclude_customer_ids)
if not (isinstance(exclude_ids, list) and all(isinstance(customer, int) for customer in exclude_ids)):
return JSONResponse({'status': 'fail', 'detail': 'incorrect type of exclude_customer_ids param'}, 422)
except JSONDecodeError:
return JSONResponse({'status': 'fail', 'detail': 'unable to parse exclude_customer_ids param'}, 422)

house_data = api_call('address', 'get_house', f'building_id={id}').get('data')
if not house_data:
return JSONResponse({'status': 'fail', 'detail': 'box not found'}, 404)

house = list(house_data.values())[0]
customer_ids: list = api_call('customer', 'get_customers_id', f'house_id={id}').get('data', [])
for customer in exclude_customer_ids:
for customer in exclude_ids:
if customer in customer_ids:
customer_ids.remove(customer)
customers_count = len(customer_ids)
Expand Down
Loading