Program Listing for File subscriber-list.h

Return to documentation for file (common/message-flow/include/message-flow/subscriber-list.h)

#ifndef MESSAGE_FLOW_SUBSCRIBER_LIST_H_
#define MESSAGE_FLOW_SUBSCRIBER_LIST_H_

#include <mutex>
#include <thread>
#include <vector>

#include <glog/logging.h>

namespace message_flow {
// An abstract interface to an ordered subscriber list.
class SubscriberListBase {
 public:
  SubscriberListBase() {}
  virtual ~SubscriberListBase() {}
  virtual void clear() = 0;
};
typedef std::shared_ptr<SubscriberListBase> SubscriberListBasePtr;

// Implementation of a subscriber list with an ordering equal to the
// registration order.
template <typename MessageType>
class SubscriberList : public SubscriberListBase {
 public:
  typedef std::function<void(const MessageType&)> SubscriberCallback;

  SubscriberList() {}

  virtual ~SubscriberList() {
    // De-register all subscribers on shutdown. All publish requests are thus
    // rejected and the remaining messages are processed.
    clear();
  }

  virtual void clear() {
    std::lock_guard<std::mutex> lock(m_subscriber_list_);
    subscriber_list_.clear();
  }

  void addSubscriber(const SubscriberCallback& subscriber) {
    CHECK(subscriber);
    std::lock_guard<std::mutex> lock(m_subscriber_list_);
    subscriber_list_.emplace_back(subscriber);
  }

  void publishToAllSubscribersBlocking(const MessageType& message) const {
    std::lock_guard<std::mutex> lock(m_subscriber_list_);
    for (const SubscriberCallback& subscriber_callback : subscriber_list_) {
      CHECK(subscriber_callback);
      subscriber_callback(message);
    }
  }

 private:
  mutable std::mutex m_subscriber_list_;
  std::vector<SubscriberCallback> subscriber_list_;
};

template <typename MessageType>
using SubscriberListPtr = std::shared_ptr<SubscriberList<MessageType>>;
}  // namespace message_flow
#endif  // MESSAGE_FLOW_SUBSCRIBER_LIST_H_