Skip to main content
Version: devel

helpers.airflow_helper

PipelineTasksGroup Objects

class PipelineTasksGroup(TaskGroup)

[view_source]

Represents a dlt Airflow pipeline task group.

__init__

def __init__(pipeline_name: str,
use_data_folder: bool = False,
local_data_folder: str = None,
use_task_logger: bool = True,
log_progress_period: float = 30.0,
buffer_max_items: int = 1000,
retry_policy: Retrying = DEFAULT_RETRY_NO_RETRY,
retry_pipeline_steps: Sequence[TPipelineStep] = ("load", ),
fail_task_if_any_job_failed: bool = True,
abort_task_if_any_job_failed: bool = False,
wipe_local_data: bool = True,
save_load_info: bool = False,
save_trace_info: bool = False,
**kwargs: Any) -> None

[view_source]

Creates a task group to which you can add pipeline runs

The run environment is prepared as follows

  • the .dlt folder (the project folder) is searched under dags as configured by Airflow
  • the data folder where pipelines are stored is always unique

The data_folder is available in certain Airflow deployments. In case of Composer, it is a location on the gcs bucket. use_data_folder is disabled and should be enabled only when needed. The operations on bucket are non-atomic and way slower than on local storage and should be avoided.

fail_task_if_any_job_failed will raise an exception if any of the loading jobs failed permanently and thus fail the current Airflow task. This happens after all dlt loading jobs executed. See more here: https://dlthub.com/docs/running-in-production/running#failed-jobs

abort_task_if_any_job_failed will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. This may put your warehouse in inconsistent state so the option is disabled by default.

The load info and trace info can be optionally saved to the destination. See https://dlthub.com/docs/running-in-production/running#inspect-and-save-the-load-info-and-trace

Arguments:

  • pipeline_name str - Name of the task group
  • use_data_folder bool, optional - If well defined 'data' folder is present it will be used. Currently only data folder on Composer is supported. Defaults to False.
  • local_data_folder str, optional - Path to a local folder on worker machine to where to store data. Used if local_data_folder is False or there's not well defined data folder. Defaults to gettempdir.
  • use_task_logger bool, optional - Will redirect dlt logger into task logger. Defaults to True.
  • log_progress_period float, optional - If progress is not configured for a pipeline, the log progress is used with a given period. Set 0 to disable. Defaults to 30.0.
  • buffer_max_items int, optional - Maximum number of buffered items. Use 0 to keep dlt built-in limit. Defaults to 1000.
  • retry_policy _type, optional_ - Tenacity retry policy. Defaults to no retry.
  • retry_pipeline_steps Sequence[TPipelineStep], optional - Which pipeline steps are eligible for retry. Defaults to ("load", ).
  • fail_task_if_any_job_failed bool, optional - Will fail a task if any of the dlt load jobs failed. Defaults to True.
  • wipe_local_data bool, optional - Will wipe all the data created by pipeline, also in case of exception. Defaults to False.
  • save_load_info bool, optional - Will save extensive load info to the destination. Defaults to False.
  • save_trace_info bool, optional - Will save trace info to the destination. Defaults to False.

run

def run(pipeline: Pipeline,
data: Any,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
pipeline_name: str = None,
on_before_run: Callable[[], None] = None,
**kwargs: Any) -> PythonOperator

[view_source]

Create a task to run the given pipeline with the given data in Airflow.

Arguments:

  • pipeline Pipeline - The pipeline to run data (Any): The data to run the pipeline with. If a non-resource callable given, it's evaluated during the DAG execution, right before the actual pipeline run.
  • NOTE - If on_before_run is provided, first on_before_run is evaluated, and then callable data.
  • table_name str, optional - The name of the table to which the data should be loaded within the dataset.
  • write_disposition TWriteDispositionConfig, optional - Same as in run command. loader_file_format (TLoaderFileFormat, optional): The file format the loader will use to create the load package.
  • schema_contract TSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema.
  • pipeline_name str, optional - The name of the derived pipeline.
  • on_before_run Callable, optional - A callable to be executed right before the actual pipeline run.

Returns:

  • PythonOperator - Airflow task instance.

add_run

@with_telemetry("helper", "airflow_add_run", False, "decompose")
def add_run(pipeline: Pipeline,
data: Any,
*,
decompose: Literal["none", "serialize", "parallel",
"parallel-isolated"] = "none",
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
on_before_run: Callable[[], None] = None,
**kwargs: Any) -> List[PythonOperator]

[view_source]

Creates a task or a group of tasks to run data with pipeline

Creates an Airflow task that extracts, normalizes and loads data with the passed pipeline instance pipeline. If data is a source and decompose is serialize it will decompose the source into disjoint connected components (isolated group of resources) and execute them one after another as separate Airflow tasks. The decomposition makes sure that each resource or transformer is extracted only once. It preserves the order of resources declared in the source when creating graph of tasks.

The kwargs are passed as arguments to all Airflow task instances created.

Arguments:

  • pipeline Pipeline - An instance of pipeline used to run the source data (Any): Any data supported by run method of the pipeline. If a non-resource callable given, it's called before the load to get the data. decompose (Literal["none", "serialize", "parallel"], optional): A source decomposition strategy into Airflow tasks: none - no decomposition, default value. serialize - decompose the source into a sequence of Airflow tasks. parallel - decompose the source into a parallel Airflow task group, except the first resource must be completed first. All tasks that are run in parallel share the same pipeline state. If two of them modify the state, part of state may be lost parallel-isolated - decompose the source into a parallel Airflow task group. with the same exception as above. All task have separate pipeline state (via separate pipeline name) but share the same dataset, schemas and tables.

  • NOTE - The first component of the source in both parallel models is done first, after that the rest are executed in parallel to each other.

  • NOTE - In case the SequentialExecutor is used by Airflow, the tasks will remain sequential despite 'parallel' or 'parallel-isolated' mode. Use another executor (e.g. CeleryExecutor) to make tasks parallel!

    Parallel tasks are executed in different pipelines, all derived from the original one, but with the state isolated from each other.

  • table_name - (str): The name of the table to which the data should be loaded within the dataset

  • write_disposition TWriteDispositionConfig, optional - Same as in run command. Defaults to None.

  • loader_file_format Literal["jsonl", "insert_values", "parquet"], optional - The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.

  • schema_contract TSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None. on_before_run (Callable, optional): A callable to be executed right before the actual pipeline run.

Returns:

  • Any - Airflow tasks created in order of creation.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.