diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp index 9420f650c..285746011 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp @@ -89,6 +89,18 @@ const std::string& Task::id() const return _id; } +//============================================================================== +const rmf_task_msgs::msg::TaskProfile Task::task_profile_msg() const +{ + return _task_profile_msg; +} + +//============================================================================== +void Task::task_profile_msg(const rmf_task_msgs::msg::TaskProfile& profile) +{ + _task_profile_msg = profile; +} + //============================================================================== const rmf_task::ConstRequestPtr Task::request() const { diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp index d0850e042..177b401a7 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp @@ -40,6 +40,7 @@ class Task : public std::enable_shared_from_this public: using StatusMsg = rmf_task_msgs::msg::TaskSummary; + using TaskProfile = rmf_task_msgs::msg::TaskProfile; /// This class represents the active phase of a Task. It provides an /// observable that the Task can track to stay up-to-date on the status and to @@ -124,6 +125,12 @@ class Task : public std::enable_shared_from_this const std::string& id() const; + /// Get the profile msg, for publish status msg + const TaskProfile task_profile_msg() const; + + /// Set the profile msg, for publish status msg + void task_profile_msg(const TaskProfile& profile); + /// Get the request used to generate this task const rmf_task::ConstRequestPtr request() const; @@ -162,6 +169,7 @@ class Task : public std::enable_shared_from_this rmf_traffic::Time _deployment_time; rmf_task::agv::State _finish_state; rmf_task::ConstRequestPtr _request; + TaskProfile _task_profile_msg; void _start_next_phase(); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 8744b750d..7a86674b3 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -35,6 +35,25 @@ namespace rmf_fleet_adapter { +//============================================================================== +void populate_task_summary( + rmf_task_msgs::msg::TaskSummary& msg, + const agv::RobotContextPtr context, + const std::shared_ptr task = nullptr) +{ + msg.robot_name = context->name(); + msg.fleet_name = context->description().owner(); + if (task != nullptr) + { + msg.task_id = task->id(); // duplicated + msg.task_profile = task->task_profile_msg(); + msg.start_time = rmf_traffic_ros2::convert( + task->deployment_time()); + msg.end_time = rmf_traffic_ros2::convert( + task->finish_state().finish_time()); + } +} + //============================================================================== TaskManagerPtr TaskManager::make(agv::RobotContextPtr context) { @@ -84,11 +103,11 @@ TaskManager::TaskManager(agv::RobotContextPtr context) } //============================================================================== -void TaskManager::queue_task(std::shared_ptr task, Start expected_finish) +void TaskManager::_queue_task(std::shared_ptr task) { - std::lock_guard guard(_mutex); - _queue.push_back(std::move(task)); - _expected_finish_location = std::move(expected_finish); + { + std::lock_guard guard(_mutex); + _queue.push_back(std::move(task)); RCLCPP_INFO( _context->node()->get_logger(), @@ -96,28 +115,14 @@ void TaskManager::queue_task(std::shared_ptr task, Start expected_finish) _queue.back()->id().c_str(), _context->requester_id().c_str(), _queue.size()); - if (!_active_task) - { - _begin_next_task(); - } - else - { rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = _queue.back()->id(); - msg.task_profile.task_id = _queue.back()->id(); + populate_task_summary(msg, _context, _queue.back()); msg.state = msg.STATE_QUEUED; - msg.robot_name = _context->name(); this->_context->node()->task_summary()->publish(msg); } -} - -//============================================================================== -auto TaskManager::expected_finish_location() const -> StartSet -{ - if (_expected_finish_location) - return {*_expected_finish_location}; - return _context->location(); + if (!_active_task) + _begin_next_task(); } //============================================================================== @@ -133,10 +138,10 @@ auto TaskManager::expected_finish_state() const -> State auto location = finish_state.location(); location.time(rmf_traffic_ros2::convert(_context->node()->now())); finish_state.location(location); - + const double current_battery_soc = _context->current_battery_soc(); finish_state.battery_soc(current_battery_soc); - + return finish_state; } @@ -159,113 +164,103 @@ agv::ConstRobotContextPtr TaskManager::context() const } //============================================================================== -void TaskManager::set_queue( - const std::vector& assignments) +const std::vector> TaskManager::task_queue() const +{ + return _queue; +} + +//============================================================================== +void TaskManager::_set_queue( + const std::vector>& tasks) { // We indent this block as _mutex is also locked in the _begin_next_task() // function that is called at the end of this function. { std::lock_guard guard(_mutex); _queue.clear(); + } - // We use dynamic cast to determine the type of request and then call the - // appropriate make(~) function to convert the request into a task - for (std::size_t i = 0; i < assignments.size(); ++i) - { - const auto& a = assignments[i]; - auto start = _context->current_task_end_state().location(); - if (i != 0) - start = assignments[i-1].state().location(); - start.time(a.deployment_time()); - rmf_task_msgs::msg::TaskType task_type_msg; - const auto request = a.request(); - if (std::dynamic_pointer_cast< - const rmf_task::requests::Clean::Description>(request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_CLEAN; - auto task = rmf_fleet_adapter::tasks::make_clean( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } - - else if (std::dynamic_pointer_cast< - const rmf_task::requests::ChargeBattery::Description>( - request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_CHARGE_BATTERY; - const auto task = tasks::make_charge_battery( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } - - else if (std::dynamic_pointer_cast< - const rmf_task::requests::Delivery::Description>( - request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_DELIVERY; - const auto task = tasks::make_delivery( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } - - else if (std::dynamic_pointer_cast< - const rmf_task::requests::Loop::Description>(request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_LOOP; - const auto task = tasks::make_loop( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } + for (const auto t : tasks) + _queue_task(t); +} - else - { - RCLCPP_WARN( - _context->node()->get_logger(), - "[TaskManager] Un-supported request type in assignment list. " - "Please update the implementation of TaskManager::set_queue() to " - "support request with task_id:[%s]", - a.request()->id().c_str()); +//============================================================================== +void TaskManager::set_queue( + const std::vector& assignments, + const std::unordered_map& task_profiles) +{ + std::vector> task_queue; + std::shared_ptr task = nullptr; + for (std::size_t i = 0; i < assignments.size(); ++i) + { + const auto& a = assignments[i]; + auto start = _context->current_task_end_state().location(); + if (i != 0) + start = assignments[i-1].state().location(); + start.time(a.deployment_time()); + + const auto req = a.request(); + const auto id = req->id(); + const auto req_desc = req->description(); + + rmf_task_msgs::msg::TaskProfile profile; + if (task_profiles.find(id) != task_profiles.end()) + { + profile = task_profiles.at(id); + } + else + { + // This is likely a self-generated task + profile.task_id = id; + profile.submission_time = _context->node()->now(); + profile.description.start_time = + rmf_traffic_ros2::convert(a.deployment_time()); + } - continue; - } + // We use dynamic cast to determine the type of request and then call the + // appropriate make(~) function to convert the request into a task + using namespace rmf_task::requests; + using TaskType = rmf_task_msgs::msg::TaskType; - // publish queued task - rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = _queue.back()->id(); - msg.task_profile.task_id = _queue.back()->id(); - msg.state = msg.STATE_QUEUED; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.task_profile.description.task_type = task_type_msg; - msg.start_time = rmf_traffic_ros2::convert( - _queue.back()->deployment_time()); - msg.start_time = rmf_traffic_ros2::convert( - _queue.back()->finish_state().finish_time()); - this->_context->node()->task_summary()->publish(msg); + /// CHARGE BATTERY TASK (self-Generated) + if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_charge_battery( + req, _context, start, a.deployment_time(), a.state()); + profile.description.task_type.type = TaskType::TYPE_CHARGE_BATTERY; + } + /// CLEAN TASK (User Request) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_clean( + req, _context, start, a.deployment_time(), a.state()); + } + /// DELIVERY TASK (User Request) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_delivery( + req, _context, start, a.deployment_time(), a.state()); + } + /// LOOP TASK (User Request) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_loop( + req, _context, start, a.deployment_time(), a.state()); + } + else + { + RCLCPP_WARN( + _context->node()->get_logger(), + "[TaskManager] Un-supported request type in assignment list. " + "Please update the implementation of TaskManager::set_queue() to " + "support request with task_id:[%s]", id.c_str()); + continue; } - } - _begin_next_task(); + task->task_profile_msg(profile); + task_queue.push_back(task); + } + this->_set_queue(task_queue); } //============================================================================== @@ -337,19 +332,13 @@ void TaskManager::_begin_next_task() .subscribe( [this, id = _active_task->id()](Task::StatusMsg msg) { - msg.task_id = id; - msg.task_profile.task_id = id; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - _active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _active_task->finish_state().finish_time()); + populate_task_summary(msg, _context, _active_task); _context->node()->task_summary()->publish(msg); }, [this, id = _active_task->id()](std::exception_ptr e) { rmf_task_msgs::msg::TaskSummary msg; + populate_task_summary(msg, _context, _active_task); msg.state = msg.STATE_FAILED; try { @@ -359,30 +348,14 @@ void TaskManager::_begin_next_task() msg.status = e.what(); } - msg.task_id = id; - msg.task_profile.task_id = id; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - _active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _active_task->finish_state().finish_time()); _context->node()->task_summary()->publish(msg); }, [this, id = _active_task->id()]() { rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = id; - msg.task_profile.task_id = id; + populate_task_summary(msg, _context, _active_task); msg.state = msg.STATE_COMPLETED; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - _active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _active_task->finish_state().finish_time()); - this->_context->node()->task_summary()->publish(msg); - + this->_context->node()->task_summary()->publish(msg); _active_task = nullptr; }); @@ -409,9 +382,8 @@ void TaskManager::_begin_waiting() .subscribe( [this](Task::StatusMsg msg) { + populate_task_summary(msg, _context); msg.task_id = _context->requester_id() + ":waiting"; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); // TODO: Fill in end_time with the beginning time of the next task if // the next task is known. msg.start_time = _context->node()->now(); @@ -422,6 +394,7 @@ void TaskManager::_begin_waiting() [this](std::exception_ptr e) { rmf_task_msgs::msg::TaskSummary msg; + populate_task_summary(msg, _context); msg.state = msg.STATE_FAILED; try { @@ -432,13 +405,8 @@ void TaskManager::_begin_waiting() } msg.task_id = _context->requester_id() + ":waiting"; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - _active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _active_task->finish_state().finish_time()); _context->node()->task_summary()->publish(msg); + // TODO: empty id string? RCLCPP_WARN( _context->node()->get_logger(), diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 0c0f31a0a..d747b6b09 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -45,13 +45,7 @@ class TaskManager : public std::enable_shared_from_this using StartSet = rmf_traffic::agv::Plan::StartSet; using Assignment = rmf_task::agv::TaskPlanner::Assignment; using State = rmf_task::agv::State; - - /// Add a task to the queue of this manager. - void queue_task(std::shared_ptr task, Start expected_finish); - - /// The location where we expect this robot to be at the end of its current - /// task queue. - StartSet expected_finish_location() const; + using TaskProfile = rmf_task_msgs::msg::TaskProfile; const agv::RobotContextPtr& context(); @@ -60,8 +54,13 @@ class TaskManager : public std::enable_shared_from_this const Task* current_task() const; /// Set the queue for this task manager with assignments generated from the - /// task planner - void set_queue(const std::vector& assignments); + /// task planner, arg 'task_profiles' is merely used for status publishing. + void set_queue( + const std::vector& assignments, + const std::unordered_map& task_profiles = {}); + + // get tasks in the queue + const std::vector> task_queue() const; /// Get the non-charging requests among pending tasks const std::vector requests() const; @@ -84,7 +83,6 @@ class TaskManager : public std::enable_shared_from_this agv::RobotContextPtr _context; std::shared_ptr _active_task; std::vector> _queue; - rmf_utils::optional _expected_finish_location; rxcpp::subscription _task_sub; rxcpp::subscription _emergency_sub; @@ -104,6 +102,12 @@ class TaskManager : public std::enable_shared_from_this // Use the _register_executed_task() to populate this container. std::vector _executed_task_registry; + /// set a vector of tasks + void _set_queue(const std::vector>& tasks); + + /// Add a task to the queue of this manager. + void _queue_task(std::shared_ptr task); + /// Callback for task timer which begins next task if its deployment time has passed void _begin_next_task(); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 1115fa18d..d7ecbe207 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -113,7 +113,9 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( return; } - if (msg->task_profile.task_id.empty()) + const std::string& id = msg->task_profile.task_id; + + if (id.empty()) { RCLCPP_WARN( node->get_logger(), @@ -122,10 +124,25 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( return; } - // TODO remove this block when we support task revival - if (bid_notice_assignments.find(msg->task_profile.task_id) - != bid_notice_assignments.end()) + // Will check if tasks is queued + if (is_task_queued(id)) + { + RCLCPP_WARN( + node->get_logger(), + "Task [%s] is currently queued in Fleet [%s], bid notice is ignored", + id.c_str(), name.c_str()); return; + } + + // Will check if tasks is an active current task + if (is_task_active(id)) + { + RCLCPP_WARN( + node->get_logger(), + "Task [%s] is currently executing by Fleet [%s], bid notice is ignored", + id.c_str(), name.c_str()); + return; + } if (!accept_task) { @@ -134,7 +151,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Fleet [%s] is not configured to accept any task requests. Use " "FleetUpdateHadndle::accept_task_requests(~) to define a callback " "for accepting requests", name.c_str()); - return; } @@ -143,10 +159,8 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( RCLCPP_INFO( node->get_logger(), "Fleet [%s] is configured to not accept task [%s]", - name.c_str(), - msg->task_profile.task_id.c_str()); - - return; + name.c_str(), id.c_str()); + return; } if (!task_planner) @@ -156,7 +170,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Fleet [%s] is not configured with parameters for task planning." "Use FleetUpdateHandle::set_task_planner_params(~) to set the " "parameters required.", name.c_str()); - return; } @@ -167,7 +180,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( const rmf_traffic::Time start_time = rmf_traffic_ros2::convert(task_profile.description.start_time); // TODO (YV) get rid of ID field in RequestPtr - std::string id = msg->task_profile.task_id; const auto& graph = (*planner)->get_configuration().graph(); // Generate the priority of the request. The current implementation supports @@ -178,21 +190,20 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( rmf_task::BinaryPriorityScheme::make_low_priority(); // Process Cleaning task - if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_CLEAN) + if (task_type.type == TaskType::TYPE_CLEAN) { - if (task_profile.description.clean.start_waypoint.empty()) + const std::string start_wp_name = task_profile.description.clean.start_waypoint; + + if (start_wp_name.empty()) { RCLCPP_ERROR( node->get_logger(), "Required param [clean.start_waypoint] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - + "Rejecting BidNotice with task_id:[%s]", id.c_str()); return; } // Check for valid start waypoint - const std::string start_wp_name = - task_profile.description.clean.start_waypoint; const auto start_wp = graph.find_waypoint(start_wp_name); if (!start_wp) { @@ -265,9 +276,10 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Generated Clean request for task_id:[%s]", id.c_str()); } - else if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_DELIVERY) + else if (task_type.type == TaskType::TYPE_DELIVERY) { const auto& delivery = task_profile.description.delivery; + if (delivery.pickup_place_name.empty()) { RCLCPP_ERROR( @@ -298,16 +310,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( return; } - if (delivery.dropoff_place_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.dropoff_place_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - if (delivery.dropoff_ingestor.empty()) { RCLCPP_ERROR( @@ -326,7 +328,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", name.c_str(), delivery.pickup_place_name.c_str(), id.c_str()); - return; } @@ -338,7 +339,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", name.c_str(), delivery.dropoff_place_name.c_str(), id.c_str()); - return; } @@ -355,11 +355,11 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( RCLCPP_INFO( node->get_logger(), "Generated Delivery request for task_id:[%s]", id.c_str()); - } - else if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_LOOP) + else if (task_type.type == TaskType::TYPE_LOOP) { const auto& loop = task_profile.description.loop; + if (loop.start_name.empty()) { RCLCPP_ERROR( @@ -439,7 +439,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( if (!new_request) return; - generated_requests.insert({id, new_request}); const auto allocation_result = allocate_tasks(new_request); @@ -485,7 +484,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( std::size_t index = 0; for (const auto& t : task_managers) { - robot_name_map.insert({index, t.first->name()}); + robot_name_map.insert({index, t->context()->name()}); ++index; } @@ -513,8 +512,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( id.c_str(), bid_proposal.robot_name.c_str(), cost); // Store assignments in internal map - bid_notice_assignments.insert({id, assignments}); - + latest_bid_notice_assignments = {new_request, assignments}; } //============================================================================== @@ -531,8 +529,8 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( if (msg->method == DispatchRequest::ADD) { - const auto task_it = bid_notice_assignments.find(id); - if (task_it == bid_notice_assignments.end()) + auto [req_ptr, assignments] = latest_bid_notice_assignments; + if (req_ptr->id() != id) { RCLCPP_WARN( node->get_logger(), @@ -546,9 +544,7 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( node->get_logger(), "Bid for task_id:[%s] awarded to fleet [%s]. Processing request...", id.c_str(), name.c_str()); - - auto& assignments = task_it->second; - + if (assignments.size() != task_managers.size()) { RCLCPP_ERROR( @@ -559,28 +555,13 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - // Here we make sure none of the tasks in the assignments has already begun - // execution. If so, we replan assignments until a valid set is obtained - // and only then update the task manager queues - const auto request_it = generated_requests.find(id); - if (request_it == generated_requests.end()) - { - RCLCPP_ERROR( - node->get_logger(), - "Unable to find generated request for task_id:[%s]. This request will " - "be ignored.", - id.c_str()); - dispatch_ack_pub->publish(dispatch_ack); - return; - } - bool valid_assignments = is_valid_assignments(assignments); if (!valid_assignments) { // TODO: This replanning is blocking the main thread. Instead, the // replanning should run on a separate worker and then deliver the // result back to the main worker. - const auto replan_results = allocate_tasks(request_it->second); + const auto replan_results = allocate_tasks(req_ptr); if (!replan_results) { RCLCPP_WARN( @@ -597,15 +578,15 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( // rxcpp worker. Hence, no new tasks would have started during this replanning. } - std::size_t index = 0; - for (auto& t : task_managers) - { - t.second->set_queue(assignments[index]); - ++index; - } + // set assignments to taskmangaer of each robot + auto task_profiles = get_task_profiles(); + task_profiles.insert({id, msg->task_profile}); + + assert(assignments.size() == task_managers.size()); + for (std::size_t i = 0; i < task_managers.size(); ++i) + task_managers[i]->set_queue(assignments[i], task_profiles); current_assignment_cost = task_planner->compute_cost(assignments); - assigned_requests.insert({id, request_it->second}); dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); @@ -623,24 +604,25 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( // When a queued task is to be cancelled, we simply re-plan and re-allocate // task assignments for the request set containing all the queued tasks // excluding the task to be cancelled. - if (cancelled_task_ids.find(id) != cancelled_task_ids.end()) + + // Check if received request is to cancel an active task + if (is_task_active(id)) { RCLCPP_WARN( node->get_logger(), - "Request with task_id:[%s] has already been cancelled.", + "Unable to cancel active task with task_id:[%s]. Only queued tasks may " + "be cancelled.", id.c_str()); - dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); return; } - auto request_to_cancel_it = assigned_requests.find(id); - if (request_to_cancel_it == assigned_requests.end()) + if (!is_task_queued(id)) { RCLCPP_WARN( node->get_logger(), - "Unable to cancel task with task_id:[%s] as it is not assigned to " + "Unable to cancel task with task_id:[%s] as it is not queued to " "fleet:[%s].", id.c_str(), name.c_str()); @@ -648,30 +630,9 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - std::unordered_set executed_tasks; - for (const auto& [context, mgr] : task_managers) - { - const auto& tasks = mgr->get_executed_tasks(); - executed_tasks.insert(tasks.begin(), tasks.end()); - } - - // Check if received request is to cancel an active task - if (executed_tasks.find(id) != executed_tasks.end()) - { - RCLCPP_WARN( - node->get_logger(), - "Unable to cancel active task with task_id:[%s]. Only queued tasks may " - "be cancelled.", - id.c_str()); - - dispatch_ack_pub->publish(dispatch_ack); - return; - } - // Re-plan assignments while ignoring request for task to be cancelled - const auto replan_results = allocate_tasks( - nullptr, request_to_cancel_it->second); - + const auto replan_results = allocate_tasks(nullptr, id); + if (!replan_results.has_value()) { RCLCPP_WARN( @@ -683,19 +644,17 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - const auto& assignments = replan_results.value(); - std::size_t index = 0; - for (auto& t : task_managers) - { - t.second->set_queue(assignments[index]); - ++index; - } + // set assignments to taskmangaer of each robot + const auto& assignments = replan_results.value(); + auto task_profiles = get_task_profiles(); + task_profiles.insert({id, msg->task_profile}); + for (std::size_t i = 0; i < task_managers.size(); ++i) + task_managers[i]->set_queue(assignments[i], task_profiles); current_assignment_cost = task_planner->compute_cost(assignments); dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); - cancelled_task_ids.insert(id); RCLCPP_INFO( node->get_logger(), @@ -720,25 +679,66 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( auto FleetUpdateHandle::Implementation::is_valid_assignments( Assignments& assignments) const -> bool { - std::unordered_set executed_tasks; - for (const auto& [context, mgr] : task_managers) - { - const auto& tasks = mgr->get_executed_tasks(); - executed_tasks.insert(tasks.begin(), tasks.end()); - } - for (const auto& agent : assignments) { for (const auto& a : agent) { - if (executed_tasks.find(a.request()->id()) != executed_tasks.end()) + // If ID doesnt exist in queue_tasks + if (!is_task_queued(a.request()->id())) return false; } } - return true; } +//============================================================================== +auto FleetUpdateHandle::Implementation::get_task_profiles() const + -> std::unordered_map +{ + std::unordered_map task_profiles; + for (const auto& mgr : task_managers) + { + for (const auto task : mgr->task_queue()) + { + task_profiles.insert({task->id(), task->task_profile_msg()}); + } + + const auto task = mgr->current_task(); + if (task) + task_profiles.insert({task->id(), task->task_profile_msg()}); + } + return task_profiles; +} + +//============================================================================== +bool FleetUpdateHandle::Implementation::is_task_queued( + const std::string& task_id ) const +{ + std::unordered_set queued_tasks; + for (const auto& mgr : task_managers) + { + for (const auto task : mgr->task_queue()) + { + if (task->id() == task_id) + return true; + } + } + return false; +} + +//============================================================================== +bool FleetUpdateHandle::Implementation::is_task_active( + const std::string& task_id ) const +{ + std::unordered_set current_tasks; + for (const auto& mgr : task_managers) + { + if (mgr->current_task() && mgr->current_task()->id() == task_id) + return true; + } + return false; +} + //============================================================================== std::optional FleetUpdateHandle::Implementation::get_nearest_charger( const rmf_traffic::agv::Planner::Start& start) @@ -843,7 +843,7 @@ rmf_fleet_msgs::msg::RobotState convert_state(const TaskManager& mgr) void FleetUpdateHandle::Implementation::publish_fleet_state() const { std::vector robot_states; - for (const auto& [context, mgr] : task_managers) + for (const auto& mgr : task_managers) robot_states.emplace_back(convert_state(*mgr)); auto fleet_state = rmf_fleet_msgs::build() @@ -856,7 +856,7 @@ void FleetUpdateHandle::Implementation::publish_fleet_state() const //============================================================================== auto FleetUpdateHandle::Implementation::allocate_tasks( rmf_task::ConstRequestPtr new_request, - rmf_task::ConstRequestPtr ignore_request) const -> std::optional + std::string ignore_request_id) const -> std::optional { // Collate robot states, constraints and combine new requestptr with // requestptr of non-charging tasks in task manager queues @@ -872,41 +872,25 @@ auto FleetUpdateHandle::Implementation::allocate_tasks( for (const auto& t : task_managers) { - states.push_back(t.second->expected_finish_state()); - const auto requests = t.second->requests(); - pending_requests.insert( - pending_requests.end(), requests.begin(), requests.end()); - } + states.push_back(t->expected_finish_state()); - // Remove the request to be ignored if present - if (ignore_request) - { - auto ignore_request_it = pending_requests.end(); - for (auto it = pending_requests.begin(); it != pending_requests.end(); ++it) - { - auto pending_request = *it; - if (pending_request->id() == ignore_request->id()) - ignore_request_it = it; - } - if (ignore_request_it != pending_requests.end()) + // This will identify the ignored task for allocation + for (const auto req : t->requests()) { - pending_requests.erase(ignore_request_it); - RCLCPP_INFO( - node->get_logger(), - "Request with task_id:[%s] will be ignored during task allocation.", - ignore_request->id().c_str()); - } - else - { - RCLCPP_WARN( - node->get_logger(), - "Request with task_id:[%s] is not present in any of the task queues.", - ignore_request->id().c_str()); - } + if (req->id() == ignore_request_id) + { + RCLCPP_INFO( + node->get_logger(), + "Request with task_id:[%s] will be ignored during task allocation.", + ignore_request_id.c_str()); + continue; + } + pending_requests.push_back(req); + } } RCLCPP_INFO( - node->get_logger(), + node->get_logger(), "Planning for [%d] robot(s) and [%d] request(s)", states.size(), pending_requests.size()); @@ -978,7 +962,7 @@ void FleetUpdateHandle::add_robot( rmf_traffic::agv::Plan::StartSet start, std::function)> handle_cb) { - + if (start.empty()) { throw std::runtime_error( @@ -1052,7 +1036,7 @@ void FleetUpdateHandle::add_robot( "Added a robot named [%s] with participant ID [%d]", context->name().c_str(), context->itinerary().id()); - fleet->_pimpl->task_managers.insert({context, TaskManager::make(context)}); + fleet->_pimpl->task_managers.push_back(TaskManager::make(context)); if (handle_cb) { handle_cb(RobotUpdateHandle::Implementation::make(std::move(context))); @@ -1162,7 +1146,7 @@ FleetUpdateHandle& FleetUpdateHandle::accept_task_requests( FleetUpdateHandle& FleetUpdateHandle::accept_delivery_requests( AcceptDeliveryRequest check) { - _pimpl->accept_delivery = std::move(check); + RCLCPP_WARN(_pimpl->node->get_logger(), "Use accept_task_requests() instead"); return *this; } @@ -1228,7 +1212,7 @@ bool FleetUpdateHandle::set_task_planner_params( // automatic retreat. Hence, we also update them whenever the // task planner here is updated. for (const auto& t : _pimpl->task_managers) - t.first->task_planner(_pimpl->task_planner); + t->context()->task_planner(_pimpl->task_planner); return true; } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index 0b1607056..17ef6abf1 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -146,16 +146,12 @@ class FleetUpdateHandle::Implementation rmf_utils::optional default_maximum_delay = std::chrono::nanoseconds(std::chrono::seconds(10)); - AcceptDeliveryRequest accept_delivery = nullptr; - std::unordered_map> task_managers = {}; - + std::vector> task_managers; + rclcpp::Publisher::SharedPtr fleet_state_pub = nullptr; rclcpp::TimerBase::SharedPtr fleet_state_timer = nullptr; - // Map task id to pair of using Assignments = rmf_task::agv::TaskPlanner::Assignments; - std::unordered_map> task_map = {}; // Map of dock name to dock parameters std::unordered_map charging_waypoints = {}; double current_assignment_cost = 0.0; - // Map to store task id with assignments for BidNotice - std::unordered_map bid_notice_assignments = {}; + + using TaskProfile = rmf_task_msgs::msg::TaskProfile; - std::unordered_map< - std::string, rmf_task::ConstRequestPtr> generated_requests = {}; - std::unordered_map< - std::string, rmf_task::ConstRequestPtr> assigned_requests = {}; - std::unordered_set cancelled_task_ids = {}; + // Tuple which stored the latest received BidNotice + std::tuple latest_bid_notice_assignments; AcceptTaskRequest accept_task = nullptr; @@ -196,6 +189,8 @@ class FleetUpdateHandle::Implementation using DockSummarySub = rclcpp::Subscription::SharedPtr; DockSummarySub dock_summary_sub = nullptr; + using TaskType = rmf_task_msgs::msg::TaskType; + template static std::shared_ptr make(Args&&... args) { @@ -279,12 +274,21 @@ class FleetUpdateHandle::Implementation /// new request and while optionally ignoring a specific request. std::optional allocate_tasks( rmf_task::ConstRequestPtr new_request = nullptr, - rmf_task::ConstRequestPtr ignore_request = nullptr) const; + std::string ignore_request_id = "") const; /// Helper function to check if assignments are valid. An assignment set is /// invalid if one of the assignments has already begun execution. bool is_valid_assignments(Assignments& assignments) const; + /// Helper function to check if a task is queued in Fleet Adapter + bool is_task_queued(const std::string& task_id ) const; + + /// Helper function to check if a task is active/executing in Fleet Adapter + bool is_task_active(const std::string& task_id ) const; + + /// Helper function to get all current and queued task profiles + std::unordered_map get_task_profiles() const; + static Implementation& get(FleetUpdateHandle& fleet) { return *fleet._pimpl; diff --git a/rmf_fleet_adapter/test/tasks/test_Delivery.cpp b/rmf_fleet_adapter/test/tasks/test_Delivery.cpp index 5f7701551..ed1489e72 100644 --- a/rmf_fleet_adapter/test/tasks/test_Delivery.cpp +++ b/rmf_fleet_adapter/test/tasks/test_Delivery.cpp @@ -309,10 +309,8 @@ SCENARIO("Test Delivery") profile }; - auto rcl_context = std::make_shared(); - rcl_context->init(0, nullptr); - rmf_fleet_adapter::agv::test::MockAdapter adapter( - "test_Delivery", rclcpp::NodeOptions().context(rcl_context)); + rclcpp::init(0, nullptr); + rmf_fleet_adapter::agv::test::MockAdapter adapter("test_Delivery"); std::promise completed_promise; bool at_least_one_incomplete = false; @@ -434,4 +432,5 @@ SCENARIO("Test Delivery") CHECK(at_least_one_incomplete); adapter.stop(); + rclcpp::shutdown(); } diff --git a/rmf_fleet_adapter/test/tasks/test_Loop.cpp b/rmf_fleet_adapter/test/tasks/test_Loop.cpp index 0915b1884..1accd7138 100644 --- a/rmf_fleet_adapter/test/tasks/test_Loop.cpp +++ b/rmf_fleet_adapter/test/tasks/test_Loop.cpp @@ -129,10 +129,8 @@ SCENARIO("Test loop requests") profile }; - auto rcl_context = std::make_shared(); - rcl_context->init(0, nullptr); - rmf_fleet_adapter::agv::test::MockAdapter adapter( - "test_Loop", rclcpp::NodeOptions().context(rcl_context)); + rclcpp::init(0, nullptr); + rmf_fleet_adapter::agv::test::MockAdapter adapter("test_Loop"); const std::string loop_0 = "loop_0"; std::promise task_0_completed_promise; @@ -178,8 +176,6 @@ SCENARIO("Test loop requests") ++completed_1_count; } - else - CHECK(false); } else { @@ -187,8 +183,6 @@ SCENARIO("Test loop requests") at_least_one_incomplete_task_0 = true; else if (msg->task_id == loop_1) at_least_one_incomplete_task_1 = true; - else - CHECK(false); } if (msg->task_id == loop_0) @@ -381,4 +375,7 @@ SCENARIO("Test loop requests") for (const auto& s : finding_a_plan_1_statuses) std::cout << " -- " << s << std::endl; } + + adapter.stop(); + rclcpp::shutdown(); }