Class distributing_kernel

Inheritance Relationships

Base Type

Derived Types

Class Documentation

class ral::cache::distributing_kernel : public ral::cache::kernel

The distributing_kernel interface allows kernels calling distributing primitives.

Subclassed by ral::batch::DistributeAggregateKernel, ral::batch::JoinPartitionKernel, ral::batch::LimitKernel, ral::batch::OverlapAccumulatorKernel, ral::batch::PartitionKernel, ral::batch::SortAndSampleKernel

Public Functions

distributing_kernel(std::size_t kernel_id, std::string expr, std::shared_ptr<Context> context, kernel_type kernel_type_id)

Constructor for the distributing_kernel

Parameters
  • kernel_id: Current kernel identifier.

  • expr: Original logical expression that the kernel will execute.

  • context: Shared context associated to the running query.

  • kernel_type_id: Identifier representing the kernel type.

inline virtual std::string kernel_name()
void set_number_of_message_trackers(std::size_t num_message_trackers)

Resizes the vector of the message trackers. Keeping more than one message tracker is useful for example for joins where we must keep track of separately for left and right partitions.

Parameters
  • num_message_trackers: The new number of the message trackers.

void send_message(std::unique_ptr<ral::frame::BlazingTable> table, bool specific_cache, std::string cache_id, std::vector<std::string> target_ids, std::string message_id_prefix = "", bool always_add = false, bool wait_for = false, std::size_t message_tracker_idx = 0, ral::cache::MetadataDictionary extra_metadata = {})

Sends a table with their corresponding metadata.

Parameters
  • table: The table to be sent. If table is a nullptr, an empty table is sent anyways.

  • specific_cache: Indicates if a message should be routed to a specific cache or to the global input cache.

  • cache_id: Indicates what cache a message should be routed to.

  • target_id: Vector of workers that will be receiving this message.

  • message_id_prefix: The prefix of the identifier of this message.

  • always_add: Forces to always add the table to the output cache.

  • wait_for: Indicates if this message must be registered to wait for back.

  • message_tracker_idx: The message tracker index.

  • extra_metadata: The cache identifier.

void scatter(std::vector<ral::frame::BlazingTableView> partitions, ral::cache::CacheMachine *output, std::string message_id_prefix, std::string cache_id, std::size_t message_tracker_idx = 0)

Sends each partition to its corresponding nodes. It is assumed that the size of the vector is the same as the number of nodes.

Parameters
  • partitions: The table partitions to be sent.

  • output: The output cache.

  • message_id_prefix: The prefix of the identifier of this message.

  • cache_id: Indicates what cache a message should be routed to.

  • message_tracker_idx: The message tracker index.

void scatterParts(std::vector<ral::distribution::NodeColumnView> partitions, std::string message_id_prefix, std::vector<int32_t> part_ids)

Sends each partition to its corresponding nodes and corresponding part_id More than one partition can belong to the same node.

Parameters
  • partitions: The table partitions to be sent represented as node column views.

  • message_id_prefix: The prefix of the identifier of this message.

  • part_ids: A vector the same size as partitions, telling which part_id each partition corresponds to.

void broadcast(std::unique_ptr<ral::frame::BlazingTable> table, ral::cache::CacheMachine *output, std::string message_id_prefix, std::string cache_id, std::size_t message_tracker_idx = 0, bool always_add = false)

Sends same table to all other nodes.

Parameters
  • table: The table to be broadcast

  • output: The output cache.

  • message_id_prefix: The prefix of the identifier of this message.

  • cache_id: Indicates what cache a message should be routed to.

  • message_tracker_idx: The message tracker index.

  • always_add: Forces to always send the message

void send_total_partition_counts(std::string message_id_prefix, std::string cache_id, std::size_t message_tracker_idx = 0)

Sends the partition counter to all other nodes. These sent counters represent the number of messages that each node should wait on their end.

Parameters
  • message_id_prefix: The prefix of the identifier of this message.

  • cache_id: Indicates what cache a message should be routed to.

  • message_tracker_idx: The message tracker index.

int get_total_partition_counts(std::size_t message_tracker_idx = 0)

Get the total partition counters associated to a message tracker. The total counter returned by this function usually is the input for function WaitingQueue::wait_for_count() that allows waiting for the arrival of a certain number of messages.

Parameters
  • message_tracker_idx: The message tracker index.

void increment_node_count(std::string node_id, std::size_t message_tracker_idx = 0)

Increments by one the corresponding node counter associated to the node identifier and the message tracker identifier. Every time data is added to a cache, the node counter must be incremented by one.

Parameters
  • node_id: The node identifier.

  • message_tracker_idx: The message tracker index.

virtual ~distributing_kernel() = default

Destructor