Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/dtp/dtp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ extern "C"
#include "dtp/dtp.h"
#include "dtp/platform.h"

extern const uint32_t DTP_VERSION;

/* CSP connection type forward declaration*/
typedef struct csp_packet_s csp_packet_t;

Expand All @@ -25,6 +27,7 @@ extern "C"
/** Transfer request */
typedef struct
{
uint32_t version; /** Protocol version from client */
uint32_t throughput; /** max server throughput in bytes/second */
uint8_t nof_intervals; /** Number of segments to transfer, see the intervals below */
uint8_t payload_id; /** Payload ID, conceptual identifier for the payload to retrieve, semantic is entirely server-specific */
Expand All @@ -48,6 +51,7 @@ extern "C"
*/
typedef struct
{
uint32_t version; /** Protocol version from server */
uint32_t size_in_bytes; /** Total size of payload data to be sent during this transfer */
uint32_t total_payload_size; /** Total size of payload, for info (will be >= size_in_bytes) */
} dtp_meta_resp_t;
Expand All @@ -61,7 +65,7 @@ extern "C"
uint32_t last_packet; /** The highest packet number occurring in this transmission */
uint32_t total_duration_ms; /** The expected total duration of this session [ms] */
} dtp_metrics_t;

extern dtp_server_transfer_ctx_t server_transfer_ctx;

extern csp_packet_t *setup_server_transfer(dtp_server_transfer_ctx_t *ctx, uint16_t dst, csp_packet_t *request);
Expand Down
2 changes: 1 addition & 1 deletion src/dtp_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dtp_t *dtp_prepare_session(uint32_t server, uint32_t session_id, uint32_t max_th
} else {
dbg_log("Session created: %p", session);
}

session->request_meta.version = htobe32(DTP_VERSION);
dtp_session_set_user_ctx(session, ctx);

if (resume) {
Expand Down
15 changes: 13 additions & 2 deletions src/dtp_protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "dtp/dtp_protocol.h"
#include "dtp/dtp_session.h"

const uint32_t DTP_VERSION = 1;

dtp_server_transfer_ctx_t server_transfer_ctx;

dtp_result send_remote_meta_req(dtp_t *session)
Expand Down Expand Up @@ -36,8 +38,17 @@ dtp_result read_remote_meta_resp(dtp_t *session)
{
res = DTP_OK;
dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)packet->data;
session->payload_size = meta_resp->total_payload_size;
dbg_log("Setting session total bytes to %u", meta_resp->size_in_bytes);
uint32_t req_version = be32toh(meta_resp->version);
if (req_version != DTP_VERSION) {
dbg_warn("Incompatible DTP version received: %"PRIu32", expected: %"PRIu32"", req_version, DTP_VERSION);
session->dtp_errno = DTP_EINVAL;
res = DTP_ERR;
csp_buffer_free(packet);
return res;
}

session->payload_size = be32toh(meta_resp->total_payload_size);
dbg_log("Setting session total bytes to %u", be32toh(meta_resp->size_in_bytes));
csp_buffer_free(packet);
} else {
session->dtp_errno = DTP_CONNECTION_FAILED;
Expand Down
37 changes: 26 additions & 11 deletions src/dtp_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ static void dtp_server_run(bool *exit_server)
}
dbg_log("Got meta data request");
packet = setup_server_transfer(&server_transfer_ctx, csp_conn_src(conn), packet);
dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)packet->data;
if(meta_resp->size_in_bytes == 0) {
/* Something went wrong, just send the response back */
csp_send(conn, packet);
csp_close(conn);
dbg_log("Transfer done");
continue;
}
server_transfer_ctx.keep_running = true;
if(packet) {
csp_send(conn, packet);
Expand Down Expand Up @@ -256,28 +264,35 @@ extern dtp_result start_sending_data(dtp_server_transfer_ctx_t *ctx)
}

csp_packet_t *setup_server_transfer(dtp_server_transfer_ctx_t *ctx, uint16_t dst, csp_packet_t *request) {
csp_packet_t *result = 0;

csp_packet_t *result = request;
memcpy(&(ctx->request), (dtp_meta_req_t *)request->data, sizeof(dtp_meta_req_t));
dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)result->data;

uint32_t req_version = be32toh(ctx->request.version);
if (req_version != DTP_VERSION) {
dbg_warn("Incompatible DTP version received: %"PRIu32", expected: %"PRIu32"", req_version, DTP_VERSION);
result->length = sizeof(dtp_meta_req_t);
meta_resp->version = htobe32(DTP_VERSION);
meta_resp->size_in_bytes = 0;
meta_resp->total_payload_size = 0;
return result;
}
ctx->request.keep_alive_interval = be32toh(ctx->request.keep_alive_interval);
ctx->destination = dst;

/* Get the payload information */
if (false == get_payload_meta(&ctx->payload_meta, ctx->request.payload_id)) {
meta_resp->size_in_bytes = 0;
meta_resp->total_payload_size = 0;
return result;
}

ctx->size_in_bytes = compute_transfer_size(ctx);

csp_buffer_free(request);
result = csp_buffer_get(0);
if (result) {
/* prepare the response */
result->length = sizeof(dtp_meta_req_t);
dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)result->data;
meta_resp->total_payload_size = ctx->payload_meta.size;
meta_resp->size_in_bytes = ctx->size_in_bytes;
}
/* prepare the response */
result->length = sizeof(dtp_meta_req_t);
meta_resp->total_payload_size = htobe32(ctx->payload_meta.size);
meta_resp->size_in_bytes = htobe32(ctx->size_in_bytes);
return result;
}

Expand Down
12 changes: 12 additions & 0 deletions src/dtp_vmem_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ int vmem_request_dtp_start_download(dtp_t *session, int node, uint32_t session_i
request->size = htobe32(size);

/* DTP specifics */
request->meta.version = htobe32(session->request_meta.version); /* DTP version */
request->meta.throughput = htobe32(session->request_meta.throughput); /* Throughput in bytes/second */
request->meta.mtu = htobe16(session->request_meta.mtu); /* MTU size (size of the *useful* payload DTP will use to split the payload) in BYTES */
request->meta.session_id = htobe32(session_id); /* The session ID, representing this particular transfer */
Expand All @@ -57,9 +58,20 @@ int vmem_request_dtp_start_download(dtp_t *session, int node, uint32_t session_i
/* Send the request */
csp_send(conn, packet);

packet = csp_read(conn, timeout);
/* Close connection */
csp_close(conn);

if (packet) {
/* Check the response */
dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)packet->data;
if(meta_resp->version != htobe32(DTP_VERSION)) {
printf("Incompatible DTP version received: %"PRIu32", expected: %"PRIu32"\n", be32toh(meta_resp->version), DTP_VERSION);
csp_buffer_free(packet);
return -1;
}
}

return 0;

}
Expand Down
19 changes: 19 additions & 0 deletions src/dtp_vmem_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,28 @@ static int vmem_dtp_request_handler(csp_conn_t *conn, csp_packet_t *packet, void
dtp_start_req_t *request = (dtp_start_req_t *)&vmem_request->body[0];
uint32_t chunk_size = be16toh(request->meta.mtu) - (2 * sizeof(uint32_t));

uint32_t request_dtp_version = be32toh(request->meta.version);
printf("Received DTP VMEM start transfer request.\n");
printf("\tDTP version: %"PRIu32"\n", request_dtp_version);
printf("\tSession ID: %"PRIu32"\n", be32toh(request->meta.session_id));
printf("\tMTU: %"PRIu16"\n", be16toh(request->meta.mtu));
if (request_dtp_version != DTP_VERSION) {
printf("Incompatible DTP version requested: %"PRIu32", expected: %"PRIu32"\n", request_dtp_version, DTP_VERSION);
csp_packet_t *result = csp_buffer_get(0);
if (result == NULL) {
return -1;
}
dtp_meta_resp_t *meta_resp = (dtp_meta_resp_t *)result->data;
result->length = sizeof(dtp_meta_req_t);
meta_resp->version = htobe32(DTP_VERSION);
meta_resp->size_in_bytes = 0;
meta_resp->total_payload_size = 0;
csp_send(conn, result);
break;
}
printf("\tThroughput: %"PRIu32" bytes/second\n", be32toh(request->meta.throughput));
msg.meta.version = request_dtp_version;

msg.meta.mtu = be16toh(request->meta.mtu);
msg.meta.throughput = be32toh(request->meta.throughput);
msg.meta.session_id = be32toh(request->meta.session_id);
Expand Down