diff --git a/main.py b/main.py index 06b3e96..cb79235 100644 --- a/main.py +++ b/main.py @@ -15,12 +15,13 @@ from routers import attach from routers import inventory from api import api_call +from tariff import Tariff from config import API_KEY as APIKEY app = FastAPI(title='SmartLinkAPI') app.state.tariffs = { - tariff['billing_uuid']: unescape(tariff['name']) + tariff['billing_uuid']: Tariff(unescape(tariff['name'])) for tariff in api_call('tariff', 'get')['data'].values() } app.state.customer_groups = { @@ -47,8 +48,7 @@ 'host': olt['host'], 'online': bool(olt['is_online']), 'location': unescape(olt['location']) - } for olt in api_call('device', 'get_data', 'object_type=olt&is_hide_ifaces_data=1')['data'] - .values() + } for olt in api_call('device', 'get_data', 'object_type=olt&is_hide_ifaces_data=1')['data'].values() ] app.state.divisions = [ { diff --git a/routers/attach.py b/routers/attach.py index b0b8961..b586d31 100644 --- a/routers/attach.py +++ b/routers/attach.py @@ -11,8 +11,7 @@ def api_get_attachs(id: int, include_task: bool = False): if include_task: tasks = api_call('task', 'get_list', f'customer_id={id}')['list'].split(',') for task in tasks: - task_attachs = normalize_items(api_call('attach', 'get', - f'object_id={task}&object_type=task')) + task_attachs = normalize_items(api_call('attach', 'get', f'object_id={task}&object_type=task')) if isinstance(task_attachs, dict): for attach in task_attachs: attach['source'] = 'task' attachs.extend(task_attachs) @@ -23,8 +22,7 @@ def api_get_attachs(id: int, include_task: bool = False): # 'url': api_call('attach', 'get_file_temporary_link', f'uuid={attach["id"]}'), 'url': get_attach_url(attach['id']), 'name': attach['internal_filepath'], - 'extension': attach['internal_filepath'].split('.')[1].lower() - if '.' in attach['internal_filepath'] else None, + 'extension': attach['internal_filepath'].split('.')[1].lower() if '.' in attach['internal_filepath'] else None, 'created_at': attach['date_add'], 'source': attach.get('source', 'customer'), 'source_id': attach.get('object_id'), diff --git a/routers/box.py b/routers/box.py index 2abe1d3..15f8ae3 100644 --- a/routers/box.py +++ b/routers/box.py @@ -1,27 +1,25 @@ +from json import loads +from json.decoder import JSONDecodeError + from fastapi import APIRouter from fastapi.responses import JSONResponse +from fastapi.requests import Request from api import api_call -from utils import extract_sn, normalize_items, remove_sn, status_to_str, list_to_str, str_to_list, get_coordinates, get_box_map_link +from routers.customer import _process_customer +from utils import normalize_items, list_to_str, str_to_list, get_coordinates, get_box_map_link router = APIRouter(prefix='/box') @router.get('/{id}') def api_get_box( + request: Request, id: int, - get_onu_level: bool = False, + get_olt_data: bool = False, get_tasks: bool = False, limit: int | None = None, - exclude_customer_ids: list[int] = [] + exclude_customer_ids: str = '[]' ): - def _get_onu_level(name) -> float | None: - if extract_sn(name) is None: - return - data = api_call('device', 'get_ont_data', f'id={extract_sn(name)}').get('data') - if not isinstance(data, dict): - return - return data.get('level_onu_rx') - def _get_tasks(entity: str, entity_id: int) -> list[int]: res = api_call('task', 'get_list', f'{entity}_id={entity_id}&state_id=18,3,17,11,1,16,19') return list(map(int, str_to_list(res.get('list', '')))) @@ -30,15 +28,25 @@ def _build_customer(customer: dict) -> dict | None: name = customer.get('full_name') if name is None: return None - return { - 'id': customer['id'], - 'name': remove_sn(name), - 'last_activity': customer.get('date_activity'), - 'status': status_to_str(customer['state_id']), - 'sn': extract_sn(name), - 'onu_level': _get_onu_level(name) if get_onu_level else None, - 'tasks': _get_tasks('customer', customer['id']) if get_tasks else None - } + return _process_customer(request.app.state.tariffs, request.app.state.customer_groups, customer, get_olt_data) + # { + # 'id': customer['id'], + # 'name': remove_sn(name), + # 'last_activity': customer.get('date_activity'), + # 'status': status_to_str(customer['state_id']), + # 'sn': extract_sn(name), + # 'onu_level': _get_onu_level(name) if get_onu_level else None, + # 'tasks': _get_tasks('customer', customer['id']) if get_tasks else None + # } + + exclude_ids = [] + if exclude_customer_ids: + try: + exclude_ids: list[int] = loads(exclude_customer_ids) + if not (isinstance(exclude_ids, list) and all(isinstance(customer, int) for customer in exclude_ids)): + return JSONResponse({'status': 'fail', 'detail': 'incorrect type of exclude_customer_ids param'}, 422) + except JSONDecodeError: + return JSONResponse({'status': 'fail', 'detail': 'unable to parse exclude_customer_ids param'}, 422) house_data = api_call('address', 'get_house', f'building_id={id}').get('data') if not house_data: @@ -46,7 +54,7 @@ def _build_customer(customer: dict) -> dict | None: house = list(house_data.values())[0] customer_ids: list = api_call('customer', 'get_customers_id', f'house_id={id}').get('data', []) - for customer in exclude_customer_ids: + for customer in exclude_ids: if customer in customer_ids: customer_ids.remove(customer) customers_count = len(customer_ids) diff --git a/routers/customer.py b/routers/customer.py index 17e2ffa..cbb6181 100644 --- a/routers/customer.py +++ b/routers/customer.py @@ -1,13 +1,16 @@ from html import unescape from ipaddress import IPv4Address +from json import loads +from json.decoder import JSONDecodeError +from datetime import datetime from fastapi import APIRouter from fastapi.requests import Request from fastapi.responses import JSONResponse from api import api_call -from utils import list_to_str, to_2gis_link, to_neo_link, normalize_items, extract_sn, remove_sn,\ - parse_agreement, status_to_str, format_mac +from utils import list_to_str, to_2gis_link, to_neo_link, normalize_items, extract_sn, remove_sn, parse_agreement, status_to_str, format_mac +from tariff import calc_disconnect, Tariff router = APIRouter(prefix='/customer') PHONE_LENGTH = 9 @@ -54,15 +57,9 @@ def api_get_customer_search(query: str): }, 404) -# TODO: divide api calls -@router.get('/{id}') -def api_get_customer(request: Request, id: int): - customer = api_call('customer', 'get_data', f'id={id}').get('data') - if customer is None: - return JSONResponse({'status': 'fail', 'detail': 'customer not found'}, 404) - +def _process_customer(request_tariffs: dict[int, Tariff], request_groups: dict, customer: dict, get_olt_data: bool = True): tariffs = [ - {'id': int(tariff['id']), 'name': request.app.state.tariffs[tariff['id']]} + {'id': int(tariff['id']), 'name': request_tariffs[tariff['id']].content, 'prices': request_tariffs[tariff['id']].to_dict()} for tariff in customer['tariff']['current'] if tariff['id'] ] @@ -81,138 +78,91 @@ def api_get_customer(request: Request, id: int): geodata['2gis_link'] = to_2gis_link(geodata['coord'][0], geodata['coord'][1]) - olt = api_call('commutation', 'get_data', - f'object_type=customer&object_id={id}&is_finish_data=1')['data'] - - if 'finish' not in olt or olt['finish'].get('object_type') != 'switch' and extract_sn(customer['full_name']) is not None: + olt_id = None + onu_level = None + if get_olt_data and extract_sn(customer['full_name']) is not None: ont = api_call('device', 'get_ont_data', f'id={extract_sn(customer["full_name"])}')['data'] if isinstance(ont, dict): olt_id = ont.get('device_id') + onu_level = ont.get('level_onu_rx') else: - olt_id = None - elif extract_sn(customer['full_name']) is None: - olt_id = None - else: - olt_id = olt['finish']['object_id'] - - - # INVENTORY - # items = api_call('inventory', 'get_inventory_amount', f'location=customer&object_id={id}')\ - # .get('data', {}) - # if isinstance(items, dict): - # items = items.values() - - # item_names = [ - # { - # 'id': str(item['id']), - # 'name': unescape(item['name']), - # 'catalog': item['inventory_section_catalog_id'] - # } - # for item in api_call('inventory', 'get_inventory_catalog', - # f'id={list_to_str([str(i["inventory_type_id"]) for i in items])}')['data'].values() - # ] - # inventory = [] - # for item in items: - # item_name = [i for i in item_names if i['id'] == str(item['inventory_type_id'])][0] - # inventory.append({ - # 'id': item['id'], - # 'catalog_id': item['inventory_type_id'], - # 'name': item_name['name'], - # 'amount': item['amount'], - # 'category_id': item_name['catalog'], - # 'sn': item['serial_number'] - # }) - - - # TASK - # tasks_id = str_to_list(api_call('task', 'get_list', f'customer_id={id}')['list']) - # if tasks_id: - # tasks_data = normalize_items(api_call('task', 'show', f'id={list_to_str(tasks_id)}')) - # tasks = [] - # for task in tasks_data: - # dates = {} - # if 'create' in task['date']: - # dates['create'] = task['date']['create'] - # if 'update' in task['date']: - # dates['update'] = task['date']['update'] - # if 'complete' in task['date']: - # dates['complete'] = task['date']['complete'] - # if task['type']['name'] != 'Обращение абонента' and \ - # task['type']['name'] != 'Регистрация звонка': - # tasks.append({ - # 'id': task['id'], - # 'customer_id': task['customer'][0], - # 'employee_id': list(task['staff']['employee'].values())[0] - # if 'staff' in task and 'employee' in task['staff'] else None, - # 'name': task['type']['name'], - # 'status': { - # 'id': task['state']['id'], - # 'name': task['state']['name'], - # 'system_id': task['state']['system_role'] - # }, - # 'address': task['address']['text'], - # 'dates': dates - # }) - # else: - # tasks = [] + olt = api_call('commutation', 'get_data', f'object_type=customer&object_id={customer["id"]}&is_finish_data=1')['data'] + if isinstance(olt, dict) and olt.get('finish', {}).get('object_type') != 'switch': + olt_id = olt['finish']['object_id'] + + will_disconnect = calc_disconnect( + [request_tariffs[tariff['id']] for tariff in customer['tariff']['current'] if tariff['id']], + customer['balance'], datetime.strptime(customer['date_connect'], '%Y-%m-%d') + ) if customer.get('date_connect') else None + if will_disconnect: + will_disconnect = will_disconnect.strftime('%Y-%m-%d') + + return { + # main data + 'id': customer['id'], + 'name': remove_sn(customer['full_name']), + 'agreement': parse_agreement(customer['agreement'][0]['number']), + 'status': status_to_str(customer['state_id']), + 'group': { + 'id': list(customer['group'].values())[0]['id'], + 'name': request_groups[list(customer['group'].values())[0]['id']] + } if 'group' in customer else None, + 'phones': [phone['number'] for phone in customer['phone'] if phone['number']], + 'tariffs': tariffs, + 'manager_id': customer.get('manager_id'), + + 'is_corporate': bool(customer.get('flag_corporate', False)), + 'is_disabled': bool(customer.get('is_disable', False)), + 'is_potential': bool(customer.get('is_potential', False)), + + # ONT + 'olt_id': olt_id, + 'sn': extract_sn(customer['full_name']), + 'ip': str(IPv4Address(int(list(customer['ip_mac'].values())[0]['ip']))) if list(customer.get('ip_mac', {'': {}}).values())[0].get('ip') else None, + 'mac': format_mac(list(customer.get('ip_mac', {'': {}}).values())[0].get('mac')), + 'onu_level': onu_level, + + # billing + 'has_billing': bool(customer.get('is_in_billing', False)), + 'billing': { + 'id': int(customer['billing_id']) if 'billing_id' in customer and customer['billing_id'] else None, + 'crc': customer.get('crc_billing') + }, + 'balance': customer['balance'], + + # geodata + 'address': { + 'house_id': customer['address'][0].get('house_id') if customer.get('address', [{}])[0].get('house_id') else None, + 'entrance': customer['address'][0].get('entrance') if customer.get('address', [{}])[0].get('entrance') else None, + 'floor': int(customer['address'][0]['floor']) if customer.get('address', [{}])[0].get('floor') else None, + 'apartment': unescape(customer['address'][0]['apartment']['number']) + if customer.get('address', [{}])[0].get('apartment', {}).get('number') else None + }, + 'box_id': customer['address'][0]['house_id'] if customer['address'][0]['house_id'] != 0 else None, + 'geodata': geodata, + + # timestamps + 'created_at': customer.get('date_create'), + 'connected_at': customer.get('date_connect'), + 'positive_balance_at': customer.get('date_positive_balance'), + 'last_active_at': customer.get('date_activity'), + 'last_inet_active_at': customer.get('date_activity_inet'), + 'will_disconnect_at': will_disconnect + } + +@router.get('/{id}') +def api_get_customer( + request: Request, + id: int, + get_olt_data: bool = False +): + customer = api_call('customer', 'get_data', f'id={id}').get('data') + if customer is None: + return JSONResponse({'status': 'fail', 'detail': 'customer not found'}, 404) + return { 'status': 'success', - 'data': { - # main data - 'id': customer['id'], - 'name': remove_sn(customer['full_name']), - 'agreement': parse_agreement(customer['agreement'][0]['number']), - 'status': status_to_str(customer['state_id']), - 'group': { - 'id': list(customer['group'].values())[0]['id'], - 'name': request.app.state.customer_groups[list(customer['group'].values())[0]['id']] - } if 'group' in customer else None, - 'phones': [phone['number'] for phone in customer['phone'] if phone['number']], - 'tariffs': tariffs, - 'manager_id': customer.get('manager_id'), - - 'is_corporate': bool(customer.get('flag_corporate', False)), - 'is_disabled': bool(customer.get('is_disable', False)), - 'is_potential': bool(customer.get('is_potential', False)), - - # 'inventory': inventory, - # 'tasks': tasks, - - # ONT - 'olt_id': olt_id, - 'sn': extract_sn(customer['full_name']), - 'ip': str(IPv4Address(int(list(customer['ip_mac'].values())[0]['ip']))) if list(customer.get('ip_mac', {'': {}}).values())[0].get('ip') else None, - 'mac': format_mac(list(customer.get('ip_mac', {'': {}}).values())[0].get('mac')), - # 'onu_level': get_ont_data(extract_sn(customer['full_name'])), - - # billing - 'has_billing': bool(customer.get('is_in_billing', False)), - 'billing': { - 'id': int(customer['billing_id']) if 'billing_id' in customer and customer['billing_id'] else None, - 'crc': customer.get('crc_billing') - }, - 'balance': customer['balance'], - - # geodata - 'address': { - 'house_id': customer['address'][0].get('house_id') if customer.get('address', [{}])[0].get('house_id') else None, - 'entrance': customer['address'][0].get('entrance') if customer.get('address', [{}])[0].get('entrance') else None, - 'floor': int(customer['address'][0]['floor']) if customer.get('address', [{}])[0].get('floor') else None, - 'apartment': unescape(customer['address'][0]['apartment']['number']) - if customer.get('address', [{}])[0].get('apartment', {}).get('number') else None - }, - 'box_id': customer['address'][0]['house_id'] if customer['address'][0]['house_id'] != 0 else None, - 'geodata': geodata, - - # timestamps - 'timestamps': { - 'created_at': customer.get('date_create'), - 'connected_at': customer.get('date_connect'), - 'positive_balance_at': customer.get('date_positive_balance'), - 'last_active_at': customer.get('date_activity'), - 'last_inet_active_at': customer.get('date_activity_inet') - } - } + 'data': _process_customer(request.app.state.tariffs, request.app.state.customer_groups, customer, get_olt_data) } @@ -228,3 +178,45 @@ def api_get_customer_name(id: int): 'id': id, 'name': remove_sn(customer['full_name']) } + + +@router.get('') +def api_get_customers( + request: Request, + ids: str | None = None, + get_data: bool = True, + get_olt_data: bool = False, + limit: int | None = None, + skip: int | None = None +): + customers = [] + if ids is not None: + try: + customers: list[int] = loads(ids) + if not (isinstance(customers, list) and all(isinstance(customer, int) for customer in customers)): + return JSONResponse({'status': 'fail', 'detail': 'incorrect type of ids param'}, 422) + except JSONDecodeError: + return JSONResponse({'status': 'fail', 'detail': 'unable to parse ids param'}, 422) + else: + return JSONResponse({'status': 'fail', 'detail': 'no filters provided'}, 422) + + count = len(customers) + if skip: + customers = customers[skip:] + if limit: + customers = customers[:limit] + + customers_data = [] + if get_data: + for customer_id in customers: + customer = api_call('customer', 'get_data', f'id={customer_id}').get('data') + if customer is None: + return JSONResponse({'status': 'fail', 'detail': f'customer {customer_id} not found'}, 404) + + customers_data.append(_process_customer(request.app.state.tariffs, request.app.state.customer_groups, customer, get_olt_data)) + + return { + 'status': 'success', + 'data': customers_data or customers, + 'count': count # total count without limit/skip + } \ No newline at end of file diff --git a/routers/employee.py b/routers/employee.py index 76b5246..86a2be3 100644 --- a/routers/employee.py +++ b/routers/employee.py @@ -12,8 +12,7 @@ def api_get_employee_login(login: str, password: str): return { 'status': 'success', 'correct': result, - 'id': api_call('employee', 'get_employee_id', f'data_typer=login&data_value={login}').get('id') - if result else None + 'id': api_call('employee', 'get_employee_id', f'data_typer=login&data_value={login}').get('id') if result else None } @router.get('/name/{id}') diff --git a/routers/neomobile.py b/routers/neomobile.py index bec6fea..54a4872 100644 --- a/routers/neomobile.py +++ b/routers/neomobile.py @@ -78,9 +78,10 @@ def neomobile_api_get_customer(request: Request, id: int): @router.post('/task') def neomobile_api_post_task(customer_id: int, phone: str, reason: str, comment: str): - id = api_call('task', 'add', f'work_typer=37&work_datedo={get_current_time()}&customer_id=\ -{customer_id}&author_employee_id=184&opis={comment}&deadline_hour=72&employee_id=184&\ -division_id=81')['Id'] + id = api_call('task', 'add', + f'work_typer=37&work_datedo={get_current_time()}&customer_id={customer_id}&author_employee_id=184&' + f'opis={comment}&deadline_hour=72&employee_id=184&division_id=81' + )['Id'] set_additional_data(17, 28, id, 'Приложение') #TODO: make own appeal type set_additional_data(17, 29, id, phone) set_additional_data(17, 30, id, reason) @@ -119,10 +120,8 @@ def neomobile_api_get_task(id: int): 'status': {'id': data['state']['id'], 'name': data['state']['name']}, 'address': {'id': data['address']['addressId'], 'text': data['address']['text']}, 'customer': data['customer'][0], - 'reason': data['additional_data']['30']['value'] if '30' in data['additional_data'] - else None, - 'phone': data['additional_data']['29']['value'] if '29' in data['additional_data'] - else None, + 'reason': data['additional_data']['30']['value'] if '30' in data['additional_data'] else None, + 'phone': data['additional_data']['29']['value'] if '29' in data['additional_data'] else None, 'comments': [{ 'id': comment['comment_id'], 'content': comment['text'], @@ -153,13 +152,9 @@ def neomobile_api_get_inventory(request: Request, id: int): 'data': [ { 'id': inventory['id'], - 'name': unescape([ - name for name in names if name['id'] == inventory['catalog_id'] - ][0]['name']), + 'name': unescape([name for name in names if name['id'] == inventory['catalog_id']][0]['name']), 'type': { - 'id': [ - name for name in names if name['id'] == inventory['catalog_id'] - ][0]['inventory_section_catalog_id'], + 'id': [name for name in names if name['id'] == inventory['catalog_id']][0]['inventory_section_catalog_id'], 'name': [ category['name'] for category in request.app.state.tmc_categories if category['id'] == [ @@ -177,13 +172,11 @@ def neomobile_api_get_inventory(request: Request, id: int): @router.get('/documents') def neomobile_api_get_documents(id: int): - attachs = list(api_call('attach', 'get', f'object_id={id}&object_type=customer') - ['data'].values()) + attachs = list(api_call('attach', 'get', f'object_id={id}&object_type=customer')['data'].values()) tasks = api_call('task', 'get_list', f'customer_id={id}')['list'].split(',') for task in tasks: try: - attachs.extend(api_call('attach', 'get', f'object_id={task}&object_type=task') - ['data'].values()) + attachs.extend(api_call('attach', 'get', f'object_id={task}&object_type=task')['data'].values()) except AttributeError: continue return { @@ -193,12 +186,9 @@ def neomobile_api_get_documents(id: int): 'attachs': [ { 'id': attach['id'], - 'url': api_call('attach', 'get_file_temporary_link', - f'uuid={attach["id"]}')['data'], - 'name': attach['internal_filepath'] if '.' in attach['internal_filepath'] else - attach['internal_filepath'] + '.png', - 'extension': attach['internal_filepath'].split('.')[1].lower() - if '.' in attach['internal_filepath'] else 'png', + 'url': api_call('attach', 'get_file_temporary_link', f'uuid={attach["id"]}')['data'], + 'name': attach['internal_filepath'] if '.' in attach['internal_filepath'] else attach['internal_filepath'] + '.png', + 'extension': attach['internal_filepath'].split('.')[1].lower() if '.' in attach['internal_filepath'] else 'png', 'created_at': attach['date_add'] } for attach in attachs ] diff --git a/routers/task.py b/routers/task.py index de68570..2c95543 100644 --- a/routers/task.py +++ b/routers/task.py @@ -193,7 +193,9 @@ def api_get_tasks( ): tasks = [] if customer_id is not None: - tasks = list(map(int, str_to_list(api_call('task', 'get_list', f'customer_id={customer_id}&order_by=date_add&{f"&limit={limit}" if limit else ""}{f"&offset={skip}" if skip else ""}')['list']))) + tasks = list(map(int, str_to_list( + api_call('task', 'get_list', f'customer_id={customer_id}&order_by=date_add&{f"&limit={limit}" if limit else ""}{f"&offset={skip}" if skip else ""}')['list'] + ))) if (limit or skip) and get_count: tasks_count = api_call('task', 'get_list', f'customer_id={customer_id}')['count'] else: @@ -213,20 +215,17 @@ def api_get_tasks( 'created_at': comment['dateAdd'], 'author': { 'id': comment['employee_id'], - 'name': (api_call('employee', 'get_data', f'id={comment["employee_id"]}') - .get('data', {}).get(str(comment['employee_id']), {}).get('name') - if get_employee_names else None) + 'name': api_call('employee', 'get_data', f'id={comment["employee_id"]}').get('data', {}).get(str(comment['employee_id']), {}).get('name') + if get_employee_names else None } if comment.get('employee_id') else None, 'content': unescape(comment['comment']) } for comment in task.get('comments', {}).values() ], - 'timestamps': { - 'created_at': task['date'].get('create'), - 'planned_at': task['date'].get('todo'), - 'updated_at': task['date'].get('update'), - 'completed_at': task['date'].get('complete'), - 'deadline': task['date'].get('runtime_individual_hour') - }, + 'created_at': task['date'].get('create'), + 'planned_at': task['date'].get('todo'), + 'updated_at': task['date'].get('update'), + 'completed_at': task['date'].get('complete'), + 'deadline': task['date'].get('runtime_individual_hour'), 'addata': { 'reason': task['additional_data'].get('30', {}).get('value'), 'solve': task['additional_data'].get('36', {}).get('value'), @@ -244,8 +243,7 @@ def api_get_tasks( 'type': task['additional_data'].get('28', {}).get('value') }, } if task['type']['id'] == 38 else { - 'coord': list(map(float, task['additional_data']['7']['value'].split(','))) - if '7' in task['additional_data'] else None, + 'coord': list(map(float, task['additional_data']['7']['value'].split(','))) if '7' in task['additional_data'] else None, 'tariff': task['additional_data'].get('25', {}).get('value'), 'connect_type': task['additional_data'].get('27', {}).get('value') } if task['type']['id'] == 28 else None, @@ -255,9 +253,8 @@ def api_get_tasks( }, 'author': { 'id': task['author_employee_id'], - 'name': (api_call('employee', 'get_data', f'id={task["author_employee_id"]}') - .get('data', {}).get(str(task['author_employee_id']), {}).get('name') - if get_employee_names else None) + 'name': api_call('employee', 'get_data', f'id={task["author_employee_id"]}').get('data', {}).get(str(task['author_employee_id']), {}).get('name') + if get_employee_names else None }, 'status': { 'id': task['state']['id'], @@ -267,8 +264,7 @@ def api_get_tasks( 'address': { 'id': task['address'].get('addressId'), 'name': task['address'].get('text'), - 'apartment': unescape(task['address']['apartment']) - if task['address'].get('apartment') else None + 'apartment': unescape(task['address']['apartment']) if task['address'].get('apartment') else None }, 'customer': { 'id': customer['id'], diff --git a/tariff.py b/tariff.py new file mode 100644 index 0000000..2449bdc --- /dev/null +++ b/tariff.py @@ -0,0 +1,106 @@ +from re import search, escape +from enum import Enum +from datetime import datetime, timedelta +from functools import reduce + +class TariffType(Enum): + BASE = 'base' # priced + PROMO = 'promo' # free + SALE = 'sale' # % sale + NONE = 'none' # no price/sale + +class Tariff: + def __init__(self, content: str): + # self.id = id + self.content = content + self.price = 0 + self.free_days = 0 + self.sale = 0 + self.sale_days = 0 + + content = content.lower() + + if search(r'\(\d+\s*сом\)', content): + self.type = TariffType.BASE + match = search(r'\((\d+)\s*сом\)', content) + if match: + self.price = int(match.group(1)) + + elif 'бесплатно' in content: + self.type = TariffType.PROMO + match = search(r'(\S+)\s+бесплатно', content) + if match: + word = match.group(1) + + multiplier_match = search(rf'(\d+)\s+{escape(match.group(1))}', content) + multiplier = multiplier_match.group(1) if multiplier_match else 1 + + if 'месяц' in word: + self.free_days = 30 * int(multiplier) + + elif 'день' in word or 'дн' in word: + self.free_days = int(multiplier) + + elif '%' in content: + self.type = TariffType.SALE + match = search(r'(\d+)%', content) + if match: + sale = match.group(1) + self.sale = int(sale) + + word_match = search(fr'{sale}%\s+на\s+(\w+)', content) + if word_match: + word = word_match.group(1) + + if 'год' in word: + self.sale_days = 365 + else: + self.type = TariffType.NONE + + def to_dict(self) -> dict: + return { + 'price': self.price, + 'free_days': self.free_days, + 'sale': self.sale, + 'sale_days': self.sale_days, + 'type': self.type.value + } + + +def calc_disconnect(tariffs: list[Tariff], balance: float, connected_at: datetime) -> datetime | None: + base_sum = sum([t.price for t in tariffs]) / 30 # sum per day + if base_sum == 0: + return None + + sale_tariff = next((t for t in tariffs if t.type == TariffType.SALE), None) # first sale tariff + now = datetime.now() + days_since_connect = (now - connected_at).days + free_days = sum(t.free_days for t in tariffs) + free = max(0, free_days - days_since_connect) + + if sale_tariff: + sale_multiplier = 1 - sale_tariff.sale / 100 + sale_remaining = max(0, sale_tariff.sale_days - days_since_connect) if sale_tariff.sale_days > 0 else 0 + else: + sale_multiplier = 1 + sale_remaining = 0 + + saled_sum = base_sum * sale_multiplier + + # spend balance + if sale_remaining > 0: + # sale period cost + sale_period_cost = sale_remaining * saled_sum + + if balance >= sale_period_cost: + # balance enough for whole sale period + balance -= sale_period_cost + days = free + sale_remaining + balance / base_sum + else: + # disconnect while sale period + days = free + balance / saled_sum + else: + # sale expired or no sale + days = free + balance / (saled_sum if sale_tariff and sale_tariff.sale_days == 0 else base_sum) + + return now + timedelta(days=int(days)) diff --git a/utils.py b/utils.py index 82a2883..dc4062d 100644 --- a/utils.py +++ b/utils.py @@ -1,6 +1,5 @@ """Simple utils like parse agreement or build 2 gis link""" from datetime import datetime as dt -from functools import reduce from urllib.parse import urljoin @@ -8,14 +7,13 @@ def parse_agreement(agreement: str | None) -> int | None: - """ - Parse agreement string into an integer if it contains only digits. + """Parse agreement string into an integer if it contains only digits. Args: agreement (str | None): Agreement value as a string. Returns: - int|None: Parsed integer if valid, otherwise None. + int | None: Parsed integer if valid, otherwise None. """ if agreement: if agreement.isdigit(): @@ -23,8 +21,7 @@ def parse_agreement(agreement: str | None) -> int | None: return None def remove_sn(data: str) -> str: - """ - Extract the name part from a string formatted like 'name (sn)'. + """Extract the name part from a string formatted like 'name (sn)'. If parentheses are present, the substring before them is returned. Otherwise, the original string is returned. @@ -40,8 +37,7 @@ def remove_sn(data: str) -> str: return data def extract_sn(data: str) -> None | str: - """ - Extract serial number from a string in the format 'name(sn)'. + """Extract serial number from a string in the format 'name(sn)'. If the string ends with '()', it is treated as empty and None is returned. Otherwise, the substring inside parentheses is returned. @@ -58,8 +54,7 @@ def extract_sn(data: str) -> None | str: return data.rsplit('(', maxsplit=1)[-1].rstrip().rstrip(')') def status_to_str(status: int) -> str: - """ - Convert numeric status code to human-readable text. + """Convert numeric status code to human-readable text. Args: status (int): Status code (0 = off, 1 = pause, 2 = active). @@ -78,8 +73,7 @@ def status_to_str(status: int) -> str: return 'Неизвестен' def list_to_str(data: list) -> str: - """ - Join a list of strings into a single comma-separated string. + """Join a list of strings into a single comma-separated string. Args: data (list): List of string elements. @@ -90,8 +84,7 @@ def list_to_str(data: list) -> str: return ','.join(map(str, data)) def str_to_list(data: str) -> list: - """ - Convert a comma-separated string into a list of trimmed strings. + """Convert a comma-separated string into a list of trimmed strings. Args: data (str): Input string with items separated by commas. @@ -102,8 +95,7 @@ def str_to_list(data: str) -> list: return [item.strip() for item in data.split(",") if item.strip()] def to_neo_link(lat: float, lon: float) -> str: - """ - Build a NeoTelecom map link from latitude and longitude. + """Build a NeoTelecom map link from latitude and longitude. Args: lat (float): Latitude coordinate. @@ -116,8 +108,7 @@ def to_neo_link(lat: float, lon: float) -> str: @{lat},{lon},18z' def to_2gis_link(lat: float, lon: float) -> str: - """ - Build a 2GIS map link from latitude and longitude. + """Build a 2GIS map link from latitude and longitude. Args: lat (float): Latitude coordinate. @@ -150,8 +141,7 @@ def normalize_items(raw: dict) -> list: return [data] def get_attach_url(path: str) -> str: - """ - Build full attachment URL by joining base and relative path. + """Build full attachment URL by joining base and relative path. Args: path (str): Relative path to the attachment. @@ -162,8 +152,7 @@ def get_attach_url(path: str) -> str: return urljoin(ATTACH_URL, path) def get_current_time() -> str: - """ - Get the current local time formatted as 'YYYY.MM.DD HH:MM:SS'. + """Get the current local time formatted as 'YYYY.MM.DD HH:MM:SS'. Returns: str: Current time string. @@ -171,8 +160,7 @@ def get_current_time() -> str: return dt.now().strftime("%Y.%m.%d %H:%M:%S") def format_mac(mac: str | None) -> str | None: - """ - Format MAC address (insert ":" between every 2 symbols) + """Format MAC address (insert ":" between every 2 symbols) Args: mac (str | None): MAC address without ":" @@ -185,7 +173,15 @@ def format_mac(mac: str | None) -> str | None: return ':'.join(mac.replace('-', '')[i:i + 2] for i in range(0, len(mac.replace('-', '')), 2)) def get_coordinates(polygon: list[list[float]] | None) -> list[float] | None: - if polygon is None: + """Get coorinates from polyfon + + Args: + polygon (list[list[float]] | None): Polygon + + Returns: + list[float] | None: Coordinates as [lat, lon] + """ + if not polygon: return None points = polygon[:-1] lats = [p[0] for p in points] @@ -193,6 +189,15 @@ def get_coordinates(polygon: list[list[float]] | None) -> list[float] | None: return [sum(lats) / len(lats), sum(lons) / len(lons)] def get_box_map_link(coords: list[float] | None, box_id: int) -> str | None: + """Get link to box in map + + Args: + coords (list[float] | None): Coordinates as [lat, lon] + box_id (int): Box id + + Returns: + str | None: Link + """ if coords is None: return None return f'https://us.neotelecom.kg/map/show?opt_wh=1&by_building={box_id}&is_show_center_marker=1@{coords[0]},{coords[1]},18z' \ No newline at end of file