From 51965cc57fab181c0e77c23ab9f66d4fdb9efcd8 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Sat, 7 Jan 2012 23:05:38 -0600 Subject: [PATCH] Fixing to remove once type functions and re-implement them --- include/serial/serial_listener.h | 160 ++++++++++-- serial.cmake | 1 + src/serial_listener.cc | 418 ++++++++++++++++++++----------- tests/serial_listener_tests.cc | 73 ++++-- 4 files changed, 453 insertions(+), 199 deletions(-) diff --git a/include/serial/serial_listener.h b/include/serial/serial_listener.h index 43a3fff..e6ee241 100644 --- a/include/serial/serial_listener.h +++ b/include/serial/serial_listener.h @@ -36,6 +36,9 @@ #ifndef SERIAL_LISTENER_H #define SERIAL_LISTENER_H +// STL +#include + // Serial #include @@ -123,26 +126,6 @@ TokenizerType; /*! This is a convenience alias for boost::uuids::uuid. */ typedef boost::uuids::uuid uuid_type; // uuid_t is already taken! =( -void -_delimeter_tokenizer (std::string &data, std::vector &tokens, - std::string delimeter); - -/*! - * This returns a tokenizer that splits on a given delimeter. - * - * The delimeter is passed into the function and a TokenizerType is returned - * that can be passed to SerialListener::setTokenizer. - * - * Example: - *
- *   my_listener.setTokenizer(delimeter_tokenizer("\r"));
- * <\pre>
- * 
- * \see SerialListener::setTokenizer, serial::TokenizerType
- */
-TokenizerType
-delimeter_tokenizer (std::string delimeter);
-
 /*!
  * This is a general exception generated by the SerialListener class.
  * 
@@ -163,6 +146,87 @@ public:
   }
 };
 
+// Based on: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
+template
+class ConcurrentQueue
+{
+private:
+  std::queue the_queue;
+  mutable boost::mutex the_mutex;
+  boost::condition_variable the_condition_variable;
+public:
+  void push(Data const& data) {
+    boost::mutex::scoped_lock lock(the_mutex);
+    the_queue.push(data);
+    lock.unlock();
+    the_condition_variable.notify_one();
+  }
+
+  bool empty() const {
+    boost::mutex::scoped_lock lock(the_mutex);
+    return the_queue.empty();
+  }
+
+  bool try_pop(Data& popped_value) {
+    boost::mutex::scoped_lock lock(the_mutex);
+    if(the_queue.empty()) {
+      return false;
+    }
+
+    popped_value=the_queue.front();
+    the_queue.pop();
+    return true;
+  }
+
+  bool timed_wait_and_pop(Data& popped_value, size_t timeout) {
+    using namespace boost::posix_time;
+    bool result;
+    boost::mutex::scoped_lock lock(the_mutex);
+    result = !the_queue.empty();
+    if (!result) {
+      result = the_condition_variable.timed_wait(lock, milliseconds(timeout));
+    }
+
+    if (result) {
+      popped_value=the_queue.front();
+      the_queue.pop();
+    }
+    return result;
+  }
+
+  void wait_and_pop(Data& popped_value) {
+    boost::mutex::scoped_lock lock(the_mutex);
+    while(the_queue.empty()) {
+      the_condition_variable.wait(lock);
+    }
+
+    popped_value=the_queue.front();
+    the_queue.pop();
+  }
+  
+  size_t size() const {
+    return the_queue.size();
+  }
+  
+  void cancel() {
+    the_condition_variable.notify_one();
+  }
+
+  void clear() {
+    boost::mutex::scoped_lock lock(the_mutex);
+    while (!the_queue.empty()) {
+      the_queue.pop();
+    }
+  }
+};
+
+
+namespace filter_type {
+typedef enum {
+  nonblocking, blocking
+} FilterType;
+}
+
 /*!
  * Listens to a serial port, facilitates asynchronous reading
  */
@@ -381,15 +445,52 @@ public:
     this->warn = warning_handler;
   }
 
+/***** Static Functions ******/
+
+  /*!
+   * This returns a tokenizer that splits on a given delimeter.
+   * 
+   * The delimeter is passed into the function and a TokenizerType is returned 
+   * that can be passed to SerialListener::setTokenizer.
+   * 
+   * Example:
+   * 
+   *   my_listener.setTokenizer(delimeter_tokenizer("\r"));
+   * <\pre>
+   * 
+   * \see SerialListener::setTokenizer, serial::TokenizerType
+   */
+  static TokenizerType
+  delimeter_tokenizer (std::string delimeter);
+
+  // tokenizer functions
+  static void
+  _delimeter_tokenizer (std::string &data, std::vector &tokens,
+                        std::string delimeter);
+
 private:
+  // Gets some data from the serial port
+  void readSomeData (std::string&, size_t);
+  // Takes newly tokenized data and processes them
+  void addNewTokens(std::vector &new_tokens,
+                    std::vector new_uuids,
+                    std::string &left_overs);
+  // Runs the new tokens through the filters
+  void filter (std::vector new_uuids);
   // Function that loops while listening is true
   void listen ();
-  // Called by listen iteratively
-  std::string listenOnce (std::string data);
+  // Target of callback thread
+  void callback ();
+  // Prune old tokens
+  void pruneTokens ();
+  // Erases one token
+  void eraseToken (uuid_type&);
+  // Erases several tokens
+  void eraseTokens (std::vector&);
   // Determines how much to read on each loop of listen
   size_t determineAmountToRead ();
   // Used in the look for string once function
-  bool listenForOnceComparator(std::string line);
+  bool listenForOnceComparator (std::string line);
 
   // Tokenizer
   TokenizerType tokenize;
@@ -409,10 +510,16 @@ private:
   bool listening;
   serial::Serial * serial_port;
   boost::thread listen_thread;
-  std::string buffer;
-  std::map lines;
+  std::string data_buffer;
+  boost::mutex token_mux;
+  std::map tokens;
   std::map ttls;
 
+  // Callback related variables
+  // uuid and true for default handler, false for normal callback
+  ConcurrentQueue > callback_queue;
+  boost::thread callback_thread;
+
   // For generating random uuids
   boost::uuids::random_generator random_generator;
 
@@ -420,7 +527,7 @@ private:
   boost::posix_time::time_duration ttl;
 
   // map
-  std::map filters;
+  std::map filters;
   // map
   std::map comparators;
   // map
@@ -429,6 +536,7 @@ private:
   std::map condition_vars;
   // Mutex for locking use of filters
   boost::mutex filter_mux;
+  boost::mutex callback_mux;
 
   // Used as temporary storage for listenForStringOnce
   std::string current_listen_for_one_target;
diff --git a/serial.cmake b/serial.cmake
index 6baec63..94a9fce 100644
--- a/serial.cmake
+++ b/serial.cmake
@@ -13,6 +13,7 @@ project(Serial)
 # Use clang if available
 IF(EXISTS /usr/bin/clang)
   set(CMAKE_CXX_COMPILER /usr/bin/clang++)
+  set(CMAKE_CXX_FLAGS -ferror-limit=5)
 ENDIF(EXISTS /usr/bin/clang)
 
 option(SERIAL_BUILD_TESTS "Build all of the Serial tests." OFF)
diff --git a/src/serial_listener.cc b/src/serial_listener.cc
index 095db07..c3becd0 100644
--- a/src/serial_listener.cc
+++ b/src/serial_listener.cc
@@ -22,19 +22,6 @@ inline void defaultExceptionCallback(const std::exception &error) {
 
 using namespace serial;
 
-void
-_delimeter_tokenizer (std::string &data, std::vector &tokens,
-                      std::string delimeter)
-{
-  boost::split(tokens, data, boost::is_any_of(delimeter));
-}
-
-TokenizerType
-delimeter_tokenizer (std::string delimeter) {
-  TokenizerType temp = boost::bind(_delimeter_tokenizer, _1, _2, delimeter);
-  return temp;
-}
-
 /***** Listener Class Functions *****/
 
 SerialListener::SerialListener() : listening(false) {
@@ -53,7 +40,47 @@ SerialListener::SerialListener() : listening(false) {
 }
 
 SerialListener::~SerialListener() {
-  
+  if (this->listening) {
+    this->stopListening();
+  }
+}
+
+void
+SerialListener::callback() {
+  try {
+    std::pair pair;
+    DataCallback _callback;
+    while (this->listening) {
+      if (this->callback_queue.timed_wait_and_pop(pair, 10)) {
+        if (this->listening) {
+          std::cout << "After pop (" << pair.second << "): ";
+          std::cout << this->tokens[pair.first] << std::endl;
+          try {
+            // If default handler
+            if (pair.second) {
+              if (this->default_handler)
+                this->default_handler(this->tokens[pair.first]);
+            // Else use provided callback
+            } else {
+              // Grab the callback as to not hold the mutex while executing
+              {
+                boost::mutex::scoped_lock l(callback_mux);
+                _callback = this->callbacks[pair.first];
+              }
+              // Execute callback
+              _callback(this->tokens[pair.first]);
+            }
+          } catch (std::exception &e) {
+            this->handle_exc(e);
+          }// try callback
+        } // if listening
+        // Erase the used and executed callback
+        this->eraseToken(pair.first);
+      } // if popped
+    } // while (this->listening)
+  } catch (std::exception &e) {
+    this->handle_exc(SerialListenerException(e.what()));
+  }
 }
 
 void
@@ -77,15 +104,23 @@ SerialListener::startListening(Serial * serial_port) {
   }
   
   listen_thread = boost::thread(boost::bind(&SerialListener::listen, this));
+  
+  // Start the callback thread
+  callback_thread =
+   boost::thread(boost::bind(&SerialListener::callback, this));
 }
 
 void
 SerialListener::stopListening() {
   // Stop listening and clear buffers
   listening = false;
+
   listen_thread.join();
-  this->buffer = "";
-  this->lines.clear();
+  callback_thread.join();
+
+  callback_queue.clear();
+  this->data_buffer = "";
+  this->tokens.clear();
   this->ttls.clear();
   this->serial_port = NULL;
 
@@ -96,136 +131,11 @@ SerialListener::stopListening() {
 void
 SerialListener::stopListeningForAll() {
   boost::mutex::scoped_lock l(filter_mux);
+  boost::mutex::scoped_lock l2(callback_mux);
   filters.clear();
   comparators.clear();
-  callbacks.clear();
   condition_vars.clear();
-}
-
-void
-SerialListener::listen() {
-  // Make sure there is a serial port
-  if (this->serial_port == NULL) {
-    this->handle_exc(SerialListenerException("Invalid serial port."));
-  }
-  // Make sure the serial port is open
-  if (!this->serial_port->isOpen()) {
-    this->handle_exc(SerialListenerException("Serial port not open."));
-  }
-  try {
-    while (this->listening) {
-      // Determine how much to read in
-      size_t amount_to_read = determineAmountToRead();
-      // Read some
-      std::string temp = this->serial_port->read(amount_to_read);
-      // If nothing was read and there is nothing in the lines, then we
-      //  don't need to interate through the filters
-      if (temp.length() == 0 && lines.size() == 0) {
-        continue;
-      }
-      // Add the new data to the buffer
-      this->buffer += temp;
-      // If there is no return carrage in the buffer, then a command hasn't 
-      //  been completed and if there is no data in the lines buffer, then 
-      //  continue.
-      if (this->buffer.find("\r") == std::string::npos && lines.size() == 0) {
-        continue;
-      }
-      // Listen once, this parses the buffer and filters the data in lines
-      buffer = this->listenOnce(buffer);
-      // Done parsing lines and buffer should now be set to the left overs
-    } // while (this->listening)
-  } catch (std::exception &e) {
-    this->handle_exc(SerialListenerException(e.what()));
-  }
-}
-
-// TODO: as it is, each line is passed to filters repeatedly until they are 
-//       too old...  Change it to only send each line to each filter once and 
-//       then send to new fitlers up until it is too old.
-std::string
-SerialListener::listenOnce(std::string data) {
-  std::string left_overs;
-  std::vector to_be_erased;
-  // Tokenize the new data
-  std::vector new_lines;
-  tokenize(data, new_lines);
-  // Iterate through new lines and add times to them
-  std::vector::iterator it_new;
-  for(it_new=new_lines.begin(); it_new != new_lines.end(); it_new++) {
-    // The last line needs to be put back in the buffer always:
-    //  In the case that the string ends with \r the last element will be 
-    //  empty ("").  In the case that it does not the last element will be 
-    //  what is left over from the next message that hasn't sent 
-    //  everything.  Ex.: "?$1E\r" -> ["?$1E", ""] and 
-    //  "?$1E\r$1E=Robo" -> ["?$1E","$1E=Robo"]
-    if (it_new == new_lines.end()-1) {
-      left_overs = (*it_new);
-      continue;
-    }
-    uuid_type uuid = random_generator();
-    lines.insert(std::pair(uuid,(*it_new)));
-    using namespace boost::posix_time;
-    ttls.insert(std::pair
-                  (uuid,ptime(microsec_clock::local_time())));
-  }
-  // Iterate through the lines checking for a match
-  std::map::iterator it_lines;
-  for(it_lines=lines.begin(); it_lines!=lines.end(); it_lines++) {
-    std::string line = (*it_lines).second;
-    uuid_type uuid = (*it_lines).first;
-    // If the line is empty, continue
-    if (line.length() == 0) {
-      continue;
-    }
-    bool matched = false;
-    bool erased = false;
-    // Get the filter lock
-    boost::mutex::scoped_lock l(filter_mux);
-    // Iterate through each filter
-    std::map::iterator it;
-    for(it=filters.begin(); it!=filters.end(); it++) {
-      if (comparators[(*it).first](line)) { // If comparator matches line
-        if ((*it).second == "non-blocking") {
-// TODO: Put this callback execution into a queue. And if I do, make sure to 
-//        keep the line instance around until the callback is done...
-          // If non-blocking run the callback
-          callbacks[(*it).first](line);
-          to_be_erased.push_back(uuid);
-          erased = true;
-        } else if ((*it).second == "blocking") {
-          // If blocking then notify the waiting call to continue
-          condition_vars[(*it).first]->notify_all();
-          to_be_erased.push_back(uuid);
-          erased = true;
-        }
-        matched = true;
-        break; // It matched, continue to next line
-      }
-    } // for(it=filters.begin(); it!=filters.end(); it++)
-    // If not already erased check how old it is, remove the too old
-    if (!erased) {
-      using namespace boost::posix_time;
-      if (ptime(microsec_clock::local_time())-ttls[uuid] > ttl) {
-        // If there is a default handler pass it on
-        if (this->default_handler) {
-// TODO: see above about callback execution queue
-          this->default_handler(line);
-        }
-        to_be_erased.push_back(uuid);
-      }
-    }
-  } // for(it_lines=lines.begin(); it_lines!=lines.end(); it_lines++)
-  // Remove any lines that need to be erased
-  //  (this must be done outside the iterator to prevent problems incrementing 
-  //   the iterator)
-  std::vector::iterator it;
-  for (it=to_be_erased.begin(); it != to_be_erased.end(); it++) {
-    lines.erase((*it));
-    ttls.erase((*it));
-  }
-  // Return the left_overs
-  return left_overs;
+  callbacks.clear();
 }
 
 size_t
@@ -236,9 +146,201 @@ SerialListener::determineAmountToRead() {
   return 5;
 }
 
+void
+SerialListener::readSomeData(std::string &temp, size_t this_many) {
+  // Make sure there is a serial port
+  if (this->serial_port == NULL) {
+    this->handle_exc(SerialListenerException("Invalid serial port."));
+  }
+  // Make sure the serial port is open
+  if (!this->serial_port->isOpen()) {
+    this->handle_exc(SerialListenerException("Serial port not open."));
+  }
+  temp = this->serial_port->read(this_many);
+}
+
+void
+SerialListener::addNewTokens(std::vector &new_tokens,
+                             std::vector new_uuids,
+                             std::string &left_overs)
+{
+  std::cout << "Inside SerialListener::addNewTokens:" << std::endl;
+  // Iterate through new tokens and add times to them
+  std::vector::iterator it_new;
+  for (it_new=new_tokens.begin(); it_new != new_tokens.end(); it_new++) {
+    std::cout << "  Token (" << (*it_new).length() << "): " << (*it_new) << std::endl;
+    // The last token needs to be put back in the buffer always:
+    //  (in the case that the delimeter is \r)...
+    //  In the case that the string ends with \r the last element will be 
+    //  empty ("").  In the case that it does not the last element will be 
+    //  what is left over from the next message that hasn't sent 
+    //  everything.  Ex.: "?$1E\r" -> ["?$1E", ""] and 
+    //  "?$1E\r$1E=Robo" -> ["?$1E","$1E=Robo"]
+    if (it_new == new_tokens.end()-1) {
+      left_overs = (*it_new);
+      continue;
+    }
+    // Create a new uuid
+    uuid_type uuid = random_generator();
+    // Put the new uuid in the list of new uuids
+    new_uuids.push_back(uuid);
+    // Create a uuid, token pair
+    std::pair token_pair(uuid,(*it_new));
+    // Create a uuid, ttl pair
+    using namespace boost::posix_time;
+    std::pair 
+     ttl_pair(uuid,ptime(microsec_clock::local_time()));
+    // Insert the new pairs
+    {
+      boost::mutex::scoped_lock l(token_mux);
+      this->tokens.insert(token_pair);
+      ttls.insert(ttl_pair);
+    }
+  } // for (it_new=new_tokens.begin(); it_new != new_tokens.end(); it_new++)
+}
+
+void
+SerialListener::eraseToken(uuid_type &uuid) {
+  boost::mutex::scoped_lock l(token_mux);
+  this->tokens.erase(uuid);
+  this->ttls.erase(uuid);
+}
+
+void
+SerialListener::eraseTokens(std::vector &uuids) {
+  std::vector::iterator it;
+  for (it=uuids.begin(); it != uuids.end(); it++) {
+    this->eraseToken((*it));
+  }
+}
+
+// TODO: Investigate possible race condition where the filter processing takes 
+//  longer than the ttl
+void
+SerialListener::filterNewTokens(std::vector new_uuids) {
+  // Iterate through the filters, checking each against new tokens
+  boost::mutex::scoped_lock l(filter_mux);
+  std::map::iterator it;
+  for (it=filters.begin(); it!=filters.end(); it++) {
+    this->filter((*it).first, new_uuids);
+  } // for (it=filters.begin(); it!=filters.end(); it++)
+  // Get the filter lock
+  boost::mutex::scoped_lock l(filter_mux);
+  std::vector to_be_erased;
+  // Iterate through the tokens checking for a match
+  std::vector::iterator it_uuids;
+  for (it_uuids=new_uuids.begin(); it_uuids!=new_uuids.end(); it_uuids++) {
+    bool matched = false;
+    uuid_type uuid = (*it_uuids);
+    // If the line is empty, continue
+    if (tokens[uuid].length() == 0) {
+      continue;
+    }
+    // Iterate through each filter
+    std::map::iterator it;
+    for (it=filters.begin(); it!=filters.end(); it++) {
+      // If comparator matches line
+      if (comparators[(*it).first](tokens[uuid])) {
+        // If non-blocking run the callback
+        if ((*it).second == filter_type::nonblocking) {
+          callback_queue.push(std::pair(uuid,false));
+        // If blocking then notify the waiting call to continue
+        } else if ((*it).second == filter_type::blocking) {
+          condition_vars[(*it).first]->notify_all();
+          to_be_erased.push_back(uuid);
+        }
+        matched = true;
+        break; // It matched, continue to next line
+      }
+    } // for(it=filters.begin(); it!=filters.end(); it++)
+  } // for(it_lines=lines.begin(); it_lines!=lines.end(); it_lines++)
+  // Remove any lines that need to be erased
+  //  (this must be done outside the iterator to prevent problems incrementing
+  //   the iterator)
+  this->eraseTokens(to_be_erased);
+}
+
+void
+filter(uuid_type filter_uuid, std::vector token_uuids) {
+  std::vector to_be_erased;
+  // Iterate through the token uuids and run each against the filter
+  std::vector::iterator it_uuids;
+  for (it_uuids=new_uuids.begin(); it_uuids!=new_uuids.end(); it_uuids++) {
+    
+  }
+  // Remove any lines that need to be erased
+  //  (this must be done outside the iterator to prevent problems incrementing
+  //   the iterator)
+  this->eraseTokens(to_be_erased);
+}
+
+void
+SerialListener::pruneTokens() {
+  // Iterate through the buffered tokens
+  std::vector to_be_erased;
+  std::map::iterator it;
+
+  {
+    boost::mutex::scoped_lock l(token_mux);
+    for (it = this->tokens.begin(); it != this->tokens.end(); it++) {
+      uuid_type uuid = (*it).first;
+      using namespace boost::posix_time;
+      // If the current time - the creation time is greater than the ttl, 
+      //  then prune it
+      if (ptime(microsec_clock::local_time())-this->ttls[uuid] > this->ttl) {
+        std::cout << "Pruning (" << this->tokens[uuid].length();
+        std::cout << "): " << this->tokens[uuid] << std::endl;
+        // If there is a default handler pass it on
+        if (this->default_handler) {
+          boost::mutex::scoped_lock l(callback_mux);
+          callback_queue.push(std::pair(uuid,true));
+        } else {
+          // Otherwise delete it
+          to_be_erased.push_back(uuid);
+        }
+      }
+    }
+  }
+  // Remove any lines that need to be erased
+  //  (this must be done outside the iterator to prevent problems incrementing
+  //   the iterator)
+  this->eraseTokens(to_be_erased);
+}
+
+void
+SerialListener::listen() {
+  try {
+    while (this->listening) {
+      // Read some data
+      std::string temp;
+      this->readSomeData(temp, determineAmountToRead());
+      // If nothing was read then we
+      //  don't need to iterate through the filters
+      if (temp.length() != 0) {
+        // Add the new data to the buffer
+        this->data_buffer += temp;
+        // Call the tokenizer on the updated buffer
+        std::vector new_tokens;
+        this->tokenize(this->data_buffer, new_tokens);
+        // Add the new tokens to the new token buffer, get a list of new uuids 
+        //  to filter once, and put left_overs in the data buffer.
+        std::vector new_uuids;
+        this->addNewTokens(new_tokens, new_uuids, this->data_buffer);
+        // Run the new tokens through existing filters
+        this->filterNewTokens(new_uuids);
+      }
+      // Look for old data and pass to the default handler or delete
+      this->pruneTokens();
+      // Done parsing lines and buffer should now be set to the left overs
+    } // while (this->listening)
+  } catch (std::exception &e) {
+    this->handle_exc(SerialListenerException(e.what()));
+  }
+}
+
 bool
-SerialListener::listenForOnceComparator(std::string line) {
-  if (line == current_listen_for_one_target)
+SerialListener::listenForOnceComparator(std::string token) {
+  if (token == current_listen_for_one_target)
     return true;
   return false;
 }
@@ -251,8 +353,8 @@ SerialListener::listenForStringOnce(std::string token, size_t milliseconds) {
 
   // Create blocking filter
   uuid_type uuid = random_generator();
-  std::pair
-   filter_pair(uuid, "blocking");
+  std::pair
+   filter_pair(uuid, filter_type::blocking);
   std::pair
    comparator_pair(uuid,
     boost::bind(&SerialListener::listenForOnceComparator, this, _1));
@@ -265,6 +367,8 @@ SerialListener::listenForStringOnce(std::string token, size_t milliseconds) {
     condition_vars.insert(condition_pair);
   }
 
+  this->processNewFilter(uuid);
+
   bool result = false;
 
   // Wait
@@ -288,8 +392,8 @@ SerialListener::listenFor(ComparatorType comparator, DataCallback callback)
 {
   // Create Filter
   uuid_type uuid = random_generator();
-  std::pair
-   filter_pair(uuid, "non-blocking");
+  std::pair
+   filter_pair(uuid, filter_type::nonblocking);
   std::pair
    comparator_pair(uuid, comparator);
   std::pair
@@ -299,6 +403,9 @@ SerialListener::listenFor(ComparatorType comparator, DataCallback callback)
     boost::mutex::scoped_lock l(filter_mux);
     filters.insert(filter_pair);
     comparators.insert(comparator_pair);
+  }
+  {
+    boost::mutex::scoped_lock l(callback_mux);
     callbacks.insert(callback_pair);
   }
 
@@ -314,4 +421,17 @@ SerialListener::stopListeningFor(uuid_type filter_uuid) {
   callbacks.erase(filter_uuid);
 }
 
+TokenizerType
+SerialListener::delimeter_tokenizer (std::string delimeter) {
+  return boost::bind(&SerialListener::_delimeter_tokenizer,
+                     _1, _2, delimeter);
+}
+
+void
+SerialListener::_delimeter_tokenizer (std::string &data,
+                                      std::vector &tokens,
+                                      std::string delimeter)
+{
+  boost::split(tokens, data, boost::is_any_of(delimeter));
+}
 
diff --git a/tests/serial_listener_tests.cc b/tests/serial_listener_tests.cc
index 4299a9b..f589b04 100644
--- a/tests/serial_listener_tests.cc
+++ b/tests/serial_listener_tests.cc
@@ -13,7 +13,7 @@ static size_t global_count, global_listen_count;
 
 void default_handler(std::string line) {
   global_count++;
-  // std::cout << "default_handler got: " << line << std::endl;
+  std::cout << "default_handler got: " << line << std::endl;
 }
 
 namespace {
@@ -21,33 +21,55 @@ namespace {
 class SerialListenerTests : public ::testing::Test {
 protected:
   virtual void SetUp() {
+    listener.listening = true;
+    listener.setTimeToLive(10);
     listener.default_handler = default_handler;
+    listener.callback_thread =
+     boost::thread(boost::bind(&SerialListener::callback, &listener));
+  }
+
+  virtual void TearDown() {
+    listener.listening = false;
+    listener.callback_thread.join();
+  }
+
+  void stopCallbackThread() {
+    while (true) {
+      boost::this_thread::sleep(boost::posix_time::milliseconds(1));
+      boost::mutex::scoped_lock lock(listener.callback_queue.the_mutex);
+      if (listener.callback_queue.the_queue.empty())
+        break;
+    }
+    listener.listening = false;
+    listener.callback_thread.join();
   }
 
   void execute_listenForStringOnce() {
     listener.listenForStringOnce("?$1E", 1000);
   }
 
+  void simulate_loop(std::string input_str) {
+    std::vector new_tokens;
+    listener.tokenize(input_str, new_tokens);
+    std::vector new_uuids;
+    listener.addNewTokens(new_tokens, new_uuids, listener.data_buffer);
+    listener.filterNewTokens(new_uuids);
+    boost::this_thread::sleep(boost::posix_time::milliseconds(11));
+    listener.pruneTokens();
+  }
+
   SerialListener listener;
 
 };
 
-TEST_F(SerialListenerTests, ignoresEmptyString) {
+TEST_F(SerialListenerTests, handlesPartialMessage) {
   global_count = 0;
+  std::string input_str = "?$1E\r$1E=Robo";
 
-  listener.listenOnce("");
-  boost::this_thread::sleep(boost::posix_time::milliseconds(11));
-  listener.listenOnce("");
+  simulate_loop(input_str);
 
-  ASSERT_TRUE(global_count == 0);
-}
-
-TEST_F(SerialListenerTests, ignoresPartialMessage) {
-  global_count = 0;
-
-  listener.listenOnce("?$1E\r$1E=Robo");
-  boost::this_thread::sleep(boost::posix_time::milliseconds(11));
-  listener.listenOnce("");
+  // give some time for the callback thread to finish
+  stopCallbackThread();
 
   ASSERT_EQ(global_count, 1);
 }
@@ -60,16 +82,17 @@ TEST_F(SerialListenerTests, listenForOnceWorks) {
 
   boost::this_thread::sleep(boost::posix_time::milliseconds(100));
 
-  listener.listenOnce("\r+\r?$1E\r$1E=Robo");
-  boost::this_thread::sleep(boost::posix_time::milliseconds(11));
-  listener.listenOnce("");
+  simulate_loop("\r+\r?$1E\r$1E=Robo");
 
   ASSERT_TRUE(t.timed_join(boost::posix_time::milliseconds(1500)));
 
   // Make sure the filters are getting deleted
   ASSERT_EQ(listener.filters.size(), 0);
 
-  ASSERT_EQ(global_count, 1);
+  // give some time for the callback thread to finish
+  stopCallbackThread();
+
+  ASSERT_EQ(global_count, 2);
 }
 
 // lookForOnce should not find it, but timeout after 1000ms, so it should 
@@ -82,12 +105,13 @@ TEST_F(SerialListenerTests, listenForOnceTimesout) {
 
   boost::this_thread::sleep(boost::posix_time::milliseconds(100));
 
-  listener.listenOnce("\r+\r?$1ENOTRIGHT\r$1E=Robo");
-  boost::this_thread::sleep(boost::posix_time::milliseconds(11));
-  listener.listenOnce("");
+  simulate_loop("\r+\r?$1ENOTRIGHT\r$1E=Robo");
 
   ASSERT_TRUE(t.timed_join(boost::posix_time::milliseconds(1500)));
 
+  // give some time for the callback thread to finish
+  stopCallbackThread();
+
   ASSERT_EQ(global_count, 2);
 }
 
@@ -109,9 +133,10 @@ TEST_F(SerialListenerTests, listenForWorks) {
   boost::uuids::uuid filt_uuid = 
     listener.listenFor(listenForComparator, listenForCallback);
 
-  listener.listenOnce("\r+\rV=05:06\r?$1E\rV=06:05\r$1E=Robo");
-  boost::this_thread::sleep(boost::posix_time::milliseconds(11));
-  listener.listenOnce("");
+  simulate_loop("\r+\rV=05:06\r?$1E\rV=06:05\r$1E=Robo");
+
+  // give some time for the callback thread to finish
+  stopCallbackThread();
 
   ASSERT_EQ(global_count, 2);
   ASSERT_EQ(global_listen_count, 2);