mirror of
https://github.com/wjwwood/serial.git
synced 2026-01-22 03:34:53 +08:00
Fixing to remove once type functions and re-implement them
This commit is contained in:
parent
9824eb1d4c
commit
51965cc57f
@ -36,6 +36,9 @@
|
||||
#ifndef SERIAL_LISTENER_H
|
||||
#define SERIAL_LISTENER_H
|
||||
|
||||
// STL
|
||||
#include <queue>
|
||||
|
||||
// Serial
|
||||
#include <serial/serial.h>
|
||||
|
||||
@ -123,26 +126,6 @@ TokenizerType;
|
||||
/*! This is a convenience alias for boost::uuids::uuid. */
|
||||
typedef boost::uuids::uuid uuid_type; // uuid_t is already taken! =(
|
||||
|
||||
void
|
||||
_delimeter_tokenizer (std::string &data, std::vector<std::string> &tokens,
|
||||
std::string delimeter);
|
||||
|
||||
/*!
|
||||
* This returns a tokenizer that splits on a given delimeter.
|
||||
*
|
||||
* The delimeter is passed into the function and a TokenizerType is returned
|
||||
* that can be passed to SerialListener::setTokenizer.
|
||||
*
|
||||
* Example:
|
||||
* <pre>
|
||||
* my_listener.setTokenizer(delimeter_tokenizer("\r"));
|
||||
* <\pre>
|
||||
*
|
||||
* \see SerialListener::setTokenizer, serial::TokenizerType
|
||||
*/
|
||||
TokenizerType
|
||||
delimeter_tokenizer (std::string delimeter);
|
||||
|
||||
/*!
|
||||
* This is a general exception generated by the SerialListener class.
|
||||
*
|
||||
@ -163,6 +146,87 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// Based on: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
|
||||
template<typename Data>
|
||||
class ConcurrentQueue
|
||||
{
|
||||
private:
|
||||
std::queue<Data> the_queue;
|
||||
mutable boost::mutex the_mutex;
|
||||
boost::condition_variable the_condition_variable;
|
||||
public:
|
||||
void push(Data const& data) {
|
||||
boost::mutex::scoped_lock lock(the_mutex);
|
||||
the_queue.push(data);
|
||||
lock.unlock();
|
||||
the_condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool empty() const {
|
||||
boost::mutex::scoped_lock lock(the_mutex);
|
||||
return the_queue.empty();
|
||||
}
|
||||
|
||||
bool try_pop(Data& popped_value) {
|
||||
boost::mutex::scoped_lock lock(the_mutex);
|
||||
if(the_queue.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
popped_value=the_queue.front();
|
||||
the_queue.pop();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool timed_wait_and_pop(Data& popped_value, size_t timeout) {
|
||||
using namespace boost::posix_time;
|
||||
bool result;
|
||||
boost::mutex::scoped_lock lock(the_mutex);
|
||||
result = !the_queue.empty();
|
||||
if (!result) {
|
||||
result = the_condition_variable.timed_wait(lock, milliseconds(timeout));
|
||||
}
|
||||
|
||||
if (result) {
|
||||
popped_value=the_queue.front();
|
||||
the_queue.pop();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void wait_and_pop(Data& popped_value) {
|
||||
boost::mutex::scoped_lock lock(the_mutex);
|
||||
while(the_queue.empty()) {
|
||||
the_condition_variable.wait(lock);
|
||||
}
|
||||
|
||||
popped_value=the_queue.front();
|
||||
the_queue.pop();
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
return the_queue.size();
|
||||
}
|
||||
|
||||
void cancel() {
|
||||
the_condition_variable.notify_one();
|
||||
}
|
||||
|
||||
void clear() {
|
||||
boost::mutex::scoped_lock lock(the_mutex);
|
||||
while (!the_queue.empty()) {
|
||||
the_queue.pop();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
namespace filter_type {
|
||||
typedef enum {
|
||||
nonblocking, blocking
|
||||
} FilterType;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Listens to a serial port, facilitates asynchronous reading
|
||||
*/
|
||||
@ -381,15 +445,52 @@ public:
|
||||
this->warn = warning_handler;
|
||||
}
|
||||
|
||||
/***** Static Functions ******/
|
||||
|
||||
/*!
|
||||
* This returns a tokenizer that splits on a given delimeter.
|
||||
*
|
||||
* The delimeter is passed into the function and a TokenizerType is returned
|
||||
* that can be passed to SerialListener::setTokenizer.
|
||||
*
|
||||
* Example:
|
||||
* <pre>
|
||||
* my_listener.setTokenizer(delimeter_tokenizer("\r"));
|
||||
* <\pre>
|
||||
*
|
||||
* \see SerialListener::setTokenizer, serial::TokenizerType
|
||||
*/
|
||||
static TokenizerType
|
||||
delimeter_tokenizer (std::string delimeter);
|
||||
|
||||
// tokenizer functions
|
||||
static void
|
||||
_delimeter_tokenizer (std::string &data, std::vector<std::string> &tokens,
|
||||
std::string delimeter);
|
||||
|
||||
private:
|
||||
// Gets some data from the serial port
|
||||
void readSomeData (std::string&, size_t);
|
||||
// Takes newly tokenized data and processes them
|
||||
void addNewTokens(std::vector<std::string> &new_tokens,
|
||||
std::vector<uuid_type> new_uuids,
|
||||
std::string &left_overs);
|
||||
// Runs the new tokens through the filters
|
||||
void filter (std::vector<uuid_type> new_uuids);
|
||||
// Function that loops while listening is true
|
||||
void listen ();
|
||||
// Called by listen iteratively
|
||||
std::string listenOnce (std::string data);
|
||||
// Target of callback thread
|
||||
void callback ();
|
||||
// Prune old tokens
|
||||
void pruneTokens ();
|
||||
// Erases one token
|
||||
void eraseToken (uuid_type&);
|
||||
// Erases several tokens
|
||||
void eraseTokens (std::vector<uuid_type>&);
|
||||
// Determines how much to read on each loop of listen
|
||||
size_t determineAmountToRead ();
|
||||
// Used in the look for string once function
|
||||
bool listenForOnceComparator(std::string line);
|
||||
bool listenForOnceComparator (std::string line);
|
||||
|
||||
// Tokenizer
|
||||
TokenizerType tokenize;
|
||||
@ -409,10 +510,16 @@ private:
|
||||
bool listening;
|
||||
serial::Serial * serial_port;
|
||||
boost::thread listen_thread;
|
||||
std::string buffer;
|
||||
std::map<const uuid_type,std::string> lines;
|
||||
std::string data_buffer;
|
||||
boost::mutex token_mux;
|
||||
std::map<const uuid_type,std::string> tokens;
|
||||
std::map<const uuid_type,boost::posix_time::ptime> ttls;
|
||||
|
||||
// Callback related variables
|
||||
// uuid and true for default handler, false for normal callback
|
||||
ConcurrentQueue<std::pair<uuid_type,bool> > callback_queue;
|
||||
boost::thread callback_thread;
|
||||
|
||||
// For generating random uuids
|
||||
boost::uuids::random_generator random_generator;
|
||||
|
||||
@ -420,7 +527,7 @@ private:
|
||||
boost::posix_time::time_duration ttl;
|
||||
|
||||
// map<uuid, filter type (blocking/non-blocking)>
|
||||
std::map<const uuid_type,std::string> filters;
|
||||
std::map<const uuid_type,filter_type::FilterType> filters;
|
||||
// map<uuid, comparator>
|
||||
std::map<const uuid_type,ComparatorType> comparators;
|
||||
// map<uuid, callback>
|
||||
@ -429,6 +536,7 @@ private:
|
||||
std::map<const uuid_type,boost::condition_variable*> condition_vars;
|
||||
// Mutex for locking use of filters
|
||||
boost::mutex filter_mux;
|
||||
boost::mutex callback_mux;
|
||||
|
||||
// Used as temporary storage for listenForStringOnce
|
||||
std::string current_listen_for_one_target;
|
||||
|
||||
@ -13,6 +13,7 @@ project(Serial)
|
||||
# Use clang if available
|
||||
IF(EXISTS /usr/bin/clang)
|
||||
set(CMAKE_CXX_COMPILER /usr/bin/clang++)
|
||||
set(CMAKE_CXX_FLAGS -ferror-limit=5)
|
||||
ENDIF(EXISTS /usr/bin/clang)
|
||||
|
||||
option(SERIAL_BUILD_TESTS "Build all of the Serial tests." OFF)
|
||||
|
||||
@ -22,19 +22,6 @@ inline void defaultExceptionCallback(const std::exception &error) {
|
||||
|
||||
using namespace serial;
|
||||
|
||||
void
|
||||
_delimeter_tokenizer (std::string &data, std::vector<std::string> &tokens,
|
||||
std::string delimeter)
|
||||
{
|
||||
boost::split(tokens, data, boost::is_any_of(delimeter));
|
||||
}
|
||||
|
||||
TokenizerType
|
||||
delimeter_tokenizer (std::string delimeter) {
|
||||
TokenizerType temp = boost::bind(_delimeter_tokenizer, _1, _2, delimeter);
|
||||
return temp;
|
||||
}
|
||||
|
||||
/***** Listener Class Functions *****/
|
||||
|
||||
SerialListener::SerialListener() : listening(false) {
|
||||
@ -53,7 +40,47 @@ SerialListener::SerialListener() : listening(false) {
|
||||
}
|
||||
|
||||
SerialListener::~SerialListener() {
|
||||
|
||||
if (this->listening) {
|
||||
this->stopListening();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::callback() {
|
||||
try {
|
||||
std::pair<uuid_type,bool> pair;
|
||||
DataCallback _callback;
|
||||
while (this->listening) {
|
||||
if (this->callback_queue.timed_wait_and_pop(pair, 10)) {
|
||||
if (this->listening) {
|
||||
std::cout << "After pop (" << pair.second << "): ";
|
||||
std::cout << this->tokens[pair.first] << std::endl;
|
||||
try {
|
||||
// If default handler
|
||||
if (pair.second) {
|
||||
if (this->default_handler)
|
||||
this->default_handler(this->tokens[pair.first]);
|
||||
// Else use provided callback
|
||||
} else {
|
||||
// Grab the callback as to not hold the mutex while executing
|
||||
{
|
||||
boost::mutex::scoped_lock l(callback_mux);
|
||||
_callback = this->callbacks[pair.first];
|
||||
}
|
||||
// Execute callback
|
||||
_callback(this->tokens[pair.first]);
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
this->handle_exc(e);
|
||||
}// try callback
|
||||
} // if listening
|
||||
// Erase the used and executed callback
|
||||
this->eraseToken(pair.first);
|
||||
} // if popped
|
||||
} // while (this->listening)
|
||||
} catch (std::exception &e) {
|
||||
this->handle_exc(SerialListenerException(e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@ -77,15 +104,23 @@ SerialListener::startListening(Serial * serial_port) {
|
||||
}
|
||||
|
||||
listen_thread = boost::thread(boost::bind(&SerialListener::listen, this));
|
||||
|
||||
// Start the callback thread
|
||||
callback_thread =
|
||||
boost::thread(boost::bind(&SerialListener::callback, this));
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::stopListening() {
|
||||
// Stop listening and clear buffers
|
||||
listening = false;
|
||||
|
||||
listen_thread.join();
|
||||
this->buffer = "";
|
||||
this->lines.clear();
|
||||
callback_thread.join();
|
||||
|
||||
callback_queue.clear();
|
||||
this->data_buffer = "";
|
||||
this->tokens.clear();
|
||||
this->ttls.clear();
|
||||
this->serial_port = NULL;
|
||||
|
||||
@ -96,136 +131,11 @@ SerialListener::stopListening() {
|
||||
void
|
||||
SerialListener::stopListeningForAll() {
|
||||
boost::mutex::scoped_lock l(filter_mux);
|
||||
boost::mutex::scoped_lock l2(callback_mux);
|
||||
filters.clear();
|
||||
comparators.clear();
|
||||
callbacks.clear();
|
||||
condition_vars.clear();
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::listen() {
|
||||
// Make sure there is a serial port
|
||||
if (this->serial_port == NULL) {
|
||||
this->handle_exc(SerialListenerException("Invalid serial port."));
|
||||
}
|
||||
// Make sure the serial port is open
|
||||
if (!this->serial_port->isOpen()) {
|
||||
this->handle_exc(SerialListenerException("Serial port not open."));
|
||||
}
|
||||
try {
|
||||
while (this->listening) {
|
||||
// Determine how much to read in
|
||||
size_t amount_to_read = determineAmountToRead();
|
||||
// Read some
|
||||
std::string temp = this->serial_port->read(amount_to_read);
|
||||
// If nothing was read and there is nothing in the lines, then we
|
||||
// don't need to interate through the filters
|
||||
if (temp.length() == 0 && lines.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
// Add the new data to the buffer
|
||||
this->buffer += temp;
|
||||
// If there is no return carrage in the buffer, then a command hasn't
|
||||
// been completed and if there is no data in the lines buffer, then
|
||||
// continue.
|
||||
if (this->buffer.find("\r") == std::string::npos && lines.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
// Listen once, this parses the buffer and filters the data in lines
|
||||
buffer = this->listenOnce(buffer);
|
||||
// Done parsing lines and buffer should now be set to the left overs
|
||||
} // while (this->listening)
|
||||
} catch (std::exception &e) {
|
||||
this->handle_exc(SerialListenerException(e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: as it is, each line is passed to filters repeatedly until they are
|
||||
// too old... Change it to only send each line to each filter once and
|
||||
// then send to new fitlers up until it is too old.
|
||||
std::string
|
||||
SerialListener::listenOnce(std::string data) {
|
||||
std::string left_overs;
|
||||
std::vector<uuid_type> to_be_erased;
|
||||
// Tokenize the new data
|
||||
std::vector<std::string> new_lines;
|
||||
tokenize(data, new_lines);
|
||||
// Iterate through new lines and add times to them
|
||||
std::vector<std::string>::iterator it_new;
|
||||
for(it_new=new_lines.begin(); it_new != new_lines.end(); it_new++) {
|
||||
// The last line needs to be put back in the buffer always:
|
||||
// In the case that the string ends with \r the last element will be
|
||||
// empty (""). In the case that it does not the last element will be
|
||||
// what is left over from the next message that hasn't sent
|
||||
// everything. Ex.: "?$1E\r" -> ["?$1E", ""] and
|
||||
// "?$1E\r$1E=Robo" -> ["?$1E","$1E=Robo"]
|
||||
if (it_new == new_lines.end()-1) {
|
||||
left_overs = (*it_new);
|
||||
continue;
|
||||
}
|
||||
uuid_type uuid = random_generator();
|
||||
lines.insert(std::pair<const uuid_type,std::string>(uuid,(*it_new)));
|
||||
using namespace boost::posix_time;
|
||||
ttls.insert(std::pair<const uuid_type,ptime>
|
||||
(uuid,ptime(microsec_clock::local_time())));
|
||||
}
|
||||
// Iterate through the lines checking for a match
|
||||
std::map<const uuid_type,std::string>::iterator it_lines;
|
||||
for(it_lines=lines.begin(); it_lines!=lines.end(); it_lines++) {
|
||||
std::string line = (*it_lines).second;
|
||||
uuid_type uuid = (*it_lines).first;
|
||||
// If the line is empty, continue
|
||||
if (line.length() == 0) {
|
||||
continue;
|
||||
}
|
||||
bool matched = false;
|
||||
bool erased = false;
|
||||
// Get the filter lock
|
||||
boost::mutex::scoped_lock l(filter_mux);
|
||||
// Iterate through each filter
|
||||
std::map<const uuid_type,std::string>::iterator it;
|
||||
for(it=filters.begin(); it!=filters.end(); it++) {
|
||||
if (comparators[(*it).first](line)) { // If comparator matches line
|
||||
if ((*it).second == "non-blocking") {
|
||||
// TODO: Put this callback execution into a queue. And if I do, make sure to
|
||||
// keep the line instance around until the callback is done...
|
||||
// If non-blocking run the callback
|
||||
callbacks[(*it).first](line);
|
||||
to_be_erased.push_back(uuid);
|
||||
erased = true;
|
||||
} else if ((*it).second == "blocking") {
|
||||
// If blocking then notify the waiting call to continue
|
||||
condition_vars[(*it).first]->notify_all();
|
||||
to_be_erased.push_back(uuid);
|
||||
erased = true;
|
||||
}
|
||||
matched = true;
|
||||
break; // It matched, continue to next line
|
||||
}
|
||||
} // for(it=filters.begin(); it!=filters.end(); it++)
|
||||
// If not already erased check how old it is, remove the too old
|
||||
if (!erased) {
|
||||
using namespace boost::posix_time;
|
||||
if (ptime(microsec_clock::local_time())-ttls[uuid] > ttl) {
|
||||
// If there is a default handler pass it on
|
||||
if (this->default_handler) {
|
||||
// TODO: see above about callback execution queue
|
||||
this->default_handler(line);
|
||||
}
|
||||
to_be_erased.push_back(uuid);
|
||||
}
|
||||
}
|
||||
} // for(it_lines=lines.begin(); it_lines!=lines.end(); it_lines++)
|
||||
// Remove any lines that need to be erased
|
||||
// (this must be done outside the iterator to prevent problems incrementing
|
||||
// the iterator)
|
||||
std::vector<uuid_type>::iterator it;
|
||||
for (it=to_be_erased.begin(); it != to_be_erased.end(); it++) {
|
||||
lines.erase((*it));
|
||||
ttls.erase((*it));
|
||||
}
|
||||
// Return the left_overs
|
||||
return left_overs;
|
||||
callbacks.clear();
|
||||
}
|
||||
|
||||
size_t
|
||||
@ -236,9 +146,201 @@ SerialListener::determineAmountToRead() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::readSomeData(std::string &temp, size_t this_many) {
|
||||
// Make sure there is a serial port
|
||||
if (this->serial_port == NULL) {
|
||||
this->handle_exc(SerialListenerException("Invalid serial port."));
|
||||
}
|
||||
// Make sure the serial port is open
|
||||
if (!this->serial_port->isOpen()) {
|
||||
this->handle_exc(SerialListenerException("Serial port not open."));
|
||||
}
|
||||
temp = this->serial_port->read(this_many);
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::addNewTokens(std::vector<std::string> &new_tokens,
|
||||
std::vector<uuid_type> new_uuids,
|
||||
std::string &left_overs)
|
||||
{
|
||||
std::cout << "Inside SerialListener::addNewTokens:" << std::endl;
|
||||
// Iterate through new tokens and add times to them
|
||||
std::vector<std::string>::iterator it_new;
|
||||
for (it_new=new_tokens.begin(); it_new != new_tokens.end(); it_new++) {
|
||||
std::cout << " Token (" << (*it_new).length() << "): " << (*it_new) << std::endl;
|
||||
// The last token needs to be put back in the buffer always:
|
||||
// (in the case that the delimeter is \r)...
|
||||
// In the case that the string ends with \r the last element will be
|
||||
// empty (""). In the case that it does not the last element will be
|
||||
// what is left over from the next message that hasn't sent
|
||||
// everything. Ex.: "?$1E\r" -> ["?$1E", ""] and
|
||||
// "?$1E\r$1E=Robo" -> ["?$1E","$1E=Robo"]
|
||||
if (it_new == new_tokens.end()-1) {
|
||||
left_overs = (*it_new);
|
||||
continue;
|
||||
}
|
||||
// Create a new uuid
|
||||
uuid_type uuid = random_generator();
|
||||
// Put the new uuid in the list of new uuids
|
||||
new_uuids.push_back(uuid);
|
||||
// Create a uuid, token pair
|
||||
std::pair<const uuid_type,std::string> token_pair(uuid,(*it_new));
|
||||
// Create a uuid, ttl pair
|
||||
using namespace boost::posix_time;
|
||||
std::pair<const uuid_type,ptime>
|
||||
ttl_pair(uuid,ptime(microsec_clock::local_time()));
|
||||
// Insert the new pairs
|
||||
{
|
||||
boost::mutex::scoped_lock l(token_mux);
|
||||
this->tokens.insert(token_pair);
|
||||
ttls.insert(ttl_pair);
|
||||
}
|
||||
} // for (it_new=new_tokens.begin(); it_new != new_tokens.end(); it_new++)
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::eraseToken(uuid_type &uuid) {
|
||||
boost::mutex::scoped_lock l(token_mux);
|
||||
this->tokens.erase(uuid);
|
||||
this->ttls.erase(uuid);
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::eraseTokens(std::vector<uuid_type> &uuids) {
|
||||
std::vector<uuid_type>::iterator it;
|
||||
for (it=uuids.begin(); it != uuids.end(); it++) {
|
||||
this->eraseToken((*it));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Investigate possible race condition where the filter processing takes
|
||||
// longer than the ttl
|
||||
void
|
||||
SerialListener::filterNewTokens(std::vector<uuid_type> new_uuids) {
|
||||
// Iterate through the filters, checking each against new tokens
|
||||
boost::mutex::scoped_lock l(filter_mux);
|
||||
std::map<const uuid_type,filter_type::FilterType>::iterator it;
|
||||
for (it=filters.begin(); it!=filters.end(); it++) {
|
||||
this->filter((*it).first, new_uuids);
|
||||
} // for (it=filters.begin(); it!=filters.end(); it++)
|
||||
// Get the filter lock
|
||||
boost::mutex::scoped_lock l(filter_mux);
|
||||
std::vector<uuid_type> to_be_erased;
|
||||
// Iterate through the tokens checking for a match
|
||||
std::vector<uuid_type>::iterator it_uuids;
|
||||
for (it_uuids=new_uuids.begin(); it_uuids!=new_uuids.end(); it_uuids++) {
|
||||
bool matched = false;
|
||||
uuid_type uuid = (*it_uuids);
|
||||
// If the line is empty, continue
|
||||
if (tokens[uuid].length() == 0) {
|
||||
continue;
|
||||
}
|
||||
// Iterate through each filter
|
||||
std::map<const uuid_type,filter_type::FilterType>::iterator it;
|
||||
for (it=filters.begin(); it!=filters.end(); it++) {
|
||||
// If comparator matches line
|
||||
if (comparators[(*it).first](tokens[uuid])) {
|
||||
// If non-blocking run the callback
|
||||
if ((*it).second == filter_type::nonblocking) {
|
||||
callback_queue.push(std::pair<uuid_type,bool>(uuid,false));
|
||||
// If blocking then notify the waiting call to continue
|
||||
} else if ((*it).second == filter_type::blocking) {
|
||||
condition_vars[(*it).first]->notify_all();
|
||||
to_be_erased.push_back(uuid);
|
||||
}
|
||||
matched = true;
|
||||
break; // It matched, continue to next line
|
||||
}
|
||||
} // for(it=filters.begin(); it!=filters.end(); it++)
|
||||
} // for(it_lines=lines.begin(); it_lines!=lines.end(); it_lines++)
|
||||
// Remove any lines that need to be erased
|
||||
// (this must be done outside the iterator to prevent problems incrementing
|
||||
// the iterator)
|
||||
this->eraseTokens(to_be_erased);
|
||||
}
|
||||
|
||||
void
|
||||
filter(uuid_type filter_uuid, std::vector<uuid_type> token_uuids) {
|
||||
std::vector<uuid_type> to_be_erased;
|
||||
// Iterate through the token uuids and run each against the filter
|
||||
std::vector<uuid_type>::iterator it_uuids;
|
||||
for (it_uuids=new_uuids.begin(); it_uuids!=new_uuids.end(); it_uuids++) {
|
||||
|
||||
}
|
||||
// Remove any lines that need to be erased
|
||||
// (this must be done outside the iterator to prevent problems incrementing
|
||||
// the iterator)
|
||||
this->eraseTokens(to_be_erased);
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::pruneTokens() {
|
||||
// Iterate through the buffered tokens
|
||||
std::vector<uuid_type> to_be_erased;
|
||||
std::map<const uuid_type,std::string>::iterator it;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock l(token_mux);
|
||||
for (it = this->tokens.begin(); it != this->tokens.end(); it++) {
|
||||
uuid_type uuid = (*it).first;
|
||||
using namespace boost::posix_time;
|
||||
// If the current time - the creation time is greater than the ttl,
|
||||
// then prune it
|
||||
if (ptime(microsec_clock::local_time())-this->ttls[uuid] > this->ttl) {
|
||||
std::cout << "Pruning (" << this->tokens[uuid].length();
|
||||
std::cout << "): " << this->tokens[uuid] << std::endl;
|
||||
// If there is a default handler pass it on
|
||||
if (this->default_handler) {
|
||||
boost::mutex::scoped_lock l(callback_mux);
|
||||
callback_queue.push(std::pair<uuid_type,bool>(uuid,true));
|
||||
} else {
|
||||
// Otherwise delete it
|
||||
to_be_erased.push_back(uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Remove any lines that need to be erased
|
||||
// (this must be done outside the iterator to prevent problems incrementing
|
||||
// the iterator)
|
||||
this->eraseTokens(to_be_erased);
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::listen() {
|
||||
try {
|
||||
while (this->listening) {
|
||||
// Read some data
|
||||
std::string temp;
|
||||
this->readSomeData(temp, determineAmountToRead());
|
||||
// If nothing was read then we
|
||||
// don't need to iterate through the filters
|
||||
if (temp.length() != 0) {
|
||||
// Add the new data to the buffer
|
||||
this->data_buffer += temp;
|
||||
// Call the tokenizer on the updated buffer
|
||||
std::vector<std::string> new_tokens;
|
||||
this->tokenize(this->data_buffer, new_tokens);
|
||||
// Add the new tokens to the new token buffer, get a list of new uuids
|
||||
// to filter once, and put left_overs in the data buffer.
|
||||
std::vector<uuid_type> new_uuids;
|
||||
this->addNewTokens(new_tokens, new_uuids, this->data_buffer);
|
||||
// Run the new tokens through existing filters
|
||||
this->filterNewTokens(new_uuids);
|
||||
}
|
||||
// Look for old data and pass to the default handler or delete
|
||||
this->pruneTokens();
|
||||
// Done parsing lines and buffer should now be set to the left overs
|
||||
} // while (this->listening)
|
||||
} catch (std::exception &e) {
|
||||
this->handle_exc(SerialListenerException(e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
SerialListener::listenForOnceComparator(std::string line) {
|
||||
if (line == current_listen_for_one_target)
|
||||
SerialListener::listenForOnceComparator(std::string token) {
|
||||
if (token == current_listen_for_one_target)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
@ -251,8 +353,8 @@ SerialListener::listenForStringOnce(std::string token, size_t milliseconds) {
|
||||
|
||||
// Create blocking filter
|
||||
uuid_type uuid = random_generator();
|
||||
std::pair<const uuid_type,std::string>
|
||||
filter_pair(uuid, "blocking");
|
||||
std::pair<const uuid_type,filter_type::FilterType>
|
||||
filter_pair(uuid, filter_type::blocking);
|
||||
std::pair<const uuid_type,ComparatorType>
|
||||
comparator_pair(uuid,
|
||||
boost::bind(&SerialListener::listenForOnceComparator, this, _1));
|
||||
@ -265,6 +367,8 @@ SerialListener::listenForStringOnce(std::string token, size_t milliseconds) {
|
||||
condition_vars.insert(condition_pair);
|
||||
}
|
||||
|
||||
this->processNewFilter(uuid);
|
||||
|
||||
bool result = false;
|
||||
|
||||
// Wait
|
||||
@ -288,8 +392,8 @@ SerialListener::listenFor(ComparatorType comparator, DataCallback callback)
|
||||
{
|
||||
// Create Filter
|
||||
uuid_type uuid = random_generator();
|
||||
std::pair<const uuid_type,std::string>
|
||||
filter_pair(uuid, "non-blocking");
|
||||
std::pair<const uuid_type,filter_type::FilterType>
|
||||
filter_pair(uuid, filter_type::nonblocking);
|
||||
std::pair<const uuid_type,ComparatorType>
|
||||
comparator_pair(uuid, comparator);
|
||||
std::pair<const uuid_type,DataCallback>
|
||||
@ -299,6 +403,9 @@ SerialListener::listenFor(ComparatorType comparator, DataCallback callback)
|
||||
boost::mutex::scoped_lock l(filter_mux);
|
||||
filters.insert(filter_pair);
|
||||
comparators.insert(comparator_pair);
|
||||
}
|
||||
{
|
||||
boost::mutex::scoped_lock l(callback_mux);
|
||||
callbacks.insert(callback_pair);
|
||||
}
|
||||
|
||||
@ -314,4 +421,17 @@ SerialListener::stopListeningFor(uuid_type filter_uuid) {
|
||||
callbacks.erase(filter_uuid);
|
||||
}
|
||||
|
||||
TokenizerType
|
||||
SerialListener::delimeter_tokenizer (std::string delimeter) {
|
||||
return boost::bind(&SerialListener::_delimeter_tokenizer,
|
||||
_1, _2, delimeter);
|
||||
}
|
||||
|
||||
void
|
||||
SerialListener::_delimeter_tokenizer (std::string &data,
|
||||
std::vector<std::string> &tokens,
|
||||
std::string delimeter)
|
||||
{
|
||||
boost::split(tokens, data, boost::is_any_of(delimeter));
|
||||
}
|
||||
|
||||
|
||||
@ -13,7 +13,7 @@ static size_t global_count, global_listen_count;
|
||||
|
||||
void default_handler(std::string line) {
|
||||
global_count++;
|
||||
// std::cout << "default_handler got: " << line << std::endl;
|
||||
std::cout << "default_handler got: " << line << std::endl;
|
||||
}
|
||||
|
||||
namespace {
|
||||
@ -21,33 +21,55 @@ namespace {
|
||||
class SerialListenerTests : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
listener.listening = true;
|
||||
listener.setTimeToLive(10);
|
||||
listener.default_handler = default_handler;
|
||||
listener.callback_thread =
|
||||
boost::thread(boost::bind(&SerialListener::callback, &listener));
|
||||
}
|
||||
|
||||
virtual void TearDown() {
|
||||
listener.listening = false;
|
||||
listener.callback_thread.join();
|
||||
}
|
||||
|
||||
void stopCallbackThread() {
|
||||
while (true) {
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
|
||||
boost::mutex::scoped_lock lock(listener.callback_queue.the_mutex);
|
||||
if (listener.callback_queue.the_queue.empty())
|
||||
break;
|
||||
}
|
||||
listener.listening = false;
|
||||
listener.callback_thread.join();
|
||||
}
|
||||
|
||||
void execute_listenForStringOnce() {
|
||||
listener.listenForStringOnce("?$1E", 1000);
|
||||
}
|
||||
|
||||
void simulate_loop(std::string input_str) {
|
||||
std::vector<std::string> new_tokens;
|
||||
listener.tokenize(input_str, new_tokens);
|
||||
std::vector<uuid_type> new_uuids;
|
||||
listener.addNewTokens(new_tokens, new_uuids, listener.data_buffer);
|
||||
listener.filterNewTokens(new_uuids);
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(11));
|
||||
listener.pruneTokens();
|
||||
}
|
||||
|
||||
SerialListener listener;
|
||||
|
||||
};
|
||||
|
||||
TEST_F(SerialListenerTests, ignoresEmptyString) {
|
||||
TEST_F(SerialListenerTests, handlesPartialMessage) {
|
||||
global_count = 0;
|
||||
std::string input_str = "?$1E\r$1E=Robo";
|
||||
|
||||
listener.listenOnce("");
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(11));
|
||||
listener.listenOnce("");
|
||||
simulate_loop(input_str);
|
||||
|
||||
ASSERT_TRUE(global_count == 0);
|
||||
}
|
||||
|
||||
TEST_F(SerialListenerTests, ignoresPartialMessage) {
|
||||
global_count = 0;
|
||||
|
||||
listener.listenOnce("?$1E\r$1E=Robo");
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(11));
|
||||
listener.listenOnce("");
|
||||
// give some time for the callback thread to finish
|
||||
stopCallbackThread();
|
||||
|
||||
ASSERT_EQ(global_count, 1);
|
||||
}
|
||||
@ -60,16 +82,17 @@ TEST_F(SerialListenerTests, listenForOnceWorks) {
|
||||
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
|
||||
|
||||
listener.listenOnce("\r+\r?$1E\r$1E=Robo");
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(11));
|
||||
listener.listenOnce("");
|
||||
simulate_loop("\r+\r?$1E\r$1E=Robo");
|
||||
|
||||
ASSERT_TRUE(t.timed_join(boost::posix_time::milliseconds(1500)));
|
||||
|
||||
// Make sure the filters are getting deleted
|
||||
ASSERT_EQ(listener.filters.size(), 0);
|
||||
|
||||
ASSERT_EQ(global_count, 1);
|
||||
// give some time for the callback thread to finish
|
||||
stopCallbackThread();
|
||||
|
||||
ASSERT_EQ(global_count, 2);
|
||||
}
|
||||
|
||||
// lookForOnce should not find it, but timeout after 1000ms, so it should
|
||||
@ -82,12 +105,13 @@ TEST_F(SerialListenerTests, listenForOnceTimesout) {
|
||||
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
|
||||
|
||||
listener.listenOnce("\r+\r?$1ENOTRIGHT\r$1E=Robo");
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(11));
|
||||
listener.listenOnce("");
|
||||
simulate_loop("\r+\r?$1ENOTRIGHT\r$1E=Robo");
|
||||
|
||||
ASSERT_TRUE(t.timed_join(boost::posix_time::milliseconds(1500)));
|
||||
|
||||
// give some time for the callback thread to finish
|
||||
stopCallbackThread();
|
||||
|
||||
ASSERT_EQ(global_count, 2);
|
||||
}
|
||||
|
||||
@ -109,9 +133,10 @@ TEST_F(SerialListenerTests, listenForWorks) {
|
||||
boost::uuids::uuid filt_uuid =
|
||||
listener.listenFor(listenForComparator, listenForCallback);
|
||||
|
||||
listener.listenOnce("\r+\rV=05:06\r?$1E\rV=06:05\r$1E=Robo");
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(11));
|
||||
listener.listenOnce("");
|
||||
simulate_loop("\r+\rV=05:06\r?$1E\rV=06:05\r$1E=Robo");
|
||||
|
||||
// give some time for the callback thread to finish
|
||||
stopCallbackThread();
|
||||
|
||||
ASSERT_EQ(global_count, 2);
|
||||
ASSERT_EQ(global_listen_count, 2);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user