diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp index bb342b20e..9eec4e68c 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp @@ -107,6 +107,19 @@ const rmf_task::agv::State Task::finish_state() const return _finish_state; } +//============================================================================== +void Task::task_profile(Task::TaskProfileMsg profile) +{ + _profile = profile; + return; +} + +//============================================================================== +const Task::TaskProfileMsg& Task::task_profile() const +{ + return _profile; +} + //============================================================================== Task::Task( std::string id, diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp index 8d6e402c0..118d57271 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp @@ -95,6 +95,7 @@ class Task : public std::enable_shared_from_this std::function; using PendingPhases = std::vector>; + using TaskProfileMsg = rmf_task_msgs::msg::TaskProfile; // Make a new task static std::shared_ptr make( @@ -133,6 +134,12 @@ class Task : public std::enable_shared_from_this /// Get the finish state of this Task const rmf_task::agv::State finish_state() const; + /// Set the TaskProfile of this task + void task_profile(TaskProfileMsg profile); + + /// Get the TaskProfile of this task + const TaskProfileMsg& task_profile() const; + private: Task( @@ -163,6 +170,8 @@ class Task : public std::enable_shared_from_this rmf_task::agv::State _finish_state; rmf_task::ConstRequestPtr _request; + TaskProfileMsg _profile; + void _start_next_phase(); StatusMsg _process_summary(const StatusMsg& input_msg); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 22e43b4fd..830b24dbf 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -86,35 +86,6 @@ TaskManager::TaskManager(agv::RobotContextPtr context) // Do nothing. The make() function does all further initialization. } -//============================================================================== -void TaskManager::queue_task(std::shared_ptr task, Start expected_finish) -{ - std::lock_guard guard(_mutex); - _queue.push_back(std::move(task)); - _expected_finish_location = std::move(expected_finish); - - RCLCPP_INFO( - _context->node()->get_logger(), - "Queuing new task [%s] for [%s]. New queue size: %ld", - _queue.back()->id().c_str(), - _context->requester_id().c_str(), - _queue.size()); - - if (!_active_task) - { - _begin_next_task(); - } - else - { - rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = _queue.back()->id(); - msg.task_profile.task_id = _queue.back()->id(); - msg.state = msg.STATE_QUEUED; - msg.robot_name = _context->name(); - _context->node()->task_summary()->publish(msg); - } -} - //============================================================================== auto TaskManager::expected_finish_location() const -> StartSet { @@ -164,7 +135,8 @@ agv::ConstRobotContextPtr TaskManager::context() const //============================================================================== void TaskManager::set_queue( - const std::vector& assignments) + const std::vector& assignments, + const TaskManager::TaskProfiles& task_profiles) { // We indent this block as _mutex is also locked in the _begin_next_task() // function that is called at the end of this function. @@ -181,21 +153,27 @@ void TaskManager::set_queue( if (i != 0) start = assignments[i-1].state().location(); start.time(a.deployment_time()); - rmf_task_msgs::msg::TaskType task_type_msg; const auto request = a.request(); + TaskProfileMsg task_profile; + const auto it = task_profiles.find(request->id()); + if (it != task_profiles.end()) + { + task_profile = it->second; + } + using namespace rmf_task::requests; if (std::dynamic_pointer_cast( request->description()) != nullptr) { - task_type_msg.type = task_type_msg.TYPE_CLEAN; auto task = rmf_fleet_adapter::tasks::make_clean( request, _context, start, a.deployment_time(), a.state()); + task->task_profile(task_profile); _queue.push_back(task); } @@ -203,7 +181,6 @@ void TaskManager::set_queue( else if (std::dynamic_pointer_cast( request->description()) != nullptr) { - task_type_msg.type = task_type_msg.TYPE_CHARGE_BATTERY; const auto task = tasks::make_charge_battery( request, _context, @@ -211,19 +188,29 @@ void TaskManager::set_queue( a.deployment_time(), a.state()); + // The TaskProfile for auto-generated tasks such as ChargeBattery will + // need to be manually constructed + task_profile.task_id = request->id(); + task_profile.submission_time = _context->node()->now(); + task_profile.description.start_time = rmf_traffic_ros2::convert( + request->earliest_start_time()); + task_profile.description.task_type.type = + rmf_task_msgs::msg::TaskType::TYPE_CHARGE_BATTERY; + task->task_profile(task_profile); + _queue.push_back(task); } else if (std::dynamic_pointer_cast( request->description()) != nullptr) { - task_type_msg.type = task_type_msg.TYPE_DELIVERY; const auto task = tasks::make_delivery( request, _context, start, a.deployment_time(), a.state()); + task->task_profile(task_profile); _queue.push_back(task); } @@ -231,13 +218,13 @@ void TaskManager::set_queue( else if (std::dynamic_pointer_cast(request-> description()) != nullptr) { - task_type_msg.type = task_type_msg.TYPE_LOOP; const auto task = tasks::make_loop( request, _context, start, a.deployment_time(), a.state()); + task->task_profile(task_profile); _queue.push_back(task); } @@ -255,17 +242,8 @@ void TaskManager::set_queue( } // publish queued task - rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = _queue.back()->id(); - msg.task_profile.task_id = _queue.back()->id(); - msg.state = msg.STATE_QUEUED; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.task_profile.description.task_type = task_type_msg; - msg.start_time = rmf_traffic_ros2::convert( - _queue.back()->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - _queue.back()->finish_state().finish_time()); + TaskSummaryMsg msg; + _populate_task_summary(_queue.back(), msg.STATE_QUEUED, msg); _context->node()->task_summary()->publish(msg); } } @@ -356,30 +334,22 @@ void TaskManager::_begin_next_task() _task_sub = _active_task->observe() .observe_on(rxcpp::identity_same_worker(_context->worker())) .subscribe( - [me = weak_from_this(), id = _active_task->id()](Task::StatusMsg msg) + [me = weak_from_this(), active_task = _active_task](Task::StatusMsg msg) { const auto self = me.lock(); if (!self) return; - msg.task_id = id; - msg.task_profile.task_id = id; - msg.robot_name = self->_context->name(); - msg.fleet_name = self->_context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - self->_active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - self->_active_task->finish_state().finish_time()); + self->_populate_task_summary(active_task, msg.state, msg); self->_context->node()->task_summary()->publish(msg); }, - [me = weak_from_this(), id = _active_task->id()](std::exception_ptr e) + [me = weak_from_this(), active_task = _active_task](std::exception_ptr e) { const auto self = me.lock(); if (!self) return; - rmf_task_msgs::msg::TaskSummary msg; - msg.state = msg.STATE_FAILED; + TaskSummaryMsg msg; try { @@ -390,32 +360,18 @@ void TaskManager::_begin_next_task() msg.status = e.what(); } - msg.task_id = id; - msg.task_profile.task_id = id; - msg.robot_name = self->_context->name(); - msg.fleet_name = self->_context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - self->_active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - self->_active_task->finish_state().finish_time()); + self->_populate_task_summary(active_task, msg.STATE_FAILED, msg); self->_context->node()->task_summary()->publish(msg); }, - [me = weak_from_this(), id = _active_task->id()]() + [me = weak_from_this(), active_task = _active_task]() { const auto self = me.lock(); if (!self) return; - rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = id; - msg.task_profile.task_id = id; - msg.state = msg.STATE_COMPLETED; - msg.robot_name = self->_context->name(); - msg.fleet_name = self->_context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - self->_active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - self->_active_task->finish_state().finish_time()); + TaskSummaryMsg msg; + self->_populate_task_summary( + active_task, msg.STATE_COMPLETED, msg); self->_context->node()->task_summary()->publish(msg); self->_active_task = nullptr; @@ -448,14 +404,7 @@ void TaskManager::_begin_waiting() if (!self) return; - msg.task_id = self->_context->requester_id() + ":waiting"; - msg.robot_name = self->_context->name(); - msg.fleet_name = self->_context->description().owner(); - // TODO: Fill in end_time with the beginning time of the next task if - // the next task is known. - msg.start_time = self->_context->node()->now(); - msg.end_time = msg.start_time; - + self->_populate_task_summary(nullptr, msg.state, msg); self->_context->node()->task_summary()->publish(msg); }, [me = weak_from_this()](std::exception_ptr e) @@ -465,7 +414,6 @@ void TaskManager::_begin_waiting() return; rmf_task_msgs::msg::TaskSummary msg; - msg.state = msg.STATE_FAILED; try { @@ -476,13 +424,7 @@ void TaskManager::_begin_waiting() msg.status = e.what(); } - msg.task_id = self->_context->requester_id() + ":waiting"; - msg.robot_name = self->_context->name(); - msg.fleet_name = self->_context->description().owner(); - msg.start_time = rmf_traffic_ros2::convert( - self->_active_task->deployment_time()); - msg.end_time = rmf_traffic_ros2::convert( - self->_active_task->finish_state().finish_time()); + self->_populate_task_summary(nullptr, msg.STATE_FAILED, msg); self->_context->node()->task_summary()->publish(msg); RCLCPP_WARN( @@ -638,5 +580,39 @@ void TaskManager::_register_executed_task(const std::string& id) _executed_task_registry.push_back(id); } +//============================================================================== +void TaskManager::_populate_task_summary( + std::shared_ptr task, + uint32_t task_summary_state, + TaskManager::TaskSummaryMsg& msg) +{ + if (task == nullptr) // ResponsiveWait + { + msg.task_id = _context->requester_id() + ":waiting"; + + msg.start_time = _context->node()->now(); + msg.end_time = _queue.empty() ? msg.start_time : rmf_traffic_ros2::convert( + _queue.front()->deployment_time()); + // Make the task type explicit + msg.task_profile.description.task_type.type = + rmf_task_msgs::msg::TaskType::TYPE_STATION; + } + + else + { + msg.task_id = task->id(); + msg.start_time = rmf_traffic_ros2::convert( + task->deployment_time()); + msg.end_time = rmf_traffic_ros2::convert( + task->finish_state().finish_time()); + msg.task_profile = task->task_profile(); + } + + msg.fleet_name = _context->description().owner(); + msg.robot_name = _context->name(); + + msg.state = task_summary_state; +} + } // namespace rmf_fleet_adapter diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index e2301f359..f6c764dc5 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -26,6 +26,7 @@ #include #include +#include #include @@ -48,9 +49,9 @@ class TaskManager : public std::enable_shared_from_this using Assignment = rmf_task::agv::TaskPlanner::Assignment; using State = rmf_task::agv::State; using RobotModeMsg = rmf_fleet_msgs::msg::RobotMode; - - /// Add a task to the queue of this manager. - void queue_task(std::shared_ptr task, Start expected_finish); + using TaskProfileMsg = rmf_task_msgs::msg::TaskProfile; + using TaskProfiles = std::unordered_map; + using TaskSummaryMsg = rmf_task_msgs::msg::TaskSummary; /// The location where we expect this robot to be at the end of its current /// task queue. @@ -64,7 +65,9 @@ class TaskManager : public std::enable_shared_from_this /// Set the queue for this task manager with assignments generated from the /// task planner - void set_queue(const std::vector& assignments); + void set_queue( + const std::vector& assignments, + const TaskProfiles& task_profiles = {}); /// Get the non-charging requests among pending tasks const std::vector requests() const; @@ -119,6 +122,11 @@ class TaskManager : public std::enable_shared_from_this /// The input task id will be inserted into the registry such that the max /// size of the registry is 100. void _register_executed_task(const std::string& id); + + void _populate_task_summary( + std::shared_ptr task, + uint32_t task_summary_state, + TaskSummaryMsg& msg); }; using TaskManagerPtr = std::shared_ptr; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 99f09d99f..ab44b806b 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -440,6 +440,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( if (!new_request) return; generated_requests.insert({id, new_request}); + task_profile_map.insert({id, task_profile}); const auto allocation_result = allocate_tasks(new_request); @@ -605,7 +606,7 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( std::size_t index = 0; for (auto& t : task_managers) { - t.second->set_queue(assignments[index]); + t.second->set_queue(assignments[index], task_profile_map); ++index; } @@ -692,7 +693,7 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( std::size_t index = 0; for (auto& t : task_managers) { - t.second->set_queue(assignments[index]); + t.second->set_queue(assignments[index], task_profile_map); ++index; } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index 0bd40b368..3c9eb87ba 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -156,8 +156,6 @@ class FleetUpdateHandle::Implementation // Map task id to pair of using Assignments = rmf_task::agv::TaskPlanner::Assignments; - std::unordered_map> task_map = {}; // Map of dock name to dock parameters std::unordered_map assigned_requests = {}; std::unordered_set cancelled_task_ids = {}; + using TaskProfileMsg = rmf_task_msgs::msg::TaskProfile; + std::unordered_map task_profile_map = {}; AcceptTaskRequest accept_task = nullptr;