Class distributing_kernel¶
Defined in File distributing_kernel.h
Inheritance Relationships¶
Base Type¶
public ral::cache::kernel
(Class kernel)
Derived Types¶
public ral::batch::DistributeAggregateKernel
(Class DistributeAggregateKernel)public ral::batch::JoinPartitionKernel
(Class JoinPartitionKernel)public ral::batch::LimitKernel
(Class LimitKernel)public ral::batch::OverlapAccumulatorKernel
(Class OverlapAccumulatorKernel)public ral::batch::PartitionKernel
(Class PartitionKernel)public ral::batch::SortAndSampleKernel
(Class SortAndSampleKernel)
Class Documentation¶
-
class
ral::cache
::
distributing_kernel
: public ral::cache::kernel¶ The distributing_kernel interface allows kernels calling distributing primitives.
Subclassed by ral::batch::DistributeAggregateKernel, ral::batch::JoinPartitionKernel, ral::batch::LimitKernel, ral::batch::OverlapAccumulatorKernel, ral::batch::PartitionKernel, ral::batch::SortAndSampleKernel
Public Functions
Constructor for the distributing_kernel
- Parameters
kernel_id
: Current kernel identifier.expr
: Original logical expression that the kernel will execute.context
: Shared context associated to the running query.kernel_type_id
: Identifier representing the kernel type.
-
inline virtual std::string
kernel_name
()¶
-
void
set_number_of_message_trackers
(std::size_t num_message_trackers)¶ Resizes the vector of the message trackers. Keeping more than one message tracker is useful for example for joins where we must keep track of separately for left and right partitions.
- Parameters
num_message_trackers
: The new number of the message trackers.
-
void
send_message
(std::unique_ptr<ral::frame::BlazingTable> table, bool specific_cache, std::string cache_id, std::vector<std::string> target_ids, std::string message_id_prefix = "", bool always_add = false, bool wait_for = false, std::size_t message_tracker_idx = 0, ral::cache::MetadataDictionary extra_metadata = {})¶ Sends a table with their corresponding metadata.
- Parameters
table
: The table to be sent. If table is a nullptr, an empty table is sent anyways.specific_cache
: Indicates if a message should be routed to a specific cache or to the global input cache.cache_id
: Indicates what cache a message should be routed to.target_id
: Vector of workers that will be receiving this message.message_id_prefix
: The prefix of the identifier of this message.always_add
: Forces to always add the table to the output cache.wait_for
: Indicates if this message must be registered to wait for back.message_tracker_idx
: The message tracker index.extra_metadata
: The cache identifier.
-
void
scatter
(std::vector<ral::frame::BlazingTableView> partitions, ral::cache::CacheMachine *output, std::string message_id_prefix, std::string cache_id, std::size_t message_tracker_idx = 0)¶ Sends each partition to its corresponding nodes. It is assumed that the size of the vector is the same as the number of nodes.
- Parameters
partitions
: The table partitions to be sent.output
: The output cache.message_id_prefix
: The prefix of the identifier of this message.cache_id
: Indicates what cache a message should be routed to.message_tracker_idx
: The message tracker index.
-
void
scatterParts
(std::vector<ral::distribution::NodeColumnView> partitions, std::string message_id_prefix, std::vector<int32_t> part_ids)¶ Sends each partition to its corresponding nodes and corresponding part_id More than one partition can belong to the same node.
- Parameters
partitions
: The table partitions to be sent represented as node column views.message_id_prefix
: The prefix of the identifier of this message.part_ids
: A vector the same size as partitions, telling which part_id each partition corresponds to.
-
void
broadcast
(std::unique_ptr<ral::frame::BlazingTable> table, ral::cache::CacheMachine *output, std::string message_id_prefix, std::string cache_id, std::size_t message_tracker_idx = 0, bool always_add = false)¶ Sends same table to all other nodes.
- Parameters
table
: The table to be broadcastoutput
: The output cache.message_id_prefix
: The prefix of the identifier of this message.cache_id
: Indicates what cache a message should be routed to.message_tracker_idx
: The message tracker index.always_add
: Forces to always send the message
-
void
send_total_partition_counts
(std::string message_id_prefix, std::string cache_id, std::size_t message_tracker_idx = 0)¶ Sends the partition counter to all other nodes. These sent counters represent the number of messages that each node should wait on their end.
- Parameters
message_id_prefix
: The prefix of the identifier of this message.cache_id
: Indicates what cache a message should be routed to.message_tracker_idx
: The message tracker index.
-
int
get_total_partition_counts
(std::size_t message_tracker_idx = 0)¶ Get the total partition counters associated to a message tracker. The total counter returned by this function usually is the input for function WaitingQueue::wait_for_count() that allows waiting for the arrival of a certain number of messages.
- Parameters
message_tracker_idx
: The message tracker index.
-
void
increment_node_count
(std::string node_id, std::size_t message_tracker_idx = 0)¶ Increments by one the corresponding node counter associated to the node identifier and the message tracker identifier. Every time data is added to a cache, the node counter must be incremented by one.
- Parameters
node_id
: The node identifier.message_tracker_idx
: The message tracker index.
-
virtual
~distributing_kernel
() = default¶ Destructor