🔧
dbtとデータ変換パイプライン
モダンデータスタックの核心であるdbt(data build tool)のモデル定義・依存関係・テスト・ドキュメント生成の仕組み。ELTパターンでのデータ変換設計を理解する
定義
dbt(data build tool):データウェアハウス内でのデータ変換をSQLとYAMLで管理するツール。「Tのあるバージョン管理されたELT」と言われ、データエンジニアリングにソフトウェアエンジニアリングの手法(テスト・バージョン管理・ドキュメント)を持ち込む。
ETLからELTへ
ETL(従来):
Extract(抽出)→ Transform(変換)→ Load(ロード)
変換してからDWHに入れる
変換処理が別のシステムで動く(Spark等)
ELT(モダン):
Extract(抽出)→ Load(ロード)→ Transform(変換)
生データをそのままDWHに入れ、DWH内で変換する
BigQuery・Snowflakeの計算パワーを活用
dbtはこのTを担当する
dbtのプロジェクト構造
my_project/
dbt_project.yml # プロジェクト設定
models/ # SQLモデル(変換ロジック)
staging/ # 生データの軽い変換
stg_orders.sql
stg_users.sql
intermediate/ # 中間集計
int_order_items.sql
marts/ # ビジネス向けの最終モデル
finance/
fct_revenue.sql
dim_customers.sql
tests/ # データテスト
seeds/ # CSVの静的データ
macros/ # 再利用可能なSQLスニペット
モデルの定義
-- models/staging/stg_orders.sql
-- 生データの型変換・命名規則の統一
SELECT
id AS order_id,
customer_id,
CAST(total_amount AS DECIMAL(10,2)) AS total_amount,
LOWER(status) AS status,
created_at AT TIME ZONE 'UTC' AS created_at_utc
FROM {{ source('raw', 'orders') }} -- {{ }} はJinjaテンプレート
WHERE created_at >= '2020-01-01' -- 古すぎるデータを除外
-- models/marts/finance/fct_revenue.sql
-- ビジネスロジックを適用した最終モデル
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }} -- 他のモデルを参照
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }}
)
SELECT
o.order_id,
o.total_amount,
o.created_at_utc,
c.customer_name,
c.country,
DATE_TRUNC('month', o.created_at_utc) AS revenue_month
FROM orders o
JOIN customers c ON c.customer_id = o.customer_id
WHERE o.status = 'completed'
依存関係グラフ(DAG)
{{ ref() }}と{{ source() }}を使うと、dbtが自動的にモデル間の依存関係を解析してDAGを構築する。
raw.orders ──→ stg_orders ──→ int_order_items ──→ fct_revenue
raw.users ──→ stg_users ──→ dim_customers ──┘
実行: dbt run
→ DAGに従って依存関係の順番に実行
→ 依存のないモデルは並列実行
# 全モデルを実行
dbt run
# 特定モデルだけ
dbt run --select fct_revenue
# 依存するモデルも含めて実行
dbt run --select +fct_revenue # 上流も含む
dbt run --select fct_revenue+ # 下流も含む
マテリアライゼーション戦略
# dbt_project.yml
models:
my_project:
staging:
+materialized: view # 毎回クエリを実行(軽い変換向け)
intermediate:
+materialized: ephemeral # CTEとして展開(実体化しない)
marts:
+materialized: table # テーブルとして保存(重い集計向け)
finance:
+materialized: incremental # 差分だけ追加(大規模テーブル向け)
インクリメンタルモデル
-- models/marts/finance/fct_daily_revenue.sql
{{
config(
materialized='incremental',
unique_key='revenue_date',
on_schema_change='sync_all_columns'
)
}}
SELECT
DATE_TRUNC('day', created_at_utc) AS revenue_date,
SUM(total_amount) AS total_revenue,
COUNT(*) AS order_count
FROM {{ ref('stg_orders') }}
WHERE status = 'completed'
{% if is_incremental() %}
-- 増分実行時: 直近3日分だけ再計算(遅延到着データに対応)
AND created_at_utc >= (SELECT MAX(revenue_date) - INTERVAL '3 days' FROM {{ this }})
{% endif %}
GROUP BY 1
データテスト
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
description: "生注文データの標準化"
columns:
- name: order_id
description: "注文の一意ID"
tests:
- unique # 重複チェック
- not_null # NULL禁止
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled']
- name: customer_id
tests:
- relationships: # 参照整合性チェック
to: ref('stg_customers')
field: customer_id
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range: # 拡張パッケージのテスト
min_value: 0
# テスト実行
dbt test
# 出力例
PASS test.stg_orders.unique_order_id
PASS test.stg_orders.not_null_order_id
FAIL test.stg_orders.accepted_values_status ← 'refunded'が入っていた
ソースの鮮度チェック
# models/staging/sources.yml
sources:
- name: raw
database: my_warehouse
schema: raw
tables:
- name: orders
freshness:
warn_after: {count: 6, period: hour} # 6時間更新なしで警告
error_after: {count: 24, period: hour} # 24時間でエラー
loaded_at_field: _loaded_at
# ソースの鮮度確認
dbt source freshness
ドキュメント自動生成
# ドキュメントサイトを生成
dbt docs generate
dbt docs serve # ローカルでWebサーバー起動
# → モデルの説明・カラム定義・依存関係グラフがWebで確認できる
モダンデータスタックの全体像
データソース(アプリDB・SaaS)
↓ Fivetran / Airbyte(Extract + Load)
BigQuery / Snowflake / Redshift(生データ保存)
↓ dbt(Transform)
BigQuery / Snowflake(変換済みデータ)
↓ Metabase / Tableau / Looker(可視化)
dbt Cloud vs dbt Core
| 観点 | dbt Core | dbt Cloud |
|---|---|---|
| 費用 | 無料(OSS) | 有料 |
| 実行 | ローカル or CI/CD | マネージドスケジューラー |
| IDE | VS Code等 | ブラウザIDE |
| オーケストレーション | Airflow / Prefect と組み合わせ | 内蔵 |
関連概念
- → バッチ処理(ELTパイプラインとの関係)
- → 列指向ストレージ(DWHとスタースキーマ)
- → マテリアライズドビュー(dbtのマテリアライゼーションとの類比)
- → データシステムの統合設計(モダンデータスタックの文脈)
出典・参考文献
- dbt Documentation — docs.getdbt.com
- Claire Carroll, “What is Analytics Engineering?” — getdbt.com/blog
- dbt-utils package — github.com/dbt-labs/dbt-utils
- 1. 🗄️データ志向アプリケーション設計:概要
- 2. 🧩データモデルとクエリ言語
- 3. 💾ストレージエンジンとインデックス
- 4. 🔁レプリケーション
- 5. 🍕パーティショニング(シャーディング)
- 6. 🔒トランザクションとACID
- 7. ⚡分散システムの本質的な問題
- 8. 🤝一貫性と分散合意
- 9. 📦バッチ処理
- 10. 🌊ストリーム処理
- 11. 📋エンコーディングとスキーマ進化
- 12. 🔗Sagaパターンと分散トランザクション
- 13. 🏗️データシステムの統合設計
- 14. 📸MVCC(多版型同時実行制御)
- 15. 📊列指向ストレージとOLAP設計
- 16. 🕰️ベクタークロックと因果順序
- 17. 🔀CRDT(競合なし複製データ型)
- 18. 🔍クエリオプティマイザーと実行計画
- 19. ⚡キャッシュ戦略とRedis設計
- 20. 🔎全文検索と転置インデックス
- 21. 🌐NewSQL(分散ACIDデータベース)
- 22. 📝WALと論理レプリケーション
- 23. 🔌コネクションプーリング
- 24. 🚧ゼロダウンタイムマイグレーション
- 25. 🆔分散ID生成
- 26. 🔄N+1問題とDataLoaderパターン
- 27. 📈タイムシリーズDB
- 28. 🛡️Row Level Security(行レベルセキュリティ)
- 29. 📤Outboxパターン(トランザクショナルアウトボックス)
- 30. 💾DBバックアップとPITR
- 31. ⚠️データベース設計アンチパターン
- 32. 🕸️グラフDB深掘り
- 33. 🔋バックプレッシャーとサーキットブレーカー
- 34. 🔵コンシステントハッシング
- 35. 📋マテリアライズドビュー
- 36. 📡DBモニタリングとオブザーバビリティ
- 37. 🔐データプライバシーとCrypto-Shredding
- 38. ✂️垂直分割(Vertical Partitioning)
- 39. 🪟ウィンドウ関数
- 40. 🧲ベクトルDBとpgvector
- 41. 🔧dbtとデータ変換パイプライン
- 42. 📬ジョブキューの設計
- 43. 📐正規化理論(1NF〜BCNF)
- 44. ☁️クラウドDBサービスの設計思想と選択基準
- 45. 🗺️地理空間データとPostGIS
- 46. 🔑DBセキュリティと権限管理
- 47. 🏔️Lakehouse(Apache Iceberg / Delta Lake)
- 48. 📜データコントラクト
- 49. 🔭OpenTelemetryとDBトレーシング
出典: dbt Documentation / The Analytics Engineering Guide