jqコマンドとsystemdで実現する堅牢なJSONデータ変換パイプライン

Tech

本記事はGeminiの出力をプロンプト工学で整理した業務ドラフト(未検証)です。

jqコマンドとsystemdで実現する堅牢なJSONデータ変換パイプライン

DevOpsの実践において、様々なシステムから取得されるJSON形式のデータを効率的かつ堅牢に処理することは不可欠です。本記事では、汎用性の高いコマンドラインツールであるjqを用いてJSONデータの変換を行い、その処理をsystemdのユニットとタイマーによって定期的に自動実行するパイプラインの構築方法について解説します。特に、スクリプトの冪等性、安全性、そして適切な権限管理に焦点を当てます。

要件と前提

本記事で構築するデータ変換パイプラインは以下の要件を満たすことを目指します。

  • JSONデータ変換: jqコマンドを核として、APIから取得したJSONデータを加工・整形します。

  • データ取得: curlコマンドを用いて、外部APIから安全かつ堅牢にデータを取得します。

  • 自動実行: systemd.serviceユニットと.timerユニットを組み合わせ、指定した間隔で自動的に処理を実行します。

  • 冪等性: 何度実行してもシステムの外部状態が変化しない、または同じ結果が得られる処理を設計します。

  • 安全性: set -euo pipefailtrapによるエラーハンドリング、一時ディレクトリの安全な利用など、シェルスクリプトのベストプラクティスを適用します。

  • 権限分離: 最小権限の原則に基づき、専用ユーザーでのスクリプト実行を推奨し、root権限の不必要な利用を避けます。

  • ログ管理: systemdの標準機能を利用して、実行ログを一元的に管理します。

前提として、本記事のコード例はLinux環境(bashシェル)を想定しています。また、jq, curl, systemdがインストール済みであるものとします。

実装

データ変換パイプラインは、「データ取得」「JSONデータ変換」「スクリプト作成」「systemdユニット設定」の順に構築します。

1. データ取得(curlの安全な利用)

外部APIからのデータ取得にはcurlコマンドを利用します。ここでは、堅牢性とエラーハンドリングを考慮したcurlの利用例を示します。

#!/usr/bin/env bash


# set -euo pipefailはスクリプト全体で後述


# curl を用いて外部APIからデータを安全に取得する関数

fetch_data() {
    local api_endpoint="$1"
    local output_file="$2"
    local max_retries=5
    local retry_delay_sec=5
    local connect_timeout_sec=10
    local max_time_sec=30

    echo "$(date +"%Y-%m-%dT%H:%M:%SZ") INFO: Fetching data from ${api_endpoint}..." >&2

    # curl の安全な利用例


    # -sSL: サイレントモード、リダイレクトを追跡


    # --retry: リトライ回数


    # --retry-delay: リトライ間隔(秒)


    # --connect-timeout: 接続確立のタイムアウト(秒)


    # --max-time: 全体処理のタイムアウト(秒)


    # --fail-with-body: HTTPエラー時にレスポンスボディを表示


    # --header: リクエストヘッダの追加


    # --output: 出力ファイル指定

    if ! curl -sSL \
        --retry "${max_retries}" \
        --retry-delay "${retry_delay_sec}" \
        --connect-timeout "${connect_timeout_sec}" \
        --max-time "${max_time_sec}" \
        --fail-with-body \
        --header "Accept: application/json" \
        --output "${output_file}" \
        "${api_endpoint}"; then
        echo "$(date +"%Y-%m-%dT%H:%M:%SZ") ERROR: Failed to fetch data from API: ${api_endpoint}" >&2
        return 1
    fi
    echo "$(date +"%Y-%m-%dT%H:%M:%SZ") INFO: Data fetched and saved to ${output_file}" >&2
    return 0
}

# 使用例:


# fetch_data "https://api.example.com/data" "/tmp/raw_data.json"

TLS証明書による相互認証を行う場合は、--cacert, --cert, --key オプションを追加します。バックオフ戦略としては、--retry-delayを指数関数的に増加させるスクリプトロジックを追加することも可能です。

2. JSONデータ変換(jqの応用)

jqは軽量かつ強力なJSONプロセッサです。ここでは、基本的な変換から少し複雑なフィルタリングまで、いくつかの例を示します。

jqフィルター 説明 例(入力: {"a":1, "b":2} 出力
. 入力全体 . {"a":1, "b":2}
.key 特定のキーの値を取得 .a 1
.[] 配列の要素を個別にストリーム出力 `[1,2,3] .[]` 1\n2\n3
select(条件) 条件に合致する要素のみをフィルタリング `[{“v”:1},{“v”:2}] .[] select(.v==1)` {"v":1}
{new_key: .old_key} 新しいオブジェクトを作成し、キー名を変更 {val: .a} {"val":1}
add_field: "value" 既存のオブジェクトにフィールドを追加 . + {c:3} {"a":1, "b":2, "c":3}
map(式) 配列の各要素に式を適用 `[1,2] map(.+1)` [2,3]
--arg name "value" シェル変数をjqスクリプトに渡す ($nameで参照) --arg msg "hello" '{$msg}' {"msg":"hello"}
--raw-output / -r 結果をJSON文字列ではなく生の文字列として出力 `.”a” .` 1 (JSON) / 1 (raw)

変換スクリプト例: 取得した生データから特定のフィールドを抽出し、新しいフィールドを追加する例です。

#!/usr/bin/env bash


# jq を用いてJSONデータを変換する関数

transform_data() {
    local input_file="$1"
    local output_file="$2"
    local current_iso_date # シェルで現在時刻を取得し、jqに渡す

    # 現在のUTC時刻をISO 8601形式で取得 (例: 2024-07-26T12:34:56Z)

    current_iso_date=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

    echo "$(date +"%Y-%m-%dT%H:%M:%SZ") INFO: Transforming JSON data from ${input_file}..." >&2

    # jqフィルターの定義


    # --arg current_time: シェル変数`current_iso_date`を`jq`内の変数`$current_time`として渡す


    # -c: 圧縮出力 (改行なし)

    jq_filter='
    .[] | select(.status == "active") | { # 配列の各要素を処理し、statusが"active"のものを選択
        id: .id,                          # idフィールドをそのまま使用
        name: .name,                      # nameフィールドをそのまま使用
        category: .category // "unknown", # categoryがnullや存在しない場合は"unknown"をデフォルト値とする
        processed_at: $current_time       # $current_time変数をprocessed_atフィールドとして追加
    }
    '

    # jqコマンドの実行

    if ! jq -c --arg current_time "${current_iso_date}" "${jq_filter}" "${input_file}" > "${output_file}"; then
        echo "$(date +"%Y-%m-%dT%H:%M:%SZ") ERROR: Failed to transform JSON data with jq." >&2
        return 1
    fi
    echo "$(date +"%Y-%m-%dT%H:%M:%SZ") INFO: Transformed data saved to ${output_file}" >&2
    return 0
}

# 使用例:


# transform_data "/tmp/raw_data.json" "/tmp/transformed_data.json"

ここで、// 演算子は「もし左側がnullまたは欠損していれば右側の値を使用する」というnull合体演算子として機能します。

3. スクリプトの作成(冪等性と安全性)

curljqの利用を統合し、systemdから実行されるメインスクリプトを作成します。ここでは、一時ディレクトリの安全な管理と厳格なエラーハンドリングを導入します。

ファイル名: /opt/data-processor/scripts/process_data.sh

#!/usr/bin/env bash


# このスクリプトはAPIからJSONデータを取得し、jqで変換後、指定された場所に出力します。

# 厳格なエラーハンドリング設定


# -e: エラーが発生したら即座に終了


# -u: 未定義の変数を使用したらエラー


# -o pipefail: パイプライン中のコマンドが一つでも失敗したら全体を失敗とみなす

set -euo pipefail

# スクリプト名

SCRIPT_NAME=$(basename "$0")

# ログ関数 (journalctlで確認できるよう標準エラー出力に詳細を出す)

log() {
    local level="$1"
    local message="$2"

    # ISO 8601形式でJST現在時刻を出力

    echo "$(date +"%Y-%m-%dT%H:%M:%S%z") ${SCRIPT_NAME} [${level}]: ${message}" >&2
}

# 一時ディレクトリのパスを格納する変数

TMP_DIR=""

# クリーンアップ関数


# スクリプト終了時 (正常終了、エラー終了、シグナル終了) に一時ファイルを削除

cleanup() {
    local exit_code=$? # 終了コードを保持
    if [ -n "${TMP_DIR}" ] && [ -d "${TMP_DIR}" ]; then
        log "INFO" "Cleaning up temporary directory: ${TMP_DIR}"
        rm -rf "${TMP_DIR}"
    fi

    # 保持した終了コードで終了

    exit "${exit_code}"
}

# EXIT: シェルが終了する際に実行


# INT: Ctrl+Cなどによる割り込みシグナル


# TERM: 終了シグナル

trap cleanup EXIT INT TERM

# 一時ディレクトリの作成 (mktempで安全なユニークなディレクトリを作成)


# -d: ディレクトリを作成


# -t json-proc-XXXXXXXXXX: プレフィックスとXでテンプレート指定

TMP_DIR=$(mktemp -d -t json-proc-XXXXXXXXXX)
if [ -z "${TMP_DIR}" ]; then
    log "ERROR" "Failed to create temporary directory."
    exit 1
fi
log "INFO" "Temporary directory created: ${TMP_DIR}"

# -----------------------------------------------------


# データ取得と変換処理の本体


# -----------------------------------------------------

# APIエンドポイントと出力ファイルの定義

API_ENDPOINT="https://api.example.com/data"
RAW_DATA_FILE="${TMP_DIR}/raw_data.json"
TRANSFORMED_DATA_FILE="${TMP_DIR}/transformed_data.json"
OUTPUT_DESTINATION_FILE="/var/lib/data-processor/latest_processed_data_$(date +"%Y%m%d%H%M%S").json" # 最終出力パス

# データ取得


# fetch_data関数は前のセクションで定義されたものを使用

fetch_data() {
    local api_endpoint="$1"
    local output_file="$2"
    local max_retries=5
    local retry_delay_sec=5
    local connect_timeout_sec=10
    local max_time_sec=30

    log "INFO" "Fetching data from ${api_endpoint}..."

    if ! curl -sSL \
        --retry "${max_retries}" \
        --retry-delay "${retry_delay_sec}" \
        --connect-timeout "${connect_timeout_sec}" \
        --max-time "${max_time_sec}" \
        --fail-with-body \
        --header "Accept: application/json" \
        --output "${output_file}" \
        "${api_endpoint}"; then
        log "ERROR" "Failed to fetch data from API: ${api_endpoint}"
        return 1
    fi
    log "INFO" "Data fetched and saved to ${output_file}"
    return 0
}

# JSONデータ変換


# transform_data関数は前のセクションで定義されたものを使用

transform_data() {
    local input_file="$1"
    local output_file="$2"
    local current_iso_date # シェルで現在時刻を取得し、jqに渡す

    current_iso_date=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

    log "INFO" "Transforming JSON data from ${input_file}..."

    jq_filter='
    .[] | select(.status == "active") | {
        id: .id,
        name: .name,
        category: .category // "unknown",
        processed_at: $current_time
    }
    '

    if ! jq -c --arg current_time "${current_iso_date}" "${jq_filter}" "${input_file}" > "${output_file}"; then
        log "ERROR" "Failed to transform JSON data with jq."
        return 1
    fi
    log "INFO" "Transformed data saved to ${output_file}"
    return 0
}


# メイン処理の実行

if ! fetch_data "${API_ENDPOINT}" "${RAW_DATA_FILE}"; then
    log "CRITICAL" "Data fetching failed. Exiting."
    exit 1
fi

if ! transform_data "${RAW_DATA_FILE}" "${TRANSFORMED_DATA_FILE}"; then
    log "CRITICAL" "Data transformation failed. Exiting."
    exit 1
fi

# 最終的なデータを永続的な場所へ移動


# 冪等性を考慮し、常に新しいタイムスタンプ付きファイルとして保存。


# もし常に同じファイル名を上書きしたい場合は、mv -f を使用。


# この例では、履歴を保持するため新しいファイル名で保存。

if ! install -D -m 644 "${TRANSFORMED_DATA_FILE}" "${OUTPUT_DESTINATION_FILE}"; then
    log "ERROR" "Failed to move transformed data to ${OUTPUT_DESTINATION_FILE}. Please check permissions."
    exit 1
fi
log "INFO" "Final processed data moved to ${OUTPUT_DESTINATION_FILE}"

log "INFO" "Script completed successfully."
exit 0

このスクリプトは、以下のディレクトリ構造に配置することを想定しています。 /opt/data-processor/scripts/process_data.sh /var/lib/data-processor/ (最終的なデータ出力先)

権限の考慮: スクリプトには実行権限 (chmod +x process_data.sh) を与え、systemdユニットで実行する際には、UserGroupディレクティブを用いて専用の非rootユーザー(例: dataproc)として実行することを強く推奨します。root権限で実行することは、セキュリティ上のリスクを増大させます。専用ユーザーは、必要最小限のファイルやディレクトリへの読み書き権限のみを持つべきです。

4. systemdユニットの設定

systemdを用いて、上記のスクリプトを定期的に自動実行するように設定します。これには、.serviceユニットと.timerユニットの2つのファイルが必要です。

systemd.serviceユニットファイル

/etc/systemd/system/data-processor.service

# /etc/systemd/system/data-processor.service

[Unit]
Description=Periodic JSON Data Processing Service by DevOps team # サービスの目的を記述
Documentation=https://jqlang.github.io/jq/manual/ # 関連ドキュメントのURL
After=network-online.target # ネットワークが利用可能になってから起動

[Service]
Type=oneshot # 実行後すぐに終了するタイプ

# Type=simple や Type=forking は長時間のバックグラウンドプロセス向け。


# このスクリプトは実行後終了するためoneshotが適切。

User=dataproc # スクリプトを実行する専用ユーザー名
Group=dataproc # スクリプトを実行する専用グループ名

# root権限を必要としない操作であれば、専用ユーザーでの実行はセキュリティ上必須です。

WorkingDirectory=/opt/data-processor/scripts # スクリプトの実行ディレクトリ

# ExecStart: 実行するコマンドとその引数

ExecStart=/opt/data-processor/scripts/process_data.sh

# サービス固有の一時ディレクトリを/tmpの代わりに利用

#これにより、サービス間の/tmp汚染を防ぎ、クリーンアップも容易になります。

PrivateTmp=true

# 読み書きを許可するパスを明示的に指定


# ReadWritePaths=/var/lib/data-processor /var/log/data-processor


# 上記はsystemd v236以降で利用可能。古いバージョンでは別途権限管理が必要です。

# プロセス分離を強化するオプション例 (必要に応じて追加)


# RootDirectory=/var/empty # chroot 環境で実行


# CapabilityBoundingSet=~CAP_SYS_ADMIN # 特権を制限

# 標準出力と標準エラー出力の処理

StandardOutput=journal # 出力をsystemdジャーナルに送る
StandardError=journal  # エラー出力をsystemdジャーナルに送る

# これにより、journalctlコマンドでログを一元的に確認できます。

# サービスが失敗した場合の再起動ポリシー (oneshotの場合はあまり意味がないが、例として)


# Restart=on-failure


# RestartSec=5s

[Install]
WantedBy=multi-user.target # システム起動時に自動実行されるターゲット

systemd.timerユニットファイル

/etc/systemd/system/data-processor.timer

# /etc/systemd/system/data-processor.timer

[Unit]
Description=Run JSON Data Processor every 15 minutes # タイマーの目的を記述
Requires=data-processor.service # 関連するサービスユニット

[Timer]

# OnCalendar: タイマーのスケジュールを定義


# この例では、毎時0分、15分、30分、45分に実行

OnCalendar=*:0/15:00

# 他の例:


# OnCalendar=hourly      # 毎時0分に実行


# OnCalendar=daily       # 毎日00:00に実行


# OnCalendar=*-*-* 03:00:00 # 毎日03:00に実行


# OnCalendar=Wed *-*-1,15 00:00:00 # 毎月1日と15日の水曜日00:00に実行

Persistent=true # サービスが停止している間に実行されるべきだったスケジュールを、再開後にすぐに実行する

# これがないと、タイマーが非アクティブな間に発生したスケジュールはスキップされます。

[Install]
WantedBy=timers.target # タイマーを有効にするターゲット

これらのファイルを配置したら、systemdに設定をリロードさせ、タイマーを有効化して起動します。

sudo systemctl daemon-reload # systemd設定の再読み込み
sudo systemctl enable data-processor.timer # タイマーを自動起動設定
sudo systemctl start data-processor.timer  # タイマーを今すぐ起動

これにより、タイマーが指定されたスケジュールでdata-processor.serviceをトリガーし、スクリプトが実行されるようになります。

検証

実装が正しく機能するかを確認します。

  1. スクリプトの手動実行: systemdを介さずにスクリプト単体で実行し、エラーが出ないか、意図した出力が得られるかを確認します。

    bash -x /opt/data-processor/scripts/process_data.sh
    
    # または、より詳細なデバッグのため
    
    
    # /opt/data-processor/scripts/process_data.sh
    

    -xオプションは、実行されるコマンドとその引数を標準エラー出力に表示し、デバッグに役立ちます。

  2. systemdサービスの確認: タイマーが起動しているか、サービスが正しく実行されたかを確認します。

    systemctl status data-processor.timer
    systemctl status data-processor.service
    

    data-processor.serviceoneshotタイプなので、実行後すぐにinactive (dead)になります。これは正常な動作です。

  3. ログの確認: systemdジャーナルからスクリプトの実行ログを確認します。

    journalctl -u data-processor.service --since "1 hour ago"
    journalctl -f -u data-processor.service # リアルタイムでログを追跡
    

    出力されたログから、fetch_datatransform_dataが成功しているか、エラーが発生していないかを確認します。

  4. 出力ファイルの確認: 最終的に生成されるファイルが意図した場所に、意図した内容で保存されているかを確認します。

    ls -l /var/lib/data-processor/
    head /var/lib/data-processor/latest_processed_data_*.json
    

運用

構築したパイプラインの安定運用には、以下の点に注意が必要です。

  • ログ監視: journalctlだけでなく、logrotateを利用してログファイルの管理を行います。また、監視ツールと連携し、エラーログが発生した場合に通知される仕組みを構築します。

  • 設定管理: スクリプトやsystemdユニットファイルは、Gitなどのバージョン管理システムで管理し、AnsibleやPuppetといったInfrastructure as Code(IaC)ツールを用いてデプロイすることを推奨します。

  • 権限分離: dataprocユーザーが本当に必要な権限のみを持つように、定期的に見直しを行います。不要なディレクトリへのアクセス権やコマンド実行権限を与えないように注意します。特に、rootユーザーでの実行は極力避け、専用のサービスアカウントと最小権限の原則を適用します。

  • リソース監視: スクリプトの実行によるCPU、メモリ、ディスクI/Oへの影響を監視し、必要に応じてリソースを調整します。

  • バックアップ: 処理されたデータや設定ファイルのバックアップ戦略を確立します。

トラブルシュート

問題が発生した場合の一般的なトラブルシュート手順です。

  1. サービスの状態確認:

    systemctl status data-processor.service
    systemctl status data-processor.timer
    

    Active: failedinactive (dead)以外の異常な状態であれば、その原因を探ります。

  2. 詳細ログの確認: 最も重要なステップです。エラーメッセージやスタックトレースから原因を特定します。

    journalctl -u data-processor.service --since "yesterday" -xe # 昨日のログからエラーの詳細を確認
    journalctl -u data-processor.service -f # リアルタイムログ
    
  3. スクリプトのデバッグ実行: systemdを介さずに、set -xオプションを有効にしてスクリプトを直接実行し、詳細な実行トレースを確認します。

    bash -x /opt/data-processor/scripts/process_data.sh
    
  4. jqフィルターの確認: jqのフィルターが複雑な場合、期待通りのJSON変換が行われているか、小さな入力ファイルで試行錯誤します。jq -r '. | ...'のように--raw-outputオプションを使って、結果の引用符などを排除して確認するのも有効です。

  5. curlの問題: APIアクセスに問題がある場合、curlコマンド単体で実行し、エラーコードやレスポンスボディを確認します。

    curl -v "https://api.example.com/data"
    

    -vオプションは、詳細な通信ログを出力します。

まとめ

jqsystemdを活用して、JSONデータ変換パイプラインを構築する方法を解説しました。curlによる安全なデータ取得、jqによる柔軟なデータ変換、そしてsystemdによる堅牢な自動実行を組み合わせることで、DevOpsにおけるデータ処理の自動化と安定運用が実現できます。

特に、set -euo pipefailtrapによる安全なシェルスクリプトの記述、mktempによる一時ファイルの適切な管理、そしてsystemdUserディレクティブを用いた権限分離は、運用上の安定性とセキュリティを確保するために不可欠です。これらのプラクティスを遵守することで、信頼性の高いデータ処理基盤を構築し、DevOpsの効率を向上させることができるでしょう。

graph TD
    D[systemd.timer] -->|スケジュール起動| E[systemd.service];
    E -->|ExecStart実行| F{"process_data.shスクリプト"};
    F -->|curlでデータ要求| A["外部APIエンドポイント"];
    A -->|JSONデータ取得| B["jqによるデータ変換"];
    B -->|変換済みJSON出力| C["データ保存/利用"];
    subgraph process_data.shの内部処理
        A --> B;
        B --> C;
    end

一次情報出典

  • [1] jq manual. jqlang.github.io. 更新日不明 (最新版のドキュメント). https://jqlang.github.io/jq/manual/

  • [2] curl manual. curl.se. 更新日不明 (最新版のドキュメント). https://curl.se/docs/manual.html

  • [3] systemd.service man page. freedesktop.org. 最終更新日: 2024-03-05. https://www.freedesktop.org/software/systemd/man/systemd.service.html

  • [4] systemd.timer man page. freedesktop.org. 最終更新日: 2024-03-05. https://www.freedesktop.org/software/systemd/man/systemd.timer.html

ライセンス:本記事のテキスト/コードは特記なき限り CC BY 4.0 です。引用の際は出典URL(本ページ)を明記してください。
利用ポリシー もご参照ください。

コメント

タイトルとURLをコピーしました