Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions omniscidb/QueryEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ set(query_engine_source_files
JoinHashTable/HashTable.cpp
JoinHashTable/PerfectJoinHashTable.cpp
JoinHashTable/Runtime/HashJoinRuntime.cpp
JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp
L0Kernel.cpp
LogicalIR.cpp
LLVMFunctionAttributesUtil.cpp
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ JoinColumn ColumnFetcher::makeJoinColumn(
data_provider,
column_cache);
if (col_buff != nullptr) {
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count, num_elems};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we force to initialize all fields with constructor?

num_elems += elem_count;
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
} else {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "QueryEngine/JoinHashTable/PerfectHashTable.h"
#include "QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.h"

#include "Shared/scope.h"

Expand Down Expand Up @@ -166,8 +167,6 @@ class PerfectJoinHashTableBuilder {
0);

auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
const int thread_count = cpu_threads();
std::vector<std::thread> init_cpu_buff_threads;

{
auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
Expand All @@ -176,54 +175,36 @@ class PerfectJoinHashTableBuilder {
hash_join_invalid_val);
}
const bool for_semi_join = for_semi_anti_join(join_type);
std::atomic<int> err{0};
{
auto timer_fill =
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
&join_column,
str_proxy_translation_map,
thread_idx,
thread_count,
type,
&err,
&col_range,
&is_bitwise_eq,
&for_semi_join,
cpu_hash_table_buff,
hash_entry_info] {
int partial_err = fill_hash_join_buff_bucketized(
cpu_hash_table_buff,
hash_join_invalid_val,
for_semi_join,
join_column,
{static_cast<size_t>(type->size()),
col_range.getIntMin(),
col_range.getIntMax(),
inline_fixed_encoding_null_value(type),
is_bitwise_eq,
col_range.getIntMax() + 1,
get_join_column_type_kind(type)},
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
: 0, // 0 is dummy value
thread_idx,
thread_count,
hash_entry_info.bucket_normalization);
int zero{0};
err.compare_exchange_strong(zero, partial_err);
});
}
for (auto& t : init_cpu_buff_threads) {
t.join();
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized_cpu");

{
JoinColumnTypeInfo type_info{static_cast<size_t>(type->size()),
col_range.getIntMin(),
col_range.getIntMax(),
inline_fixed_encoding_null_value(type),
is_bitwise_eq,
col_range.getIntMax() + 1,
get_join_column_type_kind(type)};

int error = fill_hash_join_buff_bucketized_cpu(
cpu_hash_table_buff,
hash_join_invalid_val,
for_semi_join,
join_column,
type_info,
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
: 0, // 0 is dummy value
hash_entry_info.bucket_normalization);
if (error) {
// Too many hash entries, need to retry with a 1:many table
hash_table_ = nullptr; // clear the hash table buffer
throw NeedsOneToManyHash();
}
}
}
if (err) {
// Too many hash entries, need to retry with a 1:many table
hash_table_ = nullptr; // clear the hash table buffer
throw NeedsOneToManyHash();
}
}

void initOneToManyHashTableOnCpu(
Expand Down
51 changes: 1 addition & 50 deletions omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#else
#include "Logger/Logger.h"

#include "HashJoinRuntimeCpu.h"
#include "QueryEngine/RuntimeFunctions.h"
#include "Shared/likely.h"
#include "StringDictionary/StringDictionary.h"
Expand All @@ -53,56 +54,6 @@
#ifndef __CUDACC__
namespace {

/**
* Joins between two dictionary encoded string columns without a shared string dictionary
* are computed by translating the inner dictionary to the outer dictionary while filling
* the hash table. The translation works as follows:
*
* Given two tables t1 and t2, with t1 the outer table and t2 the inner table, and two
* columns t1.x and t2.x, both dictionary encoded strings without a shared dictionary, we
* read each value in t2.x and do a lookup in the dictionary for t1.x. If the lookup
* returns a valid ID, we insert that ID into the hash table. Otherwise, we skip adding an
* entry into the hash table for the inner column. We can also skip adding any entries
* that are outside the range of the outer column.
*
* Consider a join of the form SELECT x, n FROM (SELECT x, COUNT(*) n FROM t1 GROUP BY x
* HAVING n > 10), t2 WHERE t1.x = t2.x; Let the result of the subquery be t1_s.
* Due to the HAVING clause, the range of all IDs in t1_s must be less than or equal to
* the range of all IDs in t1. Suppose we have an element a in t2.x that is also in
* t1_s.x. Then the ID of a must be within the range of t1_s. Therefore it is safe to
* ignore any element ID that is not in the dictionary corresponding to t1_s.x or is
* outside the range of column t1_s.
*/
inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
const int64_t min_elem,
const int64_t max_elem,
const void* sd_inner_proxy,
const void* sd_outer_proxy) {
CHECK(sd_outer_proxy);
const auto sd_inner_dict_proxy =
static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
const auto sd_outer_dict_proxy =
static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
const auto elem_str = sd_inner_dict_proxy->getString(elem);
const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
if (outer_id > max_elem || outer_id < min_elem) {
return StringDictionary::INVALID_STR_ID;
}
return outer_id;
}

inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem,
const int64_t min_inner_elem,
const int64_t min_outer_elem,
const int64_t max_outer_elem,
const int32_t* inner_to_outer_translation_map) {
const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
return StringDictionary::INVALID_STR_ID;
}
return outer_id;
}

#if defined(_MSC_VER)
#define DEFAULT_TARGET_ATTRIBUTE
#else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ struct JoinChunk {
const int8_t*
col_buff; // actually from AbstractBuffer::getMemoryPtr() via Chunk_NS::Chunk
size_t num_elems;
size_t row_id;
};

struct JoinColumn {
Expand Down
Loading