Versi dokumen: 2.0 · On‑prem · Python-based
Dokumen ini menyajikan: tujuan & value proposition, arsitektur high‑level, artefak & peran file, struktur filesystem kontrak, lifecycle batch step‑by‑step, mode operasi, model retry/idempotency/recovery, mapping ke Prefect (peta migrasi), runbook troubleshooting, DDL contoh, checklist kesiapan produksi, dan roadmap prioritas.
Platform ini adalah pipeline on‑prem, manifest‑driven, multi‑tenant per‑client yang:
- Menjamin traceability tiap run melalui
batch_id
danbatch_info.json
(manifest). - Menyediakan idempotent transforms (stored procedures) sehingga re‑run aman dan reproducible.
- Menghadirkan operator UI (webapp) yang memanage metadata driver dan membaca log langsung dari PostgreSQL untuk monitoring, audit, dan recovery manual.
- Memungkinkan migrasi ke orkestrator/engine yang lebih scalable (Prefect, PySpark) karena desain modular dan kontrak artefak (manifest,
parquet_name
,batch_id
).
Value: reproducibility, reduced operator toil, per‑client isolation, auditability, dan migrasi bertahap ke orkestrasi terpusat.
- Orchestrator saat ini: Pure Python scripts (
batch_processing.py
sebagai entrypoint). - Storage: Local filesystem terstruktur:
raw/
,data/
,batch_info/
. - Metadata & Logs: PostgreSQL (schema
tools
) — metadata, mappings, configs, dan tabel log untuk tiap tahap. - Transform/Integration/MV: Stored procedures di Postgres (schema
tools
) memindahkan data Bronze → Silver → Gold. - Webapp: operator UI untuk client/config/mapping/required‑columns/transformation/integration/mv refresh & dashboards.
Komponen modular memungkinkan penggantian orchestrator (Prefect) tanpa mengubah kontrak file/DB.
batch_info
(JSON manifest) — single source of truth untuk run.handlers/batch_processing.py
— entrypoint; modes:start | restart | reprocessing
. Scan incoming → create/rename → upsert manifest → spawn handlers → record logs.handlers/convert_to_parquet.py
— convert CSV/XLSX/JSON → Parquet (pandas → pyarrow/snappy); updatebatch_info.parquet_name
.scripts/validate_mapping.py
— baca Parquet (pyarrow), compare columns vstools.column_mapping
; mismatch → move Parquet kefailed
.scripts/validate_row.py
— DuckDB untuk null/duplicate checks berdasarkantools.required_columns
.scripts/load_to_bronze.py
— DuckDB → CSV → COPY ke Postgres bronze table; idempotent:DELETE WHERE dwh_batch_id = <batch_id>
sebelumCOPY
.scripts/silver_clean_transform.py
— panggil stored procedures transformation (Bronze→Silver) sesuaitools.transformation_config
.scripts/gold_integration.py
— panggil procedures integration (Silver→Gold) sesuaitools.integration_config
+ dependency checks.scripts/refresh_mv.py
— panggil refresh MV procedures (nama ditools.mv_refresh_config
).- Stored procedures contoh:
tools.load_crm_cust_info_v1
,tools.load_fact_sales_v1
,tools.refresh_mv_customer_churn
.
raw/{client_schema}/{source_system}/{incoming,success,failed,archive}
data/{client_schema}/{source_system}/{incoming,failed,archive}
batch_info/{client_schema}/{incoming,success,failed,archive,refreshed}
handlers/
scripts/
sql/ (berisi dokumentasi procedure, SELECT transformasi, dan DDL kalau migrasi pakai Prefect sudah tidak perlu procedure langsung run INSERT INTO SELECT transformasi/integrasi/refresh mv)
webapp/
- Saat
start
,physical_file_name
diberi suffix_BATCH######
. - Parquet ditulis ke
data/{client_schema}/{source_system}/incoming/{parquet_name}
. batch_info
JSON tercatat dibatch_info/{client_schema}/incoming/batch_output_{client}_{BATCH}.json
.
-
Operator menaruh file sumber ke
raw/{client_schema}/{source_system}/incoming
. -
Trigger:
python batch_processing.py <client_schema> start
(atau via webapp button fitur ini belum ada tapi akan ditambahkan). -
batch_processing
:- Ambil
last_batch_id
daritools.client_reference
. - Jika mode
start
→ generatenew_batch_id
(increment), rename file menjadi{base}_BATCH######
. - Insert/record ke
tools.file_audit_log
. - Upsert/Write
batch_info
manifest dibatch_info/{client}/incoming
(atomic).
- Ambil
-
Convert:
convert_to_parquet.py
membacaraw/.../success/{file}
→ tulis Parquet kedata/.../incoming/
→ updatebatch_info.parquet_name
(atomic write + retry). -
Validate Mapping:
validate_mapping.py
baca Parquet schema → bandingkan dengantools.column_mapping
; mismatch → move Parquet →data/.../failed
dan log ketools.mapping_validation_log
. -
Validate Row:
validate_row.py
(DuckDB) cek null pada required columns & duplicate rules → hasil ketools.row_validation_log
. Policy: ada per‑file/per‑client policy untuk fatal vs warning. -
Load to Bronze:
load_to_bronze.py
materialize CSV via DuckDB →DELETE FROM bronze_table WHERE dwh_batch_id = <batch_id>
→COPY
→ on success move Parquet →archive
. -
Transform (Silver):
silver_clean_transform.py
panggil procedures sesuaitools.transformation_config
; log ketools.transformation_log
. -
Integrate (Gold):
gold_integration.py
jalankan procedures sesuaitools.integration_config
— group dimens & facts, dependency checks viatools.integration_dependencies
; log results. -
Refresh MV:
refresh_mv.py
panggil refresh procedures, log ketools.mv_refresh_log
. -
Finalize: update
tools.job_execution_log
(status akhir), movebatch_info
kesuccess
/failed
/refreshed
sesuai outcome. Webapp menampilkan aggregasi KPI dari logs.
- start: increment
batch_id
, rename files, createbatch_info
, jalankan full pipeline. - restart: gunakan sama
batch_id
(tidak increment), update existing audit rows — digunakan setelah perbaikan mapping/konfigurasi. File difailed
harus dipindahkan keincoming
secara manual bila perlu. - reprocessing: jalankan ulang downstream (validate → load → transform → integrate) menggunakan existing Parquet; tidak increment
batch_id
. - Checkpointing: granular via
tools.file_audit_log
(per-file),tools.job_execution_log
(per-job), danbatch_info
manifest (atomic updates).
- I/O retries: helper functions
read_json_retry
,write_atomic
danwait_for_parquet_name
dengan limited polling. - Downstream retry policy: saat ini manual — jika convert/validate/load fail, file dipindah ke
failed
. Recovery dilakukan viarestart
/reprocessing
. - Idempotency: loads & stored procedures menggunakan
DELETE WHERE dwh_batch_id = ...
atau upsert logic; stored procedures dirancang agar re‑run tidak duplikasi. - Concurrency: aman bila instances handling berbeda
{client_schema, source_system}
. Tidak aman untuk multiple instances menulis ke folder/manifest yang sama (no distributed lock present).
Daftar ringkas tabel & kegunaan:
tools.client_reference
— client_id, client_schema, last_batch_id.tools.client_config
— mapping file configs → logical_source_file, target_schema/table.tools.column_mapping
— mapping kolom sumber → target.tools.required_columns
— required cols per file.tools.file_audit_log
— per-file audit (convert_status, mapping_validation_status, load_status, total_rows, physical_file_name, batch_id).tools.job_execution_log
— per-job execution.tools.mapping_validation_log
,tools.row_validation_log
,tools.load_error_log
,tools.transformation_log
,tools.integration_log
,tools.mv_refresh_log
.tools.transformation_config
,tools.integration_config
,tools.mv_refresh_config
,tools.integration_dependencies
.
Stored procedures menulis ke masing‑masing log table sebagai bagian dari kontrak.
{
"client_schema": "client1",
"client_id": 2,
"batch_id": "BATCH000014",
"files": [
{
"physical_file_name": "cust_info_BATCH000014.csv",
"logical_source_file": "cust_info",
"source_system": "crm",
"source_type": "csv",
"target_schema": "bronze_client1",
"target_table": "crm_cust_info",
"source_config": null,
"parquet_name": "cust_info_BATCH000014.parquet",
}
],
"transformation_procedure": [
"tools.load_crm_cust_info_v1",
"tools.load_crm_prd_info_v1"
],
"integration_procedure": [
"tools.load_dim_customers_v1",
"tools.load_fact_sales_v1"
],
"refresh_procedure": [
"mv_customer_lifetime_value",
"mv_customer_order_gap"
]
}
- Storage paths:
raw/...
,data/...
(Parquet),batch_info/...
(manifest). - Manifest contract:
logical_source_file
,source_type
,parquet_name
,batch_id
harus ada. - CLI examples:
python batch_processing.py client1 start
python batch_processing.py client1 restart
python batch_processing.py client1 reprocessing
A. Convert fail
- Symptom:
job_execution_log
FAILED,parquet_name
kosong dalambatch_info
. - Action: inspect raw file encoding/format; fix file or config; reingest and
start
/restart
as appropriate.
B. Mapping mismatch (validate_mapping FAILED)
- Symptom:
mapping_validation_log
entry, Parquet →data/.../failed
. - Action: perbaiki mapping via webapp (
tools.column_mapping
), pindahkan Parquet kedata/.../incoming
, jalankanpython batch_processing.py <client> reprocessing
.
C. Load to Bronze fail
- Symptom:
load_error_log
entry; type cast or missing column. - Action: check
tools.column_mapping
vs target table schema; add missing columns or update mapping; re-runreprocessing
if Parquet exists.
D. Transform/Integration fail
- Symptom:
transformation_log
orintegration_log
FAILED. - Action: inspect proc error, fix proc or upstream data, re-run transform/integration steps.
E. MV refresh fail
- Symptom:
mv_refresh_log
FAILED. - Action: inspect underlying table availability, proc dependencies, re-run
refresh_mv.py
.
- Local FS single‑node → durability & concurrency risk.
- Secrets di
.env
(plaintext) — butuh vault atau K/V secure. - Downstream auto‑retry & alerting belum ada — MTTR manual.
- Stored procedures kadang hardcoded pada client schemas — butuh parametrization.
- Caller/proc contract inconsistency (CALL vs FUNCTION return) — standardize.
Strategi: pertahankan kontrak artefak (manifest JSON, parquet_name
, batch_id
, logs). Implementasikan DAG templated per client atau single DAG dengan dynamic task mapping.
Task mapping (contoh):
- sensor:
wait_for_files
(FileSensor / custom sensor) - task:
start_batch
(PythonOperator) — increment batch_id, writebatch_info
- dynamic TaskGroup per file:
convert
→validate_mapping
→validate_row
→load_to_bronze
- taskgroup:
call_transformations
(SubDag / TaskGroup) - taskgroup:
call_integrations
(dependency‑aware) - task:
refresh_mv
- finalize: update logs + archive
Manfaat: gunakan Prefect retries, SLA, XCom untuk menyimpan parquet_name
/artefak; sensors + XComs untuk ketahanan. Pastikan idempotency contract dipertahankan (DELETE by dwh_batch_id
).
- Schema drift — Mitigasi:
schema_registry
perlogical_source_file
+ pre-load validation. - Scale & durability — Mitigasi: storage abstraction (NFS / S3 adapter).
- Operational MTTR — Mitigasi: limited automatic retries + alerting + metrics.
- Security — Mitigasi: migrate secrets to on‑prem vault; webapp auth & RBAC.
Quick wins
- Implement
schema_registry
perlogical_source_file
. - Add limited automatic retry (3 attempts, exponential backoff) pada convert/validate/load.
- Implement retention/purge job (default 90 hari).
Mid-term
- Secrets vault (HashiCorp or on‑prem K/V).
- LDAP/AD + RBAC for webapp.
- Metrics → Prometheus + Grafana.
Strategic
- Abstraction layer for storage (NFS/S3 adapter).
- Orchestrator: migrate ke refect dan adapt stored proc invocation to idempotent SQL tasks.