Skip to content

Orchestration Framework

The platform uses a custom metadata-driven orchestration framework built on top of Airflow. DAGs contain no hardcoded logic — the framework module reads aud_* tables at runtime and dynamically builds the full execution graph.


Core Design Principles

  • Metadata-driven: all pipeline behavior defined in aud_* tables, not in code
  • Serialized by chain: source extractions run sequentially per chain to protect source DBs
  • Parallelized by group: models within a group run concurrently
  • Decoupled control/execution: Airflow is the control plane, Dataflow is the execution plane
  • Config-driven Dataflow: a {modelo}.config file in GCS is the contract between both planes

DAG Hierarchy

Chain DAG (cad_{source})

  • One per source system (e.g. cad_polaris, cad_salesforce)
  • Reads aud_modelo to discover all group DAGs for this chain
  • Triggers group DAGs sequentially, order defined by aud_dependencia
  • Protects source systems from concurrent load

Group DAG (dag_grupo_{source}_{nn})

  • One per logical model group within a chain
  • Contains N models, each following raw_ → his_ → act_ sequence
  • Models run in parallel across the group
  • Within each model, layers are serialized: raw → his → act

Framework Module — How It Works

Every DAG imports the framework module. This module reads the DAG name, determines whether it is a chain or group, and builds the entire task graph dynamically.


Config File — {modelo}.config

Before executing any extraction, the framework resolves all metadata and generates a self-contained config file, uploaded to GCS.

This is the contract between the control plane (Airflow) and execution plane (Dataflow). Dataflow reads this file and needs no further access to aud_* tables.

Structure of {modelo}.config:

json
{
  "modelo": "poliza_detalle",
  "fecha_lote": "2024-01-15",
  "source": {
    "type": "oracle",
    "connection": { "secret_ref": "projects/.../secrets/oracle-polaris-prd" },
    "query": "SELECT NRO_POLIZA, FECHA_INICIO ... WHERE FECHA_MOD >= :watermark",
    "extraction_mode": "incremental"
  },
  "avro_schema": {
    "type": "record",
    "name": "poliza_detalle",
    "fields": [
      { "name": "NRO_POLIZA", "type": "string" },
      { "name": "FECHA_INICIO", "type": { "type": "int", "logicalType": "date" } }
    ]
  },
  "destination": {
    "gcs_path": "gs://prd-platform-raw/polizas/poliza_detalle/fecha_lote=2024-01-15/",
    "format": "avro"
  },
  "validations": [
    { "type": "row_count", "threshold": 0, "severity": "error" },
    { "type": "not_null", "column": "NRO_POLIZA", "severity": "error" }
  ]
}

How the config is built from aud_*:

Config fieldSource tableField
source.typeaud_entidadsource_type
source.queryBuilt from aud_columnacolumns where incluir_extraccion = TRUE
source.connectionaud_conexionresolved by source_type + environment
avro_schema.fieldsaud_columnanombre_columna + tipo_dato per entity
destination.gcs_pathaud_entidadgcs_destination + fecha_lote
validationsaud_validacionall active rules for this entity

Operator Selection

The framework dynamically loads the correct operator based on source_type and executor_type:

All operators share the same interface:

python
def execute(entity: EntityConfig, fecha_lote: date) -> ExtractResult: ...

Dataflow Execution

When executor_type = dataflow, the framework triggers a serverless Dataflow FlexTemplate running a Docker image with the Beam pipeline code.

The Docker image is source-agnostic — it supports all source types. The config file tells it which extractor to use and how to configure it.


Full Execution Flow per Model


Connections — aud_conexion

Connection details are stored in aud_conexion, not hardcoded in DAGs or Airflow connections. Credentials are resolved at runtime via GCP Secret Manager.

This enables:

  • Environment-specific connections (dev / stg / prd) using the same DAG code
  • Credential rotation without code or DAG changes
  • Multiple connections per source type

Naming Conventions

ArtifactPatternExample
Chain DAGcad_{source}cad_polaris
Group DAGdag_grupo_{source}_{nn}dag_grupo_polaris_01
Config file (GCS)gs://.../configs/{modelo}_{fecha_lote}.configpoliza_detalle_2024-01-15.config
Dataflow job{modelo}-{fecha_lote}poliza-detalle-2024-01-15
Raw taskraw_{entidad}raw_poliza_detalle
History taskhis_{entidad}his_poliza_detalle
Active taskact_{entidad}act_poliza_detalle