Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ const rmf_task::agv::State Task::finish_state() const
return _finish_state;
}

//==============================================================================
void Task::task_profile(Task::TaskProfileMsg profile)
{
_profile = profile;
return;
}

//==============================================================================
const Task::TaskProfileMsg& Task::task_profile() const
{
return _profile;
}

//==============================================================================
Task::Task(
std::string id,
Expand Down
9 changes: 9 additions & 0 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Task : public std::enable_shared_from_this<Task>
std::function<void(const StatusMsg&)>;

using PendingPhases = std::vector<std::unique_ptr<PendingPhase>>;
using TaskProfileMsg = rmf_task_msgs::msg::TaskProfile;

// Make a new task
static std::shared_ptr<Task> make(
Expand Down Expand Up @@ -133,6 +134,12 @@ class Task : public std::enable_shared_from_this<Task>
/// Get the finish state of this Task
const rmf_task::agv::State finish_state() const;

/// Set the TaskProfile of this task
void task_profile(TaskProfileMsg profile);

/// Get the TaskProfile of this task
const TaskProfileMsg& task_profile() const;

private:

Task(
Expand Down Expand Up @@ -163,6 +170,8 @@ class Task : public std::enable_shared_from_this<Task>
rmf_task::agv::State _finish_state;
rmf_task::ConstRequestPtr _request;

TaskProfileMsg _profile;

void _start_next_phase();

StatusMsg _process_summary(const StatusMsg& input_msg);
Expand Down
162 changes: 69 additions & 93 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,35 +86,6 @@ TaskManager::TaskManager(agv::RobotContextPtr context)
// Do nothing. The make() function does all further initialization.
}

//==============================================================================
void TaskManager::queue_task(std::shared_ptr<Task> task, Start expected_finish)
{
std::lock_guard<std::mutex> guard(_mutex);
_queue.push_back(std::move(task));
_expected_finish_location = std::move(expected_finish);

RCLCPP_INFO(
_context->node()->get_logger(),
"Queuing new task [%s] for [%s]. New queue size: %ld",
_queue.back()->id().c_str(),
_context->requester_id().c_str(),
_queue.size());

if (!_active_task)
{
_begin_next_task();
}
else
{
rmf_task_msgs::msg::TaskSummary msg;
msg.task_id = _queue.back()->id();
msg.task_profile.task_id = _queue.back()->id();
msg.state = msg.STATE_QUEUED;
msg.robot_name = _context->name();
_context->node()->task_summary()->publish(msg);
}
}

//==============================================================================
auto TaskManager::expected_finish_location() const -> StartSet
{
Expand Down Expand Up @@ -164,7 +135,8 @@ agv::ConstRobotContextPtr TaskManager::context() const

//==============================================================================
void TaskManager::set_queue(
const std::vector<TaskManager::Assignment>& assignments)
const std::vector<TaskManager::Assignment>& assignments,
const TaskManager::TaskProfiles& task_profiles)
{
// We indent this block as _mutex is also locked in the _begin_next_task()
// function that is called at the end of this function.
Expand All @@ -181,63 +153,78 @@ void TaskManager::set_queue(
if (i != 0)
start = assignments[i-1].state().location();
start.time(a.deployment_time());
rmf_task_msgs::msg::TaskType task_type_msg;
const auto request = a.request();

TaskProfileMsg task_profile;
const auto it = task_profiles.find(request->id());
if (it != task_profiles.end())
{
task_profile = it->second;
}

using namespace rmf_task::requests;

if (std::dynamic_pointer_cast<const Clean::Description>(
request->description()) != nullptr)
{
task_type_msg.type = task_type_msg.TYPE_CLEAN;
auto task = rmf_fleet_adapter::tasks::make_clean(
request,
_context,
start,
a.deployment_time(),
a.state());
task->task_profile(task_profile);

_queue.push_back(task);
}

else if (std::dynamic_pointer_cast<const ChargeBattery::Description>(
request->description()) != nullptr)
{
task_type_msg.type = task_type_msg.TYPE_CHARGE_BATTERY;
const auto task = tasks::make_charge_battery(
request,
_context,
start,
a.deployment_time(),
a.state());

// The TaskProfile for auto-generated tasks such as ChargeBattery will
// need to be manually constructed
task_profile.task_id = request->id();
task_profile.submission_time = _context->node()->now();
task_profile.description.start_time = rmf_traffic_ros2::convert(
request->earliest_start_time());
task_profile.description.task_type.type =
rmf_task_msgs::msg::TaskType::TYPE_CHARGE_BATTERY;
task->task_profile(task_profile);

_queue.push_back(task);
}

else if (std::dynamic_pointer_cast<const Delivery::Description>(
request->description()) != nullptr)
{
task_type_msg.type = task_type_msg.TYPE_DELIVERY;
const auto task = tasks::make_delivery(
request,
_context,
start,
a.deployment_time(),
a.state());
task->task_profile(task_profile);

_queue.push_back(task);
}

else if (std::dynamic_pointer_cast<const Loop::Description>(request->
description()) != nullptr)
{
task_type_msg.type = task_type_msg.TYPE_LOOP;
const auto task = tasks::make_loop(
request,
_context,
start,
a.deployment_time(),
a.state());
task->task_profile(task_profile);

_queue.push_back(task);
}
Expand All @@ -255,17 +242,8 @@ void TaskManager::set_queue(
}

// publish queued task
rmf_task_msgs::msg::TaskSummary msg;
msg.task_id = _queue.back()->id();
msg.task_profile.task_id = _queue.back()->id();
msg.state = msg.STATE_QUEUED;
msg.robot_name = _context->name();
msg.fleet_name = _context->description().owner();
msg.task_profile.description.task_type = task_type_msg;
msg.start_time = rmf_traffic_ros2::convert(
_queue.back()->deployment_time());
msg.end_time = rmf_traffic_ros2::convert(
_queue.back()->finish_state().finish_time());
TaskSummaryMsg msg;
_populate_task_summary(_queue.back(), msg.STATE_QUEUED, msg);
_context->node()->task_summary()->publish(msg);
}
}
Expand Down Expand Up @@ -356,30 +334,22 @@ void TaskManager::_begin_next_task()
_task_sub = _active_task->observe()
.observe_on(rxcpp::identity_same_worker(_context->worker()))
.subscribe(
[me = weak_from_this(), id = _active_task->id()](Task::StatusMsg msg)
[me = weak_from_this(), active_task = _active_task](Task::StatusMsg msg)
{
const auto self = me.lock();
if (!self)
return;

msg.task_id = id;
msg.task_profile.task_id = id;
msg.robot_name = self->_context->name();
msg.fleet_name = self->_context->description().owner();
msg.start_time = rmf_traffic_ros2::convert(
self->_active_task->deployment_time());
msg.end_time = rmf_traffic_ros2::convert(
self->_active_task->finish_state().finish_time());
self->_populate_task_summary(active_task, msg.state, msg);
self->_context->node()->task_summary()->publish(msg);
},
[me = weak_from_this(), id = _active_task->id()](std::exception_ptr e)
[me = weak_from_this(), active_task = _active_task](std::exception_ptr e)
{
const auto self = me.lock();
if (!self)
return;

rmf_task_msgs::msg::TaskSummary msg;
msg.state = msg.STATE_FAILED;
TaskSummaryMsg msg;

try
{
Expand All @@ -390,32 +360,18 @@ void TaskManager::_begin_next_task()
msg.status = e.what();
}

msg.task_id = id;
msg.task_profile.task_id = id;
msg.robot_name = self->_context->name();
msg.fleet_name = self->_context->description().owner();
msg.start_time = rmf_traffic_ros2::convert(
self->_active_task->deployment_time());
msg.end_time = rmf_traffic_ros2::convert(
self->_active_task->finish_state().finish_time());
self->_populate_task_summary(active_task, msg.STATE_FAILED, msg);
self->_context->node()->task_summary()->publish(msg);
},
[me = weak_from_this(), id = _active_task->id()]()
[me = weak_from_this(), active_task = _active_task]()
{
const auto self = me.lock();
if (!self)
return;

rmf_task_msgs::msg::TaskSummary msg;
msg.task_id = id;
msg.task_profile.task_id = id;
msg.state = msg.STATE_COMPLETED;
msg.robot_name = self->_context->name();
msg.fleet_name = self->_context->description().owner();
msg.start_time = rmf_traffic_ros2::convert(
self->_active_task->deployment_time());
msg.end_time = rmf_traffic_ros2::convert(
self->_active_task->finish_state().finish_time());
TaskSummaryMsg msg;
self->_populate_task_summary(
active_task, msg.STATE_COMPLETED, msg);
self->_context->node()->task_summary()->publish(msg);

self->_active_task = nullptr;
Expand Down Expand Up @@ -448,14 +404,7 @@ void TaskManager::_begin_waiting()
if (!self)
return;

msg.task_id = self->_context->requester_id() + ":waiting";
msg.robot_name = self->_context->name();
msg.fleet_name = self->_context->description().owner();
// TODO: Fill in end_time with the beginning time of the next task if
// the next task is known.
msg.start_time = self->_context->node()->now();
msg.end_time = msg.start_time;

self->_populate_task_summary(nullptr, msg.state, msg);
self->_context->node()->task_summary()->publish(msg);
},
[me = weak_from_this()](std::exception_ptr e)
Expand All @@ -465,7 +414,6 @@ void TaskManager::_begin_waiting()
return;

rmf_task_msgs::msg::TaskSummary msg;
msg.state = msg.STATE_FAILED;

try
{
Expand All @@ -476,13 +424,7 @@ void TaskManager::_begin_waiting()
msg.status = e.what();
}

msg.task_id = self->_context->requester_id() + ":waiting";
msg.robot_name = self->_context->name();
msg.fleet_name = self->_context->description().owner();
msg.start_time = rmf_traffic_ros2::convert(
self->_active_task->deployment_time());
msg.end_time = rmf_traffic_ros2::convert(
self->_active_task->finish_state().finish_time());
self->_populate_task_summary(nullptr, msg.STATE_FAILED, msg);
self->_context->node()->task_summary()->publish(msg);

RCLCPP_WARN(
Expand Down Expand Up @@ -638,5 +580,39 @@ void TaskManager::_register_executed_task(const std::string& id)
_executed_task_registry.push_back(id);
}

//==============================================================================
void TaskManager::_populate_task_summary(
std::shared_ptr<Task> task,
uint32_t task_summary_state,
TaskManager::TaskSummaryMsg& msg)
{
if (task == nullptr) // ResponsiveWait
{
msg.task_id = _context->requester_id() + ":waiting";

msg.start_time = _context->node()->now();
msg.end_time = _queue.empty() ? msg.start_time : rmf_traffic_ros2::convert(
_queue.front()->deployment_time());
// Make the task type explicit
msg.task_profile.description.task_type.type =
rmf_task_msgs::msg::TaskType::TYPE_STATION;
}

else
{
msg.task_id = task->id();
msg.start_time = rmf_traffic_ros2::convert(
task->deployment_time());
msg.end_time = rmf_traffic_ros2::convert(
task->finish_state().finish_time());
msg.task_profile = task->task_profile();
}

msg.fleet_name = _context->description().owner();
msg.robot_name = _context->name();

msg.state = task_summary_state;
}


} // namespace rmf_fleet_adapter
16 changes: 12 additions & 4 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <rmf_task/agv/TaskPlanner.hpp>

#include <rmf_fleet_msgs/msg/robot_mode.hpp>
#include <rmf_task_msgs/msg/task_summary.hpp>

#include <mutex>

Expand All @@ -48,9 +49,9 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>
using Assignment = rmf_task::agv::TaskPlanner::Assignment;
using State = rmf_task::agv::State;
using RobotModeMsg = rmf_fleet_msgs::msg::RobotMode;

/// Add a task to the queue of this manager.
void queue_task(std::shared_ptr<Task> task, Start expected_finish);
using TaskProfileMsg = rmf_task_msgs::msg::TaskProfile;
using TaskProfiles = std::unordered_map<std::string, TaskProfileMsg>;
using TaskSummaryMsg = rmf_task_msgs::msg::TaskSummary;

/// The location where we expect this robot to be at the end of its current
/// task queue.
Expand All @@ -64,7 +65,9 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>

/// Set the queue for this task manager with assignments generated from the
/// task planner
void set_queue(const std::vector<Assignment>& assignments);
void set_queue(
const std::vector<Assignment>& assignments,
const TaskProfiles& task_profiles = {});

/// Get the non-charging requests among pending tasks
const std::vector<rmf_task::ConstRequestPtr> requests() const;
Expand Down Expand Up @@ -119,6 +122,11 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>
/// The input task id will be inserted into the registry such that the max
/// size of the registry is 100.
void _register_executed_task(const std::string& id);

void _populate_task_summary(
std::shared_ptr<Task> task,
uint32_t task_summary_state,
TaskSummaryMsg& msg);
};

using TaskManagerPtr = std::shared_ptr<TaskManager>;
Expand Down
Loading