📜
データコントラクト
Producer/Consumer間のスキーマ契約を組織的に管理する仕組み。「誰かがスキーマを変えたら下流が壊れた」問題を防ぐデータコントラクトの設計と運用を理解する
定義
データコントラクト(Data Contract):データのProducer(生成者)とConsumer(消費者)の間で合意された、データの形式・品質・鮮度・意味論の明示的な契約。コードのAPIドキュメントに相当するデータの世界の取り決め。
なぜデータコントラクトが必要か
典型的な問題(コントラクトなし):
注文サービスチーム:
「orders.total_amountを税込みに変えたので念のため連絡します」
→ Slackに流れていたが誰も見ていなかった
3週間後:
データサイエンスチーム: 「売上レポートの数値がおかしい」
BI チーム: 「ダッシュボードの計算が全部ずれている」
請求システム: 「二重請求が発生していた」
原因: total_amountの意味が変わったことを誰も知らなかった
API(コード)の世界:
型定義・OpenAPI仕様・テスト → 変更があれば静的解析・CIで検出
データパイプラインの世界(コントラクトなし):
Kafkaのスキーマ・DBのカラム → 変更しても誰も気づかない
下流のパイプラインが壊れてから初めて発覚
データコントラクトの構成要素
# data-contract.yaml(例)
apiVersion: v2.0.0
kind: DataContract
id: orders-v1
info:
title: Orders Data Contract
version: 1.2.0
owner: order-team@example.com
description: "注文データのProducer/Consumer間の契約"
servers:
- type: kafka
topic: orders.completed
- type: bigquery
project: my-project
dataset: analytics
table: orders
schema:
type: object
required: [order_id, customer_id, total_amount, status, created_at]
properties:
order_id:
type: string
format: uuid
description: "注文の一意ID"
customer_id:
type: string
format: uuid
total_amount:
type: number
description: "税抜き価格(円)" ← 意味論を明示
minimum: 0
status:
type: string
enum: [pending, completed, cancelled]
created_at:
type: string
format: date-time
quality:
- type: not_null
columns: [order_id, customer_id, total_amount]
- type: unique
columns: [order_id]
- type: freshness
column: created_at
max_delay: PT1H # 最大1時間遅延を許容
terms:
usage: "請求・分析目的に限定"
limitations: "個人情報を含む。GDPRポリシー準拠必須"
noticePeriod: P3M # 変更時は3ヶ月前に通知
スキーマレジストリとの違い
スキーマレジストリ(Confluent Schema Registry等):
技術的な契約:バイナリフォーマットの互換性チェック
「このAvro/ProtobufのフィールドをConsumerが読めるか」
データコントラクト:
ビジネス的な契約:意味・品質・SLA・ガバナンス
「このフィールドは何を意味するか、いつ更新されるか、誰が責任者か」
両方を組み合わせるのが現代的なアプローチ
データコントラクトのテスト
# Great Expectations(データ品質テストフレームワーク)
import great_expectations as gx
context = gx.get_context()
datasource = context.sources.add_pandas_filesystem(
name="orders",
base_directory="./data"
)
# コントラクトをテストとして定義
suite = context.add_or_update_expectation_suite("orders_contract")
validator = context.get_validator(...)
validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between(
"total_amount", min_value=0
)
validator.expect_column_values_to_be_in_set(
"status", ["pending", "completed", "cancelled"]
)
# パイプラインの最初と最後で実行
results = validator.validate()
if not results.success:
raise DataContractViolationError(results)
変更管理プロセス
コントラクトの変更が必要になった場合:
後方互換な変更(Consumerへの通知のみ):
✅ オプションフィールドの追加
✅ フィールドの説明文の変更
✅ 品質基準の緩和
非互換な変更(3ヶ月前の通知必須):
❌ フィールドの削除
❌ 型の変更(string → int)
❌ 意味論の変更(税抜き → 税込み)
❌ enumの値の削除
Expand-Contractパターンとの対応:
1. 新フィールド total_amount_with_tax を追加(Expand)
2. Consumerが新フィールドに移行(Migrate)
3. total_amount を deprecated にする(3ヶ月待つ)
4. 旧フィールドを削除(Contract)
データコントラクトツール
| ツール | 特徴 |
|---|---|
| soda-core | SQLベースのデータ品質テスト。YAMLでコントラクトを定義 |
| Great Expectations | Pythonベース。テストとドキュメントを生成 |
| datacontract-cli | OpenDataContract仕様のCLIツール。lint・test・export |
| dbt tests | dbtパイプライン内での品質チェック |
| Monte Carlo | データオブザーバビリティ(ML異常検知) |
# datacontract-cliの使い方
pip install datacontract-cli
# コントラクトの検証
datacontract lint datacontract.yaml
# 実際のデータに対してテスト
datacontract test datacontract.yaml
# HTMLドキュメントの生成
datacontract export --format html datacontract.yaml
データメッシュとの関係
データシステムの統合設計で触れたデータメッシュでは、データコントラクトが「データプロダクト」の仕様書として機能する。
データプロダクト = データ + コントラクト + オーナーシップ
注文ドメインチーム(Producer):
data-contract.yaml に署名
→ SLAを保証(鮮度・品質)
→ 変更時はnoticePeriodを守る
BIチーム・データサイエンスチーム(Consumer):
コントラクトに合意してからパイプラインを構築
→ コントラクト違反があれば自動通知される
→ Producerに変更を要求できる
関連概念
- → エンコーディングとスキーマ進化(技術的な互換性管理)
- → dbtとデータ変換パイプライン(dbt testsとの組み合わせ)
- → ストリーム処理(KafkaのSchema Registryとの補完)
- → データシステムの統合設計(データメッシュの文脈)
出典・参考文献
- Chad Sanderson, “Data Contracts: An Introduction” (2022) — datacontract.com
- Open Data Contract Standard — bitol.io/open-data-contract-standard
- dbt Documentation, “Data Tests” — docs.getdbt.com/docs/build/data-tests
- 1. 🗄️データ志向アプリケーション設計:概要
- 2. 🧩データモデルとクエリ言語
- 3. 💾ストレージエンジンとインデックス
- 4. 🔁レプリケーション
- 5. 🍕パーティショニング(シャーディング)
- 6. 🔒トランザクションとACID
- 7. ⚡分散システムの本質的な問題
- 8. 🤝一貫性と分散合意
- 9. 📦バッチ処理
- 10. 🌊ストリーム処理
- 11. 📋エンコーディングとスキーマ進化
- 12. 🔗Sagaパターンと分散トランザクション
- 13. 🏗️データシステムの統合設計
- 14. 📸MVCC(多版型同時実行制御)
- 15. 📊列指向ストレージとOLAP設計
- 16. 🕰️ベクタークロックと因果順序
- 17. 🔀CRDT(競合なし複製データ型)
- 18. 🔍クエリオプティマイザーと実行計画
- 19. ⚡キャッシュ戦略とRedis設計
- 20. 🔎全文検索と転置インデックス
- 21. 🌐NewSQL(分散ACIDデータベース)
- 22. 📝WALと論理レプリケーション
- 23. 🔌コネクションプーリング
- 24. 🚧ゼロダウンタイムマイグレーション
- 25. 🆔分散ID生成
- 26. 🔄N+1問題とDataLoaderパターン
- 27. 📈タイムシリーズDB
- 28. 🛡️Row Level Security(行レベルセキュリティ)
- 29. 📤Outboxパターン(トランザクショナルアウトボックス)
- 30. 💾DBバックアップとPITR
- 31. ⚠️データベース設計アンチパターン
- 32. 🕸️グラフDB深掘り
- 33. 🔋バックプレッシャーとサーキットブレーカー
- 34. 🔵コンシステントハッシング
- 35. 📋マテリアライズドビュー
- 36. 📡DBモニタリングとオブザーバビリティ
- 37. 🔐データプライバシーとCrypto-Shredding
- 38. ✂️垂直分割(Vertical Partitioning)
- 39. 🪟ウィンドウ関数
- 40. 🧲ベクトルDBとpgvector
- 41. 🔧dbtとデータ変換パイプライン
- 42. 📬ジョブキューの設計
- 43. 📐正規化理論(1NF〜BCNF)
- 44. ☁️クラウドDBサービスの設計思想と選択基準
- 45. 🗺️地理空間データとPostGIS
- 46. 🔑DBセキュリティと権限管理
- 47. 🏔️Lakehouse(Apache Iceberg / Delta Lake)
- 48. 📜データコントラクト
- 49. 🔭OpenTelemetryとDBトレーシング
出典: Chad Sanderson, 'Data Contracts: An Introduction' / Martin Kleppmann, DDIA Chapter 4