diff --git a/include/dtp/dtp_protocol.h b/include/dtp/dtp_protocol.h index fe824bf..c2a68fd 100644 --- a/include/dtp/dtp_protocol.h +++ b/include/dtp/dtp_protocol.h @@ -13,6 +13,8 @@ extern "C" #include "dtp/dtp.h" #include "dtp/platform.h" + extern const uint32_t DTP_VERSION; + /* CSP connection type forward declaration*/ typedef struct csp_packet_s csp_packet_t; @@ -25,6 +27,7 @@ extern "C" /** Transfer request */ typedef struct { + uint32_t version; /** Protocol version from client */ uint32_t throughput; /** max server throughput in bytes/second */ uint8_t nof_intervals; /** Number of segments to transfer, see the intervals below */ uint8_t payload_id; /** Payload ID, conceptual identifier for the payload to retrieve, semantic is entirely server-specific */ @@ -48,6 +51,7 @@ extern "C" */ typedef struct { + uint32_t version; /** Protocol version from server */ uint32_t size_in_bytes; /** Total size of payload data to be sent during this transfer */ uint32_t total_payload_size; /** Total size of payload, for info (will be >= size_in_bytes) */ } dtp_meta_resp_t; @@ -61,7 +65,7 @@ extern "C" uint32_t last_packet; /** The highest packet number occurring in this transmission */ uint32_t total_duration_ms; /** The expected total duration of this session [ms] */ } dtp_metrics_t; - + extern dtp_server_transfer_ctx_t server_transfer_ctx; extern csp_packet_t *setup_server_transfer(dtp_server_transfer_ctx_t *ctx, uint16_t dst, csp_packet_t *request); diff --git a/src/dtp_client.c b/src/dtp_client.c index e35acc3..17e3680 100644 --- a/src/dtp_client.c +++ b/src/dtp_client.c @@ -24,7 +24,7 @@ dtp_t *dtp_prepare_session(uint32_t server, uint32_t session_id, uint32_t max_th } else { dbg_log("Session created: %p", session); } - + session->request_meta.version = htobe32(DTP_VERSION); dtp_session_set_user_ctx(session, ctx); if (resume) { diff --git a/src/dtp_protocol.c b/src/dtp_protocol.c index c6fcaf9..0a37451 100644 --- a/src/dtp_protocol.c +++ b/src/dtp_protocol.c @@ -9,6 +9,8 @@ #include "dtp/dtp_protocol.h" #include "dtp/dtp_session.h" +const uint32_t DTP_VERSION = 1; + dtp_server_transfer_ctx_t server_transfer_ctx; dtp_result send_remote_meta_req(dtp_t *session) @@ -36,8 +38,17 @@ dtp_result read_remote_meta_resp(dtp_t *session) { res = DTP_OK; dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)packet->data; - session->payload_size = meta_resp->total_payload_size; - dbg_log("Setting session total bytes to %u", meta_resp->size_in_bytes); + uint32_t req_version = be32toh(meta_resp->version); + if (req_version != DTP_VERSION) { + dbg_warn("Incompatible DTP version received: %"PRIu32", expected: %"PRIu32"", req_version, DTP_VERSION); + session->dtp_errno = DTP_EINVAL; + res = DTP_ERR; + csp_buffer_free(packet); + return res; + } + + session->payload_size = be32toh(meta_resp->total_payload_size); + dbg_log("Setting session total bytes to %u", be32toh(meta_resp->size_in_bytes)); csp_buffer_free(packet); } else { session->dtp_errno = DTP_CONNECTION_FAILED; diff --git a/src/dtp_server.c b/src/dtp_server.c index 2361149..76ed2aa 100644 --- a/src/dtp_server.c +++ b/src/dtp_server.c @@ -36,6 +36,14 @@ static void dtp_server_run(bool *exit_server) } dbg_log("Got meta data request"); packet = setup_server_transfer(&server_transfer_ctx, csp_conn_src(conn), packet); + dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)packet->data; + if(meta_resp->size_in_bytes == 0) { + /* Something went wrong, just send the response back */ + csp_send(conn, packet); + csp_close(conn); + dbg_log("Transfer done"); + continue; + } server_transfer_ctx.keep_running = true; if(packet) { csp_send(conn, packet); @@ -256,28 +264,35 @@ extern dtp_result start_sending_data(dtp_server_transfer_ctx_t *ctx) } csp_packet_t *setup_server_transfer(dtp_server_transfer_ctx_t *ctx, uint16_t dst, csp_packet_t *request) { - csp_packet_t *result = 0; - + csp_packet_t *result = request; memcpy(&(ctx->request), (dtp_meta_req_t *)request->data, sizeof(dtp_meta_req_t)); + dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)result->data; + + uint32_t req_version = be32toh(ctx->request.version); + if (req_version != DTP_VERSION) { + dbg_warn("Incompatible DTP version received: %"PRIu32", expected: %"PRIu32"", req_version, DTP_VERSION); + result->length = sizeof(dtp_meta_req_t); + meta_resp->version = htobe32(DTP_VERSION); + meta_resp->size_in_bytes = 0; + meta_resp->total_payload_size = 0; + return result; + } ctx->request.keep_alive_interval = be32toh(ctx->request.keep_alive_interval); ctx->destination = dst; /* Get the payload information */ if (false == get_payload_meta(&ctx->payload_meta, ctx->request.payload_id)) { + meta_resp->size_in_bytes = 0; + meta_resp->total_payload_size = 0; return result; } ctx->size_in_bytes = compute_transfer_size(ctx); - csp_buffer_free(request); - result = csp_buffer_get(0); - if (result) { - /* prepare the response */ - result->length = sizeof(dtp_meta_req_t); - dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)result->data; - meta_resp->total_payload_size = ctx->payload_meta.size; - meta_resp->size_in_bytes = ctx->size_in_bytes; - } + /* prepare the response */ + result->length = sizeof(dtp_meta_req_t); + meta_resp->total_payload_size = htobe32(ctx->payload_meta.size); + meta_resp->size_in_bytes = htobe32(ctx->size_in_bytes); return result; } diff --git a/src/dtp_vmem_client.c b/src/dtp_vmem_client.c index 004acef..c74233d 100644 --- a/src/dtp_vmem_client.c +++ b/src/dtp_vmem_client.c @@ -39,6 +39,7 @@ int vmem_request_dtp_start_download(dtp_t *session, int node, uint32_t session_i request->size = htobe32(size); /* DTP specifics */ + request->meta.version = htobe32(session->request_meta.version); /* DTP version */ request->meta.throughput = htobe32(session->request_meta.throughput); /* Throughput in bytes/second */ request->meta.mtu = htobe16(session->request_meta.mtu); /* MTU size (size of the *useful* payload DTP will use to split the payload) in BYTES */ request->meta.session_id = htobe32(session_id); /* The session ID, representing this particular transfer */ @@ -57,9 +58,20 @@ int vmem_request_dtp_start_download(dtp_t *session, int node, uint32_t session_i /* Send the request */ csp_send(conn, packet); + packet = csp_read(conn, timeout); /* Close connection */ csp_close(conn); + if (packet) { + /* Check the response */ + dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)packet->data; + if(meta_resp->version != htobe32(DTP_VERSION)) { + printf("Incompatible DTP version received: %"PRIu32", expected: %"PRIu32"\n", be32toh(meta_resp->version), DTP_VERSION); + csp_buffer_free(packet); + return -1; + } + } + return 0; } diff --git a/src/dtp_vmem_server.c b/src/dtp_vmem_server.c index 754aba4..ff1adbe 100644 --- a/src/dtp_vmem_server.c +++ b/src/dtp_vmem_server.c @@ -37,9 +37,28 @@ static int vmem_dtp_request_handler(csp_conn_t *conn, csp_packet_t *packet, void dtp_start_req_t *request = (dtp_start_req_t *)&vmem_request->body[0]; uint32_t chunk_size = be16toh(request->meta.mtu) - (2 * sizeof(uint32_t)); + uint32_t request_dtp_version = be32toh(request->meta.version); printf("Received DTP VMEM start transfer request.\n"); + printf("\tDTP version: %"PRIu32"\n", request_dtp_version); printf("\tSession ID: %"PRIu32"\n", be32toh(request->meta.session_id)); printf("\tMTU: %"PRIu16"\n", be16toh(request->meta.mtu)); + if (request_dtp_version != DTP_VERSION) { + printf("Incompatible DTP version requested: %"PRIu32", expected: %"PRIu32"\n", request_dtp_version, DTP_VERSION); + csp_packet_t *result = csp_buffer_get(0); + if (result == NULL) { + return -1; + } + dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)result->data; + result->length = sizeof(dtp_meta_req_t); + meta_resp->version = htobe32(DTP_VERSION); + meta_resp->size_in_bytes = 0; + meta_resp->total_payload_size = 0; + csp_send(conn, result); + break; + } + printf("\tThroughput: %"PRIu32" bytes/second\n", be32toh(request->meta.throughput)); + msg.meta.version = request_dtp_version; + msg.meta.mtu = be16toh(request->meta.mtu); msg.meta.throughput = be32toh(request->meta.throughput); msg.meta.session_id = be32toh(request->meta.session_id);