バッチ処理設計書とは

バッチ処理設計書は、定期実行や大量データ処理を行うバッチジョブの仕様を定義した設計書です。夜間バッチ・ファイル連携バッチ・集計バッチ・クリーニングバッチなどをカバーします。

バッチ処理はオンライン処理と異なり、実行中のエラー対応が難しく、失敗すると翌日の業務に影響が出るケースが多いです。このため、冪等性設計・エラーハンドリング・再実行方針を基本設計段階で確定させることが重要です。

💡 冪等性(Idempotency)とは

同じバッチを複数回実行しても、最終的なデータ状態が同じになることを「冪等性がある」と言います。バッチが途中で失敗した場合でも、最初から再実行すれば正しい結果が得られる設計を目指します。「冪等性のないバッチ」は再実行するとデータが二重登録されるため危険です。

① バッチジョブ一覧・スケジュール

バッチIDバッチ名スケジュール(cron)処理時間目標優先度関連IF ID
B001在庫送信バッチ0 22 * * *(毎日22:00)30分以内MustIF-004
B002在庫更新バッチ0 8 * * *(毎日08:00)30分以内MustIF-005
B003売上日報生成バッチ59 23 * * *(毎日23:59)10分以内Must
B004月次売上集計バッチ0 1 1 * *(毎月1日01:00)1時間以内MustIF-006
B005ログアーカイブバッチ0 3 * * 0(毎週日曜03:00)2時間以内Should
B006セッションクリーニングバッチ0 4 * * *(毎日04:00)5分以内Must

② 処理フローの定義

各バッチの処理フローを定義します。「在庫送信バッチ(B001)」の処理フロー例を以下に示します。

  1. 前処理チェック:前回バッチの実行ステータスを確認。異常終了のまま放置されている場合はアラート送信して処理中断
  2. 実行開始ログ出力:バッチ実行テーブル(l_batch_log)に実行開始レコードを挿入(ステータス:実行中)
  3. データ抽出:前回送信後に更新された在庫データをDBから抽出
  4. CSV生成:抽出データをSFTP送信用CSV形式にフォーマット
  5. 一時ファイル出力:ローカルディレクトリに一時CSVファイルを出力
  6. SFTP送信:一時CSVファイルをSFTPサーバーの送信ディレクトリへアップロード
  7. 完了通知ファイル配置.doneファイルをSFTPに配置
  8. 一時ファイル削除:ローカルの一時ファイルを削除
  9. 実行完了ログ出力:バッチ実行テーブルを成功ステータスで更新

⚠️ 処理フローは「障害発生ポイント」を意識して設計する

SFTP送信後に.doneファイル配置が失敗した場合、CSVは送信済みだが連携先は「未受信」と判断します。このような「部分的成功」ケースのリカバリー方法を設計段階で定義しておきます。

③ 冪等性の設計

各バッチの冪等性設計方針を定義します。

バッチID冪等性の保証方法再実行時の動作
B001 在庫送信バッチ実行IDをファイル名に含める。同一実行IDのファイルは上書き送信CSVを再生成・再送信。連携先は最新ファイルを使用するため問題なし
B002 在庫更新DBへのUPSERT(INSERT ON CONFLICT DO UPDATE)を使用同じCSVを再取込しても在庫数量は同じ値に更新される
B003 売上日報日付をキーにしてUPSERT。既存レコードがあれば上書き同日の日報を再生成しても最終結果は同じ
B006 セッションクリーニング期限切れセッションを削除(DELETE WHERE)。重複実行しても既に削除済みのレコードには影響なし冪等性あり(再実行で問題なし)

④ エラーハンドリング・再実行方針

エラー種別発生条件バッチの動作再実行方針
DB接続エラーDB接続失敗・タイムアウト5秒間隔で3回リトライ。失敗時は異常終了・アラートメール送信DB復旧後に手動再実行(冪等性あり)
SFTP接続エラーSFTP接続失敗30秒間隔で3回リトライ。失敗時は異常終了SFTP復旧後に手動再実行
データバリデーションエラー送信データに異常値が含まれるエラーレコードをスキップしてログ出力。正常データのみ送信エラーレコードを手動修正後に差分再送
処理タイムアウト処理時間が目標値×2を超過強制終了・アラート送信原因調査後に手動再実行

⑤ ジョブ依存関係の定義

複数バッチが順序依存関係を持つ場合、ジョブ実行順序と依存条件を定義します。

バッチID前提バッチ起動条件備考
B003 売上日報なし(独立実行)23:59 cron起動
B004 月次売上集計B003が当月全日分正常終了していること毎月1日01:00 cron起動。B003の月間完了確認後に起動B003異常終了日がある場合はアラートのみ。手動で起動
B006 セッションクリーニングなし(独立実行)毎日04:00 cron起動

Python Tips — Pythonバッチの共通フレームワークを作る

Python — バッチ共通基底クラス(ログ・エラーハンドリング・実行記録)
"""
全バッチに共通するログ・エラーハンドリング・実行記録を提供する基底クラス。
各バッチはこのクラスを継承してexecute()メソッドを実装するだけでよい。
"""
import abc
import logging
import psycopg2
from dataclasses import dataclass
from datetime import datetime
from enum import Enum

logger = logging.getLogger(__name__)


class BatchStatus(str, Enum):
    RUNNING  = "RUNNING"
    SUCCESS  = "SUCCESS"
    FAILED   = "FAILED"
    SKIPPED  = "SKIPPED"


@dataclass
class BatchResult:
    status: BatchStatus
    processed_count: int = 0
    error_count: int = 0
    message: str = ""


class BaseBatch(abc.ABC):
    """バッチ基底クラス。全バッチはこのクラスを継承する。"""

    batch_id: str        # サブクラスで定義: 例 "B001"
    batch_name: str      # サブクラスで定義: 例 "在庫送信バッチ"

    def __init__(self, db_config: dict):
        self.db_config = db_config
        self.run_id: int | None = None
        self.started_at: datetime | None = None

    def run(self) -> BatchResult:
        """バッチ実行エントリーポイント(オーバーライド不可)"""
        self.started_at = datetime.now()
        logger.info(f"[{self.batch_id}] {self.batch_name} 開始")

        try:
            self._check_previous_run()
            self._record_start()
            result = self.execute()
            self._record_end(result)
            logger.info(f"[{self.batch_id}] 完了: {result}")
            return result

        except Exception as e:
            logger.exception(f"[{self.batch_id}] 異常終了: {e}")
            error_result = BatchResult(
                status=BatchStatus.FAILED,
                message=str(e)
            )
            self._record_end(error_result)
            self._send_alert(str(e))
            raise

    @abc.abstractmethod
    def execute(self) -> BatchResult:
        """バッチ本体処理。サブクラスで実装する。"""
        ...

    def _check_previous_run(self) -> None:
        """前回の実行が異常終了中のままでないか確認する"""
        with psycopg2.connect(**self.db_config) as conn:
            with conn.cursor() as cur:
                cur.execute(
                    "SELECT run_id FROM l_batch_log WHERE batch_id=%s AND status=%s LIMIT 1",
                    (self.batch_id, BatchStatus.RUNNING)
                )
                row = cur.fetchone()
                if row:
                    raise RuntimeError(
                        f"前回の実行(run_id={row[0]})が RUNNING のままです。"
                        "手動でステータスを確認してください。"
                    )

    def _record_start(self) -> None:
        with psycopg2.connect(**self.db_config) as conn:
            with conn.cursor() as cur:
                cur.execute(
                    "INSERT INTO l_batch_log (batch_id, batch_name, status, started_at) "
                    "VALUES (%s, %s, %s, %s) RETURNING run_id",
                    (self.batch_id, self.batch_name, BatchStatus.RUNNING, self.started_at)
                )
                self.run_id = cur.fetchone()[0]
            conn.commit()

    def _record_end(self, result: BatchResult) -> None:
        if self.run_id is None:
            return
        with psycopg2.connect(**self.db_config) as conn:
            with conn.cursor() as cur:
                cur.execute(
                    "UPDATE l_batch_log SET status=%s, processed_count=%s, "
                    "error_count=%s, message=%s, ended_at=%s WHERE run_id=%s",
                    (result.status, result.processed_count, result.error_count,
                     result.message, datetime.now(), self.run_id)
                )
            conn.commit()

    def _send_alert(self, message: str) -> None:
        """エラー時のアラート送信(メール等)。必要に応じてオーバーライド。"""
        logger.error(f"ALERT: [{self.batch_id}] {message}")


# ── 在庫送信バッチの実装例 ─────────────────────────────
class StockSendBatch(BaseBatch):
    batch_id   = "B001"
    batch_name = "在庫送信バッチ"

    def execute(self) -> BatchResult:
        # 1. DBから更新在庫データを抽出
        # 2. CSV生成
        # 3. SFTP送信
        # ... 実際の処理を実装
        logger.info("在庫データを抽出中...")
        return BatchResult(status=BatchStatus.SUCCESS, processed_count=1500)

定義チェックリスト

チェック項目確認ポイント
□ 全バッチが一覧化されているかスケジュール・処理時間目標・優先度が全バッチに定義されているか
□ 処理フローが定義されているか開始〜終了までの処理ステップが全バッチに定義されているか
□ 冪等性が設計されているか同一バッチを複数回実行しても安全なように設計されているか
□ エラーハンドリングが定義されているか各種障害ケースの動作とリカバリー方法が定義されているか
□ ジョブ依存関係が定義されているか順序依存のあるバッチ間の依存条件が定義されているか
□ バッチ実行ログが記録されるか実行開始・完了・処理件数・エラー件数がDBまたはファイルに記録されるか
□ 処理時間ウィンドウが確認されているか夜間バッチがサービス時間帯と重複しないか。データ更新停止時間帯が影響ないか