Dagster & Teradata
This is a community-maintained integration. To report bugs or leave feedback, open an issue in the Dagster community integrations repo.
The community-supported Teradata package provides an integration with Teradata Vantage.
For more information, see the dagster-teradata GitHub repository.
To begin integrating Dagster with Teradata Vantage for building and managing ETL pipelines, this guide provides step-by-step instructions on installing and configuring the required packages, setting up a Dagster project, and implementing a pipeline that interacts with Teradata Vantage.
Prerequisites
-
Access to a Teradata Vantage instance.
noteIf you need a test instance of Vantage, you can provision one for free at https://clearscape.teradata.com
-
Python 3.9 or higher. Python 3.13 is recommended.
-
pipinstalled
Step 1: Install dagster-teradata
With your virtual environment active, the next step is to install dagster and the Teradata provider package (dagster-teradata) to interact with Teradata Vantage.
-
Install the required packages:
- uv
- pip
uv add dagster-teradatapip install dagster-teradata -
Note about Optional Dependencies:
a)
dagster-teradatarelies on dagster-aws for ingesting data from an S3 bucket into Teradata Vantage. Sincedagster-awsis an optional dependency, users can install it by running:- uv
- pip
uv add dagster-teradata[aws]pip install dagster-teradata[aws]b)
dagster-teradataalso relies ondagster-azurefor ingesting data from an Azure Blob Storage container into Teradata Vantage. To install this dependency, run:- uv
- pip
uv add dagster-teradata[azure]pip install dagster-teradata[azure] -
Verify the installation:
To confirm that Dagster is correctly installed, run:
dagster –versionIf installed correctly, it should show the version of Dagster.
Step 2: Initialize a Dagster project
Now that you have the necessary packages installed, the next step is to create a new Dagster project.
Scaffold a new Dagster project
Run the following command:
dagster project scaffold --name dagster-quickstart
This command will create a new project named dagster-quickstart. It will automatically generate the following directory structure:
dagster-quickstart
│ pyproject.toml
│ README.md
│ setup.cfg
│ setup.py
│
├───dagster_quickstart
│ assets.py
│ definitions.py
│ __init__.py
│
└───dagster_quickstart_tests
test_assets.py
__init__.py
Refer here to know more above this directory structure.
Step 3: Create sample data
To simulate an ETL pipeline, create a CSV file with sample data that your pipeline will process.
Create the CSV File: Inside the dagster_quickstart/data/ directory, create a file named sample_data.csv with the following content:
id,name,age,city
1,Alice,28,New York
2,Bob,35,San Francisco
3,Charlie,42,Chicago
4,Diana,31,Los Angeles
This file represents sample data that will be used as input for your ETL pipeline.
Step 4: Define assets for the ETL pipeline
Now, we'll define a series of assets for the ETL pipeline inside the assets.py file.
Open the dagster_quickstart/assets.py file and add the following code to define the pipeline:
import pandas as pd
from dagster import asset
@asset(required_resource_keys={"teradata"})
def read_csv_file(context):
df = pd.read_csv("dagster_quickstart/data/sample_data.csv")
context.log.info(df)
return df
@asset(required_resource_keys={"teradata"})
def drop_table(context):
result = context.resources.teradata.drop_table(["tmp_table"])
context.log.info(result)
@asset(required_resource_keys={"teradata"})
def create_table(context, drop_table):
result = context.resources.teradata.execute_query('''CREATE TABLE tmp_table (
id INTEGER,
name VARCHAR(50),
age INTEGER,
city VARCHAR(50));''')
context.log.info(result)
@asset(required_resource_keys={"teradata"}, deps=[read_csv_file])
def insert_rows(context, create_table, read_csv_file):
data_tuples = [tuple(row) for row in read_csv_file.to_numpy()]
for row in data_tuples:
result = context.resources.teradata.execute_query(
f"INSERT INTO tmp_table (id, name, age, city) VALUES ({row[0]}, '{row[1]}', {row[2]}, '{row[3]}');"
)
context.log.info(result)
@asset(required_resource_keys={"teradata"})
def read_table(context, insert_rows):
result = context.resources.teradata.execute_query("select * from tmp_table;", True)
context.log.info(result)
This Dagster pipeline defines a series of assets that interact with Teradata. It starts by reading data from a CSV file, then drops and recreates a table in Teradata. After that, it inserts rows from the CSV into the table and finally retrieves the data from the table.
Step 5: Define the pipeline definitions
The next step is to configure the pipeline by defining the necessary resources and jobs.
Edit the definitions.py file: Open dagster_quickstart/definitions.py and define your Dagster pipeline as follows:
from dagster import EnvVar, Definitions
from dagster_teradata import TeradataResource
from .assets import read_csv_file, read_table, create_table, drop_table, insert_rows
# Define the pipeline and resources
defs = Definitions(
assets=[read_csv_file, read_table, create_table, drop_table, insert_rows],
resources={
"teradata": TeradataResource(
host=EnvVar("TERADATA_HOST"),
user=EnvVar("TERADATA_USER"),
password=EnvVar("TERADATA_PASSWORD"),
database=EnvVar("TERADATA_DATABASE"),
)
}
)
This code sets up a Dagster project that interacts with Teradata by defining assets and resources:
- It imports necessary modules, including pandas, Dagster, and dagster-teradata.
- It imports asset functions (read_csv_file, read_table, create_table, drop_table, insert_rows) from the assets.py module.
- It registers these assets with Dagster using Definitions, allowing Dagster to track and execute them.
- It defines a Teradata resource (TeradataResource) that reads database connection details from environment variables (TERADATA_HOST, TERADATA_USER, TERADATA_PASSWORD, TERADATA_DATABASE).
Step 6: Run the pipeline
After setting up the project, you can now run your Dagster pipeline:
-
Start the Dagster dev server: In your terminal, navigate to the root directory of your project and run: dagster dev After executing the command dagster dev, the Dagster logs will be displayed directly in the terminal. Any errors encountered during startup will also be logged here. Once you see a message similar to:
2025-02-04 09:15:46 +0530 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 32564,It indicates that the Dagster webserver is running successfully. At this point, you can proceed to the next step.
-
Access the Dagster UI: Open a web browser and navigate to http://127.0.0.1:3000. This will open the Dagster UI where you can manage and monitor your pipelines.
-
Run the pipeline:
- In the top navigation of the Dagster UI, click Assets > View global asset lineage.
- Click Materialize to execute the pipeline.
- In the popup window, click View to see the details of the pipeline run.
- Monitor the run: The Dagster UI allows you to visualize the pipeline's progress, view logs, and inspect the status of each step. You can switch between different views to see the execution logs and metadata for each asset.
Further reading
- dagster-teradata with Teradata Vantage
- Data Transfer from AWS S3 to Teradata Vantage Using dagster-teradata
- Data Transfer from Azure Blob to Teradata Vantage Using dagster-teradata
- Manage VantageCloud Lake Compute Clusters with dagster-teradata
- Teradata Authorization
- Teradata VantageCloud Lake Compute Clusters