Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions src/platform_linux/laio.c
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ laio_read_async(io_handle *ioh,
-status,
strerror(-status));
}
io_cleanup(ioh, 0);
laio_cleanup(ioh, 1);
} while (status != 1);

return STATUS_OK;
Expand Down Expand Up @@ -484,7 +484,7 @@ laio_write_async(io_handle *ioh,
-status,
strerror(-status));
}
io_cleanup(ioh, 0);
laio_cleanup(ioh, 1);
} while (status != 1);

return STATUS_OK;
Expand All @@ -498,21 +498,42 @@ laio_write_async(io_handle *ioh,
static void
laio_cleanup(io_handle *ioh, uint64 count)
{
laio_handle *io = (laio_handle *)ioh;
struct io_event event = {0};
uint64 i;
int status;

threadid tid = platform_get_tid();
laio_handle *io = (laio_handle *)ioh;
threadid tid = platform_get_tid();
platform_assert(tid < MAX_THREADS, "Invalid tid=%lu", tid);
platform_assert(
io->ctx_idx[tid] < MAX_THREADS, "Invalid ctx_idx=%lu", io->ctx_idx[tid]);
io_process_context *pctx = &io->ctx[io->ctx_idx[tid]];

// Check for completion of up to 'count' events, one event at a time.

// pctx->io_count may get changed by other threads, so remember it now and
// use it whenever we want to specify the size of the events array.
const uint64 io_count = pctx->io_count;
if (io_count == 0) {
return;
}
struct io_event *events;
events = TYPED_ARRAY_ZALLOC(io->heap_id, events, io_count);
if (events == NULL) {
platform_error_log("%s(): OS-pid=%d, tid=%lu, failed to allocate memory"
" for io_event array of size=%lu\n",
__func__,
platform_getpid(),
tid,
io_count);
return;
}

uint64 i, j;
int status;

// Check for completion of up to 'count' events.
// Or, check for all outstanding events (count == 0)
uint64 failure_count = 0;
for (i = 0; (count == 0 || i < count) && 0 < pctx->io_count; i++) {
status = io_getevents(pctx->ctx, 0, 1, &event, NULL);
// Use io_count instead of pct->io_count, as pctx->io_count may have been
// changed by other threads.
status = io_getevents(pctx->ctx, 0, io_count, events, NULL);
if (status < 0) {
platform_error_log("%s(): OS-pid=%d, tid=%lu, io_getevents[%lu], "
"count=%lu, io_count=%lu,"
Expand All @@ -525,17 +546,28 @@ laio_cleanup(io_handle *ioh, uint64 count)
pctx->io_count,
-status,
strerror(-status));
}
if (status <= 0) {

failure_count++;
if (10 < failure_count) {
platform_error_log(
"Too many failures in a row, aborting laio_cleanup\n");
break;
}

i--;
continue;
}
failure_count = 0;

__sync_fetch_and_sub(&pctx->io_count, 1);
__sync_fetch_and_sub(&pctx->io_count, status);

// Invoke the callback for the one event that completed.
laio_callback(pctx->ctx, event.obj, event.res, 0);
// Invoke the callback for the events that completed.
for (j = 0; j < status; j++) {
laio_callback(pctx->ctx, events[j].obj, events[j].res, 0);
}
}

platform_free(io->heap_id, events);
}

/*
Expand Down