From dfd1837cfc7075c83403205264d004fd3f215d6d Mon Sep 17 00:00:00 2001 From: William Woodall Date: Wed, 11 Jan 2012 23:42:42 -0600 Subject: [PATCH] Serial Listener changes compile against the example reference, time to merge with John. --- examples/serial_listener_example.cc | 14 +- include/serial/serial_listener.h | 446 +++++++++++++++------------- src/serial_listener.cc | 63 ++-- 3 files changed, 276 insertions(+), 247 deletions(-) diff --git a/examples/serial_listener_example.cc b/examples/serial_listener_example.cc index 2cd5b20..5fd23e4 100644 --- a/examples/serial_listener_example.cc +++ b/examples/serial_listener_example.cc @@ -36,10 +36,10 @@ int main(void) { // Method #2: // comparator - blocking { - BlockingFilter f2 = + BlockingFilterPtr f2 = listener.createBlockingFilter(SerialListener::endsWith("post")); for (size_t i = 0; i < 3; i++) { - std::string token = f2.wait(100); // Wait for 100 ms or a matched token + std::string token = f2->wait(100); // Wait for 100 ms or a matched token if (token == "") std::cout << "Found something ending with 'post'" << std::endl; else @@ -54,18 +54,18 @@ int main(void) { // comparator, token buffer size - blocking { // Give it a comparator, then a buffer size of 10 - BufferedFilter f3 = + BufferedFilterPtr f3 = listener.createBufferedFilter(SerialListener::contains("substr"), 10); SerialListener::sleep(75); // Sleep 75ms, should have about 7 - std::cout << "Caught " << f3.count(); + std::cout << "Caught " << f3->count(); std::cout << " tokens containing 'substr'" << std::endl; for(size_t i = 0; i < 20; ++i) { - std::string token = f3.wait(5); // Pull message from the buffer + std::string token = f3->wait(5); // Pull message from the buffer if (token == "") // If an empty string is returned, a timeout occured break; } - f3.clear(); // Empties the buffer - if (f3.wait(0) == "") // Non-blocking wait + f3->clear(); // Empties the buffer + if (f3->wait(0) == "") // Non-blocking wait std::cout << "We won the race condition!" << std::endl; else std::cout << "We lost the race condition..." << std::endl; diff --git a/include/serial/serial_listener.h b/include/serial/serial_listener.h index 1c8e5de..737e146 100644 --- a/include/serial/serial_listener.h +++ b/include/serial/serial_listener.h @@ -178,160 +178,31 @@ public: */ typedef boost::shared_ptr FilterPtr; -/*! - * This is the a filter that provides a wait function for blocking until a - * match is found. - * - * This should probably not be created manually, but instead should be - * constructed using SerialListener::createBlockingFilter(ComparatorType) - * function which returns a BlockingFilter instance. - * - * \see serial::SerialListener::ComparatorType, - * serial::SerialListener::createBlockingFilter - */ -class BlockingFilter -{ -public: - BlockingFilter (ComparatorType comparator, - boost::shared_ptr listener) - : listener(listener) - { - DataCallback cb = boost::bind(&BlockingFilter::callback, this, _1); - this->filter_ptr = listener.createFilter(comparator, cb); - } - - virtual ~BlockingFilter () { - this->listener.removeFilter(filter_ptr); - this->result = ""; - this->cond.notify_all(); - } - - /*! - * Waits a given number of milliseconds or until a token is matched. If a - * token is matched it is returned, otherwise an empty string is returned. - * - * \param ms Time in milliseconds to wait on a new token. - * - * \return std::string token that was matched or "" if none were matched. - */ - std::string wait(size_t ms) { - this->result = ""; - boost::unique_lock lock(this->mutex); - this->cond.timed_wait(lock, boost::posix_time::milliseconds(ms)); - return this->result; - } - -private: - void callback(const std::string& token) { - this->cond.notify_all(); - this->result = token; - } - - FilterPtr filter_ptr; - boost::shared_ptr listener; - boost::condition_variable cond; - boost::mutex mutex; - std::string result; - -}; +class BlockingFilter; /*! - * This is the a filter that provides a wait function for blocking until a - * match is found. It will also buffer up to a given buffer size of tokens so - * that they can be counted or accessed after they are matched by the filter. + * Shared Pointer of BlockingFilter, returned by + * SerialListener::createBlockingFilter. * - * This should probably not be created manually, but instead should be - * constructed using SerialListener::createBufferedFilter(ComparatorType) - * function which returns a BufferedFilter instance. - * - * The internal buffer is a circular queue buffer, so when the buffer is full, - * the oldest token is dropped and the new one is added. Additionally, when - * wait is a called the oldest available token is returned. - * - * \see serial::SerialListener::ComparatorType, - * serial::SerialListener::createBufferedFilter + * \see serial::BlockingFilter, SerialListener::createBlockingFilter */ -class BufferedFilter -{ -public: - BufferedFilter (ComparatorType comparator, size_t buffer_size, - boost::shared_ptr listener) - : listener(listener), buffer_size(buffer_size) - { - DataCallback cb = boost::bind(&BlockingFilter::callback, this, _1); - this->filter_ptr = listener.createFilter(comparator, cb); - } +typedef boost::shared_ptr BlockingFilterPtr; - virtual ~BufferedFilter () { - this->listener.removeFilter(filter_ptr); - this->queue.clear(); - this->result = ""; - this->cond.notify_all(); - } +class BufferedFilter; - /*! - * Waits a given number of milliseconds or until a matched token is - * available in the buffer. If a token is matched it is returned, otherwise - * an empty string is returned. - * - * \param ms Time in milliseconds to wait on a new token. If ms is set to 0 - * then it will try to get a new token if one is available but will not - * block. - * - * \return std::string token that was matched or "" if none were matched. - */ - std::string wait(size_t ms) { - if (ms == 0) - if (!this->queue.try_pop(this->result)) - this->result = ""; - else - if (!this->queue.timed_wait_and_pop(this->result, ms)) - this->result = ""; - return result; - } - - /*! - * Clears the buffer of any tokens. - */ - void clear() { - queue.clear(); - } - - /*! - * Returns the number of tokens waiting in the buffer. - */ - size_t count() { - return queue.size(); - } - - /*! - * Returns the capacity of the buffer. - */ - size_t capacity() { - return buffer_size; - } - -private: - void callback(const std::string &token) { - std::string throw_away; - if (this->queue.size() == buffer_size) - this->queue.wait_and_pop(throw_away); - this->queue.push(token); - } - - FilterPtr filter_ptr; - size_t buffer_size; - boost::shared_ptr listener; - ConcurrentQueue queue; - std::string result; - -}; +/*! + * Shared Pointer of BufferedFilter, returned by + * SerialListener::createBufferedFilter. + * + * \see serial::BufferedFilter, SerialListener::createBufferedFilter + */ +typedef boost::shared_ptr BufferedFilterPtr; /*! * This is a general exception generated by the SerialListener class. * * Check the SerialListenerException::what function for the cause. - + * * \param e_what is a std::string that describes the cause of the error. */ class SerialListenerException : public std::exception { @@ -489,98 +360,115 @@ public: /***** Filter Functions ******/ /*! - * Setups up a filter that calls a callback when a comparator returns true. + * Creates a filter that calls a callback when the comparator returns true. * - * The user provides a comparator and a callback, and every time a line is - * received the comparator is called and the comparator has to evaluate the - * line and return true if it matches and false if it doesn't. If it does + * The user provides a comparator and a callback, and every time a line is + * received the comparator is called and the comparator has to evaluate the + * line and return true if it matches and false if it doesn't. If it does * match, the callback is called with the resulting line. * * \param comparator This is a comparator for detecting if a line matches. - * The comparartor receives a std::string reference and must return a true + * The comparartor receives a std::string reference and must return a true * if it matches and false if it doesn't. * - * \param callback This is the handler for when a match occurs. It is given + * \param callback This is the handler for when a match occurs. It is given * a std::string reference of the line that matched your comparator. * * \return boost::shared_ptr so you can remove it later. * - * \see SerialListener::stopListeningFor + * \see SerialListener::removeFilter */ FilterPtr - listenFor (ComparatorType comparator, DataCallback callback); + createFilter (ComparatorType comparator, DataCallback callback); /*! - * Blocks until the comparator returns true or until the timeout occurs. + * Creates a BlockingFilter which blocks until the comparator returns true. * - * \param comparator ComparatorType function that should return true if the - * given std::string matches otherwise false. + * The user provides a comparator, and every time a line is + * received the comparator is called and the comparator has to evaluate the + * line and return true if it matches and false if it doesn't. If it does + * match, any threads that have called BlockingFilter::wait will be + * notified. The BlockingFilter will remove itself when its destructor is + * called, i.e. when it leaves the scope, so in those cases an explicit call + * to SerialListener::removeFilter is not needed. * - * \param timeout in milliseconds before timing out and returning false. - * Defaults to 1000 milliseconds or 1 second. + * \param comparator This is a comparator for detecting if a line matches. + * The comparartor receives a std::string reference and must return a true + * if it matches and false if it doesn't. * - * \return std::string the token that was matched, returns an empty string - * if the timeout occurs first. - * i.e. if (listenForOnce(...) != "") // Got match + * \return BlockingFilterPtr So you can call BlockingFilter::wait on it. + * + * \see SerialListener::removeFilter, serial::BlockingFilter, + * serial::BlockingFilterPtr */ - std::string - listenForOnce (ComparatorType comparator, size_t timeout = 1000); + BlockingFilterPtr + createBlockingFilter (ComparatorType comparator); /*! - * Writes to the seiral port then blocks until the comparator returns true - * or until the timeout occurs. + * Creates a BlockingFilter blocks until the comparator returns true. * - * This function creates a filter, writes the data, then waits for it to - * match atleast once. + * The user provides a comparator, and every time a line is + * received the comparator is called and the comparator has to evaluate the + * line and return true if it matches and false if it doesn't. If it does + * match, any threads that have called BlockingFilter::wait will be + * notified. The BlockingFilter will remove itself when its destructor is + * called, i.e. when it leaves the scope, so in those cases an explicit call + * to SerialListener::removeFilter is not needed. * - * \param to_be_written const std::string reference of data to be written to - * the serial port. + * \param comparator This is a comparator for detecting if a line matches. + * The comparartor receives a std::string reference and must return a true + * if it matches and false if it doesn't. * - * \param comparator ComparatorType function that should return true if the - * given std::string matches otherwise false. + * \param buffer_size This is the number of tokens to be buffered by the + * BufferedFilter, defaults to 1024. * - * \param timeout in milliseconds before timing out and returning false. - * Defaults to 1000 milliseconds or 1 second. + * \return BlockingFilter So you can call BlockingFilter::wait on it. * - * \return std::string the token that was matched, returns an empty string - * if the timeout occurs first. - * i.e. if (listenForOnce(...) != "") // Got match + * \see SerialListener::removeFilter, serial::BufferedFilter, + * serial::BufferedFilterPtr */ - std::string - listenForOnce (ComparatorType comparator, size_t timeout = 1000); + BufferedFilterPtr + createBufferedFilter (ComparatorType comparator, size_t buffer_size = 1024); /*! - * Blocks until the given string is detected or until the timeout occurs. + * Removes a filter by a given FilterPtr. * - * \param token std::string that should be watched for, this string must - * match the message exactly. + * \param filter_ptr A shared pointer to the filter to be removed. * - * \param timeout in milliseconds before timing out and returning false. - * Defaults to 1000 milliseconds or 1 second. - * - * \return bool If true then the token was detected before the token, false - * if the token was not heard and the timeout occured. - */ - bool - listenForStringOnce (std::string token, size_t timeout = 1000); - - /*! - * Removes a filter by a given uuid. - * - * The uuid for a filter is returned by the listenFor function. - * - * \param filter_ptr A shared pointer to the filter. - * - * \see SerialListener::listenFor - */ - void - stopListeningFor (FilterPtr filter_ptr); - - /*! - * Stops listening for anything, but doesn't stop reading the serial port. + * \see SerialListener::createFilter */ void - stopListeningForAll (); + removeFilter (FilterPtr filter_ptr); + + /*! + * Removes a BlockingFilter. + * + * The BlockingFilter will remove itself if the destructor is called. + * + * \param blocking_filter A BlockingFilter to be removed. + * + * \see SerialListener::createBlockingFilter + */ + void + removeFilter (BlockingFilterPtr blocking_filter); + + /*! + * Removes a BufferedFilter. + * + * The BufferedFilter will remove itself if the destructor is called. + * + * \param buffered_filter A BufferedFilter to be removed. + * + * \see SerialListener::createBufferedFilter + */ + void + removeFilter (BufferedFilterPtr buffered_filter); + + /*! + * Removes all filters. + */ + void + removeAllFilters (); /***** Hooks and Handlers ******/ @@ -912,6 +800,156 @@ private: }; -} +/*! + * This is the a filter that provides a wait function for blocking until a + * match is found. + * + * This should probably not be created manually, but instead should be + * constructed using SerialListener::createBlockingFilter(ComparatorType) + * function which returns a BlockingFilter instance. + * + * \see serial::SerialListener::ComparatorType, + * serial::SerialListener::createBlockingFilter + */ +class BlockingFilter +{ +public: + BlockingFilter (ComparatorType comparator, + boost::shared_ptr listener) + : listener(listener) + { + DataCallback cb = boost::bind(&BlockingFilter::callback, this, _1); + this->filter_ptr = listener->createFilter(comparator, cb); + } + + virtual ~BlockingFilter () { + this->listener->removeFilter(filter_ptr); + this->result = ""; + this->cond.notify_all(); + } + + /*! + * Waits a given number of milliseconds or until a token is matched. If a + * token is matched it is returned, otherwise an empty string is returned. + * + * \param ms Time in milliseconds to wait on a new token. + * + * \return std::string token that was matched or "" if none were matched. + */ + std::string wait(size_t ms) { + this->result = ""; + boost::unique_lock lock(this->mutex); + this->cond.timed_wait(lock, boost::posix_time::milliseconds(ms)); + return this->result; + } + + FilterPtr filter_ptr; + + void callback(const std::string& token) { + this->cond.notify_all(); + this->result = token; + } + +private: + boost::shared_ptr listener; + boost::condition_variable cond; + boost::mutex mutex; + std::string result; + +}; + +/*! + * This is the a filter that provides a wait function for blocking until a + * match is found. It will also buffer up to a given buffer size of tokens so + * that they can be counted or accessed after they are matched by the filter. + * + * This should probably not be created manually, but instead should be + * constructed using SerialListener::createBufferedFilter(ComparatorType) + * function which returns a BufferedFilter instance. + * + * The internal buffer is a circular queue buffer, so when the buffer is full, + * the oldest token is dropped and the new one is added. Additionally, when + * wait is a called the oldest available token is returned. + * + * \see serial::SerialListener::ComparatorType, + * serial::SerialListener::createBufferedFilter + */ +class BufferedFilter +{ +public: + BufferedFilter (ComparatorType comparator, size_t buffer_size, + boost::shared_ptr listener) + : listener(listener), buffer_size(buffer_size) + { + DataCallback cb = boost::bind(&BufferedFilter::callback, this, _1); + this->filter_ptr = listener->createFilter(comparator, cb); + } + + virtual ~BufferedFilter () { + this->listener->removeFilter(filter_ptr); + this->queue.clear(); + this->result = ""; + } + + /*! + * Waits a given number of milliseconds or until a matched token is + * available in the buffer. If a token is matched it is returned, otherwise + * an empty string is returned. + * + * \param ms Time in milliseconds to wait on a new token. If ms is set to 0 + * then it will try to get a new token if one is available but will not + * block. + * + * \return std::string token that was matched or "" if none were matched. + */ + std::string wait(size_t ms) { + if (ms == 0) + if (!this->queue.try_pop(this->result)) + this->result = ""; + else + if (!this->queue.timed_wait_and_pop(this->result, ms)) + this->result = ""; + return result; + } + + /*! + * Clears the buffer of any tokens. + */ + void clear() { + queue.clear(); + } + + /*! + * Returns the number of tokens waiting in the buffer. + */ + size_t count() { + return queue.size(); + } + + /*! + * Returns the capacity of the buffer. + */ + size_t capacity() { + return buffer_size; + } + + FilterPtr filter_ptr; + + void callback(const std::string &token) { + std::string throw_away; + if (this->queue.size() == buffer_size) + this->queue.wait_and_pop(throw_away); + this->queue.push(token); + } + +private: + size_t buffer_size; + boost::shared_ptr listener; + ConcurrentQueue queue; + std::string result; + +}; + +} // namespace serial #endif // SERIAL_LISTENER_H \ No newline at end of file diff --git a/src/serial_listener.cc b/src/serial_listener.cc index 8306f0c..d3ed2a3 100644 --- a/src/serial_listener.cc +++ b/src/serial_listener.cc @@ -111,7 +111,7 @@ SerialListener::stopListening() { this->serial_port = NULL; // Delete all the filters - this->stopListeningForAll(); + this->removeAllFilters(); } size_t @@ -186,7 +186,8 @@ SerialListener::listen() { /***** Filter Functions *****/ FilterPtr -SerialListener::listenFor(ComparatorType comparator, DataCallback callback) { +SerialListener::createFilter(ComparatorType comparator, DataCallback callback) +{ FilterPtr filter_ptr(new Filter(comparator, callback)); boost::mutex::scoped_lock l(filter_mux); @@ -195,50 +196,40 @@ SerialListener::listenFor(ComparatorType comparator, DataCallback callback) { return filter_ptr; } -typedef boost::shared_ptr shared_cond_var_ptr_t; +BlockingFilterPtr +SerialListener::createBlockingFilter(ComparatorType comparator) { + return BlockingFilterPtr( + new BlockingFilter(comparator, boost::shared_ptr(this))); +} -inline void -listenForOnceCallback(const std::string &token, - shared_cond_var_ptr_t cond, - boost::shared_ptr result) +BufferedFilterPtr +SerialListener::createBufferedFilter(ComparatorType comparator, + size_t buffer_size) { - (*result) = token; - cond->notify_all(); -} - -std::string -SerialListener::listenForOnce(ComparatorType comparator, size_t ms) { - boost::shared_ptr result(new std::string("")); - - shared_cond_var_ptr_t cond(new boost::condition_variable()); - boost::mutex mutex; - - DataCallback callback = boost::bind(listenForOnceCallback,_1,cond,result); - FilterPtr filter_id = this->listenFor(comparator, callback); - - boost::unique_lock lock(mutex); - cond->timed_wait(lock, boost::posix_time::milliseconds(ms))); - - this->stopListeningFor(filter_id); - - // If the callback never got called then result will be "" because tokens - // can never be "" - return (*result); -} - -bool -SerialListener::listenForStringOnce(std::string token, size_t milliseconds) { - return this->listenForOnce(exactly(token), milliseconds) == token; + return BufferedFilterPtr( + new BufferedFilter(comparator, + buffer_size, + boost::shared_ptr(this))); } void -SerialListener::stopListeningFor(FilterPtr filter_ptr) { +SerialListener::removeFilter(FilterPtr filter_ptr) { boost::mutex::scoped_lock l(filter_mux); filters.erase(std::find(filters.begin(),filters.end(),filter_ptr)); } void -SerialListener::stopListeningForAll() { +SerialListener::removeFilter(BlockingFilterPtr blocking_filter) { + this->removeFilter(blocking_filter->filter_ptr); +} + +void +SerialListener::removeFilter(BufferedFilterPtr buffered_filter) { + this->removeFilter(buffered_filter->filter_ptr); +} + +void +SerialListener::removeAllFilters() { boost::mutex::scoped_lock l(filter_mux); filters.clear(); callback_queue.clear();