バッチ処理設計書とは
バッチ処理設計書は、定期実行や大量データ処理を行うバッチジョブの仕様を定義した設計書です。夜間バッチ・ファイル連携バッチ・集計バッチ・クリーニングバッチなどをカバーします。
バッチ処理はオンライン処理と異なり、実行中のエラー対応が難しく、失敗すると翌日の業務に影響が出るケースが多いです。このため、冪等性設計・エラーハンドリング・再実行方針を基本設計段階で確定させることが重要です。
💡 冪等性(Idempotency)とは
同じバッチを複数回実行しても、最終的なデータ状態が同じになることを「冪等性がある」と言います。バッチが途中で失敗した場合でも、最初から再実行すれば正しい結果が得られる設計を目指します。「冪等性のないバッチ」は再実行するとデータが二重登録されるため危険です。
① バッチジョブ一覧・スケジュール
| バッチID | バッチ名 | スケジュール(cron) | 処理時間目標 | 優先度 | 関連IF ID |
|---|---|---|---|---|---|
| B001 | 在庫送信バッチ | 0 22 * * *(毎日22:00) | 30分以内 | Must | IF-004 |
| B002 | 在庫更新バッチ | 0 8 * * *(毎日08:00) | 30分以内 | Must | IF-005 |
| B003 | 売上日報生成バッチ | 59 23 * * *(毎日23:59) | 10分以内 | Must | — |
| B004 | 月次売上集計バッチ | 0 1 1 * *(毎月1日01:00) | 1時間以内 | Must | IF-006 |
| B005 | ログアーカイブバッチ | 0 3 * * 0(毎週日曜03:00) | 2時間以内 | Should | — |
| B006 | セッションクリーニングバッチ | 0 4 * * *(毎日04:00) | 5分以内 | Must | — |
② 処理フローの定義
各バッチの処理フローを定義します。「在庫送信バッチ(B001)」の処理フロー例を以下に示します。
- 前処理チェック:前回バッチの実行ステータスを確認。異常終了のまま放置されている場合はアラート送信して処理中断
- 実行開始ログ出力:バッチ実行テーブル(
l_batch_log)に実行開始レコードを挿入(ステータス:実行中) - データ抽出:前回送信後に更新された在庫データをDBから抽出
- CSV生成:抽出データをSFTP送信用CSV形式にフォーマット
- 一時ファイル出力:ローカルディレクトリに一時CSVファイルを出力
- SFTP送信:一時CSVファイルをSFTPサーバーの送信ディレクトリへアップロード
- 完了通知ファイル配置:
.doneファイルをSFTPに配置 - 一時ファイル削除:ローカルの一時ファイルを削除
- 実行完了ログ出力:バッチ実行テーブルを成功ステータスで更新
⚠️ 処理フローは「障害発生ポイント」を意識して設計する
SFTP送信後に.doneファイル配置が失敗した場合、CSVは送信済みだが連携先は「未受信」と判断します。このような「部分的成功」ケースのリカバリー方法を設計段階で定義しておきます。
③ 冪等性の設計
各バッチの冪等性設計方針を定義します。
| バッチID | 冪等性の保証方法 | 再実行時の動作 |
|---|---|---|
| B001 在庫送信 | バッチ実行IDをファイル名に含める。同一実行IDのファイルは上書き送信 | CSVを再生成・再送信。連携先は最新ファイルを使用するため問題なし |
| B002 在庫更新 | DBへのUPSERT(INSERT ON CONFLICT DO UPDATE)を使用 | 同じCSVを再取込しても在庫数量は同じ値に更新される |
| B003 売上日報 | 日付をキーにしてUPSERT。既存レコードがあれば上書き | 同日の日報を再生成しても最終結果は同じ |
| B006 セッションクリーニング | 期限切れセッションを削除(DELETE WHERE)。重複実行しても既に削除済みのレコードには影響なし | 冪等性あり(再実行で問題なし) |
④ エラーハンドリング・再実行方針
| エラー種別 | 発生条件 | バッチの動作 | 再実行方針 |
|---|---|---|---|
| DB接続エラー | DB接続失敗・タイムアウト | 5秒間隔で3回リトライ。失敗時は異常終了・アラートメール送信 | DB復旧後に手動再実行(冪等性あり) |
| SFTP接続エラー | SFTP接続失敗 | 30秒間隔で3回リトライ。失敗時は異常終了 | SFTP復旧後に手動再実行 |
| データバリデーションエラー | 送信データに異常値が含まれる | エラーレコードをスキップしてログ出力。正常データのみ送信 | エラーレコードを手動修正後に差分再送 |
| 処理タイムアウト | 処理時間が目標値×2を超過 | 強制終了・アラート送信 | 原因調査後に手動再実行 |
⑤ ジョブ依存関係の定義
複数バッチが順序依存関係を持つ場合、ジョブ実行順序と依存条件を定義します。
| バッチID | 前提バッチ | 起動条件 | 備考 |
|---|---|---|---|
| B003 売上日報 | なし(独立実行) | 23:59 cron起動 | — |
| B004 月次売上集計 | B003が当月全日分正常終了していること | 毎月1日01:00 cron起動。B003の月間完了確認後に起動 | B003異常終了日がある場合はアラートのみ。手動で起動 |
| B006 セッションクリーニング | なし(独立実行) | 毎日04:00 cron起動 | — |
Python Tips — Pythonバッチの共通フレームワークを作る
"""
全バッチに共通するログ・エラーハンドリング・実行記録を提供する基底クラス。
各バッチはこのクラスを継承してexecute()メソッドを実装するだけでよい。
"""
import abc
import logging
import psycopg2
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
logger = logging.getLogger(__name__)
class BatchStatus(str, Enum):
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
SKIPPED = "SKIPPED"
@dataclass
class BatchResult:
status: BatchStatus
processed_count: int = 0
error_count: int = 0
message: str = ""
class BaseBatch(abc.ABC):
"""バッチ基底クラス。全バッチはこのクラスを継承する。"""
batch_id: str # サブクラスで定義: 例 "B001"
batch_name: str # サブクラスで定義: 例 "在庫送信バッチ"
def __init__(self, db_config: dict):
self.db_config = db_config
self.run_id: int | None = None
self.started_at: datetime | None = None
def run(self) -> BatchResult:
"""バッチ実行エントリーポイント(オーバーライド不可)"""
self.started_at = datetime.now()
logger.info(f"[{self.batch_id}] {self.batch_name} 開始")
try:
self._check_previous_run()
self._record_start()
result = self.execute()
self._record_end(result)
logger.info(f"[{self.batch_id}] 完了: {result}")
return result
except Exception as e:
logger.exception(f"[{self.batch_id}] 異常終了: {e}")
error_result = BatchResult(
status=BatchStatus.FAILED,
message=str(e)
)
self._record_end(error_result)
self._send_alert(str(e))
raise
@abc.abstractmethod
def execute(self) -> BatchResult:
"""バッチ本体処理。サブクラスで実装する。"""
...
def _check_previous_run(self) -> None:
"""前回の実行が異常終了中のままでないか確認する"""
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT run_id FROM l_batch_log WHERE batch_id=%s AND status=%s LIMIT 1",
(self.batch_id, BatchStatus.RUNNING)
)
row = cur.fetchone()
if row:
raise RuntimeError(
f"前回の実行(run_id={row[0]})が RUNNING のままです。"
"手動でステータスを確認してください。"
)
def _record_start(self) -> None:
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO l_batch_log (batch_id, batch_name, status, started_at) "
"VALUES (%s, %s, %s, %s) RETURNING run_id",
(self.batch_id, self.batch_name, BatchStatus.RUNNING, self.started_at)
)
self.run_id = cur.fetchone()[0]
conn.commit()
def _record_end(self, result: BatchResult) -> None:
if self.run_id is None:
return
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE l_batch_log SET status=%s, processed_count=%s, "
"error_count=%s, message=%s, ended_at=%s WHERE run_id=%s",
(result.status, result.processed_count, result.error_count,
result.message, datetime.now(), self.run_id)
)
conn.commit()
def _send_alert(self, message: str) -> None:
"""エラー時のアラート送信(メール等)。必要に応じてオーバーライド。"""
logger.error(f"ALERT: [{self.batch_id}] {message}")
# ── 在庫送信バッチの実装例 ─────────────────────────────
class StockSendBatch(BaseBatch):
batch_id = "B001"
batch_name = "在庫送信バッチ"
def execute(self) -> BatchResult:
# 1. DBから更新在庫データを抽出
# 2. CSV生成
# 3. SFTP送信
# ... 実際の処理を実装
logger.info("在庫データを抽出中...")
return BatchResult(status=BatchStatus.SUCCESS, processed_count=1500)
定義チェックリスト
| チェック項目 | 確認ポイント |
|---|---|
| □ 全バッチが一覧化されているか | スケジュール・処理時間目標・優先度が全バッチに定義されているか |
| □ 処理フローが定義されているか | 開始〜終了までの処理ステップが全バッチに定義されているか |
| □ 冪等性が設計されているか | 同一バッチを複数回実行しても安全なように設計されているか |
| □ エラーハンドリングが定義されているか | 各種障害ケースの動作とリカバリー方法が定義されているか |
| □ ジョブ依存関係が定義されているか | 順序依存のあるバッチ間の依存条件が定義されているか |
| □ バッチ実行ログが記録されるか | 実行開始・完了・処理件数・エラー件数がDBまたはファイルに記録されるか |
| □ 処理時間ウィンドウが確認されているか | 夜間バッチがサービス時間帯と重複しないか。データ更新停止時間帯が影響ないか |