diff --git a/src/include/partition_filter.h b/src/include/partition_filter.h index 85ddcf91..0cd08c36 100644 --- a/src/include/partition_filter.h +++ b/src/include/partition_filter.h @@ -43,17 +43,15 @@ typedef struct } ResultRelInfoHolder; -/* Forward declaration (for on_new_rri_holder()) */ +/* Forward declaration (for on_rri_holder()) */ struct ResultPartsStorage; typedef struct ResultPartsStorage ResultPartsStorage; /* - * Callback to be fired at rri_holder creation. + * Callback to be fired at rri_holder creation/destruction. */ -typedef void (*on_new_rri_holder)(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg); +typedef void (*on_rri_holder)(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage); /* * Cached ResultRelInfos of partitions. @@ -66,7 +64,7 @@ struct ResultPartsStorage bool speculative_inserts; /* for ExecOpenIndices() */ - on_new_rri_holder on_new_rri_holder_callback; + on_rri_holder on_new_rri_holder_callback; void *callback_arg; EState *estate; /* pointer to executor's state */ @@ -116,11 +114,11 @@ void init_result_parts_storage(ResultPartsStorage *parts_storage, EState *estate, bool speculative_inserts, Size table_entry_size, - on_new_rri_holder on_new_rri_holder_cb, + on_rri_holder on_new_rri_holder_cb, void *on_new_rri_holder_cb_arg); void fini_result_parts_storage(ResultPartsStorage *parts_storage, - bool close_rels); + bool close_rels, on_rri_holder hook); ResultRelInfoHolder * scan_result_parts_storage(Oid partid, ResultPartsStorage *storage); diff --git a/src/partition_filter.c b/src/partition_filter.c index 214b926a..a1886c4d 100644 --- a/src/partition_filter.c +++ b/src/partition_filter.c @@ -68,18 +68,12 @@ CustomScanMethods partition_filter_plan_methods; CustomExecMethods partition_filter_exec_methods; -static void prepare_rri_for_insert(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg); -static void prepare_rri_returning_for_insert(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg); -static void prepare_rri_fdw_for_insert(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg); +static void prepare_rri_for_insert(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage); +static void prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage); +static void prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage); static Node *fix_returning_list_mutator(Node *node, void *state); static Index append_rte_to_estate(EState *estate, RangeTblEntry *rte); @@ -143,7 +137,7 @@ init_result_parts_storage(ResultPartsStorage *parts_storage, EState *estate, bool speculative_inserts, Size table_entry_size, - on_new_rri_holder on_new_rri_holder_cb, + on_rri_holder on_new_rri_holder_cb, void *on_new_rri_holder_cb_arg) { HASHCTL *result_rels_table_config = &parts_storage->result_rels_table_config; @@ -177,16 +171,21 @@ init_result_parts_storage(ResultPartsStorage *parts_storage, /* Free ResultPartsStorage (close relations etc) */ void -fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels) +fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels, + on_rri_holder hook) { HASH_SEQ_STATUS stat; ResultRelInfoHolder *rri_holder; /* ResultRelInfo holder */ - /* Close partitions and free free conversion-related stuff */ - if (close_rels) + hash_seq_init(&stat, parts_storage->result_rels_table); + while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL) { - hash_seq_init(&stat, parts_storage->result_rels_table); - while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL) + /* Call destruction hook, if needed */ + if (hook != NULL) + hook(rri_holder, parts_storage); + + /* Close partitions and free free conversion-related stuff */ + if (close_rels) { ExecCloseIndices(rri_holder->result_rel_info); @@ -202,13 +201,8 @@ fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels) free_conversion_map(rri_holder->tuple_map); } - } - - /* Else just free conversion-related stuff */ - else - { - hash_seq_init(&stat, parts_storage->result_rels_table); - while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL) + /* Else just free conversion-related stuff */ + else { /* Skip if there's no map */ if (!rri_holder->tuple_map) @@ -329,10 +323,8 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage) /* Call on_new_rri_holder_callback() if needed */ if (parts_storage->on_new_rri_holder_callback) - parts_storage->on_new_rri_holder_callback(parts_storage->estate, - rri_holder, - parts_storage, - parts_storage->callback_arg); + parts_storage->on_new_rri_holder_callback(rri_holder, + parts_storage); /* Finally append ResultRelInfo to storage->es_alloc_result_rels */ append_rri_to_estate(parts_storage->estate, child_result_rel_info); @@ -702,7 +694,7 @@ partition_filter_end(CustomScanState *node) PartitionFilterState *state = (PartitionFilterState *) node; /* Executor will close rels via estate->es_result_relations */ - fini_result_parts_storage(&state->result_parts, false); + fini_result_parts_storage(&state->result_parts, false, NULL); Assert(list_length(node->custom_ps) == 1); ExecEndNode((PlanState *) linitial(node->custom_ps)); @@ -793,21 +785,17 @@ pfilter_build_tlist(Relation parent_rel, List *tlist) /* Main trigger */ static void -prepare_rri_for_insert(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg) +prepare_rri_for_insert(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage) { - prepare_rri_returning_for_insert(estate, rri_holder, rps_storage, arg); - prepare_rri_fdw_for_insert(estate, rri_holder, rps_storage, arg); + prepare_rri_returning_for_insert(rri_holder, rps_storage); + prepare_rri_fdw_for_insert(rri_holder, rps_storage); } /* Prepare 'RETURNING *' tlist & projection */ static void -prepare_rri_returning_for_insert(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg) +prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage) { PartitionFilterState *pfstate; List *returning_list; @@ -815,12 +803,15 @@ prepare_rri_returning_for_insert(EState *estate, *parent_rri; Index parent_rt_idx; TupleTableSlot *result_slot; + EState *estate; + + estate = rps_storage->estate; /* We don't need to do anything ff there's no map */ if (!rri_holder->tuple_map) return; - pfstate = (PartitionFilterState *) arg; + pfstate = (PartitionFilterState *) rps_storage->callback_arg; returning_list = pfstate->returning_list; /* Exit if there's no RETURNING list */ @@ -857,14 +848,15 @@ prepare_rri_returning_for_insert(EState *estate, /* Prepare FDW access structs */ static void -prepare_rri_fdw_for_insert(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg) +prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage) { ResultRelInfo *rri = rri_holder->result_rel_info; FdwRoutine *fdw_routine = rri->ri_FdwRoutine; Oid partid; + EState *estate; + + estate = rps_storage->estate; /* Nothing to do if not FDW */ if (fdw_routine == NULL) diff --git a/src/utility_stmt_hooking.c b/src/utility_stmt_hooking.c index 103f194e..e64c1542 100644 --- a/src/utility_stmt_hooking.c +++ b/src/utility_stmt_hooking.c @@ -22,6 +22,7 @@ #include "access/xact.h" #include "catalog/namespace.h" #include "commands/copy.h" +#include "commands/defrem.h" #include "commands/trigger.h" #include "commands/tablecmds.h" #include "foreign/fdwapi.h" @@ -64,10 +65,10 @@ static uint64 PathmanCopyFrom(CopyState cstate, List *range_table, bool old_protocol); -static void prepare_rri_for_copy(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg); +static void prepare_rri_for_copy(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage); +static void finish_rri_copy(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage); /* @@ -110,12 +111,18 @@ is_pathman_related_copy(Node *parsetree) /* Analyze options list */ foreach (lc, copy_stmt->options) { - DefElem *defel = (DefElem *) lfirst(lc); - - Assert(IsA(defel, DefElem)); + DefElem *defel = lfirst_node(DefElem, lc); /* We do not support freeze */ - if (strcmp(defel->defname, "freeze") == 0) + /* + * It would be great to allow copy.c extract option value and + * check it ready. However, there is no possibility (hooks) to do + * that before messaging 'ok, begin streaming data' to the client, + * which is ugly and confusing: e.g. it would require us to + * actually send something in regression tests before we notice + * the error. + */ + if (strcmp(defel->defname, "freeze") == 0 && defGetBoolean(defel)) elog(ERROR, "freeze is not supported for partitioned tables"); } @@ -481,7 +488,6 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel, uint64 processed = 0; - tupDesc = RelationGetDescr(parent_rel); parent_result_rel = makeNode(ResultRelInfo); @@ -499,7 +505,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel, /* Initialize ResultPartsStorage */ init_result_parts_storage(&parts_storage, estate, false, ResultPartsStorageStandard, - prepare_rri_for_copy, NULL); + prepare_rri_for_copy, cstate); parts_storage.saved_rel_info = parent_result_rel; /* Set up a tuple slot too */ @@ -634,13 +640,22 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel, /* Check the constraints of the tuple */ if (child_result_rel->ri_RelationDesc->rd_att->constr) ExecConstraints(child_result_rel, slot, estate); + if (!child_result_rel->ri_FdwRoutine) + { + /* OK, store the tuple and create index entries for it */ + simple_heap_insert(child_result_rel->ri_RelationDesc, tuple); - /* OK, store the tuple and create index entries for it */ - simple_heap_insert(child_result_rel->ri_RelationDesc, tuple); - - if (child_result_rel->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), - estate, false, NULL, NIL); + if (child_result_rel->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), + estate, false, NULL, NIL); + } +#ifdef PG_SHARDMAN + else /* FDW table */ + { + child_result_rel->ri_FdwRoutine->ForeignNextCopyFrom( + estate, child_result_rel, cstate); + } +#endif /* AFTER ROW INSERT Triggers (FIXME: NULL transition) */ ExecARInsertTriggersCompat(estate, child_result_rel, tuple, @@ -678,7 +693,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel, ExecResetTupleTable(estate->es_tupleTable, false); /* Close partitions and destroy hash table */ - fini_result_parts_storage(&parts_storage, true); + fini_result_parts_storage(&parts_storage, true, finish_rri_copy); /* Close parent's indices */ ExecCloseIndices(parent_result_rel); @@ -689,20 +704,58 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel, } /* - * COPY FROM does not support FDWs, emit ERROR. + * Init COPY FROM, if supported. */ static void -prepare_rri_for_copy(EState *estate, - ResultRelInfoHolder *rri_holder, - const ResultPartsStorage *rps_storage, - void *arg) +prepare_rri_for_copy(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage) { - ResultRelInfo *rri = rri_holder->result_rel_info; - FdwRoutine *fdw_routine = rri->ri_FdwRoutine; + ResultRelInfo *rri = rri_holder->result_rel_info; + FdwRoutine *fdw_routine = rri->ri_FdwRoutine; if (fdw_routine != NULL) + { + /* + * If this Postgres has no idea about shardman, behave as usual: + * vanilla Postgres doesn't support COPY FROM to foreign partitions. + * However, shardman patches to core extend FDW API to allow it. + */ +#ifdef PG_SHARDMAN + /* shardman COPY FROM requested? */ + if (*find_rendezvous_variable( + "shardman_pathman_copy_from_rendezvous") != NULL && + FdwCopyFromIsSupported(fdw_routine)) + { + CopyState cstate = (CopyState) rps_storage->callback_arg; + ResultRelInfo *parent_rri = rps_storage->saved_rel_info; + EState *estate = rps_storage->estate; + + fdw_routine->BeginForeignCopyFrom(estate, rri, cstate, parent_rri); + return; + } +#endif + elog(ERROR, "cannot copy to foreign partition \"%s\"", get_rel_name(RelationGetRelid(rri->ri_RelationDesc))); + } +} + +/* + * Shut down FDWs. + */ +static void +finish_rri_copy(ResultRelInfoHolder *rri_holder, + const ResultPartsStorage *rps_storage) +{ +#ifdef PG_SHARDMAN + ResultRelInfo *resultRelInfo = rri_holder->result_rel_info; + + if (resultRelInfo->ri_FdwRoutine) + { + resultRelInfo->ri_FdwRoutine->EndForeignCopyFrom( + rps_storage->estate, resultRelInfo); + } +#endif } /*