Skip to content

Commit 0b39e3e

Browse files
committed
add trigger gc api
1 parent 24813fa commit 0b39e3e

File tree

6 files changed

+372
-28
lines changed

6 files changed

+372
-28
lines changed

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
class HomeObjectConan(ConanFile):
1212
name = "homeobject"
13-
version = "4.0.6"
13+
version = "4.0.7"
1414

1515
homepage = "https://github.com/eBay/HomeObject"
1616
description = "Blob Store built on HomeStore"

src/lib/homestore_backend/gc_manager.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ void GCManager::start() {
123123
gc_actor->start();
124124
LOGINFOMOD(gcmgr, "start gc actor for pdev={}", pdev_id);
125125
}
126+
start_gc_scan_timer();
127+
}
126128

129+
void GCManager::start_gc_scan_timer() {
127130
const auto gc_scan_interval_sec = HS_BACKEND_DYNAMIC_CONFIG(gc_scan_interval_sec);
128131

129132
// the initial idea here is that we want gc timer to run in a reactor that not shared with other fibers that
@@ -147,9 +150,7 @@ void GCManager::start() {
147150
LOGINFOMOD(gcmgr, "gc scheduler timer has started, interval is set to {} seconds", gc_scan_interval_sec);
148151
}
149152

150-
bool GCManager::is_started() { return m_gc_timer_hdl != iomgr::null_timer_handle; }
151-
152-
void GCManager::stop() {
153+
void GCManager::stop_gc_scan_timer() {
153154
if (m_gc_timer_hdl == iomgr::null_timer_handle) {
154155
LOGWARNMOD(gcmgr, "gc scheduler timer is not running, no need to stop it");
155156
return;
@@ -163,6 +164,10 @@ void GCManager::stop() {
163164
m_gc_timer_hdl = iomgr::null_timer_handle;
164165
});
165166
m_gc_timer_fiber = nullptr;
167+
}
168+
169+
void GCManager::stop() {
170+
stop_gc_scan_timer();
166171

167172
for (const auto& [pdev_id, gc_actor] : m_pdev_gc_actors) {
168173
gc_actor->stop();
@@ -171,8 +176,6 @@ void GCManager::stop() {
171176
}
172177

173178
folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chunk_id_t chunk_id) {
174-
if (!is_started()) return folly::makeFuture< bool >(false);
175-
176179
auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id();
177180
auto it = m_pdev_gc_actors.find(pdev_id);
178181
if (it == m_pdev_gc_actors.end()) {
@@ -337,6 +340,11 @@ void GCManager::pdev_gc_actor::add_reserved_chunk(
337340
}
338341

339342
folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority, chunk_id_t move_from_chunk) {
343+
if (m_is_stopped.load()) {
344+
LOGWARNMOD(gcmgr, "pdev gc actor for pdev_id={} is not started yet or already stopped, cannot add gc task!",
345+
m_pdev_id);
346+
return folly::makeSemiFuture< bool >(false);
347+
}
340348
auto EXvchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk);
341349
// it does not belong to any pg, so we don't need to gc it.
342350
if (!EXvchunk->m_pg_id.has_value()) {

src/lib/homestore_backend/gc_manager.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,11 @@ class GCManager {
307307

308308
void start();
309309
void stop();
310-
bool is_started();
310+
311+
// the following two functions should not be called concurrently. if we need to call them concurrently, we need to
312+
// add lock to protect
313+
void start_gc_scan_timer();
314+
void stop_gc_scan_timer();
311315

312316
void scan_chunks_for_gc();
313317
void drain_pg_pending_gc_task(const pg_id_t pg_id);

src/lib/homestore_backend/hs_http_manager.cpp

Lines changed: 300 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) {
4848
{Pistache::Http::Method::Get, "/api/v1/chunk/dump",
4949
Pistache::Rest::Routes::bind(&HttpManager::dump_chunk, this)},
5050
{Pistache::Http::Method::Get, "/api/v1/shard/dump",
51-
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)}};
51+
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)},
52+
{Pistache::Http::Method::Post, "/api/v1/trigger_gc",
53+
Pistache::Rest::Routes::bind(&HttpManager::trigger_gc, this)},
54+
{Pistache::Http::Method::Get, "/api/v1/gc_job_status",
55+
Pistache::Rest::Routes::bind(&HttpManager::get_gc_job_status, this)}};
5256

5357
auto http_server = ioenvironment.get_http_server();
5458
if (!http_server) {
@@ -239,6 +243,301 @@ void HttpManager::dump_shard(const Pistache::Rest::Request& request, Pistache::H
239243
response.send(Pistache::Http::Code::Ok, j.dump());
240244
}
241245

246+
void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
247+
const auto chunk_id_param = request.query().get("chunk_id");
248+
249+
auto gc_mgr = ho_.gc_manager();
250+
if (!gc_mgr) {
251+
response.send(Pistache::Http::Code::Internal_Server_Error, "GC manager not available");
252+
return;
253+
}
254+
255+
auto chunk_selector = ho_.chunk_selector();
256+
if (!chunk_selector) {
257+
response.send(Pistache::Http::Code::Internal_Server_Error, "Chunk selector not available");
258+
return;
259+
}
260+
261+
std::string job_id = generate_job_id();
262+
nlohmann::json result;
263+
264+
if (!chunk_id_param || chunk_id_param.value().empty()) {
265+
LOGINFO("Received trigger_gc request for all chunks, job_id={}", job_id);
266+
267+
auto job_info = std::make_shared< GCJobInfo >(job_id);
268+
{
269+
std::lock_guard< std::mutex > lock(gc_job_mutex_);
270+
// Check if there is already an active GC job
271+
if (current_gc_job_ &&
272+
(current_gc_job_->status == GCJobStatus::PENDING || current_gc_job_->status == GCJobStatus::RUNNING)) {
273+
result["error"] = "A GC job is already in progress";
274+
result["active_job_id"] = current_gc_job_->job_id;
275+
result["active_job_status"] = (current_gc_job_->status == GCJobStatus::PENDING) ? "pending" : "running";
276+
response.send(Pistache::Http::Code::Conflict, result.dump());
277+
return;
278+
}
279+
280+
// Create new job and set as current (replacing any completed job)
281+
current_gc_job_ = job_info;
282+
}
283+
284+
iomanager.run_on_forget(
285+
iomgr::reactor_regex::random_worker, [this, gc_mgr, chunk_selector, job_id, job_info]() {
286+
job_info->status = GCJobStatus::RUNNING;
287+
job_info->updated_at = std::chrono::system_clock::now();
288+
289+
uint32_t total_chunks = 0;
290+
uint32_t success_count = 0;
291+
uint32_t failed_count = 0;
292+
293+
// 1. Stop the GC scan timer to prevent automatic GC tasks
294+
LOGINFO("GC job {} stopping GC scan timer", job_id);
295+
gc_mgr->stop_gc_scan_timer();
296+
297+
// 2. Get all PG IDs first
298+
std::vector< pg_id_t > pg_ids;
299+
ho_.get_pg_ids(pg_ids);
300+
301+
LOGINFO("GC job {} will process {} PGs", job_id, pg_ids.size());
302+
303+
// 3. Process each PG separately
304+
for (const auto& pg_id : pg_ids) {
305+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
306+
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
307+
308+
// 3.1 Drain pending GC tasks for this PG first
309+
LOGINFO("GC job {} draining pending GC tasks for PG {}", job_id, pg_id);
310+
gc_mgr->drain_pg_pending_gc_task(pg_id);
311+
312+
// 3.2 Get all chunks for this PG
313+
auto pg_sb = hs_pg->pg_sb_.get();
314+
std::vector< homestore::chunk_num_t > pg_chunks(pg_sb->get_chunk_ids(),
315+
pg_sb->get_chunk_ids() + pg_sb->num_chunks);
316+
317+
LOGINFO("GC job {} processing PG {} with {} chunks", job_id, pg_id, pg_chunks.size());
318+
319+
// 3.3 Drain all pending requests and refuse new requests (prevents destroy_pg and other requests)
320+
hs_pg->repl_dev_->quiesce_reqs();
321+
322+
// 3.4 Submit GC task for each eligible chunk
323+
std::vector< folly::SemiFuture< bool > > pg_futures;
324+
for (const auto& chunk_id : pg_chunks) {
325+
total_chunks++;
326+
// Determine priority based on chunk state (INUSE means has open shard)
327+
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
328+
RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id);
329+
auto priority =
330+
chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;
331+
332+
// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
333+
if (priority == task_priority::emergent) { hs_pg->repl_dev_->clear_chunk_req(chunk_id); }
334+
335+
// Submit GC task for this chunk
336+
auto future = gc_mgr->submit_gc_task(priority, chunk_id);
337+
pg_futures.push_back(std::move(future));
338+
LOGDEBUG("GC job {} submitted task for chunk_id={} in PG {} with priority={}", job_id, chunk_id,
339+
pg_id, (priority == task_priority::emergent) ? "emergent" : "normal");
340+
}
341+
342+
// 3.5 Wait for all GC tasks in this PG to complete
343+
folly::collectAllUnsafe(pg_futures)
344+
.thenValue([&success_count, &failed_count](auto&& results) {
345+
for (auto const& ok : results) {
346+
RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data");
347+
if (ok.value()) {
348+
success_count++;
349+
} else {
350+
failed_count++;
351+
}
352+
}
353+
})
354+
.get();
355+
356+
LOGINFO("GC job {} completed PG {}: submitted={}, success={}, failed={}", job_id, pg_id,
357+
pg_futures.size(), success_count, failed_count);
358+
359+
// 3.6 Resume accepting new requests
360+
hs_pg->repl_dev_->resume_accepting_reqs();
361+
LOGINFO("GC job {} resumed accepting requests for PG {}", job_id, pg_id);
362+
}
363+
364+
job_info->total_chunks = total_chunks;
365+
job_info->success_count = success_count;
366+
job_info->failed_count = failed_count;
367+
368+
LOGINFO("GC job {} completed: total={}, success={}, failed={}", job_id, total_chunks, success_count,
369+
failed_count);
370+
371+
// Job is considered successful if all tasks succeeded or no tasks failed
372+
job_info->result = (failed_count == 0);
373+
job_info->status = job_info->result.value() ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
374+
375+
// Restart the GC scan timer
376+
LOGINFO("GC job {} restarting GC scan timer", job_id);
377+
gc_mgr->start_gc_scan_timer();
378+
379+
job_info->updated_at = std::chrono::system_clock::now();
380+
job_info->promise.setValue(job_info->result.value_or(false));
381+
});
382+
383+
result["job_id"] = job_id;
384+
result["message"] = "GC triggered for all eligible chunks";
385+
response.send(Pistache::Http::Code::Accepted, result.dump());
386+
} else {
387+
uint32_t chunk_id = std::stoul(chunk_id_param.value());
388+
LOGINFO("Received trigger_gc request for chunk_id={}, job_id={}", chunk_id, job_id);
389+
390+
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
391+
if (!chunk) {
392+
nlohmann::json error;
393+
error["error"] = "Chunk not found";
394+
response.send(Pistache::Http::Code::Not_Found, error.dump());
395+
return;
396+
}
397+
398+
if (chunk->m_pg_id.has_value()) {
399+
nlohmann::json error;
400+
error["error"] = "Chunk belongs to no pg";
401+
response.send(Pistache::Http::Code::Not_Found, error.dump());
402+
return;
403+
}
404+
405+
const auto pg_id = chunk->m_pg_id.value();
406+
407+
auto pdev_id = chunk->get_pdev_id();
408+
409+
// Check for active job and create new job atomically under the same lock
410+
auto job_info = std::make_shared< GCJobInfo >(job_id, chunk_id, pdev_id);
411+
{
412+
std::lock_guard< std::mutex > lock(gc_job_mutex_);
413+
// Check if there is already an active GC job
414+
if (current_gc_job_ &&
415+
(current_gc_job_->status == GCJobStatus::PENDING || current_gc_job_->status == GCJobStatus::RUNNING)) {
416+
nlohmann::json error;
417+
error["error"] = "A GC job is already in progress";
418+
error["active_job_id"] = current_gc_job_->job_id;
419+
error["active_job_status"] = (current_gc_job_->status == GCJobStatus::PENDING) ? "pending" : "running";
420+
response.send(Pistache::Http::Code::Conflict, error.dump());
421+
return;
422+
}
423+
// Create new job and set as current (replacing any completed job)
424+
current_gc_job_ = job_info;
425+
}
426+
427+
iomanager.run_on_forget(
428+
iomgr::reactor_regex::random_worker, [this, gc_mgr, job_id, chunk_id, chunk_selector, job_info, pg_id]() {
429+
job_info->status = GCJobStatus::RUNNING;
430+
job_info->updated_at = std::chrono::system_clock::now();
431+
432+
// Determine priority based on chunk state (INUSE means has open shard)
433+
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
434+
RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id);
435+
auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;
436+
437+
// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
438+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
439+
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
440+
bool res;
441+
if (priority == task_priority::emergent) {
442+
hs_pg->repl_dev_->quiesce_reqs();
443+
hs_pg->repl_dev_->clear_chunk_req(chunk_id);
444+
res = gc_mgr->submit_gc_task(priority, chunk_id).get();
445+
hs_pg->repl_dev_->resume_accepting_reqs();
446+
} else {
447+
res = gc_mgr->submit_gc_task(priority, chunk_id).get();
448+
}
449+
450+
job_info->result = res;
451+
job_info->status = res ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
452+
job_info->updated_at = std::chrono::system_clock::now();
453+
job_info->promise.setValue(res);
454+
455+
LOGINFO("GC task completed for chunk_id={}, job_id={}, result={}", chunk_id, job_id, res);
456+
});
457+
458+
result["job_id"] = job_id;
459+
result["message"] = "GC triggered for chunk";
460+
result["chunk_id"] = chunk_id;
461+
result["pdev_id"] = pdev_id;
462+
response.send(Pistache::Http::Code::Accepted, result.dump());
463+
}
464+
}
465+
466+
std::string HttpManager::generate_job_id() {
467+
auto counter = job_counter_.fetch_add(1, std::memory_order_relaxed);
468+
auto now = std::chrono::system_clock::now();
469+
auto timestamp = std::chrono::duration_cast< std::chrono::milliseconds >(now.time_since_epoch()).count();
470+
return fmt::format("gc-{}-{}", timestamp, counter);
471+
}
472+
473+
void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
474+
auto job_id_param = request.query().get("job_id");
475+
if (!job_id_param) {
476+
response.send(Pistache::Http::Code::Bad_Request, "job_id is required");
477+
return;
478+
}
479+
480+
std::string job_id = job_id_param.value();
481+
482+
std::shared_ptr< GCJobInfo > job_info;
483+
{
484+
std::lock_guard< std::mutex > lock(gc_job_mutex_);
485+
if (!current_gc_job_ || current_gc_job_->job_id != job_id) {
486+
nlohmann::json error;
487+
error["error"] = "Job not found";
488+
error["job_id"] = job_id;
489+
response.send(Pistache::Http::Code::Not_Found, error.dump());
490+
return;
491+
}
492+
job_info = current_gc_job_;
493+
}
494+
495+
// Access job_info outside the lock
496+
nlohmann::json result;
497+
result["job_id"] = job_info->job_id;
498+
499+
switch (job_info->status) {
500+
case GCJobStatus::PENDING:
501+
result["status"] = "pending";
502+
break;
503+
case GCJobStatus::RUNNING:
504+
result["status"] = "running";
505+
break;
506+
case GCJobStatus::COMPLETED:
507+
result["status"] = "completed";
508+
break;
509+
case GCJobStatus::FAILED:
510+
result["status"] = "failed";
511+
break;
512+
}
513+
514+
auto created_ms =
515+
std::chrono::duration_cast< std::chrono::milliseconds >(job_info->created_at.time_since_epoch()).count();
516+
auto updated_ms =
517+
std::chrono::duration_cast< std::chrono::milliseconds >(job_info->updated_at.time_since_epoch()).count();
518+
result["created_at"] = created_ms;
519+
result["updated_at"] = updated_ms;
520+
521+
// Single chunk GC info
522+
if (job_info->chunk_id.has_value()) {
523+
result["chunk_id"] = job_info->chunk_id.value();
524+
if (job_info->pdev_id.has_value()) { result["pdev_id"] = job_info->pdev_id.value(); }
525+
}
526+
527+
// Batch GC statistics (for all chunks)
528+
if (job_info->total_chunks > 0) {
529+
nlohmann::json stats;
530+
stats["total_chunks"] = job_info->total_chunks;
531+
stats["success_count"] = job_info->success_count;
532+
stats["failed_count"] = job_info->failed_count;
533+
result["statistics"] = stats;
534+
}
535+
536+
if (job_info->result.has_value()) { result["result"] = job_info->result.value(); }
537+
538+
response.send(Pistache::Http::Code::Ok, result.dump());
539+
}
540+
242541
#ifdef _PRERELEASE
243542
void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
244543
std::string crash_type;

0 commit comments

Comments
 (0)