diff --git a/src/bindings/python/sprokit/pipeline/datum.cxx b/src/bindings/python/sprokit/pipeline/datum.cxx index db989ed4..bb149c5e 100644 --- a/src/bindings/python/sprokit/pipeline/datum.cxx +++ b/src/bindings/python/sprokit/pipeline/datum.cxx @@ -1,5 +1,5 @@ /*ckwg +29 - * Copyright 2011-2012 by Kitware, Inc. + * Copyright 2011-2013 by Kitware, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -59,6 +59,7 @@ static sprokit::datum_t new_datum(object const& obj); static sprokit::datum::type_t datum_type(sprokit::datum_t const& self); static sprokit::datum::error_t datum_get_error(sprokit::datum_t const& self); static object datum_get_datum(sprokit::datum_t const& self); +static bool datum_eq(sprokit::datum_t const& self, sprokit::datum_t const& other); BOOST_PYTHON_MODULE(datum) { @@ -97,6 +98,7 @@ BOOST_PYTHON_MODULE(datum) , "The error contained within the datum packet.") .def("get_datum", &datum_get_datum , "Get the data contained within the packet.") + .def("__eq__", &datum_eq) ; sprokit::python::register_type(0); @@ -139,11 +141,17 @@ datum_get_error(sprokit::datum_t const& self) object datum_get_datum(sprokit::datum_t const& self) { + boost::any const any = self->get_datum(); + sprokit::python::python_gil const gil; (void)gil; - boost::any const any = self->get_datum(); - return object(any); } + +bool +datum_eq(sprokit::datum_t const& self, sprokit::datum_t const& other) +{ + return (*self == *other); +} diff --git a/src/bindings/python/sprokit/pipeline/edge.cxx b/src/bindings/python/sprokit/pipeline/edge.cxx index 9841e3fb..b2e1833e 100644 --- a/src/bindings/python/sprokit/pipeline/edge.cxx +++ b/src/bindings/python/sprokit/pipeline/edge.cxx @@ -1,5 +1,5 @@ /*ckwg +29 - * Copyright 2011-2012 by Kitware, Inc. + * Copyright 2011-2013 by Kitware, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -32,8 +32,10 @@ #include #include +#include #include +#include #include #include #include @@ -46,12 +48,17 @@ using namespace boost::python; +static bool edge_datum_eq(sprokit::edge_datum_t const& self, sprokit::edge_datum_t const& rhs); +static bool edge_try_push_datum(sprokit::edge_t const& self, sprokit::edge_datum_t const& datum, double duration); +static boost::optional edge_try_get_datum(sprokit::edge_t const& self, double duration); + BOOST_PYTHON_MODULE(edge) { class_("EdgeDatum" , no_init) .def(init<>()) .def(init()) + .def("__eq__", &edge_datum_eq) .def_readwrite("datum", &sprokit::edge_datum_t::datum) .def_readwrite("stamp", &sprokit::edge_datum_t::stamp) ; @@ -64,6 +71,8 @@ BOOST_PYTHON_MODULE(edge) .def(vector_indexing_suite()) ; + sprokit::python::register_optional_converter("EdgeDatumOpt", "An optional edge datum."); + class_("Edge" , "A communication channel between processes." , no_init) @@ -87,6 +96,12 @@ BOOST_PYTHON_MODULE(edge) , "Returns the next datum packet from the edge.") .def("pop_datum", &sprokit::edge::pop_datum , "Remove the next datum packet from the edge.") + .def("try_push_datum", &edge_try_push_datum + , (arg("datum"), arg("duration")) + , "Pushes a datum packet into the edge and returns True or returns False if unable to meet the duration.") + .def("try_get_datum", &edge_try_get_datum + , (arg("duration")) + , "Returns the next datum packet from the edge, removing it in the process or None if unable to meet the duration.") .def("set_upstream_process", &sprokit::edge::set_upstream_process , (arg("process")) , "Set the process which is feeding data into the edge.") @@ -101,3 +116,34 @@ BOOST_PYTHON_MODULE(edge) .def_readonly("config_capacity", &sprokit::edge::config_capacity) ; } + +bool +edge_datum_eq(sprokit::edge_datum_t const& self, sprokit::edge_datum_t const& rhs) +{ + return (self == rhs); +} + +namespace +{ + +typedef boost::chrono::duration py_duration_t; + +} + +bool +edge_try_push_datum(sprokit::edge_t const& self, sprokit::edge_datum_t const& datum, double duration) +{ + py_duration_t const duration_sec = py_duration_t(duration); + sprokit::edge::duration_t const duration_edge = boost::chrono::duration_cast(duration_sec); + + return self->try_push_datum(datum, duration_edge); +} + +boost::optional +edge_try_get_datum(sprokit::edge_t const& self, double duration) +{ + py_duration_t const duration_sec = py_duration_t(duration); + sprokit::edge::duration_t const duration_edge = boost::chrono::duration_cast(duration_sec); + + return self->try_get_datum(duration_edge); +} diff --git a/src/sprokit/pipeline/CMakeLists.txt b/src/sprokit/pipeline/CMakeLists.txt index 1a2762f9..7be7981a 100644 --- a/src/sprokit/pipeline/CMakeLists.txt +++ b/src/sprokit/pipeline/CMakeLists.txt @@ -153,11 +153,12 @@ sprokit_add_library(sprokit_pipeline SHARED ${pipeline_headers} ${pipeline_private_headers}) target_link_libraries(sprokit_pipeline - LINK_PRIVATE + LINK_PUBLIC ${Boost_CHRONO_LIBRARY} + ${Boost_SYSTEM_LIBRARY} + LINK_PRIVATE ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} - ${Boost_SYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/src/sprokit/pipeline/datum.cxx b/src/sprokit/pipeline/datum.cxx index 30c65d4c..62d9de77 100644 --- a/src/sprokit/pipeline/datum.cxx +++ b/src/sprokit/pipeline/datum.cxx @@ -89,6 +89,46 @@ ::get_error() const return m_error; } +static bool any_equal(boost::any const& a, boost::any const& b); + +bool +datum +::operator == (datum const& dat) const +{ + if (this == &dat) + { + return true; + } + + if (m_type != dat.m_type) + { + return false; + } + + bool ret = false; + + switch (m_type) + { + case data: + ret = any_equal(m_datum, dat.m_datum); + break; + case empty: + case flush: + case complete: + ret = true; + break; + case error: + ret = (m_error == dat.m_error); + break; + case invalid: + default: + ret = false; + break; + } + + return ret; +} + datum ::datum(type_t ty) : m_type(ty) @@ -167,6 +207,23 @@ bad_datum_cast_exception { } +bool +any_equal(boost::any const& a, boost::any const& b) +{ + if (a.empty() && b.empty()) + { + return true; + } + + if (a.type() != b.type()) + { + return false; + } + + // Be safe. + return false; +} + char const* string_for_type(datum::type_t type) { diff --git a/src/sprokit/pipeline/datum.h b/src/sprokit/pipeline/datum.h index ec57319d..1e4f7ae0 100644 --- a/src/sprokit/pipeline/datum.h +++ b/src/sprokit/pipeline/datum.h @@ -36,6 +36,7 @@ #include "types.h" #include +#include #include @@ -56,6 +57,7 @@ namespace sprokit * \ingroup base_classes */ class SPROKIT_PIPELINE_EXPORT datum + : boost::equality_comparable { public: /// Information about an error that occurred within a process. @@ -148,6 +150,19 @@ class SPROKIT_PIPELINE_EXPORT datum */ template T get_datum() const; + + /** + * \brief Compare two data for equality. + * + * \note This returns false for two data packets which point to the same + * internal data since \c boost::any does not give access to it without + * knowing the type. + * + * \param dat The datum to compare to. + * + * \returns True if \p dat and \c *this definitely have the same value, false otherwise. + */ + bool operator == (datum const& dat) const; private: SPROKIT_PIPELINE_NO_EXPORT datum(type_t ty); SPROKIT_PIPELINE_NO_EXPORT datum(error_t const& err); diff --git a/src/sprokit/pipeline/edge.cxx b/src/sprokit/pipeline/edge.cxx index 20507372..539a9b21 100644 --- a/src/sprokit/pipeline/edge.cxx +++ b/src/sprokit/pipeline/edge.cxx @@ -37,6 +37,8 @@ #include #include #include +#include +#include #include #include @@ -69,12 +71,15 @@ edge_datum_t { } +template +static bool pointers_equal(T const& a, T const& b); + bool edge_datum_t ::operator == (edge_datum_t const& rhs) const { - return (( datum == rhs.datum) && - (*stamp == *rhs.stamp)); + return (pointers_equal(datum, rhs.datum) && + pointers_equal(stamp, rhs.stamp)); } config::key_t const edge::config_dependency = config::key_t("_dependency"); @@ -88,10 +93,12 @@ class edge::priv typedef boost::weak_ptr process_ref_t; - bool has_data() const; bool full_of_data() const; void complete_check() const; + bool push(edge_datum_t const& datum, boost::optional const& duration = boost::none); + boost::optional pop(boost::optional const& duration = boost::none); + bool const depends; size_t const capacity; bool downstream_complete; @@ -151,7 +158,7 @@ ::has_data() const (void)lock; - return d->has_data(); + return !d->q.empty(); } bool @@ -180,67 +187,14 @@ void edge ::push_datum(edge_datum_t const& datum) { - { - priv::shared_lock_t const lock(d->complete_mutex); - - (void)lock; - - if (d->downstream_complete) - { - return; - } - } - - { - priv::upgrade_lock_t lock(d->mutex); - - while (d->full_of_data()) - { - d->cond_have_space.wait(lock); - } - - { - priv::upgrade_to_unique_lock_t const write_lock(lock); - - (void)write_lock; - - d->q.push_back(datum); - } - } - - d->cond_have_data.notify_one(); + d->push(datum); } edge_datum_t edge ::get_datum() { - d->complete_check(); - - edge_datum_t dat; - - { - priv::upgrade_lock_t lock(d->mutex); - - while (!d->has_data()) - { - d->cond_have_data.wait(lock); - } - - dat = d->q.front(); - - { - priv::upgrade_to_unique_lock_t const write_lock(lock); - - (void)write_lock; - - d->q.pop_front(); - } - } - - d->cond_have_space.notify_one(); - - return dat; + return *d->pop(); } edge_datum_t @@ -251,10 +205,8 @@ ::peek_datum(size_t idx) const priv::shared_lock_t lock(d->mutex); - while (d->q.size() <= idx) - { - d->cond_have_data.wait(lock); - } + d->cond_have_data.wait(lock, + boost::bind(&priv::edge_queue_t::size, &d->q) > idx); return d->q.at(idx); } @@ -268,10 +220,8 @@ ::pop_datum() { priv::upgrade_lock_t lock(d->mutex); - while (!d->has_data()) - { - d->cond_have_data.wait(lock); - } + d->cond_have_data.wait(lock, + !boost::bind(&priv::edge_queue_t::empty, &d->q)); { priv::upgrade_to_unique_lock_t const write_lock(lock); @@ -285,6 +235,20 @@ ::pop_datum() d->cond_have_space.notify_one(); } +bool +edge +::try_push_datum(edge_datum_t const& datum, duration_t const& duration) +{ + return d->push(datum, duration); +} + +boost::optional +edge +::try_get_datum(duration_t const& duration) +{ + return d->pop(duration); +} + void edge ::mark_downstream_as_complete() @@ -374,13 +338,6 @@ edge::priv { } -bool -edge::priv -::has_data() const -{ - return !q.empty(); -} - bool edge::priv ::full_of_data() const @@ -390,7 +347,7 @@ ::full_of_data() const return false; } - return (q.size() == capacity); + return (capacity <= q.size()); } void @@ -407,4 +364,106 @@ ::complete_check() const } } +bool +edge::priv +::push(edge_datum_t const& datum, boost::optional const& duration) +{ + { + shared_lock_t const lock(complete_mutex); + + (void)lock; + + if (downstream_complete) + { + return true; + } + } + + { + upgrade_lock_t lock(mutex); + boost::function const predicate = !boost::bind(&sprokit::edge::priv::full_of_data, this); + + if (duration) + { + if (!cond_have_space.wait_for(lock, *duration, predicate)) + { + return false; + } + } + else + { + cond_have_space.wait(lock, predicate); + } + + { + upgrade_to_unique_lock_t const write_lock(lock); + + (void)write_lock; + + q.push_back(datum); + } + } + + cond_have_data.notify_one(); + + return true; +} + +boost::optional +edge::priv +::pop(boost::optional const& duration) +{ + complete_check(); + + edge_datum_t dat; + + { + upgrade_lock_t lock(mutex); + boost::function const predicate = !boost::bind(&edge_queue_t::empty, &q); + + if (duration) + { + if (!cond_have_data.wait_for(lock, *duration, predicate)) + { + return boost::none; + } + } + else + { + cond_have_data.wait(lock, predicate); + } + + dat = q.front(); + + { + upgrade_to_unique_lock_t const write_lock(lock); + + (void)write_lock; + + q.pop_front(); + } + } + + cond_have_space.notify_one(); + + return dat; +} + +template +bool +pointers_equal(T const& a, T const& b) +{ + if (a == b) + { + return true; + } + + if (!a || !b) + { + return false; + } + + return (*a == *b); +} + } diff --git a/src/sprokit/pipeline/edge.h b/src/sprokit/pipeline/edge.h index 55b2a48c..bb7fc8c7 100644 --- a/src/sprokit/pipeline/edge.h +++ b/src/sprokit/pipeline/edge.h @@ -36,8 +36,10 @@ #include "config.h" #include "types.h" +#include #include #include +#include #include #include @@ -234,6 +236,29 @@ class SPROKIT_PIPELINE_EXPORT edge */ void pop_datum(); + typedef boost::chrono::high_resolution_clock clock_t; + typedef clock_t::duration duration_t; + + /** + * \brief Push a datum into the edge. + * + * \see push_datum + * + * \param datum The datum to put into the edge. + * \param duration The maximum amount of time to wait. + */ + bool try_push_datum(edge_datum_t const& datum, duration_t const& duration); + /** + * \brief Extract a datum from the edge or fail if a timeout is reached. + * + * \see get_datum + * + * \param duration The maximum amount of time to wait. + * + * \returns The next datum available from the edge, or \c boost::none if the timeout was reached. + */ + boost::optional try_get_datum(duration_t const& duration); + /** * \brief Trigger the edge to flush all data and not accept any more data. * diff --git a/tests/bindings/python/sprokit/pipeline/test-datum.py b/tests/bindings/python/sprokit/pipeline/test-datum.py index 26033e8b..c59528b8 100755 --- a/tests/bindings/python/sprokit/pipeline/test-datum.py +++ b/tests/bindings/python/sprokit/pipeline/test-datum.py @@ -123,6 +123,16 @@ def test_error_(): test_error("An error datum does not have None as its data") +def test_compare(): + from sprokit.pipeline import datum + + d1 = datum.complete() + d2 = datum.complete() + + if not d1 == d2: + test_error("A complete datum is not equal to a complete datum") + + if __name__ == '__main__': import os import sys diff --git a/tests/bindings/python/sprokit/pipeline/test-edge.py b/tests/bindings/python/sprokit/pipeline/test-edge.py index 2ce6d46c..810919da 100755 --- a/tests/bindings/python/sprokit/pipeline/test-edge.py +++ b/tests/bindings/python/sprokit/pipeline/test-edge.py @@ -60,6 +60,7 @@ def test_datum_create(): edge.EdgeData() +# TEST_PROPERTY(TIMEOUT, 5) def test_api_calls(): from sprokit.pipeline import config from sprokit.pipeline import datum @@ -86,8 +87,24 @@ def test_api_calls(): e.push_datum(ed) e.peek_datum() + e.push_datum(ed) + e.peek_datum(1) + e.pop_datum() e.pop_datum() + ed == ed + + wait = 1 + fail = e.try_get_datum(wait) + e.try_push_datum(ed, wait) + succeed = e.try_get_datum(wait) + + if fail is not None: + test_error("Received a datum when that should have timed out") + + if succeed is None: + test_error("Did not receive a datum from a get that should have succeeded") + modules.load_known_modules() reg = process_registry.ProcessRegistry.self() diff --git a/tests/sprokit/pipeline/CMakeLists.txt b/tests/sprokit/pipeline/CMakeLists.txt index 5662b5e6..cbf5fbf9 100644 --- a/tests/sprokit/pipeline/CMakeLists.txt +++ b/tests/sprokit/pipeline/CMakeLists.txt @@ -149,9 +149,9 @@ function (sprokit_add_tooled_run_test group instance) endif () endif () - sprokit_add_tooled_test(${group} ${instance}-${scheduler}) + sprokit_add_tooled_test("${group}" "${instance}-${scheduler}") - set_tests_properties(test-${group}-${instance}-${scheduler} + set_tests_properties("test-${group}-${instance}-${scheduler}" PROPERTIES TIMEOUT 5) diff --git a/tests/sprokit/pipeline/test_datum.cxx b/tests/sprokit/pipeline/test_datum.cxx index c9d3d6f9..e56e4439 100644 --- a/tests/sprokit/pipeline/test_datum.cxx +++ b/tests/sprokit/pipeline/test_datum.cxx @@ -149,3 +149,107 @@ IMPLEMENT_TEST(new) dat->get_datum(), "retrieving an int as a string"); } + +IMPLEMENT_TEST(equality) +{ + sprokit::datum_t const empty1 = sprokit::datum::empty_datum(); + sprokit::datum_t const empty2 = sprokit::datum::empty_datum(); + sprokit::datum_t const flush1 = sprokit::datum::flush_datum(); + sprokit::datum_t const flush2 = sprokit::datum::flush_datum(); + sprokit::datum_t const complete1 = sprokit::datum::complete_datum(); + sprokit::datum_t const complete2 = sprokit::datum::complete_datum(); + + sprokit::datum::error_t const errora = sprokit::datum::error_t("An error"); + sprokit::datum::error_t const errorb = sprokit::datum::error_t("Another error"); + + sprokit::datum_t const error1a = sprokit::datum::error_datum(errora); + sprokit::datum_t const error2a = sprokit::datum::error_datum(errora); + sprokit::datum_t const error1b = sprokit::datum::error_datum(errorb); + sprokit::datum_t const error2b = sprokit::datum::error_datum(errorb); + + boost::any const dummy1 = boost::any(); + boost::any const dummy2 = boost::any(); + boost::any const in_value1 = boost::any(1); + boost::any const in_value2 = boost::any(2); + sprokit::datum_t const value_dummy1 = sprokit::datum::new_datum(dummy1); + sprokit::datum_t const value_dummy2 = sprokit::datum::new_datum(dummy2); + sprokit::datum_t const value1 = sprokit::datum::new_datum(in_value1); + sprokit::datum_t const value2a = sprokit::datum::new_datum(in_value2); + sprokit::datum_t const value2b = sprokit::datum::new_datum(in_value2); + +#define test_equality(a, b, type, desc) \ + do \ + { \ + if (*a != *b) \ + { \ + TEST_ERROR("Expected a datum with " \ + "type " type " to be " \ + "equal: " desc); \ + } \ + } while (false) + +#define test_self_equality(a, type) \ + test_equality(a, a, type, "self comparison") + + test_self_equality(empty1, "empty"); + test_equality(empty1, empty2, "empty", "all empty data are equivalent"); + + test_self_equality(flush1, "flush"); + test_equality(flush1, flush2, "flush", "all flush data are equivalent"); + + test_self_equality(complete1, "complete"); + test_equality(complete1, complete2, "complete", "all complete data are equivalent"); + + test_self_equality(error1a, "error"); + test_equality(error1a, error2a, "error", "all error data with the same error string are equivalent"); + + test_self_equality(error1b, "error"); + test_equality(error1b, error2b, "error", "all error data with the same error string are equivalent"); + + test_self_equality(value_dummy1, "data"); + test_equality(value_dummy1, value_dummy2, "data", "empty internal data"); + + test_self_equality(value1, "data"); + /// \todo Is this possible? + //test_equality(value2a, value2b, "data", "same internal data value"); + +#undef test_self_equality +#undef test_equality + +#define test_inequality(a, b, atype, btype, desc) \ + do \ + { \ + if (*a == *b) \ + { \ + TEST_ERROR("Expected a datum with type " \ + atype " to be unequal to a " \ + "with type " btype ": " desc); \ + } \ + } while (false) + + test_inequality(empty1, flush1, "empty", "flush", "different types"); + test_inequality(empty1, complete1, "empty", "complete", "different types"); + test_inequality(empty1, error1a, "empty", "error", "different types"); + test_inequality(empty1, error1b, "empty", "error", "different types"); + test_inequality(empty1, value_dummy1, "empty", "data", "different types"); + test_inequality(empty1, value1, "empty", "data", "different types"); + + test_inequality(flush1, complete1, "flush", "complete", "different types"); + test_inequality(flush1, error1a, "flush", "error", "different types"); + test_inequality(flush1, error1b, "flush", "error", "different types"); + test_inequality(flush1, value_dummy1, "flush", "data", "different types"); + test_inequality(flush1, value1, "flush", "data", "different types"); + + test_inequality(complete1, error1a, "complete", "error", "different types"); + test_inequality(complete1, error1b, "complete", "error", "different types"); + test_inequality(complete1, value_dummy1, "complete", "data", "different types"); + test_inequality(complete1, value1, "complete", "data", "different types"); + + test_inequality(error1a, error1b, "error", "error", "different error strings"); + test_inequality(error1a, value_dummy1, "error", "data", "different types"); + test_inequality(error1a, value1, "error", "data", "different types"); + + test_inequality(value_dummy1, value1, "data", "data", "different internal data"); + +#undef test_inequality +} diff --git a/tests/sprokit/pipeline/test_edge.cxx b/tests/sprokit/pipeline/test_edge.cxx index a6af9e99..870b5ad1 100644 --- a/tests/sprokit/pipeline/test_edge.cxx +++ b/tests/sprokit/pipeline/test_edge.cxx @@ -63,6 +63,72 @@ main(int argc, char* argv[]) RUN_TEST(testname); } +IMPLEMENT_TEST(edge_datum_equal) +{ + sprokit::edge_datum_t edat1 = sprokit::edge_datum_t(); + sprokit::edge_datum_t edat2 = sprokit::edge_datum_t(); + + if (edat1 != edat2) + { + TEST_ERROR("Empty edge data are not equivalent"); + } + + edat1.stamp = sprokit::stamp::new_stamp(1); + edat2.stamp = sprokit::stamp::new_stamp(1); + + if (edat1 != edat2) + { + TEST_ERROR("Edge data with just a new stamp are not equivalent"); + } + + edat1.stamp = sprokit::stamp_t(); + edat2.stamp = sprokit::stamp_t(); + + sprokit::datum_t const dat = sprokit::datum::complete_datum(); + + edat1.datum = dat; + edat2.datum = dat; + + if (edat1 != edat2) + { + TEST_ERROR("Edge data with just the same datum are not equivalent"); + } + + edat1.stamp = sprokit::stamp_t(); + edat2.stamp = sprokit::stamp_t(); + + if (edat1 != edat2) + { + TEST_ERROR("Edge data with just the same datum and new stamps are not equivalent"); + } + + edat1.stamp = sprokit::stamp::new_stamp(1); + edat1.stamp = sprokit::stamp::incremented_stamp(edat1.stamp); + + if (edat1 == edat2) + { + TEST_ERROR("Edge data with the same datum, but different stamps are equivalent"); + } + + edat1.stamp = sprokit::stamp::new_stamp(1); + + sprokit::datum_t const dat2 = sprokit::datum::complete_datum(); + + edat1.datum = dat2; + + if (edat1 == edat2) + { + TEST_ERROR("Edge data with the same stamp, but different data (of the same type) are equivalent"); + } + + edat1.stamp = sprokit::stamp::incremented_stamp(edat1.stamp); + + if (edat1 == edat2) + { + TEST_ERROR("Edge data with different stamps and data are equivalent"); + } +} + IMPLEMENT_TEST(null_config) { sprokit::config_t const config; @@ -395,11 +461,23 @@ IMPLEMENT_TEST(get_data_from_complete) "popping data from a complete edge"); } +namespace +{ + +// This clock is used because it is both steady (which rules out system_clock) +// and uses the wall time (which rules out thread_clock). +typedef boost::chrono::process_real_cpu_clock time_clock_t; +typedef time_clock_t::time_point time_point_t; +typedef time_clock_t::duration duration_t; + +} + #define SECONDS_TO_WAIT 1 #define WAIT_DURATION boost::chrono::seconds(SECONDS_TO_WAIT) static void push_datum(sprokit::edge_t edge, sprokit::edge_datum_t edat); +TEST_PROPERTY(TIMEOUT, 5) IMPLEMENT_TEST(capacity) { sprokit::config_t const config = sprokit::config::empty_config(); @@ -452,38 +530,110 @@ IMPLEMENT_TEST(capacity) } } -void -push_datum(sprokit::edge_t edge, sprokit::edge_datum_t edat) +static void check_time(duration_t const& actual, duration_t const& expected, char const* const message); + +TEST_PROPERTY(TIMEOUT, 5) +IMPLEMENT_TEST(try_push_datum) { - // This clock is used because it is both steady (which rules out system_clock) - // and uses the wall time (which rules out thread_clock). - typedef boost::chrono::process_real_cpu_clock time_clock_t; - typedef time_clock_t::time_point time_point_t; - typedef time_clock_t::duration duration_t; + sprokit::config_t const config = sprokit::config::empty_config(); + + sprokit::config::value_t const value_capacity = boost::lexical_cast(1); + + config->set_value(sprokit::edge::config_capacity, value_capacity); + + sprokit::edge_t const edge = boost::make_shared(config); + + sprokit::stamp::increment_t const inc = sprokit::stamp::increment_t(1); + + sprokit::datum_t const dat1 = sprokit::datum::empty_datum(); + sprokit::datum_t const dat2 = sprokit::datum::complete_datum(); + sprokit::stamp_t const stamp1 = sprokit::stamp::new_stamp(inc); + sprokit::stamp_t const stamp2 = sprokit::stamp::incremented_stamp(stamp1); + + sprokit::edge_datum_t const edat1 = sprokit::edge_datum_t(dat1, stamp1); + sprokit::edge_datum_t const edat2 = sprokit::edge_datum_t(dat2, stamp2); + + // Fill the edge. + edge->push_datum(edat1); time_point_t const start = time_clock_t::now(); // This should be blocking. - edge->push_datum(edat); + bool const pushed = edge->try_push_datum(edat2, WAIT_DURATION); time_point_t const end = time_clock_t::now(); + if (pushed) + { + TEST_ERROR("Returned true when a push should have timed out"); + } + duration_t const duration = end - start; - static double const tolerance = 0.75; + check_time(duration, WAIT_DURATION, "trying to get a datum from an edge"); - if (duration < (tolerance * WAIT_DURATION)) + // Make sure the edge still is at capacity. + if (edge->datum_count() != 1) { - TEST_ERROR("It seems as though blocking did not " - "occur when pushing into a full edge: " - "expected to wait between " - << tolerance * WAIT_DURATION << " and " - << WAIT_DURATION << ", but waited for " - << duration << " instead"); + TEST_ERROR("A datum was pushed into a full edge"); } +} + +TEST_PROPERTY(TIMEOUT, 5) +IMPLEMENT_TEST(try_get_datum) +{ + sprokit::edge_t const edge = boost::make_shared(); + + time_point_t const start = time_clock_t::now(); + + // This should be blocking. + boost::optional const opt_datum = edge->try_get_datum(WAIT_DURATION); + + time_point_t const end = time_clock_t::now(); + + if (opt_datum) + { + TEST_ERROR("Returned a datum from an empty edge"); + } + + duration_t const duration = end - start; + + check_time(duration, WAIT_DURATION, "trying to get a datum from an edge"); +} + +void +push_datum(sprokit::edge_t edge, sprokit::edge_datum_t edat) +{ + time_point_t const start = time_clock_t::now(); + + // This should be blocking. + edge->push_datum(edat); + + time_point_t const end = time_clock_t::now(); + + duration_t const duration = end - start; + + check_time(duration, WAIT_DURATION, "pushing into a full edge"); if (edge->datum_count() != 1) { TEST_ERROR("A datum was pushed into a full edge"); } } + +void +check_time(duration_t const& actual, duration_t const& expected, char const* const message) +{ + static double const tolerance = 0.75; + boost::chrono::duration const allowed = tolerance * WAIT_DURATION; + + if (actual < allowed) + { + TEST_ERROR("It seems as though blocking did not " + "occur when " << message << ": " + "expected to wait between " + << allowed << " and " + << expected << ", but waited for " + << actual << " instead"); + } +}