From f1147e00d12bb7dcab8c3bab11e68742ac624114 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Thu, 29 Aug 2024 22:11:39 +0000 Subject: [PATCH 1/2] Make async read and write not to wait all other IO completions --- src/platform_linux/laio.c | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index 21072ffd..748d4647 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -441,7 +441,7 @@ laio_read_async(io_handle *ioh, -status, strerror(-status)); } - io_cleanup(ioh, 0); + laio_cleanup(ioh, 1); } while (status != 1); return STATUS_OK; @@ -484,7 +484,7 @@ laio_write_async(io_handle *ioh, -status, strerror(-status)); } - io_cleanup(ioh, 0); + laio_cleanup(ioh, 1); } while (status != 1); return STATUS_OK; @@ -498,21 +498,21 @@ laio_write_async(io_handle *ioh, static void laio_cleanup(io_handle *ioh, uint64 count) { - laio_handle *io = (laio_handle *)ioh; - struct io_event event = {0}; - uint64 i; - int status; - - threadid tid = platform_get_tid(); + laio_handle *io = (laio_handle *)ioh; + threadid tid = platform_get_tid(); platform_assert(tid < MAX_THREADS, "Invalid tid=%lu", tid); platform_assert( io->ctx_idx[tid] < MAX_THREADS, "Invalid ctx_idx=%lu", io->ctx_idx[tid]); io_process_context *pctx = &io->ctx[io->ctx_idx[tid]]; + struct io_event *events; + events = TYPED_ARRAY_ZALLOC(io->heap_id, events, pctx->io_count); + uint64 i, j; + int status; // Check for completion of up to 'count' events, one event at a time. // Or, check for all outstanding events (count == 0) for (i = 0; (count == 0 || i < count) && 0 < pctx->io_count; i++) { - status = io_getevents(pctx->ctx, 0, 1, &event, NULL); + status = io_getevents(pctx->ctx, 0, pctx->io_count, events, NULL); if (status < 0) { platform_error_log("%s(): OS-pid=%d, tid=%lu, io_getevents[%lu], " "count=%lu, io_count=%lu," @@ -525,17 +525,25 @@ laio_cleanup(io_handle *ioh, uint64 count) pctx->io_count, -status, strerror(-status)); - } - if (status <= 0) { i--; continue; + } else if (status == 0) { + if (count == 0) { + continue; + } else { + break; + } } - __sync_fetch_and_sub(&pctx->io_count, 1); + __sync_fetch_and_sub(&pctx->io_count, status); - // Invoke the callback for the one event that completed. - laio_callback(pctx->ctx, event.obj, event.res, 0); + // Invoke the callback for the events that completed. + for (j = 0; j < status; j++) { + laio_callback(pctx->ctx, events[j].obj, events[j].res, 0); + } } + + platform_free(0, events); } /* From 05e2e2fabdcd0bd32640326f651c804e138df280 Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Thu, 14 Nov 2024 22:40:55 -0800 Subject: [PATCH 2/2] fix races in laio_cleanup --- src/platform_linux/laio.c | 46 +++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index 748d4647..29a049da 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -504,15 +504,36 @@ laio_cleanup(io_handle *ioh, uint64 count) platform_assert( io->ctx_idx[tid] < MAX_THREADS, "Invalid ctx_idx=%lu", io->ctx_idx[tid]); io_process_context *pctx = &io->ctx[io->ctx_idx[tid]]; - struct io_event *events; - events = TYPED_ARRAY_ZALLOC(io->heap_id, events, pctx->io_count); + + + // pctx->io_count may get changed by other threads, so remember it now and + // use it whenever we want to specify the size of the events array. + const uint64 io_count = pctx->io_count; + if (io_count == 0) { + return; + } + struct io_event *events; + events = TYPED_ARRAY_ZALLOC(io->heap_id, events, io_count); + if (events == NULL) { + platform_error_log("%s(): OS-pid=%d, tid=%lu, failed to allocate memory" + " for io_event array of size=%lu\n", + __func__, + platform_getpid(), + tid, + io_count); + return; + } + uint64 i, j; int status; - // Check for completion of up to 'count' events, one event at a time. + // Check for completion of up to 'count' events. // Or, check for all outstanding events (count == 0) + uint64 failure_count = 0; for (i = 0; (count == 0 || i < count) && 0 < pctx->io_count; i++) { - status = io_getevents(pctx->ctx, 0, pctx->io_count, events, NULL); + // Use io_count instead of pct->io_count, as pctx->io_count may have been + // changed by other threads. + status = io_getevents(pctx->ctx, 0, io_count, events, NULL); if (status < 0) { platform_error_log("%s(): OS-pid=%d, tid=%lu, io_getevents[%lu], " "count=%lu, io_count=%lu," @@ -525,15 +546,18 @@ laio_cleanup(io_handle *ioh, uint64 count) pctx->io_count, -status, strerror(-status)); - i--; - continue; - } else if (status == 0) { - if (count == 0) { - continue; - } else { + + failure_count++; + if (10 < failure_count) { + platform_error_log( + "Too many failures in a row, aborting laio_cleanup\n"); break; } + + i--; + continue; } + failure_count = 0; __sync_fetch_and_sub(&pctx->io_count, status); @@ -543,7 +567,7 @@ laio_cleanup(io_handle *ioh, uint64 count) } } - platform_free(0, events); + platform_free(io->heap_id, events); } /*