バッチ処理設計書の概要
バッチ処理はオンライン処理と比較して「定刻起動」「大量データ」「無人運用」という特性を持つ。設計書ではこの3特性を意識した定義が必要だ。
特にエラー時のリカバリ方針が曖昧なまま本番稼働すると、夜間バッチの失敗時に担当者が復旧手順を一から考えなければならず、重大インシデントに発展しやすい。設計段階でリカバリシナリオまで定義しておくことが鉄則だ。
💡 バッチ処理設計書が必要な場面
日次集計・月次請求・ファイル連携・データ移行・定期メール送信・インデックス再構築など、スケジュール実行が必要な処理はすべてバッチ処理設計書の対象となる。
定義すべき項目一覧
| 分類 | 定義項目 | 必須度 |
|---|---|---|
| 基本情報 | バッチID / バッチ名 | ◎ |
| 基本情報 | 処理概要(目的・対象データ・業務上の役割) | ◎ |
| スケジュール | 実行スケジュール(cron 式) | ◎ |
| スケジュール | 実行環境(サーバー・ユーザー) | ◎ |
| スケジュール | 実行時間の目安(通常・最大) | ◎ |
| 前後処理 | 前提バッチ(先行ジョブ) | ◎ |
| 前後処理 | 後続バッチ(後続ジョブ) | ◎ |
| 前後処理 | 使用するファイル・キュー | ○ |
| 処理フロー | 主処理のステップ(番号付き) | ◎ |
| 処理フロー | チェックポイント / コミットタイミング | ◎ |
| DB操作 | 参照・更新・挿入・削除テーブル | ◎ |
| エラー処理 | エラー検知条件と終了コード | ◎ |
| エラー処理 | リカバリ手順(手動 / 自動リトライ) | ◎ |
| エラー処理 | アラート通知先・通知条件 | ◎ |
| パフォーマンス | 処理件数上限 / タイムアウト値 | ○ |
| パフォーマンス | コミット単位(バルクサイズ) | ○ |
スケジュール定義
cron 式は必ず「実行タイミングの意図」も一緒に記述する。cron 式だけでは将来の担当者が意図を読み取れない。
┌─────────────── 分
│ ┌──────────── 時
│ │ ┌───────── 日
│ │ │ ┌────── 月
│ │ │ │ ┌─── 曜日(0=日曜)
│ │ │ │ │
0 2 * * * /opt/app/batch/daily_summary.sh
→ 意味: 毎日 02:00 に実行(深夜バッチ。オンライン停止後に実行)
0 3 1 * * /opt/app/batch/monthly_invoice.sh
→ 意味: 毎月1日 03:00 に実行(月次請求バッチ)
*/30 8-18 * * 1-5 /opt/app/batch/stock_sync.sh
→ 意味: 平日 8〜18時の間 30分ごとに実行(在庫同期)
【注意事項】
- 実行時間の最大見積: 30分(通常 5分)
→ 03:30 開始の後続バッチまでに必ず完了させること
- タイムゾーン: Asia/Tokyo(サーバー設定確認必須)
- 同一時刻の重複起動防止: flock を使用(flock -n /var/lock/batch.lock)
処理フロー定義
【前処理】
1. 処理開始ログを batch_logs テーブルに INSERT(status=RUNNING)
2. 前回実行時刻を batch_control テーブルから取得
2a. 前回実行日 = 今日の場合 → 二重起動エラー → E01
3. ロックファイル(/var/lock/daily_summary.lock)を確認
3a. 存在する場合 → 前回バッチが継続中 → E02
【主処理】
4. 処理対象日付を確定(前日: TODAY-1)
5. orders テーブルから対象日付の注文を取得
WHERE order_date = :target_date
AND status IN (20, 30, 40) ← 確定済み・出荷済み・完了
6. 1000件ずつ FETCH してループ処理
7. 各 1000件ごとに daily_summaries テーブルへ UPSERT
ON CONFLICT (summary_date, category_id) DO UPDATE
8. コミット(1000件ごと)
【後処理】
9. batch_control.last_run_date を TODAY に更新
10. 処理完了ログ(status=SUCCESS, processed_count=N)を batch_logs に UPDATE
11. ロックファイルを削除
【異常終了時】
- ロールバック(STEP 7〜8 のコミット分は保持、途中処理のみロールバック)
- status=FAILED で batch_logs を更新
- Slack #alert チャンネルに通知(エラーコード・処理済み件数・スタックトレース)
- ロックファイルを削除(次回実行を妨げないこと)
ジョブ依存関係定義
複数バッチが連携する場合、依存関係を明示する。依存関係の記述がないと、前後のバッチが同時実行されてデータ不整合が発生する。
| バッチID | バッチ名 | 先行バッチ | 後続バッチ | スケジュール |
|---|---|---|---|---|
| BTH-001 | 受注データ取込 | なし | BTH-002 | 毎日 01:00 |
| BTH-002 | 在庫引当 | BTH-001 | BTH-003 | BTH-001 完了後 |
| BTH-003 | 日次売上集計 | BTH-002 | BTH-004 | 毎日 02:00 |
| BTH-004 | 管理者レポート生成 | BTH-003 | なし | 毎日 03:00 |
エラー・リカバリ方針
| エラーコード | 発生条件 | 終了コード | 自動リカバリ | 手動リカバリ手順 |
|---|---|---|---|---|
| E01 | 当日分がすでに処理済み | 1 | なし | 確認のみ(正常終了扱い) |
| E02 | ロックファイルが存在 | 2 | 30分後に1回リトライ | プロセス確認→ロック解除→再実行 |
| E03 | DB接続エラー | 3 | 5分後に3回リトライ | DB状態確認→回復後に手動再実行 |
| E04 | 処理件数 0件(データ異常) | 4 | なし | データ確認→原因調査→判断後に再実行 |
パフォーマンス設計
バッチ処理は大量データを扱うため、パフォーマンス設計が特に重要だ。以下の観点を定義する。
■ 処理件数想定
通常時: 50,000件/日
ピーク時: 200,000件/日(年末シーズン)
最大タイムアウト: 60分(60分超でアラート)
■ コミット戦略
バルクサイズ: 1,000件ごとにコミット
理由: メモリ使用量を抑えつつ、障害時の再実行範囲を限定
■ インデックス利用確認
orders テーブル: idx_orders_date(order_date, status)
実行計画で INDEX SCAN であることを確認すること
■ テーブルロック回避
SKIP LOCKED を使用して他セッションにロックされた行をスキップ
SELECT ... FOR UPDATE SKIP LOCKED
■ 並列実行設計
カテゴリID を分割キーとして 4並列で実行
各プロセスに担当カテゴリ範囲を引数で渡す
結果を集約プロセスが最後にマージ
Python Tips — cron 設定の自動検証
cronファイルの設定を Python で解析し、実行スケジュールの衝突・過密・設定ミスを検出できる。
"""
crontab ファイルを解析し、スケジュール情報を一覧化する
pip install croniter
"""
from croniter import croniter
from datetime import datetime
CRONTAB_SAMPLE = """
# 受注取込
0 1 * * * /opt/app/batch/import_orders.sh
# 在庫引当(受注取込後に実行想定)
0 1 * * * /opt/app/batch/allocate_stock.sh
# 日次集計
0 2 * * * /opt/app/batch/daily_summary.sh
# 月次請求
0 3 1 * * /opt/app/batch/monthly_invoice.sh
"""
def parse_crontab(content: str) -> list[dict]:
jobs = []
for line in content.strip().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(None, 5)
if len(parts) < 6:
continue
cron_expr = " ".join(parts[:5])
command = parts[5]
base = datetime(2026, 1, 1)
try:
it = croniter(cron_expr, base)
next_runs = [it.get_next(datetime) for _ in range(3)]
jobs.append({
"cron": cron_expr,
"command": command,
"next_3_runs": [str(r) for r in next_runs],
})
except Exception as e:
jobs.append({"cron": cron_expr, "command": command, "error": str(e)})
return jobs
def check_conflicts(jobs: list[dict]) -> list[str]:
"""同一分に複数ジョブが起動する衝突を検出"""
from collections import defaultdict
schedule_map = defaultdict(list)
base = datetime(2026, 1, 1)
for job in jobs:
if "error" in job:
continue
it = croniter(job["cron"], base)
for _ in range(24 * 7): # 1週間分チェック
t = it.get_next(datetime)
schedule_map[t].append(job["command"])
conflicts = []
for t, cmds in schedule_map.items():
if len(cmds) > 1:
conflicts.append(f"{t}: {cmds}")
return conflicts[:5] # 最初の5件を返す
jobs = parse_crontab(CRONTAB_SAMPLE)
for j in jobs:
print(f"[{j['cron']}] {j['command']}")
if "next_3_runs" in j:
for r in j["next_3_runs"]:
print(f" 次回実行: {r}")
conflicts = check_conflicts(jobs)
if conflicts:
print("\n⚠️ 同時刻起動の検出:")
for c in conflicts:
print(f" {c}")
レビューチェックリスト
| # | チェック項目 |
|---|---|
| 1 | cron 式とその意味・タイムゾーンが記述されているか |
| 2 | 先行バッチ・後続バッチの依存関係が定義されているか |
| 3 | 二重起動防止の仕組みが定義されているか |
| 4 | コミット単位(バルクサイズ)が定義されているか |
| 5 | エラーコード・終了コードが定義されているか |
| 6 | 手動リカバリ手順が具体的に記述されているか |
| 7 | 処理件数上限・タイムアウト値が定義されているか |
| 8 | アラート通知先・通知条件が定義されているか |