diff --git a/src/catalog/catalog.cpp b/src/catalog/catalog.cpp index 90b9d13a62a..ca9398574e3 100644 --- a/src/catalog/catalog.cpp +++ b/src/catalog/catalog.cpp @@ -30,6 +30,8 @@ #include "function/date_functions.h" #include "function/decimal_functions.h" #include "function/old_engine_string_functions.h" +#include "function/string_functions.h" +#include "function/sequence_functions.h" #include "function/timestamp_functions.h" #include "index/index_factory.h" #include "settings/settings_manager.h" @@ -1204,6 +1206,20 @@ void Catalog::InitializeFunctions() { function::BuiltInFuncType{OperatorId::Like, function::OldEngineStringFunctions::Like}, txn); + // Sequence + AddBuiltinFunction( + "nextval", {type::TypeId::VARCHAR}, type::TypeId::INTEGER, + internal_lang, "Nextval", + function::BuiltInFuncType{OperatorId::Nextval, + function::SequenceFunctions::_Nextval}, + txn); + AddBuiltinFunction( + "currval", {type::TypeId::VARCHAR}, type::TypeId::INTEGER, + internal_lang, "Currval", + function::BuiltInFuncType{OperatorId::Currval, + function::SequenceFunctions::_Currval}, + txn); + /** * decimal functions diff --git a/src/catalog/catalog_cache.cpp b/src/catalog/catalog_cache.cpp index 54f0e1a3d13..411cb5706b3 100644 --- a/src/catalog/catalog_cache.cpp +++ b/src/catalog/catalog_cache.cpp @@ -13,8 +13,10 @@ #include #include "catalog/catalog_cache.h" +#include #include "catalog/database_catalog.h" +#include "catalog/sequence_catalog.h" #include "common/logger.h" namespace peloton { @@ -157,5 +159,76 @@ std::shared_ptr CatalogCache::GetCachedIndexObject( return nullptr; } +/*@brief insert sequence catalog object into cache + * @param sequence_object + * @return false only if sequence already exists in cache + */ +bool CatalogCache::InsertSequenceObject( + std::shared_ptr sequence_object) { + if (!sequence_object || sequence_object->seq_oid == INVALID_OID) { + return false; // invalid object + } + + std::size_t hash_key = GetHashKey(sequence_object->seq_name, + sequence_object->db_oid); + + // check if already in cache + if (sequence_objects_cache.find(hash_key) != + sequence_objects_cache.end()) { + LOG_DEBUG("Sequence %s already exists in cache!", + sequence_object->seq_name.c_str()); + return false; + } + + sequence_objects_cache.insert( + std::make_pair(hash_key, sequence_object)); + return true; +} + +/*@brief evict sequence catalog object from cache + * @param sequence_name, database_oid + * @return true if specified sequence is found and evicted; + * false if not found + */ +bool CatalogCache::EvictSequenceObject(const std::string & sequence_name, + oid_t database_oid) { + std::size_t hash_key = GetHashKey(sequence_name, database_oid); + + auto it = sequence_objects_cache.find(hash_key); + if (it == sequence_objects_cache.end()) { + return false; // sequence not found in cache + } + + auto sequence_object = it->second; + PELOTON_ASSERT(sequence_object); + sequence_objects_cache.erase(it); + return true; +} + +/*@brief get sequence catalog object from cache + * @param sequence_name, database_oid + * @return sequence catalog object; if not found return object with invalid oid + */ +std::shared_ptr CatalogCache::GetSequenceObject( + const std::string & sequence_name, oid_t database_oid) { + std::size_t hash_key = GetHashKey(sequence_name, database_oid); + auto it = sequence_objects_cache.find(hash_key); + if (it == sequence_objects_cache.end()) { + return nullptr; + } + return it->second; +} + +/*@brief get the hash key given the sequence information + * @param sequence_name, database_oid + * @return hash key + */ +std::size_t CatalogCache::GetHashKey(const std::string sequence_name, + oid_t database_oid) { + std::tuple key(sequence_name, database_oid); + boost::hash> key_hash; + return key_hash(key); +} + } // namespace catalog } // namespace peloton diff --git a/src/catalog/sequence_catalog.cpp b/src/catalog/sequence_catalog.cpp new file mode 100644 index 00000000000..19f7a409ae3 --- /dev/null +++ b/src/catalog/sequence_catalog.cpp @@ -0,0 +1,299 @@ +//===----------------------------------------------------------------------===// +// +// Peloton +// +// sequence_catalog.h +// +// Identification: src/catalog/sequence_catalog.cpp +// +// Copyright (c) 2015-17, Carnegie Mellon University Database Group +// +//===----------------------------------------------------------------------===// + +#include + +#include "catalog/sequence_catalog.h" + +#include "catalog/catalog.h" +#include "catalog/database_catalog.h" +#include "catalog/table_catalog.h" +#include "common/internal_types.h" +#include "storage/data_table.h" +#include "type/value_factory.h" +#include "function/functions.h" +#include "planner/update_plan.h" +#include "executor/update_executor.h" +#include "executor/executor_context.h" +#include "optimizer/optimizer.h" +#include "parser/postgresparser.h" + +namespace peloton { +namespace catalog { + +/* @brief Get the nextval of the sequence + * @return the next value of the sequence. + * @exception throws SequenceException if the sequence exceeds the upper/lower + * limit. + */ +int64_t SequenceCatalogObject::GetNextVal() { + int64_t result = seq_curr_val; + seq_prev_val = result; + if (seq_increment > 0) { + if ((seq_max >= 0 && seq_curr_val > seq_max - seq_increment) || + (seq_max < 0 && seq_curr_val + seq_increment > seq_max)) { + if (!seq_cycle) { + throw SequenceException( + StringUtil::Format( + "nextval: reached maximum value of sequence %s (%ld)", seq_name.c_str(), seq_max)); + } + seq_curr_val = seq_min; + } else + seq_curr_val += seq_increment; + } else { + if ((seq_min < 0 && seq_curr_val < seq_min - seq_increment) || + (seq_min >= 0 && seq_curr_val + seq_increment < seq_min)) { + if (!seq_cycle) { + throw SequenceException( + StringUtil::Format( + "nextval: reached minimum value of sequence %s (%ld)", seq_name.c_str(), seq_min)); + } + seq_curr_val = seq_max; + } else + seq_curr_val += seq_increment; + } + + bool status = Catalog::GetInstance() + ->GetSystemCatalogs(db_oid) + ->GetSequenceCatalog() + ->UpdateNextVal(seq_oid, seq_curr_val, txn_); + LOG_DEBUG("status of update pg_sequence: %d", status); + return result; +} + +SequenceCatalog::SequenceCatalog(const std::string &database_name, + concurrency::TransactionContext *txn) + : AbstractCatalog("CREATE TABLE " + database_name + + "." CATALOG_SCHEMA_NAME "." SEQUENCE_CATALOG_NAME + " (" + "oid INT NOT NULL PRIMARY KEY, " + "sqdboid INT NOT NULL, " + "sqname VARCHAR NOT NULL, " + "sqinc BIGINT NOT NULL, " + "sqmax BIGINT NOT NULL, " + "sqmin BIGINT NOT NULL, " + "sqstart BIGINT NOT NULL, " + "sqcycle BOOLEAN NOT NULL, " + "sqval BIGINT NOT NULL);", + txn) { + Catalog::GetInstance()->CreateIndex( + database_name, CATALOG_SCHEMA_NAME, SEQUENCE_CATALOG_NAME, + {ColumnId::DATABSE_OID, ColumnId::SEQUENCE_NAME}, + SEQUENCE_CATALOG_NAME "_skey0", false, IndexType::BWTREE, txn); +} + +SequenceCatalog::~SequenceCatalog() {} + +/* @brief Insert the sequence by name. + * @param database_oid the databse_oid associated with the sequence + * @param sequence_name the name of the sequence + * @param seq_increment the increment per step of the sequence + * @param seq_max the max value of the sequence + * @param seq_min the min value of the sequence + * @param seq_start the start of the sequence + * @param seq_cycle whether the sequence cycles + * @param pool an instance of abstract pool + * @param txn current transaction + * @return ResultType::SUCCESS if the sequence exists, ResultType::FAILURE + * otherwise. + * @exception throws SequenceException if the sequence already exists. + */ +bool SequenceCatalog::InsertSequence(oid_t database_oid, + std::string sequence_name, + int64_t seq_increment, int64_t seq_max, + int64_t seq_min, int64_t seq_start, + bool seq_cycle, type::AbstractPool *pool, + concurrency::TransactionContext *txn) { + LOG_DEBUG("Insert Sequence Database Oid: %u", database_oid); + LOG_DEBUG("Insert Sequence Sequence Name: %s", sequence_name.c_str()); + + ValidateSequenceArguments(seq_increment, seq_max, seq_min, seq_start); + if (GetSequence(database_oid, sequence_name, txn) != nullptr) { + throw SequenceException( + StringUtil::Format("Sequence %s already exists!", + sequence_name.c_str())); + } + + std::unique_ptr tuple( + new storage::Tuple(catalog_table_->GetSchema(), true)); + + auto val0 = type::ValueFactory::GetIntegerValue(GetNextOid()); + auto val1 = type::ValueFactory::GetIntegerValue(database_oid); + auto val2 = type::ValueFactory::GetVarcharValue(sequence_name); + auto val3 = type::ValueFactory::GetBigIntValue(seq_increment); + auto val4 = type::ValueFactory::GetBigIntValue(seq_max); + auto val5 = type::ValueFactory::GetBigIntValue(seq_min); + auto val6 = type::ValueFactory::GetBigIntValue(seq_start); + auto val7 = type::ValueFactory::GetBooleanValue(seq_cycle); + // When insert value, seqval = seq_start + auto val8 = type::ValueFactory::GetBigIntValue(seq_start); + + tuple->SetValue(ColumnId::SEQUENCE_OID, val0, pool); + tuple->SetValue(ColumnId::DATABSE_OID, val1, pool); + tuple->SetValue(ColumnId::SEQUENCE_NAME, val2, pool); + tuple->SetValue(ColumnId::SEQUENCE_INC, val3, pool); + tuple->SetValue(ColumnId::SEQUENCE_MAX, val4, pool); + tuple->SetValue(ColumnId::SEQUENCE_MIN, val5, pool); + tuple->SetValue(ColumnId::SEQUENCE_START, val6, pool); + tuple->SetValue(ColumnId::SEQUENCE_CYCLE, val7, pool); + tuple->SetValue(ColumnId::SEQUENCE_VALUE, val8, pool); + + // Insert the tuple + return InsertTuple(std::move(tuple), txn); +} + +/* @brief Delete the sequence by name. + * @param database_oid the databse_oid associated with the sequence + * @param sequence_name the name of the sequence + * @param txn current transaction + * @return ResultType::SUCCESS if the sequence exists, throw exception + * otherwise. + */ +ResultType SequenceCatalog::DropSequence(const std::string &database_name, + const std::string &sequence_name, + concurrency::TransactionContext *txn) { + if (txn == nullptr) { + throw CatalogException("Transaction is invalid!"); + } + + auto database_object = + Catalog::GetInstance()->GetDatabaseObject(database_name, txn); + + oid_t sequence_oid = Catalog::GetInstance() + ->GetSystemCatalogs(database_object->GetDatabaseOid()) + ->GetSequenceCatalog() + ->GetSequenceOid(sequence_name, database_object->GetDatabaseOid(), txn); + if (sequence_oid == INVALID_OID) { + throw SequenceException( + StringUtil::Format("Sequence %s does not exist!", + sequence_name.c_str())); + } + + LOG_INFO("sequence %d will be deleted!", sequence_oid); + + oid_t database_oid = database_object->GetDatabaseOid(); + DeleteSequenceByName(sequence_name, database_oid, txn); + + return ResultType::SUCCESS; +} + +/* @brief Delete the sequence by name. The sequence is guaranteed to exist. + * @param database_oid the databse_oid associated with the sequence + * @param sequence_name the name of the sequence + * @param txn current transaction + * @return The result of DeleteWithIndexScan. + */ +bool SequenceCatalog::DeleteSequenceByName( + const std::string &sequence_name, oid_t database_oid, + concurrency::TransactionContext *txn) { + oid_t index_offset = IndexId::DBOID_SEQNAME_KEY; + std::vector values; + values.push_back(type::ValueFactory::GetIntegerValue(database_oid).Copy()); + values.push_back(type::ValueFactory::GetVarcharValue(sequence_name).Copy()); + + return DeleteWithIndexScan(index_offset, values, txn); +} + +/* @brief get sequence from pg_sequence table + * @param database_oid the databse_oid associated with the sequence + * @param sequence_name the name of the sequence + * @param txn current transaction + * @return a SequenceCatalogObject if the sequence is found, nullptr otherwise + */ +std::shared_ptr SequenceCatalog::GetSequence( + oid_t database_oid, const std::string &sequence_name, + concurrency::TransactionContext *txn) { + std::vector column_ids( + {ColumnId::SEQUENCE_OID, ColumnId::SEQUENCE_START, + ColumnId::SEQUENCE_INC, ColumnId::SEQUENCE_MAX, + ColumnId::SEQUENCE_MIN, ColumnId::SEQUENCE_CYCLE, + ColumnId::SEQUENCE_VALUE}); + oid_t index_offset = IndexId::DBOID_SEQNAME_KEY; + std::vector values; + values.push_back(type::ValueFactory::GetIntegerValue(database_oid).Copy()); + values.push_back(type::ValueFactory::GetVarcharValue(sequence_name).Copy()); + + // the result is a vector of executor::LogicalTile + auto result_tiles = + GetResultWithIndexScan(column_ids, index_offset, values, txn); + // careful! the result tile could be null! + if (result_tiles == nullptr || result_tiles->size() == 0) { + LOG_INFO("no sequence on database %d and %s", database_oid, + sequence_name.c_str()); + return std::shared_ptr(nullptr); + } else { + LOG_INFO("size of the result tiles = %lu", result_tiles->size()); + } + + PELOTON_ASSERT(result_tiles->size() == 1); + size_t tuple_count = (*result_tiles)[0]->GetTupleCount(); + PELOTON_ASSERT(tuple_count == 1); + (void) tuple_count; + auto new_sequence = std::make_shared( + (*result_tiles)[0]->GetValue(0, 0).GetAs(), + database_oid, + sequence_name, + (*result_tiles)[0]->GetValue(0, 1).GetAs(), + (*result_tiles)[0]->GetValue(0, 2).GetAs(), + (*result_tiles)[0]->GetValue(0, 3).GetAs(), + (*result_tiles)[0]->GetValue(0, 4).GetAs(), + (*result_tiles)[0]->GetValue(0, 5).GetAs(), + (*result_tiles)[0]->GetValue(0, 6).GetAs(), txn); + + return new_sequence; +} + +bool SequenceCatalog::UpdateNextVal(oid_t sequence_oid, int64_t nextval, + concurrency::TransactionContext *txn){ + std::vector update_columns({SequenceCatalog::ColumnId::SEQUENCE_VALUE}); + std::vector update_values; + update_values.push_back(type::ValueFactory::GetBigIntValue(nextval).Copy()); + std::vector scan_values; + scan_values.push_back(type::ValueFactory::GetIntegerValue(sequence_oid).Copy()); + oid_t index_offset = SequenceCatalog::IndexId::PRIMARY_KEY; + + return UpdateWithIndexScan(update_columns, update_values, scan_values, index_offset, txn); +} + +/* @brief get sequence oid from pg_sequence table given sequence_name and + * database_oid + * @param database_oid the databse_oid associated with the sequence + * @param sequence_name the name of the sequence + * @param txn current transaction + * @return the oid_t of the sequence if the sequence is found, INVALID_OID + * otherwise + */ +oid_t SequenceCatalog::GetSequenceOid(std::string sequence_name, + oid_t database_oid, + concurrency::TransactionContext *txn) { + std::vector column_ids({ColumnId::SEQUENCE_OID}); + oid_t index_offset = IndexId::DBOID_SEQNAME_KEY; + std::vector values; + values.push_back(type::ValueFactory::GetIntegerValue(database_oid).Copy()); + values.push_back(type::ValueFactory::GetVarcharValue(sequence_name).Copy()); + + // the result is a vector of executor::LogicalTile + auto result_tiles = + GetResultWithIndexScan(column_ids, index_offset, values, txn); + // carefull! the result tile could be null! + if (result_tiles == nullptr || result_tiles->size() == 0) { + LOG_INFO("no sequence on database %d and %s", database_oid, + sequence_name.c_str()); + return INVALID_OID; + } + + PELOTON_ASSERT(result_tiles->size() == 1); + return (*result_tiles)[0]->GetValue(0, 0).GetAs(); +} + +} // namespace catalog +} // namespace peloton diff --git a/src/catalog/system_catalogs.cpp b/src/catalog/system_catalogs.cpp index b1371ddd379..dedea74271b 100644 --- a/src/catalog/system_catalogs.cpp +++ b/src/catalog/system_catalogs.cpp @@ -30,6 +30,7 @@ SystemCatalogs::SystemCatalogs(storage::Database *database, type::AbstractPool *pool, concurrency::TransactionContext *txn) : pg_trigger_(nullptr), + pg_sequence_(nullptr), pg_table_metrics_(nullptr), pg_index_metrics_(nullptr), pg_query_metrics_(nullptr) { @@ -69,6 +70,7 @@ SystemCatalogs::~SystemCatalogs() { delete pg_attribute_; delete pg_namespace_; if (pg_trigger_) delete pg_trigger_; + if (pg_sequence_) delete pg_sequence_; // if (pg_proc) delete pg_proc; if (pg_table_metrics_) delete pg_table_metrics_; if (pg_index_metrics_) delete pg_index_metrics_; @@ -87,6 +89,10 @@ void SystemCatalogs::Bootstrap(const std::string &database_name, pg_trigger_ = new TriggerCatalog(database_name, txn); } + if (!pg_sequence_) { + pg_sequence_ = new SequenceCatalog(database_name, txn); + } + // if (!pg_proc) { // pg_proc = new ProcCatalog(database_name, txn); // } diff --git a/src/codegen/proxy/sequence_functions_proxy.cpp b/src/codegen/proxy/sequence_functions_proxy.cpp new file mode 100644 index 00000000000..79694a33773 --- /dev/null +++ b/src/codegen/proxy/sequence_functions_proxy.cpp @@ -0,0 +1,24 @@ +//===----------------------------------------------------------------------===// +// +// Peloton +// +// sequence_functions_proxy.cpp +// +// Identification: src/codegen/proxy/sequence_functions_proxy.cpp +// +// Copyright (c) 2015-2017, Carnegie Mellon University Database Group +// +//===----------------------------------------------------------------------===// + +#include "codegen/proxy/sequence_functions_proxy.h" + +#include "codegen/proxy/executor_context_proxy.h" + +namespace peloton { +namespace codegen { + +DEFINE_METHOD(peloton::function, SequenceFunctions, Nextval); +DEFINE_METHOD(peloton::function, SequenceFunctions, Currval); + +} // namespace codegen +} // namespace peloton diff --git a/src/codegen/type/bigint_type.cpp b/src/codegen/type/bigint_type.cpp index e20e3e0396f..8c2613d08ba 100644 --- a/src/codegen/type/bigint_type.cpp +++ b/src/codegen/type/bigint_type.cpp @@ -14,10 +14,12 @@ #include "codegen/lang/if.h" #include "codegen/value.h" +#include "codegen/proxy/sequence_functions_proxy.h" #include "codegen/proxy/values_runtime_proxy.h" #include "codegen/type/boolean_type.h" #include "codegen/type/decimal_type.h" #include "codegen/type/integer_type.h" +#include "codegen/type/varchar_type.h" #include "common/exception.h" #include "type/limits.h" #include "util/string_util.h" @@ -503,6 +505,47 @@ struct Modulo : public TypeSystem::BinaryOperatorHandleNull { } }; +// Nextval +struct Nextval : public TypeSystem::UnaryOperatorHandleNull { + bool SupportsType(const Type &type) const override { + return type.GetSqlType() == Varchar::Instance(); + } + + Type ResultType(UNUSED_ATTRIBUTE const Type &val_type) const override { + return BigInt::Instance(); + } + + Value Impl(CodeGen &codegen, const Value &val, + const TypeSystem::InvocationContext &ctx) const override { + llvm::Value *executor_ctx = ctx.executor_context; + llvm::Value *raw_ret = + codegen.Call(SequenceFunctionsProxy::Nextval, + {executor_ctx, val.GetValue()}); + return Value{BigInt::Instance(), raw_ret}; + } +}; + +// Currval +struct Currval : public TypeSystem::UnaryOperatorHandleNull { + bool SupportsType(const Type &type) const override { + return type.GetSqlType() == Varchar::Instance(); + } + + Type ResultType(UNUSED_ATTRIBUTE const Type &val_type) const override { + return BigInt::Instance(); + } + + Value Impl(CodeGen &codegen, const Value &val, + const TypeSystem::InvocationContext &ctx) const override { + llvm::Value *executor_ctx = ctx.executor_context; + llvm::Value *raw_ret = + codegen.Call(SequenceFunctionsProxy::Currval, + {executor_ctx, val.GetValue()}); + return Value{BigInt::Instance(), raw_ret}; + } +}; + + //////////////////////////////////////////////////////////////////////////////// /// /// Function tables @@ -538,12 +581,16 @@ Abs kAbsOp; Ceil kCeilOp; Floor kFloorOp; Sqrt kSqrt; +Nextval kNextval; +Currval kCurrval; std::vector kUnaryOperatorTable = { {OperatorId::Negation, kNegOp}, {OperatorId::Abs, kAbsOp}, {OperatorId::Ceil, kCeilOp}, {OperatorId::Floor, kFloorOp}, - {OperatorId::Sqrt, kSqrt}}; + {OperatorId::Sqrt, kSqrt}, + {OperatorId::Nextval, kNextval}, + {OperatorId::Currval, kCurrval}}; // Binary operations Add kAddOp; diff --git a/src/common/internal_types.cpp b/src/common/internal_types.cpp index 6d95bf1a35c..1fb299783e8 100644 --- a/src/common/internal_types.cpp +++ b/src/common/internal_types.cpp @@ -329,6 +329,9 @@ std::string CreateTypeToString(CreateType type) { case CreateType::TRIGGER: { return "TRIGGER"; } + case CreateType::SEQUENCE: { + return "SEQUENCE"; + } case CreateType::SCHEMA: { return "SCHEMA"; } @@ -557,6 +560,8 @@ std::string QueryTypeToString(QueryType query_type) { return "CREATE TRIGGER"; case QueryType::QUERY_CREATE_SCHEMA: return "CREATE SCHEMA"; + case QueryType::QUERY_CREATE_SEQUENCE: + return "CREATE SEQUENCE"; case QueryType::QUERY_CREATE_VIEW: return "CREATE VIEW"; case QueryType::QUERY_DROP: @@ -618,6 +623,7 @@ QueryType StringToQueryType(const std::string &str) { {"CREATE TRIGGER", QueryType::QUERY_CREATE_TRIGGER}, {"CREATE SCHEMA", QueryType::QUERY_CREATE_SCHEMA}, {"CREATE VIEW", QueryType::QUERY_CREATE_VIEW}, + {"CREATE SEQUENCE", QueryType::QUERY_CREATE_SEQUENCE}, {"OTHER", QueryType::QUERY_OTHER}, }; std::unordered_map::iterator it = @@ -688,6 +694,9 @@ QueryType StatementTypeToQueryType(StatementType stmt_type, case parser::CreateStatement::CreateType::kView: query_type = QueryType::QUERY_CREATE_VIEW; break; + case parser::CreateStatement::CreateType::kSequence: + query_type = QueryType::QUERY_CREATE_SEQUENCE; + break; } break; } diff --git a/src/executor/create_executor.cpp b/src/executor/create_executor.cpp index 83e85c92c48..5ea732fc587 100644 --- a/src/executor/create_executor.cpp +++ b/src/executor/create_executor.cpp @@ -15,6 +15,7 @@ #include "catalog/catalog.h" #include "catalog/foreign_key.h" #include "catalog/system_catalogs.h" +#include "catalog/sequence_catalog.h" #include "concurrency/transaction_context.h" #include "executor/executor_context.h" #include "planner/create_plan.h" @@ -74,6 +75,12 @@ bool CreateExecutor::DExecute() { break; } + // if query was for creating sequence + case CreateType::SEQUENCE: { + result = CreateSequence(node); + break; + } + default: { std::string create_type = CreateTypeToString(node.GetCreateType()); LOG_ERROR("Not supported create type %s", create_type.c_str()); @@ -281,5 +288,37 @@ bool CreateExecutor::CreateTrigger(const planner::CreatePlan &node) { return (true); } +bool CreateExecutor::CreateSequence(const planner::CreatePlan &node) { + auto txn = context_->GetTransaction(); + std::string database_name = node.GetDatabaseName(); + std::string sequence_name = node.GetSequenceName(); + + auto database_object = catalog::Catalog::GetInstance()->GetDatabaseObject( + database_name, txn); + + catalog::Catalog::GetInstance() + ->GetSystemCatalogs(database_object->GetDatabaseOid()) + ->GetSequenceCatalog() + ->InsertSequence( + database_object->GetDatabaseOid(), sequence_name, + node.GetSequenceIncrement(), node.GetSequenceMaxValue(), + node.GetSequenceMinValue(), node.GetSequenceStart(), + node.GetSequenceCycle(), pool_.get(), txn); + + if (txn->GetResult() == ResultType::SUCCESS) { + LOG_DEBUG("Creating sequence succeeded!"); + } else if (txn->GetResult() == ResultType::FAILURE) { + LOG_DEBUG("Creating sequence failed!"); + } else { + LOG_DEBUG("Result is: %s", + ResultTypeToString(txn->GetResult()).c_str()); + } + + // Notice this action will always return true, since any exception + // will be handled in CreateSequence function in SequencCatalog. + return (true); +} + + } // namespace executor } // namespace peloton diff --git a/src/executor/drop_executor.cpp b/src/executor/drop_executor.cpp index b9413366c2e..0135a286480 100644 --- a/src/executor/drop_executor.cpp +++ b/src/executor/drop_executor.cpp @@ -65,6 +65,10 @@ bool DropExecutor::DExecute() { result = DropIndex(node, current_txn); break; } + case DropType::SEQUENCE:{ + result = DropSequence(node, current_txn); + break; + } default: { throw NotImplementedException( StringUtil::Format("Drop type %d not supported yet.\n", dropType)); @@ -215,6 +219,35 @@ bool DropExecutor::DropTrigger(const planner::DropPlan &node, return false; } +bool DropExecutor::DropSequence(const planner::DropPlan &node, + concurrency::TransactionContext *txn) { + std::string database_name = node.GetDatabaseName(); + std::string sequence_name = node.GetSequenceName(); + auto database_object = catalog::Catalog::GetInstance()->GetDatabaseObject( + database_name, txn); + + // drop sequence + ResultType result = + catalog::Catalog::GetInstance() + ->GetSystemCatalogs(database_object->GetDatabaseOid()) + ->GetSequenceCatalog() + ->DropSequence(database_name,sequence_name, txn); + txn->SetResult(result); + if (txn->GetResult() == ResultType::SUCCESS) { + LOG_DEBUG("Dropping sequence succeeded!"); + } else if (txn->GetResult() == ResultType::FAILURE && node.IsMissing()) { + txn->SetResult(ResultType::SUCCESS); + LOG_TRACE("Dropping Sequence Succeeded!"); + } else if (txn->GetResult() == ResultType::FAILURE && !node.IsMissing()) { + LOG_TRACE("Dropping Sequence Failed!"); + } else { + LOG_TRACE("Result is: %s", ResultTypeToString(txn->GetResult()).c_str()); + } + return false; +} + + + bool DropExecutor::DropIndex(const planner::DropPlan &node, concurrency::TransactionContext *txn) { std::string index_name = node.GetIndexName(); diff --git a/src/executor/executor_context.cpp b/src/executor/executor_context.cpp index ae9281c13fe..29fb3794538 100644 --- a/src/executor/executor_context.cpp +++ b/src/executor/executor_context.cpp @@ -16,12 +16,16 @@ #include "executor/executor_context.h" #include "concurrency/transaction_context.h" + namespace peloton { namespace executor { ExecutorContext::ExecutorContext(concurrency::TransactionContext *transaction, - codegen::QueryParameters parameters) - : transaction_(transaction), parameters_(std::move(parameters)) {} + codegen::QueryParameters parameters, + const std::string default_database_name) + : transaction_(transaction), parameters_(std::move(parameters)), + default_database_name_(default_database_name) { +} concurrency::TransactionContext *ExecutorContext::GetTransaction() const { return transaction_; @@ -43,5 +47,9 @@ type::EphemeralPool *ExecutorContext::GetPool() { return pool_.get(); } +std::string ExecutorContext::GetDatabaseName() const { + return default_database_name_; +} + } // namespace executor } // namespace peloton diff --git a/src/executor/plan_executor.cpp b/src/executor/plan_executor.cpp index 104aff1351c..0fb90e193db 100644 --- a/src/executor/plan_executor.cpp +++ b/src/executor/plan_executor.cpp @@ -36,8 +36,8 @@ static void CompileAndExecutePlan( std::shared_ptr plan, concurrency::TransactionContext *txn, const std::vector ¶ms, - std::function &&)> - on_complete) { + std::function &&)> on_complete, + std::string default_database_name) { LOG_TRACE("Compiling and executing query ..."); // Perform binding @@ -51,7 +51,8 @@ static void CompileAndExecutePlan( std::unique_ptr executor_context( new executor::ExecutorContext(txn, - codegen::QueryParameters(*plan, params))); + codegen::QueryParameters(*plan, params), + default_database_name)); // Compile the query codegen::Query *query = codegen::QueryCache::Instance().Find(plan); @@ -87,12 +88,13 @@ static void InterpretPlan( const std::vector ¶ms, const std::vector &result_format, std::function &&)> - on_complete) { + on_complete, + std::string default_database_name) { executor::ExecutionResult result; std::vector values; std::unique_ptr executor_context( - new executor::ExecutorContext(txn, params)); + new executor::ExecutorContext(txn, params, default_database_name)); bool status; std::unique_ptr executor_tree( @@ -142,8 +144,8 @@ void PlanExecutor::ExecutePlan( concurrency::TransactionContext *txn, const std::vector ¶ms, const std::vector &result_format, - std::function &&)> - on_complete) { + std::function &&)> on_complete, + std::string default_database_name) { PELOTON_ASSERT(plan != nullptr && txn != nullptr); LOG_TRACE("PlanExecutor Start (Txn ID=%" PRId64 ")", txn->GetTransactionId()); @@ -152,9 +154,9 @@ void PlanExecutor::ExecutePlan( try { if (codegen_enabled && codegen::QueryCompiler::IsSupported(*plan)) { - CompileAndExecutePlan(plan, txn, params, on_complete); + CompileAndExecutePlan(plan, txn, params, on_complete, default_database_name); } else { - InterpretPlan(plan, txn, params, result_format, on_complete); + InterpretPlan(plan, txn, params, result_format, on_complete, default_database_name); } } catch (Exception &e) { ExecutionResult result; diff --git a/src/expression/function_expression.cpp b/src/expression/function_expression.cpp index 9f74f30e475..eef472e025f 100644 --- a/src/expression/function_expression.cpp +++ b/src/expression/function_expression.cpp @@ -42,13 +42,17 @@ expression::FunctionExpression::FunctionExpression( type::Value FunctionExpression::Evaluate( const AbstractTuple *tuple1, const AbstractTuple *tuple2, - UNUSED_ATTRIBUTE executor::ExecutorContext *context) const { + executor::ExecutorContext *context) const { std::vector child_values; PELOTON_ASSERT(func_.impl != nullptr); for (auto &child : children_) { child_values.push_back(child->Evaluate(tuple1, tuple2, context)); } + if (func_name_ == "nextval" || func_name_ == "currval") { + uint64_t ctx = (uint64_t)context; + child_values.push_back(type::ValueFactory::GetBigIntValue(ctx)); + } type::Value ret = func_.impl(child_values); diff --git a/src/function/old_engine_string_functions.cpp b/src/function/old_engine_string_functions.cpp index 8add85a1fe1..06fe8a7205d 100644 --- a/src/function/old_engine_string_functions.cpp +++ b/src/function/old_engine_string_functions.cpp @@ -18,7 +18,9 @@ #include "executor/executor_context.h" #include "function/string_functions.h" -#include "type/value_factory.h" +#include "catalog/catalog.h" +#include "catalog/database_catalog.h" +#include "concurrency/transaction_context.h" namespace peloton { namespace function { @@ -234,6 +236,5 @@ type::Value OldEngineStringFunctions::Lower( UNUSED_ATTRIBUTE const std::vector &args) { throw Exception{"Lower not implemented in old engine"}; } - } // namespace function } // namespace peloton diff --git a/src/function/sequence_functions.cpp b/src/function/sequence_functions.cpp new file mode 100644 index 00000000000..1927377b814 --- /dev/null +++ b/src/function/sequence_functions.cpp @@ -0,0 +1,125 @@ +//===----------------------------------------------------------------------===// +// +// Peloton +// +// sequence_functions.cpp +// +// Identification: src/function/sequence_functions.cpp +// +// Copyright (c) 2015-2018, Carnegie Mellon University Database Group +// +//===----------------------------------------------------------------------===// + +#include "function/sequence_functions.h" + +#include "common/macros.h" +#include "executor/executor_context.h" +#include "catalog/catalog.h" +#include "catalog/database_catalog.h" +#include "catalog/sequence_catalog.h" +#include "concurrency/transaction_context.h" +#include "concurrency/transaction_manager_factory.h" +#include "type/value_factory.h" + +namespace peloton { +namespace function { + +/*@brief The actual implementation to get the incremented value for the specified sequence + * @param sequence name, executor context + * @return the next value for the sequence + * @exception the sequence does not exist + */ +uint32_t SequenceFunctions::Nextval(executor::ExecutorContext &ctx, + const char *sequence_name) { + PELOTON_ASSERT(sequence_name != nullptr); + concurrency::TransactionContext* txn = ctx.GetTransaction(); + // get the database oid for this transaction + oid_t database_oid = catalog::Catalog::GetInstance() + ->GetDatabaseObject(ctx.GetDatabaseName(), txn)->GetDatabaseOid(); + + // initialize a new transaction for incrementing sequence value + auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance(); + auto mini_txn = txn_manager.BeginTransaction(); + + // evict the old cached copy of sequence + txn->catalog_cache.EvictSequenceObject(sequence_name,database_oid); + auto sequence_object = + catalog::Catalog::GetInstance() + ->GetSystemCatalogs(database_oid) + ->GetSequenceCatalog() + ->GetSequence(database_oid, sequence_name, mini_txn); + if (sequence_object != nullptr) { + uint32_t val = sequence_object->GetNextVal(); + // insert the new copy of sequence into cache for future currval + bool insert = txn->catalog_cache.InsertSequenceObject(sequence_object); + PELOTON_ASSERT(insert); + auto ret = txn_manager.CommitTransaction(mini_txn); + while (ret != ResultType::SUCCESS) { + ret = txn_manager.CommitTransaction(mini_txn); + } + return val; + } else { + throw SequenceException( + StringUtil::Format("Sequence \"%s\" does not exist", sequence_name)); + } +} + +/*@brief The actual implementation to get the current value for the specified sequence + * @param sequence name, executor context + * @return the current value of a sequence + * @exception either the sequence does not exist, or 'call nextval before currval' + */ +uint32_t SequenceFunctions::Currval(executor::ExecutorContext &ctx, + const char *sequence_name) { + PELOTON_ASSERT(sequence_name != nullptr); + concurrency::TransactionContext* txn = ctx.GetTransaction(); + // get the database oid for this transaction + oid_t database_oid = catalog::Catalog::GetInstance() + ->GetDatabaseObject(ctx.GetDatabaseName(), txn)->GetDatabaseOid(); + // get the sequence copy from cache + auto sequence_object = txn-> + catalog_cache.GetSequenceObject(sequence_name, database_oid); + if (sequence_object != nullptr) { + return sequence_object->GetCurrVal(); + } else { + // get sequence from catalog + sequence_object = + catalog::Catalog::GetInstance() + ->GetSystemCatalogs(database_oid) + ->GetSequenceCatalog() + ->GetSequence(database_oid, sequence_name, txn); + if (sequence_object != nullptr) { + // nextval not called brefore + throw SequenceException( + StringUtil::Format("Nextval never called for sequence \"%s\"", + sequence_name)); + } else { + // sequence does not exist + throw SequenceException( + StringUtil::Format("Sequence \"%s\" does not exist", sequence_name)); + } + } +} + +/*@brief The wrapper function to get the incremented value for the specified sequence + * @param sequence name, executor context + * @return the result of executing NextVal + */ +type::Value SequenceFunctions::_Nextval(const std::vector &args) { + executor::ExecutorContext* ctx=(executor::ExecutorContext*)args[1].GetAs(); + uint32_t ret = SequenceFunctions::Nextval(*ctx, args[0].GetAs()); + return type::ValueFactory::GetIntegerValue(ret); +} + +/*@brief The wrapper function to get the current value for the specified sequence + * @param sequence name, executor context + * @return the result of executing CurrVal + */ +type::Value SequenceFunctions::_Currval(const std::vector &args) { + executor::ExecutorContext* ctx=(executor::ExecutorContext*)args[1].GetAs(); + uint32_t ret = SequenceFunctions::Currval(*ctx, args[0].GetAs()); + return type::ValueFactory::GetIntegerValue(ret); +} + +} // namespace function +} // namespace peloton diff --git a/src/include/catalog/catalog_cache.h b/src/include/catalog/catalog_cache.h index 5bde19760ad..b00ec7403c8 100644 --- a/src/include/catalog/catalog_cache.h +++ b/src/include/catalog/catalog_cache.h @@ -23,9 +23,14 @@ namespace planner { class PlanUtil; } // namespace planner +namespace function { +class SequenceFunctions; +} // namespace function + namespace catalog { class DatabaseCatalogObject; +class SequenceCatalogObject; class TableCatalogObject; class IndexCatalogObject; @@ -38,6 +43,8 @@ class CatalogCache { friend class TableCatalogObject; friend class IndexCatalogObject; friend class planner::PlanUtil; + friend class SequenceCatalogObject; + friend class function::SequenceFunctions; public: CatalogCache() {} @@ -60,11 +67,25 @@ class CatalogCache { bool EvictDatabaseObject(oid_t database_oid); bool EvictDatabaseObject(const std::string &database_name); + // sequence catalog cache interface + bool InsertSequenceObject( + std::shared_ptr sequence_object); + bool EvictSequenceObject(const std::string &sequence_name, + oid_t database_oid); + std::shared_ptr GetSequenceObject( + const std::string &sequence_name, oid_t database_oid); + std::size_t GetHashKey(std::string sequence_name, oid_t database_oid); + // cache for database catalog object std::unordered_map> database_objects_cache; std::unordered_map> database_name_cache; + + // cache for sequence catalog object + std::unordered_map> + sequence_objects_cache; + }; } // namespace catalog diff --git a/src/include/catalog/catalog_defaults.h b/src/include/catalog/catalog_defaults.h index 2cfcacbda70..17960b14391 100644 --- a/src/include/catalog/catalog_defaults.h +++ b/src/include/catalog/catalog_defaults.h @@ -32,6 +32,7 @@ namespace catalog { #define TABLE_CATALOG_NAME "pg_table" #define INDEX_CATALOG_NAME "pg_index" #define COLUMN_CATALOG_NAME "pg_attribute" +#define SEQUENCE_CATALOG_NAME "pg_sequence" // Local oids from START_OID = 0 to START_OID + OID_OFFSET are reserved #define OID_OFFSET 100 @@ -45,6 +46,7 @@ namespace catalog { #define TRIGGER_OID_MASK (static_cast(catalog::CatalogType::TRIGGER)) #define LANGUAGE_OID_MASK (static_cast(catalog::CatalogType::LANGUAGE)) #define PROC_OID_MASK (static_cast(catalog::CatalogType::PROC)) +#define SEQUENCE_OID_MASK (static_cast(catalog::CatalogType::SEQUENCE)) // Reserved peloton database oid #define CATALOG_DATABASE_OID (0 | DATABASE_OID_MASK) @@ -99,6 +101,7 @@ enum class CatalogType : uint32_t { TRIGGER = 6 << CATALOG_TYPE_OFFSET, LANGUAGE = 7 << CATALOG_TYPE_OFFSET, PROC = 8 << CATALOG_TYPE_OFFSET, + SEQUENCE = 9 << CATALOG_TYPE_OFFSET, // To be added }; diff --git a/src/include/catalog/sequence_catalog.h b/src/include/catalog/sequence_catalog.h new file mode 100644 index 00000000000..a833cb7df2e --- /dev/null +++ b/src/include/catalog/sequence_catalog.h @@ -0,0 +1,178 @@ +//===----------------------------------------------------------------------===// +// +// Peloton +// +// sequence_catalog.h +// +// Identification: src/include/catalog/sequence_catalog.h +// +// Copyright (c) 2015-17, Carnegie Mellon University Database Group +// +//===----------------------------------------------------------------------===// + +//===----------------------------------------------------------------------===// +// pg_trigger +// +// Schema: (column offset: column_name) +// 0: oid (pkey) +// 1: sqdboid : database_oid +// 2: sqname : sequence_name +// 3: sqinc : seq_increment +// 4: sqmax : seq_max +// 5: sqmin : seq_min +// 6: sqstart : seq_start +// 7: sqcycle : seq_cycle +// 7: sqval : seq_value +// +// Indexes: (index offset: indexed columns) +// 0: oid (primary key) +// 1: (sqdboid, sqname) (secondary key 0) +//===----------------------------------------------------------------------===// + +#pragma once + +#include +#include +#include +#include + +#include "catalog/abstract_catalog.h" +#include "catalog/catalog_defaults.h" +#include "catalog/system_catalogs.h" + +namespace peloton { + +namespace concurrency { +class TransactionContext; +} + +namespace catalog { + +class SequenceCatalogObject { + public: + SequenceCatalogObject(oid_t seqoid, oid_t dboid, const std::string &name, + const int64_t seqstart, const int64_t seqincrement, + const int64_t seqmax, const int64_t seqmin, + const bool seqcycle, const int64_t seqval, + concurrency::TransactionContext *txn) + : seq_oid(seqoid), + db_oid(dboid), + seq_name(name), + seq_start(seqstart), + seq_increment(seqincrement), + seq_max(seqmax), + seq_min(seqmin), + seq_cycle(seqcycle), + txn_(txn), + seq_curr_val(seqval){}; + + oid_t seq_oid; + oid_t db_oid; + std::string seq_name; + int64_t seq_start; // Start value of the sequence + int64_t seq_increment; // Increment value of the sequence + int64_t seq_max; // Maximum value of the sequence + int64_t seq_min; // Minimum value of the sequence + int64_t seq_cache; // Cache size of the sequence + bool seq_cycle; // Whether the sequence cycles + concurrency::TransactionContext *txn_; + + int64_t seq_prev_val; + + int64_t GetNextVal(); + + int64_t GetCurrVal() { + return seq_prev_val; + }; + + void SetCurrVal(int64_t curr_val) { + seq_curr_val = curr_val; + }; // only visible for test! + void SetCycle(bool cycle) { seq_cycle = cycle; }; + + private: + int64_t seq_curr_val; +}; + +class SequenceCatalog : public AbstractCatalog { + public: + SequenceCatalog(const std::string &database_name, + concurrency::TransactionContext *txn); + ~SequenceCatalog(); + + //===--------------------------------------------------------------------===// + // write Related API + //===--------------------------------------------------------------------===// + bool InsertSequence(oid_t database_oid, std::string sequence_name, + int64_t seq_increment, int64_t seq_max, int64_t seq_min, + int64_t seq_start, bool seq_cycle, + type::AbstractPool *pool, + concurrency::TransactionContext *txn); + + ResultType DropSequence(const std::string &database_name, + const std::string &sequence_name, + concurrency::TransactionContext *txn); + + bool DeleteSequenceByName(const std::string &sequence_name, + oid_t database_oid, + concurrency::TransactionContext *txn); + + std::shared_ptr GetSequence( + oid_t database_oid, const std::string &sequence_name, + concurrency::TransactionContext *txn); + + oid_t GetSequenceOid(std::string sequence_name, oid_t database_oid, + concurrency::TransactionContext *txn); + + bool UpdateNextVal(oid_t sequence_oid, int64_t nextval, + concurrency::TransactionContext *txn); + + enum ColumnId { + SEQUENCE_OID = 0, + DATABSE_OID = 1, + SEQUENCE_NAME = 2, + SEQUENCE_INC = 3, + SEQUENCE_MAX = 4, + SEQUENCE_MIN = 5, + SEQUENCE_START = 6, + SEQUENCE_CYCLE = 7, + SEQUENCE_VALUE = 8 + }; + + enum IndexId { + PRIMARY_KEY = 0, + DBOID_SEQNAME_KEY = 1 + }; + + private: + oid_t GetNextOid() { return oid_++ | SEQUENCE_OID_MASK; } + + void ValidateSequenceArguments(int64_t seq_increment, int64_t seq_max, + int64_t seq_min, int64_t seq_start) { + if (seq_min > seq_max) { + throw SequenceException( + StringUtil::Format( + "MINVALUE (%d) must be less than MAXVALUE (%d)", seq_min, seq_max)); + } + + if (seq_increment == 0) { + throw SequenceException( + StringUtil::Format("INCREMENT must not be zero")); + } + + if (seq_increment > 0 && seq_start < seq_min) { + throw SequenceException( + StringUtil::Format( + "START value (%d) cannot be less than MINVALUE (%d)", seq_start, seq_min)); + } + + if (seq_increment < 0 && seq_start > seq_max) { + throw SequenceException( + StringUtil::Format( + "START value (%d) cannot be greater than MAXVALUE (%d)", seq_start, seq_max)); + } + }; +}; + +} // namespace catalog +} // namespace peloton diff --git a/src/include/catalog/system_catalogs.h b/src/include/catalog/system_catalogs.h index 7791c019097..d8d8900480e 100644 --- a/src/include/catalog/system_catalogs.h +++ b/src/include/catalog/system_catalogs.h @@ -21,6 +21,7 @@ #include "catalog/table_catalog.h" #include "catalog/table_metrics_catalog.h" #include "catalog/trigger_catalog.h" +#include "catalog/sequence_catalog.h" namespace peloton { @@ -34,6 +35,7 @@ class SchemaCatalog; class TableCatalog; class IndexCatalog; class ColumnCatalog; +class SequenceCatalog; class SystemCatalogs { public: @@ -86,6 +88,13 @@ class SystemCatalogs { return pg_trigger_; } + SequenceCatalog *GetSequenceCatalog() { + if (!pg_sequence_) { + throw CatalogException("Sequence catalog catalog has not been initialized"); + } + return pg_sequence_; + } + TableMetricsCatalog *GetTableMetricsCatalog() { if (!pg_table_metrics_) { throw CatalogException("Table metrics catalog has not been initialized"); @@ -114,6 +123,8 @@ class SystemCatalogs { IndexCatalog *pg_index_; TriggerCatalog *pg_trigger_; + SequenceCatalog *pg_sequence_; + // ProcCatalog *pg_proc; TableMetricsCatalog *pg_table_metrics_; IndexMetricsCatalog *pg_index_metrics_; diff --git a/src/include/codegen/proxy/sequence_functions_proxy.h b/src/include/codegen/proxy/sequence_functions_proxy.h new file mode 100644 index 00000000000..222b6cf4fe2 --- /dev/null +++ b/src/include/codegen/proxy/sequence_functions_proxy.h @@ -0,0 +1,28 @@ +//===----------------------------------------------------------------------===// +// +// Peloton +// +// string_functions_proxy.h +// +// Identification: src/include/codegen/proxy/string_functions_proxy.h +// +// Copyright (c) 2015-2018, Carnegie Mellon University Database Group +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "codegen/proxy/proxy.h" +#include "codegen/proxy/type_builder.h" +#include "function/sequence_functions.h" + +namespace peloton { +namespace codegen { + +PROXY(SequenceFunctions) { + DECLARE_METHOD(Nextval); + DECLARE_METHOD(Currval); +}; + +} // namespace codegen +} // namespace peloton diff --git a/src/include/codegen/type/varchar_type.h b/src/include/codegen/type/varchar_type.h index 796d493772a..ffaf76b8c43 100644 --- a/src/include/codegen/type/varchar_type.h +++ b/src/include/codegen/type/varchar_type.h @@ -50,4 +50,4 @@ class Varchar : public SqlType, public Singleton { } // namespace type } // namespace codegen -} // namespace peloton \ No newline at end of file +} // namespace peloton diff --git a/src/include/common/exception.h b/src/include/common/exception.h index 4c201891751..d8e82565da1 100644 --- a/src/include/common/exception.h +++ b/src/include/common/exception.h @@ -59,7 +59,8 @@ enum class ExceptionType { SETTINGS = 23, // settings related BINDER = 24, // binder related NETWORK = 25, // network related - OPTIMIZER = 26 // optimizer related + OPTIMIZER = 26, // optimizer related + SEQUENCE = 27 // sequence related }; class Exception : public std::runtime_error { @@ -76,9 +77,7 @@ class Exception : public std::runtime_error { "\nMessage :: " + message; } - std::string GetMessage() { - return exception_message_; - } + std::string GetMessage() { return exception_message_; } std::string ExceptionTypeToString(ExceptionType type) { switch (type) { @@ -132,6 +131,8 @@ class Exception : public std::runtime_error { return "Settings"; case ExceptionType::OPTIMIZER: return "Optimizer"; + case ExceptionType::SEQUENCE: + return "Sequence"; default: return "Unknown"; } @@ -467,4 +468,12 @@ class OptimizerException : public Exception { : Exception(ExceptionType::OPTIMIZER, msg) {} }; +class SequenceException : public Exception { + SequenceException() = delete; + + public: + SequenceException(std::string msg) + : Exception(ExceptionType::SEQUENCE, msg) {} +}; + } // namespace peloton diff --git a/src/include/common/internal_types.h b/src/include/common/internal_types.h index 17020512944..3648cc2001b 100644 --- a/src/include/common/internal_types.h +++ b/src/include/common/internal_types.h @@ -616,6 +616,7 @@ enum class CreateType { CONSTRAINT = 4, // constraint create type TRIGGER = 5, // trigger create type SCHEMA = 6, // schema create type + SEQUENCE = 7 // sequencce create type }; std::string CreateTypeToString(CreateType type); CreateType StringToCreateType(const std::string &str); @@ -632,7 +633,8 @@ enum class DropType { INDEX = 3, // index drop type CONSTRAINT = 4, // constraint drop type TRIGGER = 5, // trigger drop type - SCHEMA = 6, // trigger drop type + SCHEMA = 6, // schema drop type + SEQUENCE = 7, // sequence drop type }; std::string DropTypeToString(DropType type); DropType StringToDropType(const std::string &str); @@ -712,7 +714,9 @@ enum class QueryType { QUERY_CREATE_TRIGGER = 21, QUERY_CREATE_SCHEMA = 22, QUERY_CREATE_VIEW = 23, - QUERY_EXPLAIN = 24 + QUERY_EXPLAIN = 24, + QUERY_CREATE_SEQUENCE = 25, + QUERY_DROP_SEQUENCE = 26 }; std::string QueryTypeToString(QueryType query_type); QueryType StringToQueryType(std::string str); @@ -1094,6 +1098,8 @@ enum class OperatorId : uint32_t { DateTrunc, Like, Now, + Nextval, + Currval, // Add more operators here, before the last "Invalid" entry Invalid diff --git a/src/include/executor/create_executor.h b/src/include/executor/create_executor.h index 19b29dd24aa..902e44bd6b4 100644 --- a/src/include/executor/create_executor.h +++ b/src/include/executor/create_executor.h @@ -54,6 +54,8 @@ class CreateExecutor : public AbstractExecutor { bool CreateTrigger(const planner::CreatePlan &node); + bool CreateSequence(const planner::CreatePlan &node); + private: ExecutorContext *context_; diff --git a/src/include/executor/drop_executor.h b/src/include/executor/drop_executor.h index 4454ebe2b5d..8e792083518 100644 --- a/src/include/executor/drop_executor.h +++ b/src/include/executor/drop_executor.h @@ -59,6 +59,9 @@ class DropExecutor : public AbstractExecutor { bool DropIndex(const planner::DropPlan &node, concurrency::TransactionContext *txn); + + bool DropSequence(const planner::DropPlan &node, + concurrency::TransactionContext *txn); private: ExecutorContext *context_; diff --git a/src/include/executor/executor_context.h b/src/include/executor/executor_context.h index 79cfe5cd19b..0e02a3e6d2a 100644 --- a/src/include/executor/executor_context.h +++ b/src/include/executor/executor_context.h @@ -30,7 +30,8 @@ namespace executor { class ExecutorContext { public: explicit ExecutorContext(concurrency::TransactionContext *transaction, - codegen::QueryParameters parameters = {}); + codegen::QueryParameters parameters = {}, + std::string default_database_name = ""); DISALLOW_COPY_AND_MOVE(ExecutorContext); @@ -44,6 +45,8 @@ class ExecutorContext { type::EphemeralPool *GetPool(); + std::string GetDatabaseName() const; + // Number of processed tuples during execution uint32_t num_processed = 0; @@ -54,6 +57,8 @@ class ExecutorContext { codegen::QueryParameters parameters_; // Temporary memory pool for allocations done during execution std::unique_ptr pool_; + // Default database name + std::string default_database_name_; }; } // namespace executor diff --git a/src/include/executor/plan_executor.h b/src/include/executor/plan_executor.h index 49d32b98b71..99d8d51eedd 100644 --- a/src/include/executor/plan_executor.h +++ b/src/include/executor/plan_executor.h @@ -61,7 +61,8 @@ class PlanExecutor { const std::vector ¶ms, const std::vector &result_format, std::function &&)> on_complete); + std::vector &&)> on_complete, + std::string default_database_name=""); /* * @brief When a peloton node recvs a query plan, this function is invoked diff --git a/src/include/function/old_engine_string_functions.h b/src/include/function/old_engine_string_functions.h index 7603ac14fd0..d80f2e6c150 100644 --- a/src/include/function/old_engine_string_functions.h +++ b/src/include/function/old_engine_string_functions.h @@ -20,6 +20,7 @@ namespace peloton { namespace function { class OldEngineStringFunctions { + public: // ASCII code of the first character of the argument. static type::Value Ascii(const std::vector &args); diff --git a/src/include/function/sequence_functions.h b/src/include/function/sequence_functions.h new file mode 100644 index 00000000000..a91faf5dffd --- /dev/null +++ b/src/include/function/sequence_functions.h @@ -0,0 +1,41 @@ +//===----------------------------------------------------------------------===// +// +// Peloton +// +// sequence_functions.h +// +// Identification: src/include/function/sequence_functions.h +// +// Copyright (c) 2015-2018, Carnegie Mellon University Database Group +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include +#include "type/value.h" + +namespace peloton { + +namespace executor { +class ExecutorContext; +} // namespace executor + +namespace function { + +class SequenceFunctions { + public: + + // Nextval will return the next value of the given sequence + static uint32_t Nextval(executor::ExecutorContext &ctx, const char *sequence_name); + + // Currval will return the current value of the given sequence + static uint32_t Currval(executor::ExecutorContext &ctx, const char *sequence_name); + + // Wrapper function used for AddBuiltin Functions + static type::Value _Nextval(const std::vector &args); + static type::Value _Currval(const std::vector &args); +}; + +} // namespace function +} // namespace peloton diff --git a/src/include/parser/create_statement.h b/src/include/parser/create_statement.h index f83b24222ef..a3b27a9dd30 100644 --- a/src/include/parser/create_statement.h +++ b/src/include/parser/create_statement.h @@ -13,6 +13,8 @@ #pragma once #include +#include + #include "common/internal_types.h" #include "common/sql_node_visitor.h" #include "expression/abstract_expression.h" @@ -215,7 +217,15 @@ struct ColumnDefinition { */ class CreateStatement : public TableRefStatement { public: - enum CreateType { kTable, kDatabase, kIndex, kTrigger, kSchema, kView }; + enum CreateType { + kTable, + kDatabase, + kIndex, + kTrigger, + kSchema, + kView, + kSequence + }; CreateStatement(CreateType type) : TableRefStatement(StatementType::CREATE), @@ -252,6 +262,14 @@ class CreateStatement : public TableRefStatement { std::unique_ptr trigger_when; int16_t trigger_type; // information about row, timing, events, access by // pg_trigger + + // attributes related to sequences + std::string sequence_name; + int64_t seq_start = 1; + int64_t seq_increment = 1; + int64_t seq_max_value = LONG_MAX; + int64_t seq_min_value = 1; + bool seq_cycle = false; }; } // namespace parser diff --git a/src/include/parser/drop_statement.h b/src/include/parser/drop_statement.h index 9612ed5fe7e..4bb693af3b0 100644 --- a/src/include/parser/drop_statement.h +++ b/src/include/parser/drop_statement.h @@ -31,7 +31,8 @@ class DropStatement : public TableRefStatement { kIndex, kView, kPreparedStatement, - kTrigger + kTrigger, + kSequence }; DropStatement(EntityType type) @@ -81,6 +82,13 @@ class DropStatement : public TableRefStatement { std::string GetTriggerTableName() { return GetTableName(); } + std::string &GetSequenceName() { return sequence_name_; } + + void SetSequenceName(std::string &sequence_name) { + sequence_name_ = sequence_name; + } + void SetSequenceName(char *sequence_name) { sequence_name_ = sequence_name; } + virtual ~DropStatement() {} virtual void Accept(SqlNodeVisitor *v) override { v->Visit(this); } @@ -105,6 +113,9 @@ class DropStatement : public TableRefStatement { // drop trigger std::string trigger_name_; + + // drop sequence + std::string sequence_name_; }; } // namespace parser diff --git a/src/include/parser/parsenodes.h b/src/include/parser/parsenodes.h index bf818ff6b86..a43c937076c 100644 --- a/src/include/parser/parsenodes.h +++ b/src/include/parser/parsenodes.h @@ -730,6 +730,16 @@ typedef struct CreateSchemaStmt bool if_not_exists; /* just do nothing if schema already exists? */ } CreateSchemaStmt; +typedef struct CreateSeqStmt +{ + NodeTag type; + RangeVar *sequence; /* the sequence to create */ + List *options; + Oid ownerId; /* ID of owner, or InvalidOid for default */ + bool for_identity; + bool if_not_exists; /* just do nothing if it already exists? */ +} CreateSeqStmt; + typedef enum RoleSpecType { ROLESPEC_CSTRING, /* role name is stored as a C string */ diff --git a/src/include/parser/postgresparser.h b/src/include/parser/postgresparser.h index decd43d9ee7..56497e10225 100644 --- a/src/include/parser/postgresparser.h +++ b/src/include/parser/postgresparser.h @@ -118,8 +118,8 @@ class PostgresParser { static parser::TableRef *FromTransform(SelectStmt *root); // transform helper for select targets - static std::vector> - *TargetTransform(List *root); + static std::vector> * + TargetTransform(List *root); // transform helper for all expr nodes static expression::AbstractExpression *ExprTransform(Node *root); @@ -167,7 +167,8 @@ class PostgresParser { static parser::OrderDescription *OrderByTransform(List *order); // transform helper for table column definitions - static void ColumnDefTransform(ColumnDef* root, parser::CreateStatement* stmt); + static void ColumnDefTransform(ColumnDef *root, + parser::CreateStatement *stmt); // transform helper for create statements static parser::SQLStatement *CreateTransform(CreateStmt *root); @@ -195,7 +196,8 @@ class PostgresParser { * @param Postgres CreateDatabaseStmt parsenode * @return a peloton CreateStatement node */ - static parser::SQLStatement *CreateDatabaseTransform(CreateDatabaseStmt *root); + static parser::SQLStatement *CreateDatabaseTransform( + CreateDatabaseStmt *root); // transform helper for create schema statements static parser::SQLStatement *CreateSchemaTransform(CreateSchemaStmt *root); @@ -203,13 +205,15 @@ class PostgresParser { // transform helper for create view statements static parser::SQLStatement *CreateViewTransform(ViewStmt *root); + static parser::SQLStatement *CreateSequenceTransform(CreateSeqStmt *root); + // transform helper for column name (for insert statement) static std::vector *ColumnNameTransform(List *root); // transform helper for ListsTransform (insert multiple rows) static std::vector< - std::vector>> - *ValueListsTransform(List *root); + std::vector>> * + ValueListsTransform(List *root); // transform helper for insert statements static parser::SQLStatement *InsertTransform(InsertStmt *root); @@ -233,8 +237,8 @@ class PostgresParser { static parser::UpdateStatement *UpdateTransform(UpdateStmt *update_stmt); // transform helper for update statement - static std::vector> - *UpdateTargetTransform(List *root); + static std::vector> * + UpdateTargetTransform(List *root); // transform helper for drop statement static parser::DropStatement *DropTransform(DropStmt *root); @@ -253,6 +257,9 @@ class PostgresParser { // transform helper for drop trigger statement static parser::DropStatement *DropTriggerTransform(DropStmt *root); + // transform helper for drop sequence statement + static parser::DropStatement *DropSequenceTransform(DropStmt *root); + // transform helper for drop schema statement static parser::DropStatement *DropSchemaTransform(DropStmt *root); @@ -282,13 +289,20 @@ class PostgresParser { static parser::CopyStatement *CopyTransform(CopyStmt *root); // transform helper for analyze statement - static parser::AnalyzeStatement *VacuumTransform(VacuumStmt* root); + static parser::AnalyzeStatement *VacuumTransform(VacuumStmt *root); - static parser::VariableSetStatement *VariableSetTransform(VariableSetStmt* root); + static parser::VariableSetStatement *VariableSetTransform( + VariableSetStmt *root); // transform helper for subquery expressions static expression::AbstractExpression *SubqueryExprTransform(SubLink *node); + static void ParseSequenceParams(List *options, + parser::CreateStatement *result); + + static int64_t GetLongInDefElem(DefElem *defel) { + return (int64_t)((reinterpret_cast(defel->arg))->val.ival); + }; }; } // namespace parser diff --git a/src/include/planner/create_plan.h b/src/include/planner/create_plan.h index ecf6a0524fe..3b5cda6e3df 100644 --- a/src/include/planner/create_plan.h +++ b/src/include/planner/create_plan.h @@ -14,6 +14,7 @@ #include "parser/create_statement.h" #include "planner/abstract_plan.h" +#include "common/exception.h" namespace peloton { namespace catalog { @@ -114,6 +115,14 @@ class CreatePlan : public AbstractPlan { int16_t GetTriggerType() const { return trigger_type; } + std::string GetSequenceName() const { return sequence_name; } + int64_t GetSequenceStart() const { return seq_start; }; + int64_t GetSequenceIncrement() const { return seq_increment; } + int64_t GetSequenceMaxValue() const { return seq_max_value; } + int64_t GetSequenceMinValue() const { return seq_min_value; } + int64_t GetSequenceCacheSize() const { return seq_cache; } + bool GetSequenceCycle() const { return seq_cycle; } + protected: // This is a helper method for extracting foreign key information // and storing it in an internal struct. @@ -159,6 +168,15 @@ class CreatePlan : public AbstractPlan { int16_t trigger_type; // information about row, timing, events, access by // pg_trigger + // information for sequences; + std::string sequence_name; + int64_t seq_start; + int64_t seq_increment; + int64_t seq_max_value; + int64_t seq_min_value; + int64_t seq_cache; // sequence cache size, not supported yet + bool seq_cycle; + private: DISALLOW_COPY_AND_MOVE(CreatePlan); }; diff --git a/src/include/planner/drop_plan.h b/src/include/planner/drop_plan.h index c5593680202..9ef92f73c93 100644 --- a/src/include/planner/drop_plan.h +++ b/src/include/planner/drop_plan.h @@ -57,6 +57,8 @@ class DropPlan : public AbstractPlan { std::string GetSchemaName() const { return schema_name; } std::string GetTriggerName() const { return trigger_name; } + + std::string GetSequenceName() const { return sequence_name; } std::string GetIndexName() const { return index_name; } @@ -77,6 +79,10 @@ class DropPlan : public AbstractPlan { std::string schema_name; std::string trigger_name; + + // sequence name + std::string sequence_name; + std::string index_name; bool missing; diff --git a/src/network/postgres_protocol_handler.cpp b/src/network/postgres_protocol_handler.cpp index ffbb786b88e..4aabc487cb1 100644 --- a/src/network/postgres_protocol_handler.cpp +++ b/src/network/postgres_protocol_handler.cpp @@ -1222,6 +1222,7 @@ void PostgresProtocolHandler::CompleteCommand(const QueryType &query_type, case QueryType::QUERY_CREATE_DB: case QueryType::QUERY_CREATE_INDEX: case QueryType::QUERY_CREATE_TRIGGER: + case QueryType::QUERY_CREATE_SEQUENCE: case QueryType::QUERY_PREPARE: break; default: diff --git a/src/parser/create_statement.cpp b/src/parser/create_statement.cpp index 40be9a437fa..bfe014885b6 100644 --- a/src/parser/create_statement.cpp +++ b/src/parser/create_statement.cpp @@ -72,6 +72,12 @@ const std::string CreateStatement::GetInfo(int num_indent) const { << StringUtil::Format("View name: %s", view_name.c_str()); break; } + case CreateStatement::CreateType::kSequence: { + os << "Create type: Sequence" << std::endl; + os << StringUtil::Indent(num_indent + 1) + << StringUtil::Format("Sequence name: %s", sequence_name.c_str()); + break; + } } os << std::endl; diff --git a/src/parser/drop_statement.cpp b/src/parser/drop_statement.cpp index feb266f89f2..348ed48e8b5 100644 --- a/src/parser/drop_statement.cpp +++ b/src/parser/drop_statement.cpp @@ -66,6 +66,14 @@ const std::string DropStatement::GetInfo(int num_indent) const { << "Trigger name: " << trigger_name_; break; } + case kSequence: { + os << "DropType: Sequence\n"; + os << StringUtil::Indent(num_indent + 1) + << "Sequence database name: " << GetDatabaseName() << std::endl; + os << StringUtil::Indent(num_indent + 1) + << "Sequence name: " << sequence_name_; + break; + } } os << std::endl; os << StringUtil::Indent(num_indent + 1) diff --git a/src/parser/postgresparser.cpp b/src/parser/postgresparser.cpp index 797b77406b5..453e2bcb651 100644 --- a/src/parser/postgresparser.cpp +++ b/src/parser/postgresparser.cpp @@ -284,12 +284,10 @@ expression::AbstractExpression *PostgresParser::ColumnRefTransform( ->val.str)); } else { result = new expression::TupleValueExpression( - std::string( - (reinterpret_cast(fields->head->next->data.ptr_value)) - ->val.str), - std::string( - (reinterpret_cast(fields->head->data.ptr_value)) - ->val.str)); + std::string((reinterpret_cast( + fields->head->next->data.ptr_value))->val.str), + std::string((reinterpret_cast( + fields->head->data.ptr_value))->val.str)); } break; } @@ -488,9 +486,8 @@ expression::AbstractExpression *PostgresParser::TypeCastTransform( } TypeName *type_name = root->typeName; - char *name = - (reinterpret_cast(type_name->names->tail->data.ptr_value) - ->val.str); + char *name = (reinterpret_cast( + type_name->names->tail->data.ptr_value)->val.str); type::VarlenType temp(StringToTypeId("INVALID")); result = new expression::ConstantValueExpression( temp.CastAs(source_value, ColumnDefinition::StrToValueType(name))); @@ -556,8 +553,8 @@ expression::AbstractExpression *PostgresParser::FuncCallTransform( // This function takes in the whereClause part of a Postgres SelectStmt // parsenode and transfers it into the select_list of a Peloton SelectStatement. // It checks the type of each target and call the corresponding helpers. -std::vector> - *PostgresParser::TargetTransform(List *root) { +std::vector> * +PostgresParser::TargetTransform(List *root) { // Statement like 'SELECT;' cannot detect by postgres parser and would lead to // null list if (root == nullptr) { @@ -863,9 +860,8 @@ expression::AbstractExpression *PostgresParser::WhenTransform(Node *root) { void PostgresParser::ColumnDefTransform(ColumnDef *root, parser::CreateStatement *stmt) { TypeName *type_name = root->typeName; - char *name = - (reinterpret_cast(type_name->names->tail->data.ptr_value) - ->val.str); + char *name = (reinterpret_cast( + type_name->names->tail->data.ptr_value)->val.str); parser::ColumnDefinition *result = nullptr; parser::ColumnDefinition::DataType data_type = @@ -1056,9 +1052,8 @@ parser::FuncParameter *PostgresParser::FunctionParameterTransform( FunctionParameter *root) { parser::FuncParameter::DataType data_type; TypeName *type_name = root->argType; - char *name = - (reinterpret_cast(type_name->names->tail->data.ptr_value) - ->val.str); + char *name = (reinterpret_cast( + type_name->names->tail->data.ptr_value)->val.str); parser::FuncParameter *result = nullptr; // Transform parameter type @@ -1352,12 +1347,83 @@ parser::SQLStatement *PostgresParser::CreateViewTransform(ViewStmt *root) { return result; } +parser::SQLStatement *PostgresParser::CreateSequenceTransform( + CreateSeqStmt *root) { + parser::CreateStatement *result = + new parser::CreateStatement(CreateStatement::kSequence); + result->sequence_name = std::string(root->sequence->relname); + ParseSequenceParams(root->options, result); + return result; +} + +void PostgresParser::ParseSequenceParams(List *options, + parser::CreateStatement *result) { + DefElem *start_value = NULL; + DefElem *increment_by = NULL; + DefElem *max_value = NULL; + DefElem *min_value = NULL; + DefElem *is_cycled = NULL; + if (!options) return; + + ListCell *option; + for (option = options->head; option != NULL; option = lnext(option)) { + DefElem *defel = (DefElem *)lfirst(option); + + if (strcmp(defel->defname, "increment") == 0) { + if (increment_by) + throw ParserException( + "Redundant definition of increment in defining sequence"); + increment_by = defel; + result->seq_increment = GetLongInDefElem(increment_by); + } else if (strcmp(defel->defname, "start") == 0) { + if (start_value) + throw ParserException( + "Redundant definition of start in defining sequence"); + start_value = defel; + result->seq_start = GetLongInDefElem(start_value); + } else if (strcmp(defel->defname, "maxvalue") == 0) { + if (max_value) + throw ParserException( + "Redundant definition of max in defining sequence"); + max_value = defel; + result->seq_max_value = GetLongInDefElem(max_value); + } else if (strcmp(defel->defname, "minvalue") == 0) { + if (min_value) + throw ParserException( + "Redundant definition of min in defining sequence"); + min_value = defel; + result->seq_min_value = GetLongInDefElem(min_value); + } else if (strcmp(defel->defname, "cycle") == 0) { + if (is_cycled) + throw ParserException( + "Redundant definition of cycle in defining sequence"); + is_cycled = defel; + result->seq_cycle = (bool)GetLongInDefElem(is_cycled); + } + else + throw ParserException( + StringUtil::Format("option \"%s\" not recognized\n", defel->defname)); + } + + // manually set the start value for a sequence + if (!start_value) { + if(result->seq_increment < 0 && max_value){ + result->seq_start = result->seq_max_value; + } + else if (result->seq_increment > 0 && min_value){ + result->seq_start = result->seq_min_value; + } + } +} + parser::DropStatement *PostgresParser::DropTransform(DropStmt *root) { switch (root->removeType) { case ObjectType::OBJECT_TABLE: return DropTableTransform(root); case ObjectType::OBJECT_TRIGGER: return DropTriggerTransform(root); + case ObjectType::OBJECT_SEQUENCE: + return DropSequenceTransform(root); case ObjectType::OBJECT_INDEX: return DropIndexTransform(root); case ObjectType::OBJECT_SCHEMA: @@ -1431,6 +1497,16 @@ parser::DropStatement *PostgresParser::DropTriggerTransform(DropStmt *root) { return result; } +parser::DropStatement *PostgresParser::DropSequenceTransform(DropStmt *root) { + auto result = new DropStatement(DropStatement::EntityType::kSequence); + auto cell = root->objects->head; + auto list = reinterpret_cast(cell->data.ptr_value); + // first, set sequence name + result->SetSequenceName( + reinterpret_cast(list->tail->data.ptr_value)->val.str); + return result; +} + parser::DropStatement *PostgresParser::DropSchemaTransform(DropStmt *root) { auto result = new DropStatement(DropStatement::EntityType::kSchema); result->SetCascade(root->behavior == DropBehavior::DROP_CASCADE); @@ -1559,8 +1635,8 @@ std::vector *PostgresParser::ColumnNameTransform(List *root) { // parsenode and transfers it into Peloton AbstractExpression. // This is a vector pointer of vector pointers because one InsertStmt can insert // multiple tuples. -std::vector>> - *PostgresParser::ValueListsTransform(List *root) { +std::vector>> * +PostgresParser::ValueListsTransform(List *root) { auto result = new std::vector< std::vector>>(); @@ -1671,8 +1747,8 @@ parser::SQLStatement *PostgresParser::InsertTransform(InsertStmt *root) { result = new parser::InsertStatement(InsertType::VALUES); PELOTON_ASSERT(select_stmt->valuesLists != NULL); - std::vector>> - *insert_values = nullptr; + std::vector>> * + insert_values = nullptr; try { insert_values = ValueListsTransform(select_stmt->valuesLists); } catch (Exception e) { @@ -1807,6 +1883,9 @@ parser::SQLStatement *PostgresParser::NodeTransform(Node *stmt) { result = CreateSchemaTransform(reinterpret_cast(stmt)); break; + case T_CreateSeqStmt: + result = CreateSequenceTransform(reinterpret_cast(stmt)); + break; case T_ViewStmt: result = CreateViewTransform(reinterpret_cast(stmt)); break; @@ -1887,8 +1966,8 @@ parser::SQLStatementList *PostgresParser::ListTransform(List *root) { return result; } -std::vector> - *PostgresParser::UpdateTargetTransform(List *root) { +std::vector> * +PostgresParser::UpdateTargetTransform(List *root) { auto result = new std::vector>(); for (auto cell = root->head; cell != NULL; cell = cell->next) { auto update_clause = new UpdateClause(); diff --git a/src/planner/create_plan.cpp b/src/planner/create_plan.cpp index 2a23a75abb4..83e6cc259ff 100644 --- a/src/planner/create_plan.cpp +++ b/src/planner/create_plan.cpp @@ -205,6 +205,18 @@ CreatePlan::CreatePlan(parser::CreateStatement *parse_tree) { break; } + case parser::CreateStatement::CreateType::kSequence: { + create_type = CreateType::SEQUENCE; + database_name = std::string(parse_tree->GetDatabaseName()); + + sequence_name = parse_tree->sequence_name; + seq_start = parse_tree->seq_start; + seq_increment = parse_tree->seq_increment; + seq_max_value = parse_tree->seq_max_value; + seq_min_value = parse_tree->seq_min_value; + seq_cycle = parse_tree->seq_cycle; + break; + } default: LOG_ERROR("UNKNOWN CREATE TYPE"); // TODO Should we handle this here? diff --git a/src/planner/drop_plan.cpp b/src/planner/drop_plan.cpp index 34240f85cad..21e92cfcc9a 100644 --- a/src/planner/drop_plan.cpp +++ b/src/planner/drop_plan.cpp @@ -64,6 +64,13 @@ DropPlan::DropPlan(parser::DropStatement *parse_tree) { drop_type = DropType::INDEX; break; } + case parser::DropStatement::EntityType::kSequence: { + database_name = parse_tree->GetDatabaseName(); + sequence_name = parse_tree->GetSequenceName(); + drop_type = DropType::SEQUENCE; + break; + } + default: { LOG_ERROR("Not supported Drop type"); } } } diff --git a/src/traffic_cop/traffic_cop.cpp b/src/traffic_cop/traffic_cop.cpp index a87d99c0ac5..6b6a03a43cc 100644 --- a/src/traffic_cop/traffic_cop.cpp +++ b/src/traffic_cop/traffic_cop.cpp @@ -191,9 +191,10 @@ executor::ExecutionResult TrafficCop::ExecuteHelper( }; auto &pool = threadpool::MonoQueuePool::GetInstance(); - pool.SubmitTask([plan, txn, ¶ms, &result_format, on_complete] { + std::string default_database_name = default_database_name_; + pool.SubmitTask([plan, txn, ¶ms, &result_format, on_complete, default_database_name] { executor::PlanExecutor::ExecutePlan(plan, txn, params, result_format, - on_complete); + on_complete, default_database_name); }); is_queuing_ = true; diff --git a/test/catalog/sequence_catalog_test.cpp b/test/catalog/sequence_catalog_test.cpp new file mode 100644 index 00000000000..3f9dda1611d --- /dev/null +++ b/test/catalog/sequence_catalog_test.cpp @@ -0,0 +1,210 @@ +//===----------------------------------------------------------------------===// +// +// Peloton +// +// sequence_test.cpp +// +// Identification: test/sequence/sequence_test.cpp +// +// Copyright (c) 2015-17, Carnegie Mellon University Database Group +// +//===----------------------------------------------------------------------===// + +#include "catalog/catalog.h" +#include "catalog/sequence_catalog.h" +#include "storage/abstract_table.h" +#include "common/harness.h" +#include "common/exception.h" +#include "executor/executors.h" +#include "parser/postgresparser.h" +#include "planner/create_plan.h" +#include "planner/insert_plan.h" +#include "concurrency/transaction_manager_factory.h" + +namespace peloton { +namespace test { + +class SequenceTests : public PelotonTest { + protected: + void CreateDatabaseHelper() { + auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance(); + auto txn = txn_manager.BeginTransaction(); + catalog::Catalog::GetInstance()->Bootstrap(); + catalog::Catalog::GetInstance()->CreateDatabase(DEFAULT_DB_NAME, txn); + txn_manager.CommitTransaction(txn); + } + + std::shared_ptr GetSequenceHelper( + std::string sequence_name, concurrency::TransactionContext *txn) { + // Check the effect of creation + oid_t database_oid = catalog::Catalog::GetInstance() + ->GetDatabaseWithName(DEFAULT_DB_NAME, txn) + ->GetOid(); + std::shared_ptr new_sequence = + catalog::SequenceCatalog::GetInstance().GetSequence(database_oid, + sequence_name, txn); + + return new_sequence; + } + + void CreateSequenceHelper(std::string query, + concurrency::TransactionContext *txn) { + auto parser = parser::PostgresParser::GetInstance(); + + std::unique_ptr stmt_list( + parser.BuildParseTree(query).release()); + EXPECT_TRUE(stmt_list->is_valid); + EXPECT_EQ(StatementType::CREATE, stmt_list->GetStatement(0)->GetType()); + auto create_sequence_stmt = + static_cast(stmt_list->GetStatement(0)); + + create_sequence_stmt->TryBindDatabaseName(DEFAULT_DB_NAME); + // Create plans + planner::CreatePlan plan(create_sequence_stmt); + + // plan type + EXPECT_EQ(CreateType::SEQUENCE, plan.GetCreateType()); + + // Execute the create sequence + std::unique_ptr context( + new executor::ExecutorContext(txn)); + executor::CreateExecutor createSequenceExecutor(&plan, context.get()); + createSequenceExecutor.Init(); + createSequenceExecutor.Execute(); + } +}; + +TEST_F(SequenceTests, BasicTest) { + CreateDatabaseHelper(); + auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance(); + auto txn = txn_manager.BeginTransaction(); + + // Create statement + std::string query = + "CREATE SEQUENCE seq " + "INCREMENT BY 2 " + "MINVALUE 10 MAXVALUE 50 " + "START 10 CYCLE;"; + std::string name = "seq"; + + CreateSequenceHelper(query, txn); + std::shared_ptr new_sequence = + GetSequenceHelper(name, txn); + + EXPECT_EQ(name, new_sequence->seq_name); + EXPECT_EQ(2, new_sequence->seq_increment); + EXPECT_EQ(10, new_sequence->seq_min); + EXPECT_EQ(50, new_sequence->seq_max); + EXPECT_EQ(10, new_sequence->seq_start); + EXPECT_EQ(true, new_sequence->seq_cycle); + EXPECT_EQ(10, new_sequence->GetCurrVal()); + + int64_t nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(10, nextVal); + txn_manager.CommitTransaction(txn); +} + +TEST_F(SequenceTests, NoDuplicateTest) { + auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance(); + auto txn = txn_manager.BeginTransaction(); + + // Create statement + std::string query = + "CREATE SEQUENCE seq " + "INCREMENT BY 2 " + "MINVALUE 10 MAXVALUE 50 " + "START 10 CYCLE;"; + std::string name = "seq"; + + // Expect exception + try { + CreateSequenceHelper(query, txn); + EXPECT_EQ(0, 1); + } catch (const SequenceException &expected) { + ASSERT_STREQ("Insert Sequence with Duplicate Sequence Name: seq", + expected.what()); + } + txn_manager.CommitTransaction(txn); +} + +TEST_F(SequenceTests, NextValPosIncrementFunctionalityTest) { + auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance(); + auto txn = txn_manager.BeginTransaction(); + + std::string query = + "CREATE SEQUENCE seq1 " + "INCREMENT BY 1 " + "MINVALUE 10 MAXVALUE 50 " + "START 10 CYCLE;"; + std::string name = "seq1"; + + CreateSequenceHelper(query, txn); + std::shared_ptr new_sequence = + GetSequenceHelper(name, txn); + + int64_t nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(10, nextVal); + nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(11, nextVal); + + // test cycle + new_sequence->SetCurrVal(50); + nextVal = new_sequence->GetNextVal(); + nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(10, nextVal); + + // test no cycle + new_sequence->SetCycle(false); + new_sequence->SetCurrVal(50); + + // Expect exception + try { + nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(0, 1); + } catch (const SequenceException &expected) { + ASSERT_STREQ("Sequence exceeds upper limit!", expected.what()); + } + txn_manager.CommitTransaction(txn); +} + +TEST_F(SequenceTests, NextValNegIncrementFunctionalityTest) { + auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance(); + auto txn = txn_manager.BeginTransaction(); + + std::string query = + "CREATE SEQUENCE seq2 " + "INCREMENT BY -1 " + "MINVALUE 10 MAXVALUE 50 " + "START 10 CYCLE;"; + std::string name = "seq2"; + + CreateSequenceHelper(query, txn); + std::shared_ptr new_sequence = + GetSequenceHelper(name, txn); + + // test cycle + int64_t nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(10, nextVal); + nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(50, nextVal); + + new_sequence->SetCurrVal(49); + nextVal = new_sequence->GetNextVal(); + nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(48, nextVal); + + // test no cycle + new_sequence->SetCycle(false); + new_sequence->SetCurrVal(10); + + // Expect exception + try { + nextVal = new_sequence->GetNextVal(); + EXPECT_EQ(0, 1); + } catch (const SequenceException &expected) { + ASSERT_STREQ("Sequence exceeds lower limit!", expected.what()); + } + txn_manager.CommitTransaction(txn); +} +} +} diff --git a/test/parser/postgresparser_test.cpp b/test/parser/postgresparser_test.cpp index 36910bdc9a9..91ff5039fdd 100644 --- a/test/parser/postgresparser_test.cpp +++ b/test/parser/postgresparser_test.cpp @@ -588,8 +588,7 @@ TEST_F(PostgresParserTests, InsertTest) { CmpBool res = five.CompareEquals( ((expression::ConstantValueExpression *)insert_stmt->insert_values.at(1) .at(1) - .get()) - ->GetValue()); + .get())->GetValue()); EXPECT_EQ(CmpBool::CmpTrue, res); // LOG_TRACE("%d : %s", ++ii, stmt_list->GetInfo().c_str()); @@ -1021,15 +1020,13 @@ TEST_F(PostgresParserTests, CreateTriggerTest) { EXPECT_EQ(ExpressionType::VALUE_TUPLE, left->GetExpressionType()); EXPECT_EQ("old", static_cast(left) ->GetTableName()); - EXPECT_EQ("balance", - static_cast(left) - ->GetColumnName()); + EXPECT_EQ("balance", static_cast( + left)->GetColumnName()); EXPECT_EQ(ExpressionType::VALUE_TUPLE, right->GetExpressionType()); EXPECT_EQ("new", static_cast(right) ->GetTableName()); - EXPECT_EQ("balance", - static_cast(right) - ->GetColumnName()); + EXPECT_EQ("balance", static_cast( + right)->GetColumnName()); // level // the level is for each row EXPECT_TRUE(TRIGGER_FOR_ROW(create_trigger_stmt->trigger_type)); @@ -1068,6 +1065,37 @@ TEST_F(PostgresParserTests, DropTriggerTest) { EXPECT_EQ("films", drop_trigger_stmt->GetTriggerTableName()); } +TEST_F(PostgresParserTests, CreateSequenceTest) { + auto parser = parser::PostgresParser::GetInstance(); + + // missing AS, CACHE and OWNED BY. + std::string query = + "CREATE SEQUENCE seq " + "INCREMENT BY 2 " + "MINVALUE 10 " + "MAXVALUE 50 " + "CYCLE " + "START 10;"; + std::unique_ptr stmt_list( + parser.BuildParseTree(query).release()); + EXPECT_TRUE(stmt_list->is_valid); + EXPECT_EQ(StatementType::CREATE, stmt_list->GetStatement(0)->GetType()); + auto create_sequence_stmt = + static_cast(stmt_list->GetStatement(0)); + + // The following code checks the arguments in the create statement + // are identical to what is specified in the query. + + // create type + EXPECT_EQ(parser::CreateStatement::CreateType::kSequence, + create_sequence_stmt->type); + EXPECT_EQ(10, create_sequence_stmt->seq_start); + EXPECT_EQ(2, create_sequence_stmt->seq_increment); + EXPECT_EQ(50, create_sequence_stmt->seq_max_value); + EXPECT_EQ(10, create_sequence_stmt->seq_min_value); + EXPECT_EQ(true, create_sequence_stmt->seq_cycle); +} + TEST_F(PostgresParserTests, FuncCallTest) { std::string query = "SELECT add(1,a), chr(99) FROM TEST WHERE FUN(b) > 2";