From 724dbd76eb70fce4b684c717f27a7e19af457dad Mon Sep 17 00:00:00 2001 From: youliang Date: Thu, 8 Apr 2021 05:00:26 +0800 Subject: [PATCH 1/4] refactor task components on fleet adapter Signed-off-by: youliang --- .../src/rmf_fleet_adapter/Task.cpp | 12 + .../src/rmf_fleet_adapter/Task.hpp | 8 + .../src/rmf_fleet_adapter/TaskManager.cpp | 272 +++++++--------- .../src/rmf_fleet_adapter/TaskManager.hpp | 20 +- .../agv/FleetUpdateHandle.cpp | 298 +++++++----------- .../agv/internal_FleetUpdateHandle.hpp | 25 +- 6 files changed, 271 insertions(+), 364 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp index f2b31919a..75aaf7ba8 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp @@ -88,6 +88,18 @@ const std::string& Task::id() const return _id; } +//============================================================================== +const rmf_task_msgs::msg::TaskProfile Task::profile_msg() const +{ + return _profile_msg; +} + +//============================================================================== +void Task::profile_msg(const rmf_task_msgs::msg::TaskProfile& profile) +{ + _profile_msg = profile; +} + //============================================================================== const rmf_task::ConstRequestPtr Task::request() const { diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp index d0850e042..04da355d6 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp @@ -40,6 +40,7 @@ class Task : public std::enable_shared_from_this public: using StatusMsg = rmf_task_msgs::msg::TaskSummary; + using TaskProfile = rmf_task_msgs::msg::TaskProfile; /// This class represents the active phase of a Task. It provides an /// observable that the Task can track to stay up-to-date on the status and to @@ -124,6 +125,12 @@ class Task : public std::enable_shared_from_this const std::string& id() const; + /// Get the profile msg, for publish status msg + const TaskProfile profile_msg() const; + + /// Set the profile msg, for publish status msg + void profile_msg(const TaskProfile& profile); + /// Get the request used to generate this task const rmf_task::ConstRequestPtr request() const; @@ -162,6 +169,7 @@ class Task : public std::enable_shared_from_this rmf_traffic::Time _deployment_time; rmf_task::agv::State _finish_state; rmf_task::ConstRequestPtr _request; + TaskProfile _profile_msg; void _start_next_phase(); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 5ec4a649c..d804e8a35 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -35,6 +35,25 @@ namespace rmf_fleet_adapter { +//============================================================================== +void populate_task_summary( + rmf_task_msgs::msg::TaskSummary& msg, + const agv::RobotContextPtr context, + const std::shared_ptr task = nullptr) +{ + msg.robot_name = context->name(); + msg.fleet_name = context->description().owner(); + if (task != nullptr) + { + msg.task_id = task->id(); // duplicated + msg.task_profile = task->profile_msg(); + msg.start_time = rmf_traffic_ros2::convert( + task->deployment_time()); + msg.end_time = rmf_traffic_ros2::convert( + task->finish_state().finish_time()); + } +} + //============================================================================== TaskManagerPtr TaskManager::make(agv::RobotContextPtr context) { @@ -78,17 +97,17 @@ TaskManagerPtr TaskManager::make(agv::RobotContextPtr context) //============================================================================== TaskManager::TaskManager(agv::RobotContextPtr context) -: _context(std::move(context)) + : _context(std::move(context)) { _begin_waiting(); } //============================================================================== -void TaskManager::queue_task(std::shared_ptr task, Start expected_finish) +void TaskManager::queue_task(std::shared_ptr task) { - std::lock_guard guard(_mutex); - _queue.push_back(std::move(task)); - _expected_finish_location = std::move(expected_finish); + { + std::lock_guard guard(_mutex); + _queue.push_back(std::move(task)); RCLCPP_INFO( _context->node()->get_logger(), @@ -96,28 +115,14 @@ void TaskManager::queue_task(std::shared_ptr task, Start expected_finish) _queue.back()->id().c_str(), _context->requester_id().c_str(), _queue.size()); - if (!_active_task) - { - _begin_next_task(); - } - else - { rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = _queue.back()->id(); - msg.task_profile.task_id = _queue.back()->id(); + populate_task_summary(msg, _context, _queue.back()); msg.state = msg.STATE_QUEUED; - msg.robot_name = _context->name(); this->_context->node()->task_summary()->publish(msg); } -} - -//============================================================================== -auto TaskManager::expected_finish_location() const -> StartSet -{ - if (_expected_finish_location) - return {*_expected_finish_location}; - return _context->location(); + if (!_active_task) + _begin_next_task(); } //============================================================================== @@ -133,10 +138,10 @@ auto TaskManager::expected_finish_state() const -> State auto location = finish_state.location(); location.time(rmf_traffic_ros2::convert(_context->node()->now())); finish_state.location(location); - + const double current_battery_soc = _context->current_battery_soc(); finish_state.battery_soc(current_battery_soc); - + return finish_state; } @@ -158,114 +163,104 @@ agv::ConstRobotContextPtr TaskManager::context() const return _context; } +//============================================================================== +const std::vector> TaskManager::task_queue() const +{ + return _queue; +} + //============================================================================== void TaskManager::set_queue( - const std::vector& assignments) + const std::vector>& tasks) { // We indent this block as _mutex is also locked in the _begin_next_task() // function that is called at the end of this function. { std::lock_guard guard(_mutex); _queue.clear(); + } - // We use dynamic cast to determine the type of request and then call the - // appropriate make(~) function to convert the request into a task - for (std::size_t i = 0; i < assignments.size(); ++i) - { - const auto& a = assignments[i]; - auto start = _context->current_task_end_state().location(); - if (i != 0) - start = assignments[i-1].state().location(); - start.time(a.deployment_time()); - rmf_task_msgs::msg::TaskType task_type_msg; - const auto request = a.request(); - if (std::dynamic_pointer_cast< - const rmf_task::requests::CleanDescription>(request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_CLEAN; - auto task = rmf_fleet_adapter::tasks::make_clean( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } - - else if (std::dynamic_pointer_cast< - const rmf_task::requests::ChargeBatteryDescription>( - request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_CHARGE_BATTERY; - const auto task = tasks::make_charge_battery( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } - - else if (std::dynamic_pointer_cast< - const rmf_task::requests::DeliveryDescription>( - request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_DELIVERY; - const auto task = tasks::make_delivery( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } - - else if (std::dynamic_pointer_cast< - const rmf_task::requests::LoopDescription>(request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_LOOP; - const auto task = tasks::make_loop( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } + for (const auto t : tasks) + queue_task(t); +} - else - { - RCLCPP_WARN( - _context->node()->get_logger(), - "[TaskManager] Un-supported request type in assignment list. " - "Please update the implementation of TaskManager::set_queue() to " - "support request with task_id:[%s]", - a.request()->id().c_str()); +//============================================================================== +void TaskManager::set_queue( + const std::vector& assignments, + const std::unordered_map& task_profiles) +{ + std::vector> task_queue; + std::shared_ptr task = nullptr; + for (std::size_t i = 0; i < assignments.size(); ++i) + { + const auto& a = assignments[i]; + auto start = _context->current_task_end_state().location(); + if (i != 0) + start = assignments[i-1].state().location(); + start.time(a.deployment_time()); + + const auto req = a.request(); + const auto id = req->id(); + const auto req_desc = req->description(); + + rmf_task_msgs::msg::TaskProfile profile; + if (task_profiles.find(id) != task_profiles.end()) + { + profile = task_profiles.at(id); + } + else + { + // This is likely a self-generated task + profile.task_id = id; + profile.submission_time = _context->node()->now(); + profile.description.start_time = + rmf_traffic_ros2::convert(a.deployment_time()); + } - continue; - } + // We use dynamic cast to determine the type of request and then call the + // appropriate make(~) function to convert the request into a task + using namespace rmf_task::requests; + using TaskType = rmf_task_msgs::msg::TaskType; - // publish queued task - rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = _queue.back()->id(); - msg.task_profile.task_id = _queue.back()->id(); - msg.state = msg.STATE_QUEUED; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.task_profile.description.task_type = task_type_msg; - msg.start_time = rmf_traffic_ros2::convert( - _queue.back()->deployment_time()); - msg.start_time = rmf_traffic_ros2::convert( - _queue.back()->finish_state().finish_time()); - this->_context->node()->task_summary()->publish(msg); + /// CHARGE BATTERY TASK (self-Generated) + if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_charge_battery( + req, _context, start, a.deployment_time(), a.state()); + profile.description.task_type.type = TaskType::TYPE_CHARGE_BATTERY; + } + /// CLEAN TASK (User Request) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_clean( + req, _context, start, a.deployment_time(), a.state()); + } + /// DELIVERY TASK (User Request) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_delivery( + req, _context, start, a.deployment_time(), a.state()); + } + /// LOOP TASK (User Request) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_loop( + req, _context, start, a.deployment_time(), a.state()); + } + else + { + RCLCPP_WARN( + _context->node()->get_logger(), + "[TaskManager] Un-supported request type in assignment list. " + "Please update the implementation of TaskManager::set_queue() to " + "support request with task_id:[%s]", id.c_str()); + continue; } - } - _begin_next_task(); + task->profile_msg(profile); + task_queue.push_back(task); + } + this->set_queue(task_queue); } //============================================================================== @@ -329,19 +324,13 @@ void TaskManager::_begin_next_task() .subscribe( [this, id = _active_task->id()](Task::StatusMsg msg) { - msg.task_id = id; - msg.task_profile.task_id = id; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - _active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _active_task->finish_state().finish_time()); + populate_task_summary(msg, _context, _active_task); _context->node()->task_summary()->publish(msg); }, [this, id = _active_task->id()](std::exception_ptr e) { rmf_task_msgs::msg::TaskSummary msg; + populate_task_summary(msg, _context, _active_task); msg.state = msg.STATE_FAILED; try { @@ -351,30 +340,14 @@ void TaskManager::_begin_next_task() msg.status = e.what(); } - msg.task_id = id; - msg.task_profile.task_id = id; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - _active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _active_task->finish_state().finish_time()); _context->node()->task_summary()->publish(msg); }, [this, id = _active_task->id()]() { rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = id; - msg.task_profile.task_id = id; + populate_task_summary(msg, _context, _active_task); msg.state = msg.STATE_COMPLETED; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - _active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _active_task->finish_state().finish_time()); - this->_context->node()->task_summary()->publish(msg); - + this->_context->node()->task_summary()->publish(msg); _active_task = nullptr; }); @@ -401,9 +374,8 @@ void TaskManager::_begin_waiting() .subscribe( [this](Task::StatusMsg msg) { + populate_task_summary(msg, _context); msg.task_id = _context->requester_id() + ":waiting"; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); // TODO: Fill in end_time with the beginning time of the next task if // the next task is known. msg.start_time = _context->node()->now(); @@ -414,6 +386,7 @@ void TaskManager::_begin_waiting() [this](std::exception_ptr e) { rmf_task_msgs::msg::TaskSummary msg; + populate_task_summary(msg, _context); msg.state = msg.STATE_FAILED; try { @@ -424,13 +397,8 @@ void TaskManager::_begin_waiting() } msg.task_id = _context->requester_id() + ":waiting"; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - _active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _active_task->finish_state().finish_time()); _context->node()->task_summary()->publish(msg); + // TODO: empty id string? RCLCPP_WARN( _context->node()->get_logger(), diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 0c0f31a0a..bc12f6b0f 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -45,13 +45,10 @@ class TaskManager : public std::enable_shared_from_this using StartSet = rmf_traffic::agv::Plan::StartSet; using Assignment = rmf_task::agv::TaskPlanner::Assignment; using State = rmf_task::agv::State; + using TaskProfile = rmf_task_msgs::msg::TaskProfile; /// Add a task to the queue of this manager. - void queue_task(std::shared_ptr task, Start expected_finish); - - /// The location where we expect this robot to be at the end of its current - /// task queue. - StartSet expected_finish_location() const; + void queue_task(std::shared_ptr task); const agv::RobotContextPtr& context(); @@ -60,8 +57,16 @@ class TaskManager : public std::enable_shared_from_this const Task* current_task() const; /// Set the queue for this task manager with assignments generated from the - /// task planner - void set_queue(const std::vector& assignments); + /// task planner, arg 'task_profiles' is merely used for status publishing. + void set_queue( + const std::vector& assignments, + const std::unordered_map& task_profiles = {}); + + /// set a vector of tasks + void set_queue(const std::vector>& tasks); + + // get tasks in the queue + const std::vector> task_queue() const; /// Get the non-charging requests among pending tasks const std::vector requests() const; @@ -84,7 +89,6 @@ class TaskManager : public std::enable_shared_from_this agv::RobotContextPtr _context; std::shared_ptr _active_task; std::vector> _queue; - rmf_utils::optional _expected_finish_location; rxcpp::subscription _task_sub; rxcpp::subscription _emergency_sub; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index d42adeeab..a8b510cb4 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -107,9 +107,16 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( if (msg->task_profile.task_id.empty()) return; - if (bid_notice_assignments.find(msg->task_profile.task_id) - != bid_notice_assignments.end()) + if (received_task_profiles.find(msg->task_profile.task_id) + != received_task_profiles.end()) + { + RCLCPP_WARN( + node->get_logger(), + "Task [%s] has been submitted by Fleet [%s] previously", + msg->task_profile.task_id.c_str(), + name.c_str()); return; + } if (!accept_task) { @@ -152,7 +159,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( const rmf_traffic::Time start_time = rmf_traffic_ros2::convert(task_profile.description.start_time); // TODO (YV) get rid of ID field in RequestPtr - std::string id = msg->task_profile.task_id; + const std::string id = msg->task_profile.task_id; const auto& graph = (*planner)->get_configuration().graph(); // Generate the priority of the request. The current implementation supports @@ -163,21 +170,20 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( rmf_task::BinaryPriorityScheme::make_low_priority(); // Process Cleaning task - if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_CLEAN) + if (task_type.type == TaskType::TYPE_CLEAN) { - if (task_profile.description.clean.start_waypoint.empty()) + const std::string start_wp_name = task_profile.description.clean.start_waypoint; + + if (start_wp_name.empty()) { RCLCPP_ERROR( node->get_logger(), - "Required param [clean.start_waypoint] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - + "Clean Msg is invalid/invalid." + "Rejecting BidNotice with task_id:[%s]", id.c_str()); return; } // Check for valid start waypoint - const std::string start_wp_name = - task_profile.description.clean.start_waypoint; const auto start_wp = graph.find_waypoint(start_wp_name); if (!start_wp) { @@ -255,90 +261,48 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Generated Clean request for task_id:[%s]", id.c_str()); } - else if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_DELIVERY) + else if (task_type.type == TaskType::TYPE_DELIVERY) { - const auto& delivery = task_profile.description.delivery; - if (delivery.pickup_place_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.pickup_place_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - if (delivery.pickup_dispenser.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.pickup_dispenser] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); + const auto& del = task_profile.description.delivery; - return; - } - - if (delivery.dropoff_place_name.empty()) + if (del.pickup_place_name.empty() || del.pickup_dispenser.empty() || + del.dropoff_place_name.empty() || del.dropoff_ingestor.empty()) { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.dropoff_place_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - + RCLCPP_ERROR(node->get_logger(), + "Delivery Msg is invalid/invalid." + "Rejecting BidNotice with task_id:[%s]", id.c_str()); return; } - if (delivery.dropoff_place_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.dropoff_place_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - if (delivery.dropoff_ingestor.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.dropoff_ingestor] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - const auto pickup_wp = graph.find_waypoint(delivery.pickup_place_name); + const auto pickup_wp = graph.find_waypoint(del.pickup_place_name); if (!pickup_wp) { RCLCPP_INFO( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), delivery.pickup_place_name.c_str(), id.c_str()); - + name.c_str(), del.pickup_place_name.c_str(), id.c_str()); return; } - const auto dropoff_wp = graph.find_waypoint(delivery.dropoff_place_name); + const auto dropoff_wp = graph.find_waypoint(del.dropoff_place_name); if (!dropoff_wp) { RCLCPP_INFO( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), delivery.dropoff_place_name.c_str(), id.c_str()); - + name.c_str(), del.dropoff_place_name.c_str(), id.c_str()); return; } new_request = rmf_task::requests::Delivery::make( id, pickup_wp->index(), - delivery.pickup_dispenser, + del.pickup_dispenser, dropoff_wp->index(), - delivery.dropoff_ingestor, - delivery.items, + del.dropoff_ingestor, + del.items, motion_sink, ambient_sink, *planner, @@ -349,38 +313,17 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( RCLCPP_INFO( node->get_logger(), "Generated Delivery request for task_id:[%s]", id.c_str()); - } - else if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_LOOP) + else if (task_type.type == TaskType::TYPE_LOOP) { const auto& loop = task_profile.description.loop; - if (loop.start_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [loop.start_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - if (loop.finish_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [loop.finish_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - if (loop.num_loops < 1) + + if (loop.start_name.empty() || loop.finish_name.empty() || + loop.num_loops == 0) { - RCLCPP_ERROR( - node->get_logger(), - "Required param [loop.num_loops: %d] in TaskProfile is invalid." - "Rejecting BidNotice with task_id:[%s]" , loop.num_loops, id.c_str()); - + RCLCPP_ERROR(node->get_logger(), + "Loop Msg is invalid/invalid." + "Rejecting BidNotice with task_id:[%s]", id.c_str()); return; } @@ -403,7 +346,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), loop.finish_name.c_str(), id.c_str()); + name.c_str(), loop.start_name.c_str(), id.c_str()); return; } @@ -437,7 +380,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( if (!new_request) return; - generated_requests.insert({id, new_request}); const auto allocation_result = allocate_tasks(new_request); @@ -483,7 +425,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( std::size_t index = 0; for (const auto& t : task_managers) { - robot_name_map.insert({index, t.first->name()}); + robot_name_map.insert({index, t->context()->name()}); ++index; } @@ -511,8 +453,8 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( id.c_str(), bid_proposal.robot_name.c_str(), cost); // Store assignments in internal map - bid_notice_assignments.insert({id, assignments}); - + latest_bid_notice_assignments = {new_request, assignments}; + received_task_profiles.insert({id, task_profile}); } //============================================================================== @@ -529,8 +471,8 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( if (msg->method == DispatchRequest::ADD) { - const auto task_it = bid_notice_assignments.find(id); - if (task_it == bid_notice_assignments.end()) + auto [req_ptr, assignments] = latest_bid_notice_assignments; + if (req_ptr->id() != id) { RCLCPP_WARN( node->get_logger(), @@ -544,9 +486,7 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( node->get_logger(), "Bid for task_id:[%s] awarded to fleet [%s]. Processing request...", id.c_str(), name.c_str()); - - auto& assignments = task_it->second; - + if (assignments.size() != task_managers.size()) { RCLCPP_ERROR( @@ -557,28 +497,13 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - // Here we make sure none of the tasks in the assignments has already begun - // execution. If so, we replan assignments until a valid set is obtained - // and only then update the task manager queues - const auto request_it = generated_requests.find(id); - if (request_it == generated_requests.end()) - { - RCLCPP_ERROR( - node->get_logger(), - "Unable to find generated request for task_id:[%s]. This request will " - "be ignored.", - id.c_str()); - dispatch_ack_pub->publish(dispatch_ack); - return; - } - bool valid_assignments = is_valid_assignments(assignments); if (!valid_assignments) { // TODO: This replanning is blocking the main thread. Instead, the // replanning should run on a separate worker and then deliver the // result back to the main worker. - const auto replan_results = allocate_tasks(request_it->second); + const auto replan_results = allocate_tasks(req_ptr); if (!replan_results) { RCLCPP_WARN( @@ -595,15 +520,12 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( // rxcpp worker. Hence, no new tasks would have started during this replanning. } - std::size_t index = 0; - for (auto& t : task_managers) - { - t.second->set_queue(assignments[index]); - ++index; - } + // set assignments to taskmangaer of each robot + assert(assignments.size() == task_managers.size()); + for (std::size_t i = 0; i < task_managers.size(); ++i) + task_managers[i]->set_queue(assignments[i], received_task_profiles); current_assignment_cost = task_planner->compute_cost(assignments); - assigned_requests.insert({id, request_it->second}); dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); @@ -633,12 +555,21 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - auto request_to_cancel_it = assigned_requests.find(id); - if (request_to_cancel_it == assigned_requests.end()) + std::unordered_set executed_tasks; + std::unordered_set queued_tasks; + for (const auto& mgr : task_managers) + { + if (mgr->current_task()) + executed_tasks.insert(mgr->current_task()->id()); + for (const auto& tsk : mgr->task_queue()) + queued_tasks.insert(tsk->id()); + } + + if (queued_tasks.find(id) == queued_tasks.end()) { RCLCPP_WARN( node->get_logger(), - "Unable to cancel task with task_id:[%s] as it is not assigned to " + "Unable to cancel task with task_id:[%s] as it is not queued to " "fleet:[%s].", id.c_str(), name.c_str()); @@ -646,13 +577,6 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - std::unordered_set executed_tasks; - for (const auto& [context, mgr] : task_managers) - { - const auto& tasks = mgr->get_executed_tasks(); - executed_tasks.insert(tasks.begin(), tasks.end()); - } - // Check if received request is to cancel an active task if (executed_tasks.find(id) != executed_tasks.end()) { @@ -664,12 +588,11 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( dispatch_ack_pub->publish(dispatch_ack); return; - } + } // Re-plan assignments while ignoring request for task to be cancelled - const auto replan_results = allocate_tasks( - nullptr, request_to_cancel_it->second); - + const auto replan_results = allocate_tasks(nullptr, id); + if (!replan_results.has_value()) { RCLCPP_WARN( @@ -681,13 +604,11 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } + // set assignments to taskmangaer of each robot const auto& assignments = replan_results.value(); - std::size_t index = 0; - for (auto& t : task_managers) - { - t.second->set_queue(assignments[index]); - ++index; - } + assert(assignments.size() == task_managers.size()); + for (std::size_t i = 0; i < task_managers.size(); ++i) + task_managers[i]->set_queue(assignments[i], received_task_profiles); current_assignment_cost = task_planner->compute_cost(assignments); @@ -718,23 +639,28 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( auto FleetUpdateHandle::Implementation::is_valid_assignments( Assignments& assignments) const -> bool { - std::unordered_set executed_tasks; - for (const auto& [context, mgr] : task_managers) + // This checks if the current assignments are all queued in task managers + std::unordered_set queued_tasks; + for (const auto& mgr : task_managers) { - const auto& tasks = mgr->get_executed_tasks(); - executed_tasks.insert(tasks.begin(), tasks.end()); + for (const auto task : mgr->task_queue()) + queued_tasks.insert(task->id()); } + size_t assignment_size = 0; for (const auto& agent : assignments) { for (const auto& a : agent) { - if (executed_tasks.find(a.request()->id()) != executed_tasks.end()) + // If ID exists doesnt exist in queue_tasks + if (queued_tasks.find(a.request()->id()) == queued_tasks.end()) return false; + assignment_size++; } } - return true; + // Also check if size of total queued tasks is the same as assignments + return (queued_tasks.size() == assignment_size); } //============================================================================== @@ -847,7 +773,7 @@ rmf_fleet_msgs::msg::RobotState convert_state(const TaskManager& mgr) void FleetUpdateHandle::Implementation::publish_fleet_state() const { std::vector robot_states; - for (const auto& [context, mgr] : task_managers) + for (const auto& mgr : task_managers) robot_states.emplace_back(convert_state(*mgr)); auto fleet_state = rmf_fleet_msgs::build() @@ -860,7 +786,7 @@ void FleetUpdateHandle::Implementation::publish_fleet_state() const //============================================================================== auto FleetUpdateHandle::Implementation::allocate_tasks( rmf_task::ConstRequestPtr new_request, - rmf_task::ConstRequestPtr ignore_request) const -> std::optional + std::string ignore_request_id) const -> std::optional { // Collate robot states, constraints and combine new requestptr with // requestptr of non-charging tasks in task manager queues @@ -877,42 +803,32 @@ auto FleetUpdateHandle::Implementation::allocate_tasks( for (const auto& t : task_managers) { - states.push_back(t.second->expected_finish_state()); - constraints_set.push_back(t.first->task_planning_constraints()); - const auto requests = t.second->requests(); - pending_requests.insert( - pending_requests.end(), requests.begin(), requests.end()); - } - - // Remove the request to be ignored if present - if (ignore_request) - { - auto ignore_request_it = pending_requests.end(); - for (auto it = pending_requests.begin(); it != pending_requests.end(); ++it) - { - auto pending_request = *it; - if (pending_request->id() == ignore_request->id()) - ignore_request_it = it; - } - if (ignore_request_it != pending_requests.end()) - { - pending_requests.erase(ignore_request_it); - RCLCPP_INFO( - node->get_logger(), - "Request with task_id:[%s] will be ignored during task allocation.", - ignore_request->id().c_str()); - } - else + states.push_back(t->expected_finish_state()); + constraints_set.push_back(t->context()->task_planning_constraints()); + + for (const auto task : t->task_queue()) { - RCLCPP_WARN( - node->get_logger(), - "Request with task_id:[%s] is not present in any of the task queues.", - ignore_request->id().c_str()); + // ignore auto allocated charging task. + // TODO: categorize a task if it is self-generated by adpater's task planner + const auto type = task->profile_msg().description.task_type.type; + if (type == TaskType::TYPE_CHARGE_BATTERY) + continue; + + if (task->id() == ignore_request_id ) + { + RCLCPP_INFO( + node->get_logger(), + "Request with task_id:[%s] will be ignored during task allocation.", + ignore_request_id.c_str()); + continue; + } + + pending_requests.push_back(task->request()); } } RCLCPP_INFO( - node->get_logger(), + node->get_logger(), "Planning for [%d] robot(s) and [%d] request(s)", states.size(), pending_requests.size()); @@ -985,7 +901,7 @@ void FleetUpdateHandle::add_robot( rmf_traffic::agv::Plan::StartSet start, std::function)> handle_cb) { - + if (start.empty()) { throw std::runtime_error( @@ -1054,7 +970,7 @@ void FleetUpdateHandle::add_robot( "Added a robot named [%s] with participant ID [%d]", context->name().c_str(), context->itinerary().id()); - fleet->_pimpl->task_managers.insert({context, TaskManager::make(context)}); + fleet->_pimpl->task_managers.push_back(TaskManager::make(context)); if (handle_cb) { handle_cb(RobotUpdateHandle::Implementation::make(std::move(context))); @@ -1164,7 +1080,7 @@ FleetUpdateHandle& FleetUpdateHandle::accept_task_requests( FleetUpdateHandle& FleetUpdateHandle::accept_delivery_requests( AcceptDeliveryRequest check) { - _pimpl->accept_delivery = std::move(check); + RCLCPP_WARN(_pimpl->node->get_logger(), "Use accept_task_requests() instead"); return *this; } @@ -1212,10 +1128,10 @@ bool FleetUpdateHandle::set_task_planner_params( ambient_sink, *_pimpl->planner, _pimpl->cost_calculator); - + _pimpl->task_planner = std::make_shared( std::move(task_config)); - + _pimpl->initialized_task_planner = true; return _pimpl->initialized_task_planner; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index a723fbdda..4ea20cb30 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -152,16 +152,12 @@ class FleetUpdateHandle::Implementation rmf_utils::optional default_maximum_delay = std::chrono::nanoseconds(std::chrono::seconds(10)); - AcceptDeliveryRequest accept_delivery = nullptr; - std::unordered_map> task_managers = {}; - + std::vector> task_managers; + rclcpp::Publisher::SharedPtr fleet_state_pub = nullptr; rclcpp::TimerBase::SharedPtr fleet_state_timer = nullptr; - // Map task id to pair of using Assignments = rmf_task::agv::TaskPlanner::Assignments; - std::unordered_map> task_map = {}; // Map of dock name to dock parameters std::unordered_map available_charging_waypoints = {}; double current_assignment_cost = 0.0; - // Map to store task id with assignments for BidNotice - std::unordered_map bid_notice_assignments = {}; + + // Map to store task id with the profile msg of the received tasks + using TaskProfile = rmf_task_msgs::msg::TaskProfile; + std::unordered_map received_task_profiles = {}; + + // Tuple which stored the latest received BidNotice + std::tuple latest_bid_notice_assignments; - std::unordered_map< - std::string, rmf_task::ConstRequestPtr> generated_requests = {}; - std::unordered_map< - std::string, rmf_task::ConstRequestPtr> assigned_requests = {}; std::unordered_set cancelled_task_ids = {}; AcceptTaskRequest accept_task = nullptr; @@ -207,6 +204,8 @@ class FleetUpdateHandle::Implementation using DockSummarySub = rclcpp::Subscription::SharedPtr; DockSummarySub dock_summary_sub = nullptr; + using TaskType = rmf_task_msgs::msg::TaskType; + template static std::shared_ptr make(Args&&... args) { @@ -291,7 +290,7 @@ class FleetUpdateHandle::Implementation /// new request and while optionally ignoring a specific request. std::optional allocate_tasks( rmf_task::ConstRequestPtr new_request = nullptr, - rmf_task::ConstRequestPtr ignore_request = nullptr) const; + std::string ignore_request_id = "") const; /// Helper function to check if assignments are valid. An assignment set is /// invalid if one of the assignments has already begun execution. From edab2601c26e9f88b549fe9eeb35c8f82b6140da Mon Sep 17 00:00:00 2001 From: youliang Date: Wed, 14 Apr 2021 10:44:17 +0800 Subject: [PATCH 2/4] support for future task revival, fix task test Signed-off-by: youliang --- .../agv/FleetUpdateHandle.cpp | 90 +++++++++++-------- .../agv/internal_FleetUpdateHandle.hpp | 6 ++ .../test/tasks/test_Delivery.cpp | 7 +- rmf_fleet_adapter/test/tasks/test_Loop.cpp | 17 ++-- 4 files changed, 72 insertions(+), 48 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index a8b510cb4..699b99e29 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -104,17 +104,28 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( if (task_managers.empty()) return; - if (msg->task_profile.task_id.empty()) + const std::string id = msg->task_profile.task_id; + + if (id.empty()) + return; + + // Will check if tasks is queued or active + if (is_queued(id)) + { + RCLCPP_WARN( + node->get_logger(), + "Task [%s] is currently queued in Fleet [%s]", + id.c_str(), name.c_str()); return; + } - if (received_task_profiles.find(msg->task_profile.task_id) - != received_task_profiles.end()) + // Will check if tasks is an active current task + if (is_current(id)) { RCLCPP_WARN( node->get_logger(), - "Task [%s] has been submitted by Fleet [%s] previously", - msg->task_profile.task_id.c_str(), - name.c_str()); + "Task [%s] is currently executing by Fleet [%s]", + id.c_str(), name.c_str()); return; } @@ -125,7 +136,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Fleet [%s] is not configured to accept any task requests. Use " "FleetUpdateHadndle::accept_task_requests(~) to define a callback " "for accepting requests", name.c_str()); - return; } @@ -134,10 +144,8 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( RCLCPP_INFO( node->get_logger(), "Fleet [%s] is configured to not accept task [%s]", - name.c_str(), - msg->task_profile.task_id.c_str()); - - return; + name.c_str(), id.c_str()); + return; } if (!task_planner @@ -148,7 +156,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Fleet [%s] is not configured with parameters for task planning." "Use FleetUpdateHandle::set_task_planner_params(~) to set the " "parameters required.", name.c_str()); - return; } @@ -159,7 +166,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( const rmf_traffic::Time start_time = rmf_traffic_ros2::convert(task_profile.description.start_time); // TODO (YV) get rid of ID field in RequestPtr - const std::string id = msg->task_profile.task_id; const auto& graph = (*planner)->get_configuration().graph(); // Generate the priority of the request. The current implementation supports @@ -555,17 +561,7 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - std::unordered_set executed_tasks; - std::unordered_set queued_tasks; - for (const auto& mgr : task_managers) - { - if (mgr->current_task()) - executed_tasks.insert(mgr->current_task()->id()); - for (const auto& tsk : mgr->task_queue()) - queued_tasks.insert(tsk->id()); - } - - if (queued_tasks.find(id) == queued_tasks.end()) + if (!is_queued(id)) { RCLCPP_WARN( node->get_logger(), @@ -578,7 +574,7 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( } // Check if received request is to cancel an active task - if (executed_tasks.find(id) != executed_tasks.end()) + if (is_current(id)) { RCLCPP_WARN( node->get_logger(), @@ -639,28 +635,50 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( auto FleetUpdateHandle::Implementation::is_valid_assignments( Assignments& assignments) const -> bool { - // This checks if the current assignments are all queued in task managers - std::unordered_set queued_tasks; - for (const auto& mgr : task_managers) - { - for (const auto task : mgr->task_queue()) - queued_tasks.insert(task->id()); - } - - size_t assignment_size = 0; + std::size_t assignment_size = 0; + std::size_t queued_task_size = 0; for (const auto& agent : assignments) { for (const auto& a : agent) { // If ID exists doesnt exist in queue_tasks - if (queued_tasks.find(a.request()->id()) == queued_tasks.end()) + if (!is_queued(a.request()->id())) return false; assignment_size++; } } // Also check if size of total queued tasks is the same as assignments - return (queued_tasks.size() == assignment_size); + for (const auto& mgr : task_managers) + queued_task_size += mgr->task_queue().size(); + + return (queued_task_size == assignment_size); +} + +//============================================================================== +bool FleetUpdateHandle::Implementation::is_queued( + const std::string& task_id ) const +{ + std::unordered_set queued_tasks; + for (const auto& mgr : task_managers) + { + for (const auto task : mgr->task_queue()) + queued_tasks.insert(task->id()); + } + + return (queued_tasks.find(task_id) != queued_tasks.end()); +} + +//============================================================================== +bool FleetUpdateHandle::Implementation::is_current( + const std::string& task_id ) const +{ + std::unordered_set current_tasks; + for (const auto& mgr : task_managers) + if (mgr->current_task()) + current_tasks.insert(mgr->current_task()->id()); + + return (current_tasks.find(task_id) != current_tasks.end()); } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index 4ea20cb30..a483ed735 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -296,6 +296,12 @@ class FleetUpdateHandle::Implementation /// invalid if one of the assignments has already begun execution. bool is_valid_assignments(Assignments& assignments) const; + /// Helper function to check if a task is queued in Fleet Adapter + bool is_queued(const std::string& task_id ) const; + + /// Helper function to check if a task is active/executing in Fleet Adapter + bool is_current(const std::string& task_id ) const; + static Implementation& get(FleetUpdateHandle& fleet) { return *fleet._pimpl; diff --git a/rmf_fleet_adapter/test/tasks/test_Delivery.cpp b/rmf_fleet_adapter/test/tasks/test_Delivery.cpp index 5ef1cbfaf..39ac230d0 100644 --- a/rmf_fleet_adapter/test/tasks/test_Delivery.cpp +++ b/rmf_fleet_adapter/test/tasks/test_Delivery.cpp @@ -309,10 +309,8 @@ SCENARIO("Test Delivery") profile }; - auto rcl_context = std::make_shared(); - rcl_context->init(0, nullptr); - rmf_fleet_adapter::agv::test::MockAdapter adapter( - "test_Delivery", rclcpp::NodeOptions().context(rcl_context)); + rclcpp::init(0, nullptr); + rmf_fleet_adapter::agv::test::MockAdapter adapter("test_Delivery"); std::promise completed_promise; bool at_least_one_incomplete = false; @@ -436,4 +434,5 @@ SCENARIO("Test Delivery") CHECK(at_least_one_incomplete); adapter.stop(); + rclcpp::shutdown(); } diff --git a/rmf_fleet_adapter/test/tasks/test_Loop.cpp b/rmf_fleet_adapter/test/tasks/test_Loop.cpp index 665f953f3..3ca1eddee 100644 --- a/rmf_fleet_adapter/test/tasks/test_Loop.cpp +++ b/rmf_fleet_adapter/test/tasks/test_Loop.cpp @@ -129,10 +129,8 @@ SCENARIO("Test loop requests") profile }; - auto rcl_context = std::make_shared(); - rcl_context->init(0, nullptr); - rmf_fleet_adapter::agv::test::MockAdapter adapter( - "test_Loop", rclcpp::NodeOptions().context(rcl_context)); + rclcpp::init(0, nullptr); + rmf_fleet_adapter::agv::test::MockAdapter adapter("test_Loop"); const std::string loop_0 = "loop_0"; std::promise task_0_completed_promise; @@ -178,8 +176,8 @@ SCENARIO("Test loop requests") ++completed_1_count; } - else - CHECK(false); + // else + // CHECK(false); // this will fail as responsiveWait is active } else { @@ -187,8 +185,8 @@ SCENARIO("Test loop requests") at_least_one_incomplete_task_0 = true; else if (msg->task_id == loop_1) at_least_one_incomplete_task_1 = true; - else - CHECK(false); + // else + // CHECK(false); } if (msg->task_id == loop_0) @@ -383,4 +381,7 @@ SCENARIO("Test loop requests") for (const auto& s : finding_a_plan_1_statuses) std::cout << " -- " << s << std::endl; } + + adapter.stop(); + rclcpp::shutdown(); } From 5075f0f7e013acda6fca566cab03a52d0f01c8e4 Mon Sep 17 00:00:00 2001 From: youliang Date: Wed, 5 May 2021 10:24:01 +0800 Subject: [PATCH 3/4] fix from feedbacks Signed-off-by: youliang --- .../src/rmf_fleet_adapter/Task.cpp | 8 +- .../src/rmf_fleet_adapter/Task.hpp | 6 +- .../src/rmf_fleet_adapter/TaskManager.cpp | 6 +- .../agv/FleetUpdateHandle.cpp | 208 ++++++++++++------ .../agv/internal_FleetUpdateHandle.hpp | 13 +- rmf_fleet_adapter/test/tasks/test_Loop.cpp | 4 - 6 files changed, 151 insertions(+), 94 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp index c99e8507c..285746011 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp @@ -90,15 +90,15 @@ const std::string& Task::id() const } //============================================================================== -const rmf_task_msgs::msg::TaskProfile Task::profile_msg() const +const rmf_task_msgs::msg::TaskProfile Task::task_profile_msg() const { - return _profile_msg; + return _task_profile_msg; } //============================================================================== -void Task::profile_msg(const rmf_task_msgs::msg::TaskProfile& profile) +void Task::task_profile_msg(const rmf_task_msgs::msg::TaskProfile& profile) { - _profile_msg = profile; + _task_profile_msg = profile; } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp index 04da355d6..177b401a7 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp @@ -126,10 +126,10 @@ class Task : public std::enable_shared_from_this const std::string& id() const; /// Get the profile msg, for publish status msg - const TaskProfile profile_msg() const; + const TaskProfile task_profile_msg() const; /// Set the profile msg, for publish status msg - void profile_msg(const TaskProfile& profile); + void task_profile_msg(const TaskProfile& profile); /// Get the request used to generate this task const rmf_task::ConstRequestPtr request() const; @@ -169,7 +169,7 @@ class Task : public std::enable_shared_from_this rmf_traffic::Time _deployment_time; rmf_task::agv::State _finish_state; rmf_task::ConstRequestPtr _request; - TaskProfile _profile_msg; + TaskProfile _task_profile_msg; void _start_next_phase(); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index d804e8a35..c2b07969d 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -46,7 +46,7 @@ void populate_task_summary( if (task != nullptr) { msg.task_id = task->id(); // duplicated - msg.task_profile = task->profile_msg(); + msg.task_profile = task->task_profile_msg(); msg.start_time = rmf_traffic_ros2::convert( task->deployment_time()); msg.end_time = rmf_traffic_ros2::convert( @@ -97,7 +97,7 @@ TaskManagerPtr TaskManager::make(agv::RobotContextPtr context) //============================================================================== TaskManager::TaskManager(agv::RobotContextPtr context) - : _context(std::move(context)) +: _context(std::move(context)) { _begin_waiting(); } @@ -257,7 +257,7 @@ void TaskManager::set_queue( continue; } - task->profile_msg(profile); + task->task_profile_msg(profile); task_queue.push_back(task); } this->set_queue(task_queue); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 699b99e29..8015e93f2 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -104,27 +104,27 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( if (task_managers.empty()) return; - const std::string id = msg->task_profile.task_id; + const std::string& id = msg->task_profile.task_id; if (id.empty()) return; - // Will check if tasks is queued or active - if (is_queued(id)) + // Will check if tasks is queued + if (is_task_queued(id)) { RCLCPP_WARN( node->get_logger(), - "Task [%s] is currently queued in Fleet [%s]", + "Task [%s] is currently queued in Fleet [%s], bid notice is ignored", id.c_str(), name.c_str()); return; } // Will check if tasks is an active current task - if (is_current(id)) + if (is_task_active(id)) { RCLCPP_WARN( node->get_logger(), - "Task [%s] is currently executing by Fleet [%s]", + "Task [%s] is currently executing by Fleet [%s], bid notice is ignored", id.c_str(), name.c_str()); return; } @@ -184,7 +184,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( { RCLCPP_ERROR( node->get_logger(), - "Clean Msg is invalid/invalid." + "Required param [clean.start_waypoint] missing in TaskProfile." "Rejecting BidNotice with task_id:[%s]", id.c_str()); return; } @@ -269,46 +269,77 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( else if (task_type.type == TaskType::TYPE_DELIVERY) { - const auto& del = task_profile.description.delivery; + const auto& delivery = task_profile.description.delivery; + + if (delivery.pickup_place_name.empty()) + { + RCLCPP_ERROR( + node->get_logger(), + "Required param [delivery.pickup_place_name] missing in TaskProfile." + "Rejecting BidNotice with task_id:[%s]" , id.c_str()); + + return; + } - if (del.pickup_place_name.empty() || del.pickup_dispenser.empty() || - del.dropoff_place_name.empty() || del.dropoff_ingestor.empty()) + if (delivery.pickup_dispenser.empty()) { - RCLCPP_ERROR(node->get_logger(), - "Delivery Msg is invalid/invalid." - "Rejecting BidNotice with task_id:[%s]", id.c_str()); + RCLCPP_ERROR( + node->get_logger(), + "Required param [delivery.pickup_dispenser] missing in TaskProfile." + "Rejecting BidNotice with task_id:[%s]" , id.c_str()); + return; } - const auto pickup_wp = graph.find_waypoint(del.pickup_place_name); + if (delivery.dropoff_place_name.empty()) + { + RCLCPP_ERROR( + node->get_logger(), + "Required param [delivery.dropoff_place_name] missing in TaskProfile." + "Rejecting BidNotice with task_id:[%s]" , id.c_str()); + + return; + } + + if (delivery.dropoff_ingestor.empty()) + { + RCLCPP_ERROR( + node->get_logger(), + "Required param [delivery.dropoff_ingestor] missing in TaskProfile." + "Rejecting BidNotice with task_id:[%s]" , id.c_str()); + + return; + } + + const auto pickup_wp = graph.find_waypoint(delivery.pickup_place_name); if (!pickup_wp) { RCLCPP_INFO( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), del.pickup_place_name.c_str(), id.c_str()); + name.c_str(), delivery.pickup_place_name.c_str(), id.c_str()); return; } - const auto dropoff_wp = graph.find_waypoint(del.dropoff_place_name); + const auto dropoff_wp = graph.find_waypoint(delivery.dropoff_place_name); if (!dropoff_wp) { RCLCPP_INFO( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), del.dropoff_place_name.c_str(), id.c_str()); + name.c_str(), delivery.dropoff_place_name.c_str(), id.c_str()); return; } new_request = rmf_task::requests::Delivery::make( id, pickup_wp->index(), - del.pickup_dispenser, + delivery.pickup_dispenser, dropoff_wp->index(), - del.dropoff_ingestor, - del.items, + delivery.dropoff_ingestor, + delivery.items, motion_sink, ambient_sink, *planner, @@ -324,12 +355,33 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( { const auto& loop = task_profile.description.loop; - if (loop.start_name.empty() || loop.finish_name.empty() || - loop.num_loops == 0) + if (loop.start_name.empty()) + { + RCLCPP_ERROR( + node->get_logger(), + "Required param [loop.start_name] missing in TaskProfile." + "Rejecting BidNotice with task_id:[%s]" , id.c_str()); + + return; + } + + if (loop.finish_name.empty()) + { + RCLCPP_ERROR( + node->get_logger(), + "Required param [loop.finish_name] missing in TaskProfile." + "Rejecting BidNotice with task_id:[%s]" , id.c_str()); + + return; + } + + if (loop.num_loops < 1) { - RCLCPP_ERROR(node->get_logger(), - "Loop Msg is invalid/invalid." - "Rejecting BidNotice with task_id:[%s]", id.c_str()); + RCLCPP_ERROR( + node->get_logger(), + "Required param [loop.num_loops: %d] in TaskProfile is invalid." + "Rejecting BidNotice with task_id:[%s]" , loop.num_loops, id.c_str()); + return; } @@ -352,7 +404,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), loop.start_name.c_str(), id.c_str()); + name.c_str(), loop.finish_name.c_str(), id.c_str()); return; } @@ -460,7 +512,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( // Store assignments in internal map latest_bid_notice_assignments = {new_request, assignments}; - received_task_profiles.insert({id, task_profile}); } //============================================================================== @@ -527,9 +578,12 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( } // set assignments to taskmangaer of each robot + auto task_profiles = get_task_profiles(); + task_profiles.insert({id, msg->task_profile}); + assert(assignments.size() == task_managers.size()); for (std::size_t i = 0; i < task_managers.size(); ++i) - task_managers[i]->set_queue(assignments[i], received_task_profiles); + task_managers[i]->set_queue(assignments[i], task_profiles); current_assignment_cost = task_planner->compute_cost(assignments); dispatch_ack.success = true; @@ -549,19 +603,21 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( // When a queued task is to be cancelled, we simply re-plan and re-allocate // task assignments for the request set containing all the queued tasks // excluding the task to be cancelled. - if (cancelled_task_ids.find(id) != cancelled_task_ids.end()) + + // Check if received request is to cancel an active task + if (is_task_active(id)) { RCLCPP_WARN( node->get_logger(), - "Request with task_id:[%s] has already been cancelled.", + "Unable to cancel active task with task_id:[%s]. Only queued tasks may " + "be cancelled.", id.c_str()); - dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); return; } - if (!is_queued(id)) + if (!is_task_queued(id)) { RCLCPP_WARN( node->get_logger(), @@ -573,19 +629,6 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - // Check if received request is to cancel an active task - if (is_current(id)) - { - RCLCPP_WARN( - node->get_logger(), - "Unable to cancel active task with task_id:[%s]. Only queued tasks may " - "be cancelled.", - id.c_str()); - - dispatch_ack_pub->publish(dispatch_ack); - return; - } - // Re-plan assignments while ignoring request for task to be cancelled const auto replan_results = allocate_tasks(nullptr, id); @@ -601,16 +644,16 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( } // set assignments to taskmangaer of each robot - const auto& assignments = replan_results.value(); - assert(assignments.size() == task_managers.size()); + const auto& assignments = replan_results.value(); + auto task_profiles = get_task_profiles(); + task_profiles.insert({id, msg->task_profile}); for (std::size_t i = 0; i < task_managers.size(); ++i) - task_managers[i]->set_queue(assignments[i], received_task_profiles); + task_managers[i]->set_queue(assignments[i], task_profiles); current_assignment_cost = task_planner->compute_cost(assignments); dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); - cancelled_task_ids.insert(id); RCLCPP_INFO( node->get_logger(), @@ -641,8 +684,8 @@ auto FleetUpdateHandle::Implementation::is_valid_assignments( { for (const auto& a : agent) { - // If ID exists doesnt exist in queue_tasks - if (!is_queued(a.request()->id())) + // If ID doesnt exist in queue_tasks + if (!is_task_queued(a.request()->id())) return false; assignment_size++; } @@ -656,29 +699,51 @@ auto FleetUpdateHandle::Implementation::is_valid_assignments( } //============================================================================== -bool FleetUpdateHandle::Implementation::is_queued( +auto FleetUpdateHandle::Implementation::get_task_profiles() const + -> std::unordered_map +{ + std::unordered_map task_profiles; + for (const auto& mgr : task_managers) + { + for (const auto task : mgr->task_queue()) + { + task_profiles.insert({task->id(), task->task_profile_msg()}); + } + + const auto task = mgr->current_task(); + if (task) + task_profiles.insert({task->id(), task->task_profile_msg()}); + } + return task_profiles; +} + +//============================================================================== +bool FleetUpdateHandle::Implementation::is_task_queued( const std::string& task_id ) const { std::unordered_set queued_tasks; for (const auto& mgr : task_managers) { for (const auto task : mgr->task_queue()) - queued_tasks.insert(task->id()); + { + if (task->id() == task_id) + return true; + } } - - return (queued_tasks.find(task_id) != queued_tasks.end()); + return false; } //============================================================================== -bool FleetUpdateHandle::Implementation::is_current( +bool FleetUpdateHandle::Implementation::is_task_active( const std::string& task_id ) const { std::unordered_set current_tasks; for (const auto& mgr : task_managers) - if (mgr->current_task()) - current_tasks.insert(mgr->current_task()->id()); - - return (current_tasks.find(task_id) != current_tasks.end()); + { + if (mgr->current_task() && mgr->current_task()->id() == task_id) + return true; + } + return false; } //============================================================================== @@ -823,16 +888,11 @@ auto FleetUpdateHandle::Implementation::allocate_tasks( { states.push_back(t->expected_finish_state()); constraints_set.push_back(t->context()->task_planning_constraints()); - - for (const auto task : t->task_queue()) - { - // ignore auto allocated charging task. - // TODO: categorize a task if it is self-generated by adpater's task planner - const auto type = task->profile_msg().description.task_type.type; - if (type == TaskType::TYPE_CHARGE_BATTERY) - continue; - if (task->id() == ignore_request_id ) + // This will identify the ignored task for allocation + for (const auto req : t->requests()) + { + if (req->id() == ignore_request_id) { RCLCPP_INFO( node->get_logger(), @@ -840,9 +900,8 @@ auto FleetUpdateHandle::Implementation::allocate_tasks( ignore_request_id.c_str()); continue; } - - pending_requests.push_back(task->request()); - } + pending_requests.push_back(req); + } } RCLCPP_INFO( @@ -1125,6 +1184,7 @@ FleetUpdateHandle& FleetUpdateHandle::fleet_state_publish_period( return *this; } +//============================================================================== bool FleetUpdateHandle::set_task_planner_params( std::shared_ptr battery_system, std::shared_ptr motion_sink, @@ -1146,7 +1206,7 @@ bool FleetUpdateHandle::set_task_planner_params( ambient_sink, *_pimpl->planner, _pimpl->cost_calculator); - + _pimpl->task_planner = std::make_shared( std::move(task_config)); @@ -1158,11 +1218,13 @@ bool FleetUpdateHandle::set_task_planner_params( return false; } +//============================================================================== bool FleetUpdateHandle::account_for_battery_drain(bool value) { _pimpl->drain_battery = value; return _pimpl->drain_battery; } + //============================================================================== FleetUpdateHandle& FleetUpdateHandle::set_recharge_threshold( const double threshold) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index a483ed735..ed6a65b52 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -173,15 +173,11 @@ class FleetUpdateHandle::Implementation double current_assignment_cost = 0.0; - // Map to store task id with the profile msg of the received tasks using TaskProfile = rmf_task_msgs::msg::TaskProfile; - std::unordered_map received_task_profiles = {}; - + // Tuple which stored the latest received BidNotice std::tuple latest_bid_notice_assignments; - std::unordered_set cancelled_task_ids = {}; - AcceptTaskRequest accept_task = nullptr; using BidNotice = rmf_task_msgs::msg::BidNotice; @@ -297,10 +293,13 @@ class FleetUpdateHandle::Implementation bool is_valid_assignments(Assignments& assignments) const; /// Helper function to check if a task is queued in Fleet Adapter - bool is_queued(const std::string& task_id ) const; + bool is_task_queued(const std::string& task_id ) const; /// Helper function to check if a task is active/executing in Fleet Adapter - bool is_current(const std::string& task_id ) const; + bool is_task_active(const std::string& task_id ) const; + + /// Helper function to get all current and queued task profiles + std::unordered_map get_task_profiles() const; static Implementation& get(FleetUpdateHandle& fleet) { diff --git a/rmf_fleet_adapter/test/tasks/test_Loop.cpp b/rmf_fleet_adapter/test/tasks/test_Loop.cpp index 3ca1eddee..431db47b2 100644 --- a/rmf_fleet_adapter/test/tasks/test_Loop.cpp +++ b/rmf_fleet_adapter/test/tasks/test_Loop.cpp @@ -176,8 +176,6 @@ SCENARIO("Test loop requests") ++completed_1_count; } - // else - // CHECK(false); // this will fail as responsiveWait is active } else { @@ -185,8 +183,6 @@ SCENARIO("Test loop requests") at_least_one_incomplete_task_0 = true; else if (msg->task_id == loop_1) at_least_one_incomplete_task_1 = true; - // else - // CHECK(false); } if (msg->task_id == loop_0) From cd1b73f29f45e8cb19badd6abf1146f324b4e472 Mon Sep 17 00:00:00 2001 From: youliang Date: Wed, 5 May 2021 16:12:33 +0800 Subject: [PATCH 4/4] move some fns to private, cleanups Signed-off-by: youliang --- .../src/rmf_fleet_adapter/TaskManager.cpp | 8 ++++---- .../src/rmf_fleet_adapter/TaskManager.hpp | 12 ++++++------ .../src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp | 10 +--------- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index c2b07969d..074326833 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -103,7 +103,7 @@ TaskManager::TaskManager(agv::RobotContextPtr context) } //============================================================================== -void TaskManager::queue_task(std::shared_ptr task) +void TaskManager::_queue_task(std::shared_ptr task) { { std::lock_guard guard(_mutex); @@ -170,7 +170,7 @@ const std::vector> TaskManager::task_queue() const } //============================================================================== -void TaskManager::set_queue( +void TaskManager::_set_queue( const std::vector>& tasks) { // We indent this block as _mutex is also locked in the _begin_next_task() @@ -181,7 +181,7 @@ void TaskManager::set_queue( } for (const auto t : tasks) - queue_task(t); + _queue_task(t); } //============================================================================== @@ -260,7 +260,7 @@ void TaskManager::set_queue( task->task_profile_msg(profile); task_queue.push_back(task); } - this->set_queue(task_queue); + this->_set_queue(task_queue); } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index bc12f6b0f..d747b6b09 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -47,9 +47,6 @@ class TaskManager : public std::enable_shared_from_this using State = rmf_task::agv::State; using TaskProfile = rmf_task_msgs::msg::TaskProfile; - /// Add a task to the queue of this manager. - void queue_task(std::shared_ptr task); - const agv::RobotContextPtr& context(); agv::ConstRobotContextPtr context() const; @@ -62,9 +59,6 @@ class TaskManager : public std::enable_shared_from_this const std::vector& assignments, const std::unordered_map& task_profiles = {}); - /// set a vector of tasks - void set_queue(const std::vector>& tasks); - // get tasks in the queue const std::vector> task_queue() const; @@ -108,6 +102,12 @@ class TaskManager : public std::enable_shared_from_this // Use the _register_executed_task() to populate this container. std::vector _executed_task_registry; + /// set a vector of tasks + void _set_queue(const std::vector>& tasks); + + /// Add a task to the queue of this manager. + void _queue_task(std::shared_ptr task); + /// Callback for task timer which begins next task if its deployment time has passed void _begin_next_task(); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 00c2e5b23..8fad0286f 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -678,8 +678,6 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( auto FleetUpdateHandle::Implementation::is_valid_assignments( Assignments& assignments) const -> bool { - std::size_t assignment_size = 0; - std::size_t queued_task_size = 0; for (const auto& agent : assignments) { for (const auto& a : agent) @@ -687,15 +685,9 @@ auto FleetUpdateHandle::Implementation::is_valid_assignments( // If ID doesnt exist in queue_tasks if (!is_task_queued(a.request()->id())) return false; - assignment_size++; } } - - // Also check if size of total queued tasks is the same as assignments - for (const auto& mgr : task_managers) - queued_task_size += mgr->task_queue().size(); - - return (queued_task_size == assignment_size); + return true; } //==============================================================================