diff --git a/.idea/CIManagerProject.iml b/.idea/CIManagerProject.iml index d302d76..a247850 100644 --- a/.idea/CIManagerProject.iml +++ b/.idea/CIManagerProject.iml @@ -36,6 +36,9 @@ + + + diff --git a/.idea/libraries/Dart_Packages.xml b/.idea/libraries/Dart_Packages.xml deleted file mode 100644 index 452f14a..0000000 --- a/.idea/libraries/Dart_Packages.xml +++ /dev/null @@ -1,978 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/Documents/http_protocol_errors b/Documents/http_protocol_errors new file mode 100644 index 0000000..f5122d6 --- /dev/null +++ b/Documents/http_protocol_errors @@ -0,0 +1,2 @@ +Common errors +1. 503 serviceUnavailable \ No newline at end of file diff --git a/cim_server_2/bin/main.dart b/cim_server_2/bin/main.dart index f62cdcc..9f3f479 100644 --- a/cim_server_2/bin/main.dart +++ b/cim_server_2/bin/main.dart @@ -8,7 +8,7 @@ import 'package:cim_server_2/src/http/server.dart'; void main(List args) async { var config = ServerConfiguration; var server = Server(config.host, config.port); - await server.start(); + await server.start(timeout: Duration(seconds: 30)); /* var parser = ArgParser()..addOption('port', abbr: 'p'); var result = parser.parse(args); diff --git a/cim_server_2/lib/src/app_channel.dart b/cim_server_2/lib/src/app_channel.dart index 6ae6372..f3b0f21 100644 --- a/cim_server_2/lib/src/app_channel.dart +++ b/cim_server_2/lib/src/app_channel.dart @@ -1,3 +1,5 @@ +import 'dart:io'; + import 'package:cim_server_2/src/http/application_channel.dart'; import 'package:cim_server_2/src/http/controller.dart'; import 'package:cim_server_2/src/http/request.dart'; @@ -8,6 +10,7 @@ import 'package:cim_server_2/src/http/router.dart'; class TestController extends Controller{ @override RequestOrResponse handle(Request request) { + sleep(Duration(seconds: 30)); return Response.ok(); } diff --git a/cim_server_2/lib/src/http/http_processor.dart b/cim_server_2/lib/src/http/http_processor.dart index 355f7df..6217234 100644 --- a/cim_server_2/lib/src/http/http_processor.dart +++ b/cim_server_2/lib/src/http/http_processor.dart @@ -74,6 +74,6 @@ class HttpProcessor{ var response = router!.processRequest(request); var responseMessage = MessageHttpResponse(response, id); callerPort!.send(responseMessage); -// callerPort!.send(MessageHttpProcessorReady(id)); + callerPort!.send(MessageHttpProcessorReady(id)); } } \ No newline at end of file diff --git a/cim_server_2/lib/src/http/http_reader_writer.dart b/cim_server_2/lib/src/http/http_reader_writer.dart index 0bd017a..c60b7d9 100644 --- a/cim_server_2/lib/src/http/http_reader_writer.dart +++ b/cim_server_2/lib/src/http/http_reader_writer.dart @@ -1,6 +1,6 @@ +import 'dart:async'; import 'dart:collection'; -import 'package:alfred/alfred.dart'; import 'package:cim_server_2/src/http/http_processor.dart'; import 'package:cim_server_2/src/http/response.dart'; @@ -16,24 +16,32 @@ class SendPortId{ SendPortId(this.id, this.port); } class HttpReaderWriter{ + static Duration requestTimeout = Duration(seconds: 30);//Timeout for processing http request + static Duration processorTimeout = Duration(minutes: 20);//Timeout for completing processor isolate working cycle for processing 1 request static SendPort? _callerPort; static int _id = 0; static List processors = List.empty(growable: true); static Map processorPorts = {}; - static Queue requestQueue = Queue(); +// static Queue requestQueue = Queue(); + static Queue> requestQueue = Queue(); static Queue processorsQueue = Queue(); - static Map httpRequestsMap = {}; +// static Map httpRequestsMap = {}; + static Map> httpRequestsMap = {}; + static Map timeoutMap = {}; static HttpServer? server; + static Type? channelType; + static ReceivePort? receivePort; static void readIsolateEntryPoint(MessageInitServer messageInitServer) async { - var channelType = messageInitServer.applicationChannel; + channelType = messageInitServer.applicationChannel; _id = messageInitServer.id; - var receivePort = ReceivePort(); //Регистрация порта для приёмки сообщений + receivePort = ReceivePort(); //Регистрация порта для приёмки сообщений _callerPort = messageInitServer.callerPort; + requestTimeout = messageInitServer.timeout; + processorTimeout = requestTimeout * 10; /* _callerPort!.send(MessageSendPort(receivePort.sendPort, _id)); //Передача порта, через который будем получать сообщения*/ for(var i = 0; i < messageInitServer.threadsCount; i++){ - var processorMessage = MessageInitHttpProcessor(i, receivePort.sendPort, channelType); - var isolate = await Isolate.spawn(HttpProcessor.processorEntryPoint, processorMessage); + var isolate = await _spawn(i); processors.add(isolate); } try { @@ -51,17 +59,58 @@ class HttpReaderWriter{ } server!.listen(requestHandler); _callerPort!.send(MessageServerInited(_id)); - receivePort.listen(mainProcessListener); + receivePort!.listen(mainProcessListener); + } + static Future _spawn(int id) async{ + var processorMessage = MessageInitHttpProcessor(id, receivePort!.sendPort, channelType!); + var isolate = await Isolate.spawn(HttpProcessor.processorEntryPoint, processorMessage); + return isolate; } - static void httpProcessorListener(Message message)async{ } static Future requestHandler(HttpRequest httpRequest) async{ print('[HttpRequest received: ${httpRequest.uri}]'); - requestQueue.add(httpRequest); - unawaited (processRequest()); + var timer = Timer(requestTimeout, ()=>httpRequestTimeout(httpRequest)); + var entry = MapEntry(httpRequest, timer); +// requestQueue.add(httpRequest); + requestQueue.add(entry); + await processRequest(); + } + static void httpRequestTimeout(HttpRequest request){ + print('[HttpReaderWriter.httpRequestTimeout]'); + /*if(requestQueue.contains(requestEntry)){//If request in requestQueue, it has not been processed yet + + requestQueue.remove(requestEntry); + }*/ + var entries = requestQueue.toList(); + MapEntry? requiredEntry = null; + for(var entry in entries){ + if(entry.key == request) + { + requiredEntry = entry; + break; + } + } + if(requiredEntry != null){ + requestQueue.remove(requiredEntry); + } + else { + var id = -1; + for (var mapEntry in httpRequestsMap.entries) { + if (mapEntry.value.key == request) { + id = mapEntry.key; + requiredEntry = mapEntry.value; + break; + } + } + if(id < 0){ + return; + } + httpRequestsMap.remove(id); + } + sendResponse(requiredEntry!.key.response, Response.requestTimeout()); } static Future mainProcessListener(dynamic message) async{ @@ -84,18 +133,26 @@ class HttpReaderWriter{ } case MessageTypes.processorReady:{ message = message as MessageHttpProcessorReady; + var timer = timeoutMap[message.id]; + if(timer != null){ + timer.cancel(); + } var port = processorPorts[message.id]; if(port == null){ print('[HttpReaderWriter.mainProcessListener]SendPort is null!'); break; } processorsQueue.add(SendPortId(message.id,port)); - unawaited(processRequest()); + await processRequest(); break; } case MessageTypes.httpResponse:{ message = message as MessageHttpResponse; - unawaited(processResponse(message.id, message.response)); + var timer = timeoutMap[message.id]; + if(timer != null){ + timer.cancel(); + } + await processResponse(message.id, message.response); break; } default:{ @@ -111,18 +168,31 @@ class HttpReaderWriter{ return; } var sendPortId = processorsQueue.removeFirst(); - var httpRequest = requestQueue.removeFirst(); + var entry = requestQueue.removeFirst(); + var httpRequest = entry.key; var request = await Request.prepare(httpRequest); - httpRequestsMap[sendPortId.id] = httpRequest; + httpRequestsMap[sendPortId.id] = entry; var message = MessageHttpRequest(request, _id); + timeoutMap[sendPortId.id] = Timer(processorTimeout, ()=>processorTimeoutCallback(sendPortId.id)); sendPortId.port.send(message); } + static void processorTimeoutCallback(int id) async{ + print('[HttpReaderWriter.processorTimeoutCallback]'); + var isolate = processors[id]; + isolate.kill(priority: Isolate.immediate); + isolate = await _spawn(id); + processors[id] = isolate; + } static Future processResponse(int id, Response response) async{ - var httpRequest = httpRequestsMap.remove(id);; - if(httpRequest == null){ + var entry = httpRequestsMap.remove(id); + if(entry == null){ return; } - var httpResponse = httpRequest.response; + entry.value.cancel(); + var httpRequest = entry.key; + await sendResponse(httpRequest.response, response); + } + static Future sendResponse(HttpResponse httpResponse, Response response)async{ var headers = response.headers; var keys = headers.keys; for(var key in keys){ @@ -134,24 +204,22 @@ class HttpReaderWriter{ } httpResponse.statusCode = response.status; var body = response.body; - if(body.rawBody.isNotEmpty) { - switch(response.body.type){ - case BodyTypes.text: - httpResponse.headers.contentType = ContentType.text; - httpResponse.write(body.asString()); - break; - case BodyTypes.json: - httpResponse.headers.contentType = ContentType.json; - httpResponse.write(body.asJsonMap()); - break; - case BodyTypes.raw: - httpResponse.headers.contentType = ContentType.binary; - httpResponse.write(body.rawBody); - break; - case BodyTypes.empty: - break; - } - + switch(response.body.type){ + case BodyTypes.text: + httpResponse.headers.contentType = ContentType.text; + httpResponse.write(body.asString()); + break; + case BodyTypes.json: + httpResponse.headers.contentType = ContentType.json; + httpResponse.write(body.asJsonMap()); + break; + case BodyTypes.raw: + httpResponse.headers.contentType = ContentType.binary; + httpResponse.write(body.rawBody); + break; + case BodyTypes.empty: + httpResponse.headers.contentType = ContentType.text; + break; } await httpResponse.close(); } diff --git a/cim_server_2/lib/src/http/messages.dart b/cim_server_2/lib/src/http/messages.dart index 8a53589..86544e0 100644 --- a/cim_server_2/lib/src/http/messages.dart +++ b/cim_server_2/lib/src/http/messages.dart @@ -32,12 +32,13 @@ class MessageSendPort extends Message{ } class MessageInitServer extends Message{ + Duration timeout; int threadsCount; String host; int serverPort; SendPort callerPort; Type applicationChannel; - MessageInitServer( this.callerPort,this.threadsCount, this.host, this.serverPort, int id, this.applicationChannel):super(id); + MessageInitServer( this.callerPort,this.threadsCount, this.host, this.serverPort, int id, this.applicationChannel, this.timeout):super(id); @override MessageTypes getType() => MessageTypes.initServer; } diff --git a/cim_server_2/lib/src/http/response.dart b/cim_server_2/lib/src/http/response.dart index 78ddd01..697fda7 100644 --- a/cim_server_2/lib/src/http/response.dart +++ b/cim_server_2/lib/src/http/response.dart @@ -27,4 +27,8 @@ class Response implements RequestOrResponse{ body ??= Body.empty(); this.body = body; } + Response.requestTimeout({Body? body}):status = HttpStatus.serviceUnavailable{ + body ??= Body.empty(); + this.body = body; + } } \ No newline at end of file diff --git a/cim_server_2/lib/src/http/server.dart b/cim_server_2/lib/src/http/server.dart index 30ef4d7..7b0aabb 100644 --- a/cim_server_2/lib/src/http/server.dart +++ b/cim_server_2/lib/src/http/server.dart @@ -22,10 +22,11 @@ class Server{ late SendPort _readPort; late StreamSubscription _subscription; // static Future onInit(Server server) async {return server;} - Future start ([int count = 2]) async{ + Future start ({int count = 2, Duration? timeout}) async{ + timeout ??= Duration(seconds: 30); var type = reflectType(T).reflectedType; var callerPort = ReceivePort(); - var initMessage = MessageInitServer(callerPort.sendPort, count, host, port, servers.length, type); + var initMessage = MessageInitServer(callerPort.sendPort, count, host, port, servers.length, type, timeout); _readIsolate = await Isolate.spawn(HttpReaderWriter.readIsolateEntryPoint, initMessage); _subscription = callerPort.listen(callbackReadIsolateListener); servers.add(this); diff --git a/cim_server_2/pubspec.lock b/cim_server_2/pubspec.lock new file mode 100644 index 0000000..3c6cb99 --- /dev/null +++ b/cim_server_2/pubspec.lock @@ -0,0 +1,369 @@ +# Generated by pub +# See https://dart.dev/tools/pub/glossary#lockfile +packages: + _fe_analyzer_shared: + dependency: transitive + description: + name: _fe_analyzer_shared + url: "https://pub.dev" + source: hosted + version: "20.0.0" + alfred: + dependency: "direct main" + description: + name: alfred + url: "https://pub.dev" + source: hosted + version: "0.0.3+2" + analyzer: + dependency: transitive + description: + name: analyzer + url: "https://pub.dev" + source: hosted + version: "1.4.0" + args: + dependency: "direct main" + description: + name: args + url: "https://pub.dev" + source: hosted + version: "2.0.0" + async: + dependency: transitive + description: + name: async + url: "https://pub.dev" + source: hosted + version: "2.5.0" + boolean_selector: + dependency: transitive + description: + name: boolean_selector + url: "https://pub.dev" + source: hosted + version: "2.1.0" + charcode: + dependency: transitive + description: + name: charcode + url: "https://pub.dev" + source: hosted + version: "1.2.0" + cim_protocol: + dependency: "direct main" + description: + path: "../cim_protocol-0.1.0" + relative: true + source: path + version: "0.1.0" + cli_util: + dependency: transitive + description: + name: cli_util + url: "https://pub.dev" + source: hosted + version: "0.3.0" + collection: + dependency: transitive + description: + name: collection + url: "https://pub.dev" + source: hosted + version: "1.15.0" + convert: + dependency: transitive + description: + name: convert + url: "https://pub.dev" + source: hosted + version: "3.0.0" + coverage: + dependency: transitive + description: + name: coverage + url: "https://pub.dev" + source: hosted + version: "1.0.2" + crypto: + dependency: transitive + description: + name: crypto + url: "https://pub.dev" + source: hosted + version: "3.0.1" + enum_to_string: + dependency: transitive + description: + name: enum_to_string + url: "https://pub.dev" + source: hosted + version: "2.0.1" + file: + dependency: transitive + description: + name: file + url: "https://pub.dev" + source: hosted + version: "6.1.0" + glob: + dependency: transitive + description: + name: glob + url: "https://pub.dev" + source: hosted + version: "2.0.1" + http_multi_server: + dependency: transitive + description: + name: http_multi_server + url: "https://pub.dev" + source: hosted + version: "3.0.1" + http_parser: + dependency: transitive + description: + name: http_parser + url: "https://pub.dev" + source: hosted + version: "4.0.0" + http_server: + dependency: "direct main" + description: + name: http_server + url: "https://pub.dev" + source: hosted + version: "1.0.0" + io: + dependency: transitive + description: + name: io + url: "https://pub.dev" + source: hosted + version: "1.0.0" + js: + dependency: transitive + description: + name: js + url: "https://pub.dev" + source: hosted + version: "0.6.3" + logging: + dependency: transitive + description: + name: logging + url: "https://pub.dev" + source: hosted + version: "1.0.1" + matcher: + dependency: transitive + description: + name: matcher + url: "https://pub.dev" + source: hosted + version: "0.12.10" + meta: + dependency: transitive + description: + name: meta + url: "https://pub.dev" + source: hosted + version: "1.3.0" + mime: + dependency: transitive + description: + name: mime + url: "https://pub.dev" + source: hosted + version: "1.0.0" + mime_type: + dependency: transitive + description: + name: mime_type + url: "https://pub.dev" + source: hosted + version: "1.0.0" + node_preamble: + dependency: transitive + description: + name: node_preamble + url: "https://pub.dev" + source: hosted + version: "2.0.0" + package_config: + dependency: transitive + description: + name: package_config + url: "https://pub.dev" + source: hosted + version: "2.0.0" + path: + dependency: transitive + description: + name: path + url: "https://pub.dev" + source: hosted + version: "1.8.0" + pedantic: + dependency: "direct dev" + description: + name: pedantic + url: "https://pub.dev" + source: hosted + version: "1.11.0" + pool: + dependency: transitive + description: + name: pool + url: "https://pub.dev" + source: hosted + version: "1.5.0" + pub_semver: + dependency: transitive + description: + name: pub_semver + url: "https://pub.dev" + source: hosted + version: "2.0.0" + shelf: + dependency: "direct main" + description: + name: shelf + url: "https://pub.dev" + source: hosted + version: "1.1.0" + shelf_packages_handler: + dependency: transitive + description: + name: shelf_packages_handler + url: "https://pub.dev" + source: hosted + version: "3.0.0" + shelf_static: + dependency: transitive + description: + name: shelf_static + url: "https://pub.dev" + source: hosted + version: "1.0.0" + shelf_web_socket: + dependency: transitive + description: + name: shelf_web_socket + url: "https://pub.dev" + source: hosted + version: "1.0.1" + source_map_stack_trace: + dependency: transitive + description: + name: source_map_stack_trace + url: "https://pub.dev" + source: hosted + version: "2.1.0" + source_maps: + dependency: transitive + description: + name: source_maps + url: "https://pub.dev" + source: hosted + version: "0.10.10" + source_span: + dependency: transitive + description: + name: source_span + url: "https://pub.dev" + source: hosted + version: "1.8.1" + stack_trace: + dependency: transitive + description: + name: stack_trace + url: "https://pub.dev" + source: hosted + version: "1.10.0" + stream_channel: + dependency: transitive + description: + name: stream_channel + url: "https://pub.dev" + source: hosted + version: "2.1.0" + string_scanner: + dependency: transitive + description: + name: string_scanner + url: "https://pub.dev" + source: hosted + version: "1.1.0" + term_glyph: + dependency: transitive + description: + name: term_glyph + url: "https://pub.dev" + source: hosted + version: "1.2.0" + test: + dependency: "direct dev" + description: + name: test + url: "https://pub.dev" + source: hosted + version: "1.16.8" + test_api: + dependency: transitive + description: + name: test_api + url: "https://pub.dev" + source: hosted + version: "0.3.0" + test_core: + dependency: transitive + description: + name: test_core + url: "https://pub.dev" + source: hosted + version: "0.3.19" + typed_data: + dependency: transitive + description: + name: typed_data + url: "https://pub.dev" + source: hosted + version: "1.3.0" + vm_service: + dependency: transitive + description: + name: vm_service + url: "https://pub.dev" + source: hosted + version: "6.2.0" + watcher: + dependency: transitive + description: + name: watcher + url: "https://pub.dev" + source: hosted + version: "1.0.0" + web_socket_channel: + dependency: transitive + description: + name: web_socket_channel + url: "https://pub.dev" + source: hosted + version: "2.0.0" + webkit_inspection_protocol: + dependency: transitive + description: + name: webkit_inspection_protocol + url: "https://pub.dev" + source: hosted + version: "1.0.0" + yaml: + dependency: "direct main" + description: + name: yaml + url: "https://pub.dev" + source: hosted + version: "3.1.0" +sdks: + dart: ">=2.12.0 <3.0.0"