Class kernel

Inheritance Relationships

Derived Types

Class Documentation

class ral::cache::kernel

This interface represents a computation unit in the execution graph. Each kernel has basically and input and output ports and the expression asocciated to the computation unit. Each class that implements this interface should define how the computation is executed. See do_process() method.

Subclassed by ral::batch::BindableTableScan, ral::batch::ComputeAggregateKernel, ral::batch::ComputeWindowKernel, ral::batch::Filter, ral::batch::MergeAggregateKernel, ral::batch::MergeStreamKernel, ral::batch::OutputKernel, ral::batch::OverlapGeneratorKernel, ral::batch::PartitionSingleNodeKernel, ral::batch::PartwiseJoin, ral::batch::Print, ral::batch::Projection, ral::batch::TableScan, ral::batch::UnionKernel, ral::cache::distributing_kernel

Public Functions

kernel(std::size_t kernel_id, std::string expr, std::shared_ptr<Context> context, kernel_type kernel_type_id)

Constructor for the kernel

Parameters
  • kernel_id: Current kernel identifier.

  • expr: Original logical expression that the kernel will execute.

  • context: Shared context associated to the running query.

  • kernel_type_id: Identifier representing the kernel type.

inline void set_parent(size_t id)

Sets its parent kernel.

Parameters
  • id: The identifier of its parent.

inline bool has_parent() const

Indicates if the kernel has a parent.

Return

true If the kernel has a parent, false otherwise.

inline virtual ~kernel()

Destructor

virtual kstatus run() = 0

Executes the batch processing. Loads the data from their input port, and after processing it, the results are stored in their output port.

Return

kstatus ‘stop’ to halt processing, or ‘proceed’ to continue processing.

inline kernel_pair operator[](const std::string &portname)
inline std::int32_t get_id() const

Returns the kernel identifier.

Return

int32_t The kernel identifier.

inline kernel_type get_type_id() const

Returns the kernel type identifier.

Return

kernel_type The kernel type identifier.

inline void set_type_id(kernel_type kernel_type_id_)

Sets the kernel type identifier.

Parameters
  • kernel_type: The new kernel type identifier.

std::shared_ptr<ral::cache::CacheMachine> input_cache()

Returns the input cache.

std::shared_ptr<ral::cache::CacheMachine> output_cache(std::string cache_id = "")

Returns the output cache associated to an identifier.

Return

cache_id The identifier of the output cache.

bool add_to_output_cache(std::unique_ptr<ral::frame::BlazingTable> table, std::string cache_id = "", bool always_add = false)

Adds a BlazingTable into the output cache.

Parameters
  • table: The table that will be added to the output cache.

  • cache_id: The cache identifier.

bool add_to_output_cache(std::unique_ptr<ral::cache::CacheData> cache_data, std::string cache_id = "", bool always_add = false)

Adds a CacheData into the output cache.

Parameters
  • cache_data: The cache_data that will be added to the output cache.

  • cache_id: The cache identifier.

bool add_to_output_cache(std::unique_ptr<ral::frame::BlazingHostTable> host_table, std::string cache_id = "")

Adds a BlazingHostTable into the output cache.

Parameters
  • host_table: The host table that will be added to the output cache.

  • cache_id: The cache identifier.

inline Context *get_context() const

Returns the current context.

inline std::string get_message_id()

Returns the id message as a string.

inline bool input_all_finished()

Returns true if all the caches of an input are finished.

inline uint64_t total_input_rows_added()

Returns sum of all the rows added to all caches of the input port.

inline bool input_cache_finished(const std::string &port_name)

Returns true if a specific input cache is finished.

Parameters
  • port_name: Name of the port.

inline uint64_t input_cache_num_rows_added(const std::string &port_name)

Returns the number of rows added to a specific input cache.

Parameters
  • port_name: Name of the port.

virtual std::pair<bool, uint64_t> get_estimated_output_num_rows()

Returns the estimated num_rows for the output, the default is that its the same as the input (i.e. project, sort, …).

ral::execution::task_result process(std::vector<std::unique_ptr<ral::frame::BlazingTable>> inputs, std::shared_ptr<ral::cache::CacheMachine> output, cudaStream_t stream, const std::map<std::string, std::string> &args)

Invokes the do_process function.

inline virtual ral::execution::task_result do_process(std::vector<std::unique_ptr<ral::frame::BlazingTable>>, std::shared_ptr<ral::cache::CacheMachine>, cudaStream_t, const std::map<std::string, std::string>&)

Implemented by all derived classes and is the function which actually performs transformations on dataframes.

Parameters
  • inputs: The data being operated on

  • output: the output cache to write the output to

  • stream: the cudastream to to use

  • args: any additional arguments the kernel may need to perform its execution that may not be available to the kernel at instantiation.

std::size_t estimate_output_bytes(const std::vector<std::unique_ptr<ral::cache::CacheData>> &inputs)

given the inputs, estimates the number of bytes that will be necessary for holding the output after performing a transformation. For many kernels this is not an estimate but rather a certainty. For operations whose outputs are of indeterminate size it provides an estimate.

Return

the number of bytes that we expect to be needed to hold the output after performing this kernels transformations on the given inputs.

Parameters
  • inputs: the data that would be transformed

std::size_t estimate_operating_bytes(const std::vector<std::unique_ptr<ral::cache::CacheData>> &inputs)

given the inputs, estimates the number of bytes that will be necessary for performing the transformation. This can be thought of as the memory overhead of the actual transformations being performed. For many kernels this is not an estimate but rather a certainty. For operations that perform indeterminately sized allocations based on the contents of inputs it provides an estimate.

Return

the number of bytes that we expect to be needed to hold the output after performing this kernels transformations on the given inputs.

Parameters
  • inputs: the data that would be transformed

inline virtual std::string kernel_name()
void notify_complete(size_t task_id)

notify the kernel that a task it dispatched was completed successfully.

void notify_fail(size_t task_id)

notify the kernel that a task it dispatched failed.

void add_task(size_t task_id)

add a task to the list of tasks the kernel is waiting to complete.

inline bool finished_tasks()

check and see if all the tasks were completed.

Public Members

std::string expression

Stores the logical expression being processed.

port input_ = {this}

Represents the input cache machines and their names.

port output_ = {this}

Represents the output cache machine and their name.

const std::size_t kernel_id

Stores the current kernel identifier.

std::int32_t parent_id_

Stores the parent kernel identifier if any.

bool execution_done = false

Indicates whether the execution is complete.

kernel_type kernel_type_id

Stores the id of the kernel type.

std::shared_ptr<graph> query_graph

Stores a pointer to the current execution graph.

std::shared_ptr<Context> context

Shared context of the running query.

bool has_limit_

Indicates if the Logical plan only contains a LogicalTableScan (or BindableTableScan) and LogicalLimit.

int64_t limit_rows_

Specifies the maximum number of rows to return.

std::shared_ptr<spdlog::logger> logger

Protected Attributes

std::set<size_t> tasks
std::mutex kernel_mutex
std::condition_variable kernel_cv
std::atomic<std::size_t> total_input_bytes_processed
std::atomic<std::size_t> total_input_rows_processed