Template Class WaitingQueue

Class Documentation

template<typename message_ptr>
class ral::cache::WaitingQueue

A Queue that has built in methods for waiting operations. The purpose of WaitingQueue is to provide apis that get things out of the queue when they exist and wait for things when they don’t without consuming many compute resources.This is accomplished through the use of a condition_variable and mutex locks.

Public Functions

inline WaitingQueue(std::string queue_name, int timeout = 60000, bool log_timeout = true)

Constructor

~WaitingQueue() = default

Destructor

WaitingQueue(WaitingQueue&&) = delete
WaitingQueue(const WaitingQueue&) = delete
WaitingQueue &operator=(WaitingQueue&&) = delete
WaitingQueue &operator=(const WaitingQueue&) = delete
inline void put(message_ptr item)

Put a message onto the WaitingQueue using unique_lock. This message aquires a unique_lock and then pushes a message onto the WaitingQueue. It then increments the processed count and notifies the WaitingQueue’s condition variable.

Parameters

inline int processed_parts()

Get number of partitions processed.

Return

number of partitions that have been inserted into this WaitingQueue.

inline void finish()

Finish lets us know that know more messages will come in. This exists so that if anyone is trying to pull from a queue that is already completed that operation will return nullptr so it will not block indefinitely.

inline bool is_finished()

Lets us know if a WaitingQueue has finished running messages.

Return

A bool indicating whether or not this WaitingQueue is finished. receiving messages.

inline void wait_for_count(int count)

Blocks executing thread until a certain number messages are reached. We often want to block a thread from proceeding until a certain number ouf messages exist in the WaitingQueue. It also alerts us if we ever receive more messages than we expected.

inline message_ptr pop_or_wait()

Get a message_ptr if it exists in the WaitingQueue else wait. This function allows kernels to pull from the cache before a cache has CacheData in it. If finish is called on the WaitingQueue and no messages are left this returns nullptr.

Return

A message_ptr that was pushed into the WaitingQueue nullptr if the WaitingQueue is empty and finished.

inline message_ptr pop_back()

Get a message_ptr from the back of the queue if it exists in the WaitingQueue else return nullptr.

Return

message_ptr from the back of the queue if it exists in the WaitingQueue else return nullptr.

inline bool wait_for_next()

Wait for the next message to be ready.

Return

Waits for the next CacheData to be available. Returns true when this is the case. Returns false if the WaitingQueue is both finished and empty.

inline bool has_next_now()

Indicates if the WaitingQueue has messages at this point in time.

Return

A bool which is true if the WaitingQueue is not empty.

inline void wait_until_finished()

Pauses a threads execution until this WaitingQueue has finished processing. Sometimes, like in the case of Joins, we might be waiting for a WaitingQueue to have finished before the next kernel can use the data it contains.

inline void wait_until_num_bytes(size_t num_bytes, int num_bytes_timeout = -1)

Waits until a certain number of bytes exist in the WaitingQueue During some execution kernels it is better to wait for a certain amount of the total anticipated data to be available before processing the next batch.

Parameters
  • num_bytes: The number of bytes that we will wait to exist in the WaitingQueue unless the WaitingQueue has already had finished() called.

  • num_bytes_timeout: A timeout in ms where if there is data and the timeout expires, then it will return, even if not the requested num_bytes is available. If its set to -1, then it disables this timeout

inline size_t get_next_size_in_bytes()

Let’s us know the size of the next CacheData to be pulled. Sometimes it is useful to know how much data we will be pulling in each CacheData that we have accumulated for making estimates on how much more is going to be coming or seeing how far along we are.

Return

The number of bytes consumed by the next message.

inline message_ptr get_or_wait(std::string message_id)

Get a specific message from the WaitingQueue. Messages are always accompanied by a message_id though in some cases that id is empty string. This allows us to get a specific message from the WaitingQueue and wait around for it to exist or for this WaitingQueue to be finished. If WaitingQueue is finished and there are no messages we get a nullptr.

Return

The message that has this id or nullptr if that message will never be able to arrive because the WaitingQueue is finished.

Parameters
  • message_id: The id of the message that we want to get or wait for.

inline message_ptr get_or_wait_any(const std::vector<std::string> &messages)

Get one of any of a specific set of messages from the WaitingQueue. Messages are always accompanied by a message_id though in some cases that id is empty string. This allows us to get a specific message from the WaitingQueue and wait around for it to exist or for this WaitingQueue to be finished. If WaitingQueue is finished and there are no messages we get a nullptr.

Return

The message that has this id or nullptr if that message will never be able to arrive because the WaitingQueue is finished.

Parameters
  • messages: A vector of possible messages ids that we want to get or wait for.

inline message_ptr pop_unsafe()

Pop the front element WITHOUT thread safety. Allos us to pop from the front in situations where we have already acquired a unique_lock on this WaitingQueue’s mutex.

Return

The first message in the WaitingQueue.

inline std::vector<message_ptr> get_all()

gets all the messages

inline std::vector<std::string> get_all_message_ids()

gets all the message ids

inline std::vector<message_ptr> get_all_or_wait()

Waits until all messages are ready then returns all of them. You should never call this function more than once on a WaitingQueue else race conditions can occur.

Return

A vector of all the messages that were inserted into the WaitingQueue.

inline std::unique_lock<std::mutex> lock()

A function that returns a unique_lock using the WaitingQueue’s mutex.

Return

A unique_lock

inline std::vector<message_ptr> get_all_unsafe()

Get all messagse in the WaitingQueue without locking.

Return

A vector with all the messages in the WaitingQueue.

inline void put_all_unsafe(std::vector<message_ptr> messages)

Put a vector of messages onto the WaitingQueue without locking.

Parameters
  • messages: A vector of messages that will be pushed into the WaitingQueue.

inline void put_all(std::vector<message_ptr> messages)