【第10回】故障はいつ起きる?振動データで劣化予測に挑戦

加速度センサー

※当サイトは、アフィリエイト広告を利用しています

概要

加速度センサーシリーズの続編です。

今回は、振動データを使った劣化予測に挑戦します。
前回までは、正常か異常か、どの故障モードかを判定する仕組みを作ってきました。
しかし、使用者が本当に知りたいのは、「どれくらい劣化しているのか」「いつ頃故障しそうなのか」です。
そこで今回は、吸気異常を4段階に変化させたデータを使い、特徴量がどのように変化するのか、あとどのくらいで故障に至るのか予測モデルの構築に挑戦してみました。

詳しくは、以下のYouTube動画をご覧ください。

【第10回】故障はいつ起きる?振動データで劣化予測に挑戦

配線

プログラムコード

マイコン(ESP32)

・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・1000Hz周期でデータ取得
・CSV形式でシリアル出力

#include <Wire.h>
#include <MPU6050_tockn.h>

MPU6050 mpu(Wire);

const unsigned long SAMPLE_US = 1000; // 1000us = 1000Hz
unsigned long last_us = 0;

void setup() {
  Serial.begin(115200);
  Wire.begin(21, 22);
  Wire.setClock(400000); 

  mpu.begin();
  mpu.calcGyroOffsets(true);

  Serial.println("time_us,ax,ay,az,acc_total");
}

void loop() {
  unsigned long now = micros();
  if (now - last_us < SAMPLE_US) return;
  last_us = now;

  mpu.update();

  float ax = mpu.getAccX();
  float ay = mpu.getAccY();
  float az = mpu.getAccZ();
  float acc_total = sqrt(ax*ax + ay*ay + az*az);

  Serial.print(now);
  Serial.print(",");
  Serial.print(ax, 4);
  Serial.print(",");
  Serial.print(ay, 4);
  Serial.print(",");
  Serial.print(az, 4);
  Serial.print(",");
  Serial.println(acc_total, 4);
}

Python

次の3つのコードを使用しています。①⇒②⇒③の順に出力されたファイルが次のコードのインプットになっています。

①ステージ別特徴量の作成・比較

"""
-----------------------------------------
正常状態から劣化状態までの振動データを比較し、
・どの特徴量が劣化とともに変化するのか
・劣化度合いを表せる特徴量は何か
を確認します。

今回の実験では、normal/stage1/stage2/stage3/stage4の5段階のデータを比較します。

処理の流れ
-----------------------------------------
① CSV読み込み
② time_usを使って等間隔データへ補正
③ FFT、Welch PSDを計算
④ FFT特徴量を抽出
    ・支配周波数
    ・ピーク幅
    ・バンドパワー
    ・スペクトル重心
    ・高周波比
⑤ ステージごとの平均を比較
⑥ 箱ひげ図でばらつきを確認
⑦ all_stage_features.csv を保存
このCSVを次のコードで読み込み、劣化スコアを作成します。
=========================================================
"""

import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths


# =============================
# ▼基本設定
# =============================

FS = 1000.0
# 第2回で1000Hz測定した前提です

TARGET_DURATION_S = 10.0
# 各CSVファイルの先頭10秒を解析します

N_SAMPLES = int(FS * TARGET_DURATION_S)

CHANNEL = "acc_total"
# 3軸合成加速度を解析対象にします

F_MIN = 5.0
# 5Hz未満は低周波の揺れや姿勢変化の影響を受けやすいため除外します

F_MAX_PLOT = 500.0
# PSDグラフの表示上限です

F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
# 支配周波数を探す範囲です

N_PERSEG = 2048
# Welch法でPSDを計算するときの分割長です

BANDS_HZ = [
    (5.0, 10.0),
    (10.0, 30.0),
    (30.0, 80.0),
    (80.0, 200.0),
    (200.0, 400.0),
    (400.0, 500.0),
]

EPS = 1e-12


# =============================
# ▼劣化レベルごとのフォルダ
# =============================
#
# このPythonファイルと同じ場所に、
#
# normal
# stage1
# stage2
# stage3
# stage4
#
# というフォルダを作成し、
# それぞれのCSVを入れてください。
#
# stage番号が大きいほど、
# 劣化が進んでいる想定です。
#

GROUP_DIRS = {
    "normal": "normal",
    "stage1": "stage1",
    "stage2": "stage2",
    "stage3": "stage3",
    "stage4": "stage4",
}

FILE_PATTERN = "acc_usb_highrate_*.csv"


DISPLAY_NAMES = {
    "normal": "Normal",
    "stage1": "Stage 1",
    "stage2": "Stage 2",
    "stage3": "Stage 3",
    "stage4": "Stage 4",
}

GROUP_COLORS = {
    "normal": "blue",
    "stage1": "green",
    "stage2": "orange",
    "stage3": "red",
    "stage4": "purple",
}


# =============================
# ▼劣化評価に使う特徴量
# =============================
#
# どの特徴量が劣化に反応するかを確認します。
#
# dom_freq_hz
#     支配周波数
#
# peak_width_hz
#     ピーク幅
#
# band_psd_sum_30_80
#     30~80Hz帯の振動エネルギー
#
# band_psd_sum_200_400
#     200~400Hz帯の高周波エネルギー
#
# hi_lo_ratio_200_500_over_5_200
#     高周波ノイズ比
#
# spec_centroid_hz_ge5
#     スペクトル重心
#

COMPARE_FEATURES = [
    "dom_freq_hz",
    "peak_width_hz",
    "band_psd_sum_30_80",
    "band_psd_sum_200_400",
    "hi_lo_ratio_200_500_over_5_200",
    "spec_centroid_hz_ge5",
]


# =============================
# ▼前処理:time_usから等間隔化し、先頭10秒を抽出
# =============================

def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
    """
    ESP32の測定データは、完全な等間隔とは限りません。

    FFTを正しく行うために、
    time_us を使って1000Hzの等間隔データへ補正します。

    さらに、各CSVで比較条件をそろえるため、
    先頭10秒だけを解析対象にします。
    """

    df = pd.read_csv(csv_path)

    required_cols = ["time_us", "ax", "ay", "az", "acc_total"]

    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"列 {c} が見つかりません。file={csv_path}")

    # time_usを秒に変換し、開始時刻を0秒にそろえます
    t = df["time_us"].to_numpy(dtype=np.float64) / 1e6
    t = t - t[0]

    # 時刻の重複や逆行を除外します
    keep = np.ones_like(t, dtype=bool)
    keep[1:] = t[1:] > t[:-1]

    t = t[keep]
    df = df.loc[keep].reset_index(drop=True)

    if t.size < 2:
        raise ValueError(f"time列が短すぎます。file={csv_path}")

    # 1000Hz、10秒分の等間隔時間軸を作ります
    t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS

    out = pd.DataFrame({"time_s": t_uniform})

    t_min = t[0]
    t_max = t[-1]

    # ax, ay, az, acc_totalを等間隔データに補間します
    for col in ["ax", "ay", "az", "acc_total"]:
        y = df[col].to_numpy(dtype=np.float64)

        f = interp1d(
            t,
            y,
            kind="linear",
            bounds_error=False,
            fill_value="extrapolate",
        )

        y_uniform = f(t_uniform)

        # 実測範囲外は0にします
        valid = (t_uniform >= t_min) & (t_uniform <= t_max)
        y_uniform = np.where(valid, y_uniform, 0.0)

        out[col] = y_uniform

    return out


# =============================
# ▼Welch PSD
# =============================

def compute_psd(x: np.ndarray):
    """
    Welch法でPSDを計算します。

    PSDを見ることで、
    どの周波数にどれだけ振動エネルギーがあるかを確認できます。

    Welch法は、単純なFFTよりもノイズの影響を抑えやすく、
    安定したスペクトルを得やすい方法です。
    """

    x = np.asarray(x, dtype=np.float64)

    # 平均値を引いてDC成分を除去します
    x = x - np.mean(x)

    f, Pxx = welch(
        x,
        fs=FS,
        nperseg=N_PERSEG,
    )

    return f, Pxx


# =============================
# ▼支配ピークの特徴量
# =============================

def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
    """
    最も強い周波数成分を調べます。

    抽出する特徴量:

    dom_freq_hz
        支配周波数

    dom_psd
        支配周波数の強さ

    peak_width_hz
        ピークの広がり

    peak_sharpness
        ピークの鋭さ

    劣化が進むと、ピーク位置や鋭さが変化することがあります。
    """

    m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)

    f_dom = f[m_dom]
    P_dom = Pxx[m_dom]

    feats = {}

    if f_dom.size < 3:
        feats["dom_freq_hz"] = 0.0
        feats["dom_psd"] = 0.0
        feats["peak_width_hz"] = 0.0
        feats["peak_sharpness"] = 0.0
        return feats

    # PSDが最大となる周波数を支配周波数とします
    idx_peak_local = int(np.argmax(P_dom))

    dom_freq = float(f_dom[idx_peak_local])
    dom_psd = float(P_dom[idx_peak_local])

    # ピークを検出します
    peaks, _ = find_peaks(P_dom)

    if peaks.size == 0:
        peaks = np.array([idx_peak_local], dtype=int)

    # 支配周波数に最も近いピークを採用します
    peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
    pidx = int(peaks_sorted[0])

    # ピーク幅を計算します
    widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)

    width_bins = float(widths[0])
    df_hz = float(f_dom[1] - f_dom[0])
    peak_width_hz = width_bins * df_hz

    # ピーク周辺と比較して、ピークの鋭さを計算します
    span_hz = 4.0
    span_bins = max(1, int(round(span_hz / df_hz)))

    lo = max(0, pidx - span_bins)
    hi = min(P_dom.size, pidx + span_bins + 1)

    if hi - lo >= 3:
        neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
    else:
        neighborhood = P_dom[lo:hi]

    if neighborhood.size > 0:
        neigh_mean = float(np.mean(neighborhood))
    else:
        neigh_mean = float(np.mean(P_dom))

    peak_sharpness = np.log10(dom_psd / (neigh_mean + EPS) + 1.0)

    feats["dom_freq_hz"] = dom_freq
    feats["dom_psd"] = dom_psd
    feats["peak_width_hz"] = float(peak_width_hz)
    feats["peak_sharpness"] = float(peak_sharpness)

    return feats


# =============================
# ▼スペクトル特徴量
# =============================

def spectral_features(f: np.ndarray, Pxx: np.ndarray):
    """
    FFTスペクトル全体から特徴量を作ります。

    劣化が進むと、
    高周波成分や特定帯域のエネルギーが増えることがあります。
    """

    feats = {}

    m_ge = f >= F_MIN

    f_ge = f[m_ge]
    P_ge = Pxx[m_ge]

    # 5Hz以上の全体エネルギーです
    feats["psd_sum_ge5"] = float(np.sum(P_ge))

    # 周波数帯ごとのエネルギーです
    for f1, f2 in BANDS_HZ:
        m = (f >= f1) & (f < f2)
        feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))

    # スペクトル重心です
    # エネルギーの中心がどの周波数にあるかを表します
    denom = float(np.sum(P_ge))

    if denom > 0:
        feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
    else:
        feats["spec_centroid_hz_ge5"] = 0.0

    # 80Hz以上の高周波比です
    p_hi_80_500 = float(np.sum(Pxx[f >= 80.0]))
    p_lo_5_80 = float(np.sum(Pxx[(f >= 5.0) & (f < 80.0)]))

    feats["hi_lo_ratio_ge80_over_5to80"] = p_hi_80_500 / (p_lo_5_80 + EPS)

    # 200〜500Hzの高周波比です
    p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
    p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))

    feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)

    return feats


# =============================
# ▼1つのCSVファイルから特徴量を抽出
# =============================

def extract_all_features(csv_path: str):
    prep = load_and_resample_10s(csv_path)

    x = prep[CHANNEL].to_numpy(dtype=np.float64)

    f, Pxx = compute_psd(x)

    feats = {}
    feats.update(peak_features_from_psd(f, Pxx))
    feats.update(spectral_features(f, Pxx))

    return feats, f, Pxx, prep


# =============================
# ▼グループごとの特徴量を読み込む
# =============================

def load_group_features(group_name: str, folder_path: str):
    """
    normal、stage1、stage2…の各フォルダからCSVを読み込み、
    グループごとの特徴量、PSD、時間波形をまとめます。
    """

    files = sorted(glob.glob(os.path.join(folder_path, FILE_PATTERN)))

    if len(files) == 0:
        raise RuntimeError(f"{group_name} のCSVが見つかりません: {folder_path}")

    feat_rows = []
    psd_list = []
    waveform_list = []
    f_ref = None
    time_ref = None

    for p in files:
        feats, f, Pxx, prep = extract_all_features(p)

        x = prep[CHANNEL].to_numpy(dtype=np.float64)
        t = prep["time_s"].to_numpy(dtype=np.float64)

        # 周波数軸が全ファイルで同じか確認します
        if f_ref is None:
            f_ref = f
        else:
            if f.size != f_ref.size or np.max(np.abs(f - f_ref)) > 1e-9:
                raise RuntimeError(f"Welchの周波数軸が一致しません: {p}")

        # 時間軸が全ファイルで同じか確認します
        if time_ref is None:
            time_ref = t
        else:
            if t.size != time_ref.size or np.max(np.abs(t - time_ref)) > 1e-12:
                raise RuntimeError(f"時間軸が一致しません: {p}")

        row = {
            "group": group_name,
            "file": os.path.basename(p),
            "path": p,
        }

        row.update(feats)

        feat_rows.append(row)
        psd_list.append(Pxx)
        waveform_list.append(x)

    df = pd.DataFrame(feat_rows).sort_values("file").reset_index(drop=True)

    psd_arr = np.vstack(psd_list)
    waveform_arr = np.vstack(waveform_list)

    return df, f_ref, psd_arr, time_ref, waveform_arr


# =============================
# ▼グラフ1:平均時間波形の比較
# =============================

def plot_mean_time_waveform(groups_waveform, time_ref):
    """
    各ステージの平均時間波形を比較します。

    時系列波形の見た目が変わるかを確認できます。
    """

    plt.figure(figsize=(12, 7))

    for group_name, info in groups_waveform.items():
        mean_waveform = np.mean(info["waveform"], axis=0)

        color = GROUP_COLORS.get(group_name, None)
        label_mean = DISPLAY_NAMES.get(group_name, group_name)

        plt.plot(
            time_ref,
            mean_waveform,
            label=label_mean,
            color=color,
            linewidth=1.0,
        )

    plt.xlim(0, TARGET_DURATION_S)
    plt.title("Mean Time Waveform comparison")
    plt.xlabel("Time [s]")
    plt.ylabel(CHANNEL)
    plt.legend(fontsize=9)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()


# =============================
# ▼グラフ2:平均PSD比較
# =============================

def plot_mean_psd(groups_psd, f_ref):
    """
    各ステージの平均PSDを比較します。

    正常からstage4に向けて、
    どの周波数帯が変化するかを見るグラフです。

    第10回コード①の一番重要なグラフです。
    """

    plt.figure(figsize=(12, 7))

    for group_name, info in groups_psd.items():
        mean_psd = np.mean(info["psd"], axis=0)

        color = GROUP_COLORS.get(group_name, None)
        label_mean = DISPLAY_NAMES.get(group_name, group_name)

        plt.semilogy(
            f_ref,
            mean_psd,
            label=label_mean,
            color=color,
            linewidth=1.0,
        )

    plt.xlim(0, F_MAX_PLOT)
    plt.title("Mean PSD comparison")
    plt.xlabel("Frequency [Hz]")
    plt.ylabel("PSD")
    plt.legend(fontsize=9)
    plt.tight_layout()
    plt.show()


# =============================
# ▼グラフ3:特徴量の箱ひげ図
# =============================

def plot_feature_boxplots(all_df: pd.DataFrame, feature_names):
    """
    劣化ステージごとの特徴量分布を比較します。

    平均値だけでなく、
    ばらつきやグループ間の重なりを確認します。

    劣化指標として使いやすい特徴量を探すためのグラフです。
    """

    n = len(feature_names)

    ncols = 2
    nrows = int(np.ceil(n / ncols))

    fig, axes = plt.subplots(nrows, ncols, figsize=(12, 4 * nrows))

    axes = np.array(axes).reshape(-1)

    group_order = list(GROUP_DIRS.keys())

    for i, feat in enumerate(feature_names):
        ax = axes[i]

        data = []
        labels = []

        for g in group_order:
            vals = all_df.loc[all_df["group"] == g, feat].to_numpy(dtype=np.float64)

            if len(vals) > 0:
                data.append(vals)
                labels.append(DISPLAY_NAMES.get(g, g))

        ax.boxplot(
            data,
            labels=labels,
            showmeans=True,
            showfliers=False,
            meanprops=dict(
                marker="^",
                markerfacecolor="green",
                markeredgecolor="green",
                markersize=8,
            ),
        )

        ax.set_title(feat)
        ax.tick_params(axis="x", rotation=20)
        ax.grid(axis="y", alpha=0.3)

    for j in range(i + 1, len(axes)):
        fig.delaxes(axes[j])

    plt.tight_layout()
    plt.show()


# =============================
# ▼メイン処理
# =============================

def main():
    """
    処理の流れ:

    1. 各ステージのCSVを読み込む
    2. FFT特徴量を抽出する
    3. 平均時間波形を比較する
    4. 平均PSDを比較する
    5. 箱ひげ図で特徴量を比較する
    6. all_stage_features.csv を保存する

    このCSVが、
    第10回② 劣化スコア作成コードの入力になります。
    """

    script_dir = os.path.dirname(os.path.abspath(__file__))

    group_feature_tables = {}
    groups_psd = {}
    groups_waveform = {}

    f_master = None
    time_master = None

    # 各ステージフォルダを順番に読み込みます
    for group_name, folder_name in GROUP_DIRS.items():
        folder_path = os.path.join(script_dir, folder_name)

        if not os.path.isdir(folder_path):
            raise RuntimeError(f"フォルダが見つかりません: {folder_path}")

        df, f_ref, psd_arr, time_ref, waveform_arr = load_group_features(
            group_name,
            folder_path,
        )

        if f_master is None:
            f_master = f_ref
        else:
            if f_ref.size != f_master.size or np.max(np.abs(f_ref - f_master)) > 1e-9:
                raise RuntimeError("グループ間で周波数軸が一致しません。")

        if time_master is None:
            time_master = time_ref
        else:
            if time_ref.size != time_master.size or np.max(np.abs(time_ref - time_master)) > 1e-12:
                raise RuntimeError("グループ間で時間軸が一致しません。")

        group_feature_tables[group_name] = df
        groups_psd[group_name] = {"psd": psd_arr}
        groups_waveform[group_name] = {"waveform": waveform_arr}

        print(f"{group_name}: {len(df)} files loaded")

    # 全ステージの特徴量を1つの表に結合します
    all_df = pd.concat(
        group_feature_tables.values(),
        axis=0,
        ignore_index=True,
    )

    # ステージごとの平均・標準偏差を作成します
    summary_rows = []

    for group_name, df in group_feature_tables.items():
        row = {
            "group": group_name,
            "n_files": len(df),
        }

        for feat in COMPARE_FEATURES:
            vals = df[feat].to_numpy(dtype=np.float64)

            row[f"{feat}_mean"] = float(np.mean(vals))
            row[f"{feat}_std"] = float(np.std(vals, ddof=1)) if len(vals) >= 2 else 0.0

        summary_rows.append(row)

    summary_df = pd.DataFrame(summary_rows)

    print("\n=== Group summary ===")

    show_cols = ["group", "n_files"] + [f"{f}_mean" for f in COMPARE_FEATURES]

    print(summary_df[show_cols])

    # グラフを表示します
    plot_mean_time_waveform(groups_waveform, time_master)
    plot_mean_psd(groups_psd, f_master)
    plot_feature_boxplots(all_df, COMPARE_FEATURES)

    # 結果を保存します
    out_dir = os.path.join(script_dir, "fft_stage_comparison")
    os.makedirs(out_dir, exist_ok=True)

    all_df.to_csv(
        os.path.join(out_dir, "all_stage_features.csv"),
        index=False,
    )

    summary_df.to_csv(
        os.path.join(out_dir, "stage_summary.csv"),
        index=False,
    )

    # ステージごとの平均PSDも保存します
    psd_save_rows = []

    for group_name, info in groups_psd.items():
        mean_psd = np.mean(info["psd"], axis=0)

        tmp = pd.DataFrame(
            {
                "frequency_hz": f_master,
                "group": group_name,
                "mean_psd": mean_psd,
            }
        )

        psd_save_rows.append(tmp)

    mean_psd_df = pd.concat(psd_save_rows, axis=0, ignore_index=True)

    mean_psd_df.to_csv(
        os.path.join(out_dir, "mean_psd_by_stage.csv"),
        index=False,
    )

    # ステージごとの平均時間波形も保存します
    waveform_save_rows = []

    for group_name, info in groups_waveform.items():
        mean_waveform = np.mean(info["waveform"], axis=0)

        tmp = pd.DataFrame(
            {
                "time_s": time_master,
                "group": group_name,
                f"mean_{CHANNEL}": mean_waveform,
            }
        )

        waveform_save_rows.append(tmp)

    mean_waveform_df = pd.concat(waveform_save_rows, axis=0, ignore_index=True)

    mean_waveform_df.to_csv(
        os.path.join(out_dir, "mean_time_waveform_by_stage.csv"),
        index=False,
    )

    print(f"\nSaved to: {out_dir}")


if __name__ == "__main__":
    main()

②劣化スコア作成、劣化ステージ予測モデル作成

"""
-----------------------------------------
コード①で作成した all_stage_features.csv を読み込み、
・正常基準で特徴量をZスコア化する
・劣化が進むほど大きくなるスコアを作る
・ステージごとのスコア変化を確認する
・スコアから劣化ステージを推定するモデルを作る
ことを目的にします。

このコードの位置付け
-----------------------------------------
コード①:劣化ステージごとのFFT特徴量を抽出する
コード②:特徴量を使って、劣化スコアと予測式を作る
コード③:日次点検データに劣化スコアと予測式を適用する

処理の流れ
-----------------------------------------
① all_stage_features.csv を読み込む
② normal、stage1、stage2、stage3、stage4 を数値に変換する
③ 正常データを基準にZスコア化する
④ 劣化が進むほど大きくなる単調劣化スコアを作る
⑤ ステージごとのスコア分布を確認する
⑥ 線形回帰で「スコア → 劣化ステージ」の予測式を作る
⑦ 結果CSVを保存する
=========================================================
"""

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LinearRegression


# =========================
# ▼基本設定
# =========================

INPUT_CSV = "all_stage_features.csv"
# コード①で作成したCSVです

OUTPUT_DIR = "degradation_monotonic_analysis"
# 結果の保存先フォルダです

EPS = 1e-12


# =========================
# ▼劣化スコアに使う特徴量
# =========================
#
# 正常基準でZスコア化したうえで、
# 劣化が進むほどスコアが増えるようにします。
#
# dom_freq_hz
#     支配周波数です。
#     今回の実験では、劣化が進むほど下がる想定です。
#     そのため符号を反転して使います。
#
# band_psd_sum_30_80
#     30~80Hz帯の振動エネルギーです。
#     今回の実験では、劣化が進むほど増える想定です。
#     そのためそのまま使います。
#
# 係数が -1.0 の特徴量:
#     劣化で値が下がるため、符号を反転します。
#
# 係数が 1.0 の特徴量:
#     劣化で値が上がるため、そのまま使います。
#

MONOTONIC_FEATURE_RULES = {
    "dom_freq_hz": -1.0,
    "band_psd_sum_30_80": 1.0,
}


# =========================
# ▼ステージ番号の定義
# =========================
#
# normal を0、
# stage1~stage4 を1~4として扱います。
#
# この数値を教師データとして、
# スコアからステージを予測するモデルを作ります。
#

STAGE_MAP = {
    "normal": 0,
    "stage1": 1,
    "stage2": 2,
    "stage3": 3,
    "stage4": 4,
}


# =========================
# ▼データ読み込み
# =========================

def load_data(input_csv: str) -> pd.DataFrame:
    """
    コード①で作成した all_stage_features.csv を読み込みます。

    group列には、
    normal、stage1、stage2、stage3、stage4
    が入っている前提です。
    """

    df = pd.read_csv(input_csv)

    required_cols = ["group"] + list(MONOTONIC_FEATURE_RULES.keys())

    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"必要列 {c} が見つかりません。")

    df = df.copy()

    # group名を数値ステージに変換します
    df["stage_num"] = df["group"].map(STAGE_MAP)

    if df["stage_num"].isna().any():
        bad = df.loc[df["stage_num"].isna(), "group"].unique()
        raise ValueError(f"stage_num に変換できない group があります: {bad}")

    return df


# =========================
# ▼正常基準でZスコア化
# =========================

def add_normal_based_zscores(
    df: pd.DataFrame,
    feature_names,
):
    """
    normalデータの平均と標準偏差を基準にして、
    各特徴量をZスコア化します。

    Zスコアとは、

        Z = (測定値 - 正常平均) / 正常標準偏差

    です。

    これにより、
    単位や大きさが異なる特徴量を同じ尺度で比較できます。
    """

    out = df.copy()

    normal_df = out[out["group"] == "normal"].copy()

    if len(normal_df) == 0:
        raise ValueError("normal データがありません。")

    stats_rows = []

    for feat in feature_names:
        mu = float(normal_df[feat].mean())
        sd = float(normal_df[feat].std(ddof=1))

        if sd <= EPS:
            sd = EPS

        out[f"{feat}_z"] = (out[feat] - mu) / sd

        stats_rows.append(
            {
                "feature": feat,
                "normal_mean": mu,
                "normal_std": sd,
            }
        )

    stats_df = pd.DataFrame(stats_rows)

    return out, stats_df


# =========================
# ▼単調劣化スコア作成
# =========================

def add_monotonic_degradation_score(
    df: pd.DataFrame,
    feature_rules: dict,
):
    """
    複数の特徴量を組み合わせて、
    劣化が進むほど大きくなるスコアを作ります。

    今回は、

        score = -1 × dom_freq_hz_z
                +1 × band_psd_sum_30_80_z

    としています。

    支配周波数が下がる変化と、
    30~80Hz帯エネルギーが増える変化を、
    どちらも「劣化が進む方向」として加算します。
    """

    out = df.copy()

    score = np.zeros(len(out), dtype=np.float64)
    detail_rows = []

    for feat, coef in feature_rules.items():
        z_col = f"{feat}_z"

        if z_col not in out.columns:
            raise ValueError(f"{z_col} が見つかりません。")

        contribution = coef * out[z_col].to_numpy(dtype=np.float64)
        score += contribution

        detail_rows.append(
            {
                "feature": feat,
                "z_column": z_col,
                "coefficient": coef,
                "meaning": (
                    "increase_with_degradation"
                    if coef > 0
                    else "decrease_with_degradation_then_sign_flipped"
                ),
            }
        )

    out["score_degradation_monotonic"] = score

    detail_df = pd.DataFrame(detail_rows)

    return out, detail_df


# =========================
# ▼ステージごとのスコア集計
# =========================

def summarize_by_stage(df: pd.DataFrame):
    """
    各ステージごとに、
    劣化スコアの平均、標準偏差、最小値、最大値を集計します。

    ステージが進むほどスコア平均が大きくなっていれば、
    劣化スコアとして使いやすいことが分かります。
    """

    rows = []

    for g, sub in df.groupby("group"):
        row = {
            "group": g,
            "stage_num": int(sub["stage_num"].iloc[0]),
            "n_files": len(sub),
            "score_mean": float(sub["score_degradation_monotonic"].mean()),
            "score_std": (
                float(sub["score_degradation_monotonic"].std(ddof=1))
                if len(sub) >= 2
                else 0.0
            ),
            "score_min": float(sub["score_degradation_monotonic"].min()),
            "score_max": float(sub["score_degradation_monotonic"].max()),
        }

        rows.append(row)

    summary_df = pd.DataFrame(rows).sort_values("stage_num").reset_index(drop=True)

    return summary_df


# =========================
# ▼可視化①:スコア分布
# =========================

def plot_score_by_stage(df: pd.DataFrame):
    """
    各ステージの劣化スコアを散布図で確認します。

    stage番号が大きくなるほど、
    点が上に移動していれば、
    劣化スコアとして機能していると判断できます。
    """

    groups_in_order = ["normal", "stage1", "stage2", "stage3", "stage4"]

    plt.figure(figsize=(9, 5))

    for g in groups_in_order:
        sub = df[df["group"] == g]

        if len(sub) == 0:
            continue

        plt.scatter(
            [g] * len(sub),
            sub["score_degradation_monotonic"],
            s=70,
        )

    plt.title("Monotonic Degradation Score by Stage")
    plt.ylabel("Score")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()


# =========================
# ▼可視化②:劣化カーブ
# =========================

def plot_degradation_curve(summary_df: pd.DataFrame):
    """
    ステージごとの平均劣化スコアを線で結びます。

    劣化が進むほどスコアが単調に上がっているかを確認します。
    """

    plt.figure(figsize=(8, 5))

    plt.plot(
        summary_df["stage_num"],
        summary_df["score_mean"],
        marker="o",
    )

    plt.xlabel("Stage")
    plt.ylabel("Mean Degradation Score")
    plt.title("Monotonic Degradation Curve")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()


# =========================
# ▼可視化③:ステージ予測モデル
# =========================

def fit_and_plot_prediction(df: pd.DataFrame):
    """
    劣化スコアからステージ番号を推定するモデルを作ります。

    今回は分かりやすさを優先して、
    線形回帰を使います。

        stage = a × score + b

    という形で、
    劣化スコアからステージを推定します。
    """

    X = df[["score_degradation_monotonic"]].to_numpy(dtype=np.float64)
    y = df["stage_num"].to_numpy(dtype=np.float64)

    model = LinearRegression()
    model.fit(X, y)

    coef = float(model.coef_[0])
    intercept = float(model.intercept_)

    score_min = float(df["score_degradation_monotonic"].min())
    score_max = float(df["score_degradation_monotonic"].max())

    x_line = np.linspace(
        score_min - 0.1 * abs(score_min),
        score_max + 0.1 * abs(score_max),
        100,
    )

    y_line = model.predict(x_line.reshape(-1, 1))

    plt.figure(figsize=(8, 5))

    plt.scatter(
        df["score_degradation_monotonic"],
        df["stage_num"],
        label="data",
        s=70,
    )

    plt.plot(
        x_line,
        y_line,
        color="red",
        label="prediction",
    )

    plt.xlabel("Monotonic Degradation Score")
    plt.ylabel("Stage")
    plt.title("Prediction Model")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

    return model, coef, intercept


# =========================
# ▼可視化④:特徴量寄与の確認
# =========================

def plot_feature_contributions(
    df: pd.DataFrame,
    feature_rules: dict,
):
    """
    劣化スコアに対して、
    どの特徴量がどれだけ寄与しているかを確認します。

    今回は、

    ・支配周波数の低下
    ・30~80Hz帯エネルギーの増加

    の2つが、スコアにどのように効いているかを見ます。
    """

    groups_in_order = ["normal", "stage1", "stage2", "stage3", "stage4"]

    rows = []

    for g in groups_in_order:
        sub = df[df["group"] == g]

        if len(sub) == 0:
            continue

        row = {"group": g}

        for feat, coef in feature_rules.items():
            row[f"{feat}_contribution_mean"] = float(
                (coef * sub[f"{feat}_z"]).mean()
            )

        rows.append(row)

    contrib_df = pd.DataFrame(rows)

    plt.figure(figsize=(9, 5))

    x = np.arange(len(contrib_df))
    bottom = np.zeros(len(contrib_df), dtype=np.float64)

    for feat, coef in feature_rules.items():
        col = f"{feat}_contribution_mean"
        vals = contrib_df[col].to_numpy(dtype=np.float64)

        plt.bar(
            x,
            vals,
            bottom=bottom,
            label=feat,
            width=0.65,
        )

        bottom += vals

    plt.xticks(x, contrib_df["group"])
    plt.ylabel("Mean Contribution to Score")
    plt.title("Feature Contributions to Monotonic Score")
    plt.legend()
    plt.grid(axis="y", alpha=0.3)
    plt.tight_layout()
    plt.show()

    return contrib_df


# =========================
# ▼メイン処理
# =========================

def main():
    script_dir = os.path.dirname(os.path.abspath(__file__))

    input_csv = os.path.join(script_dir, INPUT_CSV)

    out_dir = os.path.join(script_dir, OUTPUT_DIR)
    os.makedirs(out_dir, exist_ok=True)

    # 1. コード①で作成した特徴量CSVを読み込みます
    df = load_data(input_csv)

    # 2. 正常基準でZスコア化します
    feature_names = list(MONOTONIC_FEATURE_RULES.keys())

    df_z, normal_stats_df = add_normal_based_zscores(
        df,
        feature_names,
    )

    # 3. 劣化が進むほど大きくなるスコアを作ります
    df_score, score_rule_df = add_monotonic_degradation_score(
        df_z,
        MONOTONIC_FEATURE_RULES,
    )

    # 4. ステージごとにスコアを集計します
    summary_df = summarize_by_stage(df_score)

    print("\n=== Normal-based statistics ===")
    print(normal_stats_df)

    print("\n=== Monotonic score rules ===")
    print(score_rule_df)

    print("\n=== Stage summary ===")
    print(summary_df)

    # 5. グラフを表示します
    plot_score_by_stage(df_score)
    plot_degradation_curve(summary_df)

    model, coef, intercept = fit_and_plot_prediction(df_score)

    contrib_df = plot_feature_contributions(
        df_score,
        MONOTONIC_FEATURE_RULES,
    )

    print("\n=== Prediction model ===")
    print(
        f"stage = {coef:.6f} "
        f"* score_degradation_monotonic + {intercept:.6f}"
    )

    # 6. 結果を保存します
    df_score.to_csv(
        os.path.join(out_dir, "all_stage_features_with_monotonic_score.csv"),
        index=False,
    )

    normal_stats_df.to_csv(
        os.path.join(out_dir, "normal_stats_for_monotonic_score.csv"),
        index=False,
    )

    score_rule_df.to_csv(
        os.path.join(out_dir, "monotonic_score_rule.csv"),
        index=False,
    )

    summary_df.to_csv(
        os.path.join(out_dir, "monotonic_score_stage_summary.csv"),
        index=False,
    )

    contrib_df.to_csv(
        os.path.join(out_dir, "monotonic_score_feature_contributions.csv"),
        index=False,
    )

    model_df = pd.DataFrame(
        [
            {
                "model": "LinearRegression",
                "target": "stage_num",
                "input": "score_degradation_monotonic",
                "coef": coef,
                "intercept": intercept,
            }
        ]
    )

    model_df.to_csv(
        os.path.join(out_dir, "monotonic_prediction_model.csv"),
        index=False,
    )

    print(f"\nSaved to: {out_dir}")


if __name__ == "__main__":
    main()

③日次点検データの劣化判定・残寿命予測

"""
-----------------------------------------
コード②で作成した、
・正常基準統計
・劣化スコアルール
・ステージ予測モデル
を使って、daily_checkフォルダ内の測定データを評価します。

このコードで分かること
-----------------------------------------
・現在の劣化スコア
・前回からの変化量
・推定劣化ステージ
・Normal / Caution / Warning / Critical 判定
・危険レベルに到達する予測日
・残り日数の目安

処理の流れ
-----------------------------------------
① daily_check フォルダのCSVを読み込む
② FFT特徴量を抽出する
③ 正常基準でZスコア化する
④ 劣化スコアを計算する
⑤ ステージ予測モデルで劣化ステージを推定する
⑥ スコア・ステージ・急変量から最終判定する
⑦ 劣化スコアのトレンドから危険到達日を予測する
⑧ グラフとCSVを出力する
=========================================================
"""

import os
import re
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths
from sklearn.linear_model import LinearRegression


# =============================
# ▼基本設定
# =============================

FS = 1000.0
# 第2回で1000Hz測定した前提です

TARGET_DURATION_S = 10.0
# 各CSVファイルの先頭10秒を解析します

N_SAMPLES = int(FS * TARGET_DURATION_S)

CHANNEL = "acc_total"
# 3軸合成加速度を解析対象にします

F_MIN = 5.0
# 5Hz未満は低周波の揺れや姿勢変化の影響を受けやすいため除外します

F_MAX_PLOT = 500.0

F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
# 支配周波数を探す範囲です

N_PERSEG = 2048
# Welch法でPSDを計算するときの分割長です

EPS = 1e-12

BANDS_HZ = [
    (5.0, 10.0),
    (10.0, 30.0),
    (30.0, 80.0),
    (80.0, 200.0),
    (200.0, 400.0),
    (400.0, 500.0),
]


# =============================
# ▼ユーザーが主に変更する場所
# =============================

DAILY_CHECK_DIR = "daily_check"
# 日次点検データを入れるフォルダです

FILE_PATTERN = "*.csv"
# daily_checkフォルダ内で読み込むCSVのパターンです

NORMAL_STATS_CSV = "normal_stats_for_monotonic_score.csv"
# コード②で作成した正常基準統計です

SCORE_RULE_CSV = "monotonic_score_rule.csv"
# コード②で作成した劣化スコアルールです

PRED_MODEL_CSV = "monotonic_prediction_model.csv"
# コード②で作成したステージ予測モデルです

OUTPUT_DIR = "daily_check_result"
# 結果保存先です

USE_MOVING_AVERAGE = True
# トレンドグラフに移動平均を表示するかどうかです

MOVING_AVERAGE_WINDOW = 3
# 移動平均の点数です


# =============================
# ▼スコアの閾値
# =============================
#
# score_degradation_monotonic の値によって、
# Normal / Caution / Warning / Critical を判定します。
#

THRESHOLD_CAUTION = -5.0
THRESHOLD_WARNING = 0.0
THRESHOLD_DANGER = 5.0


# =============================
# ▼急変判定の閾値
# =============================
#
# 前回測定からスコアが急に上がった場合は、
# 通常の劣化レベルよりも優先して警告します。
#

RAPID_CHANGE_THRESHOLD_WARNING = 2.5
RAPID_CHANGE_THRESHOLD_CRITICAL = 4.0


# =============================
# ▼ステージ判定の閾値
# =============================

STAGE_WARNING_THRESHOLD = 2.0
STAGE_CRITICAL_THRESHOLD = 3.0


# =============================
# ▼残寿命予測の設定
# =============================

MIN_POINTS_FOR_FORECAST = 3
# 予測に必要な最小データ点数です

FORECAST_HORIZON_DAYS = 30
# 予測線を描く最大日数です

FORECAST_USE_LAST_N = None
# Noneなら全点で回帰します
# 例:5にすると直近5点だけで回帰します


# =============================
# ▼表示名
# =============================

STAGE_NAME_MAP = {
    0: "Normal",
    1: "Stage 1",
    2: "Stage 2",
    3: "Stage 3",
    4: "Stage 4",
}

LEVEL_NAME_MAP = {
    0: "Normal",
    1: "Caution",
    2: "Warning",
    3: "Critical",
}

LEVEL_COLOR_MAP = {
    0: "blue",
    1: "green",
    2: "orange",
    3: "red",
}

STAGE_COLOR_MAP = {
    0: "blue",
    1: "green",
    2: "orange",
    3: "red",
    4: "purple",
}


# =============================
# ▼前処理:1000Hz、10秒にそろえる
# =============================

def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
    """
    CSVを読み込み、time_usを使って1000Hzの等間隔データに補正します。

    ESP32の測定データは、厳密には完全な等間隔ではないため、
    FFT前に時間軸をそろえます。
    """

    df = pd.read_csv(csv_path)

    required_cols = ["time_us", "ax", "ay", "az", "acc_total"]

    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"列 {c} が見つかりません。file={csv_path}")

    t = df["time_us"].to_numpy(dtype=np.float64) / 1e6
    t = t - t[0]

    # 時刻の重複や逆行を除外します
    keep = np.ones_like(t, dtype=bool)
    keep[1:] = t[1:] > t[:-1]

    t = t[keep]
    df = df.loc[keep].reset_index(drop=True)

    if t.size < 2:
        raise ValueError(f"time列が短すぎます。file={csv_path}")

    t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS
    out = pd.DataFrame({"time_s": t_uniform})

    t_min = t[0]
    t_max = t[-1]

    for col in ["ax", "ay", "az", "acc_total"]:
        y = df[col].to_numpy(dtype=np.float64)

        f = interp1d(
            t,
            y,
            kind="linear",
            bounds_error=False,
            fill_value="extrapolate",
        )

        y_uniform = f(t_uniform)

        valid = (t_uniform >= t_min) & (t_uniform <= t_max)
        y_uniform = np.where(valid, y_uniform, 0.0)

        out[col] = y_uniform

    return out


# =============================
# ▼Welch法でPSDを計算
# =============================

def compute_psd(x: np.ndarray):
    """
    Welch法でPSDを計算します。

    PSDを見ることで、
    どの周波数帯にどれだけ振動エネルギーがあるかを確認できます。
    """

    x = np.asarray(x, dtype=np.float64)

    # DC成分を除去します
    x = x - np.mean(x)

    f, Pxx = welch(
        x,
        fs=FS,
        nperseg=N_PERSEG,
    )

    return f, Pxx


# =============================
# ▼支配ピークの特徴量
# =============================

def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
    """
    支配周波数、支配周波数の強さ、ピーク幅、ピークの鋭さを計算します。
    """

    m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)

    f_dom = f[m_dom]
    P_dom = Pxx[m_dom]

    feats = {}

    if f_dom.size < 3:
        feats["dom_freq_hz"] = 0.0
        feats["dom_psd"] = 0.0
        feats["peak_width_hz"] = 0.0
        feats["peak_sharpness"] = 0.0
        return feats

    idx_peak_local = int(np.argmax(P_dom))

    dom_freq = float(f_dom[idx_peak_local])
    dom_psd = float(P_dom[idx_peak_local])

    peaks, _ = find_peaks(P_dom)

    if peaks.size == 0:
        peaks = np.array([idx_peak_local], dtype=int)

    peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
    pidx = int(peaks_sorted[0])

    widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)

    width_bins = float(widths[0])
    df_hz = float(f_dom[1] - f_dom[0])
    peak_width_hz = width_bins * df_hz

    span_hz = 4.0
    span_bins = max(1, int(round(span_hz / df_hz)))

    lo = max(0, pidx - span_bins)
    hi = min(P_dom.size, pidx + span_bins + 1)

    if hi - lo >= 3:
        neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
    else:
        neighborhood = P_dom[lo:hi]

    if neighborhood.size > 0:
        neigh_mean = float(np.mean(neighborhood))
    else:
        neigh_mean = float(np.mean(P_dom))

    peak_sharpness = np.log10(dom_psd / (neigh_mean + EPS) + 1.0)

    feats["dom_freq_hz"] = dom_freq
    feats["dom_psd"] = dom_psd
    feats["peak_width_hz"] = float(peak_width_hz)
    feats["peak_sharpness"] = float(peak_sharpness)

    return feats


# =============================
# ▼スペクトル特徴量
# =============================

def spectral_features(f: np.ndarray, Pxx: np.ndarray):
    """
    FFTスペクトル全体から特徴量を作ります。

    コード②で使った特徴量と同じ定義にしておくことで、
    日次データにも同じ劣化スコアを適用できます。
    """

    feats = {}

    m_ge = f >= F_MIN

    f_ge = f[m_ge]
    P_ge = Pxx[m_ge]

    feats["psd_sum_ge5"] = float(np.sum(P_ge))

    for f1, f2 in BANDS_HZ:
        m = (f >= f1) & (f < f2)
        feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))

    denom = float(np.sum(P_ge))

    if denom > 0:
        feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
    else:
        feats["spec_centroid_hz_ge5"] = 0.0

    p_hi_80_500 = float(np.sum(Pxx[f >= 80.0]))
    p_lo_5_80 = float(np.sum(Pxx[(f >= 5.0) & (f < 80.0)]))

    feats["hi_lo_ratio_ge80_over_5to80"] = p_hi_80_500 / (p_lo_5_80 + EPS)

    p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
    p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))

    feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)

    return feats


# =============================
# ▼1ファイル分の特徴量抽出
# =============================

def extract_all_features(csv_path: str):
    prep = load_and_resample_10s(csv_path)

    x = prep[CHANNEL].to_numpy(dtype=np.float64)

    f, Pxx = compute_psd(x)

    feats = {}
    feats.update(peak_features_from_psd(f, Pxx))
    feats.update(spectral_features(f, Pxx))

    return feats, f, Pxx


# =============================
# ▼ファイル名から日付を抽出
# =============================

def extract_date_from_filename(file_name: str):
    """
    ファイル名から日付を抽出します。

    対応例:
    acc_2026-06-01.csv
    acc_20260601.csv

    日付が取れない場合は NaT になります。
    """

    m1 = re.search(r"(20\d{2}-\d{2}-\d{2})", file_name)

    if m1:
        return pd.to_datetime(m1.group(1), format="%Y-%m-%d")

    m2 = re.search(r"(20\d{6})", file_name)

    if m2:
        return pd.to_datetime(m2.group(1), format="%Y%m%d")

    return pd.NaT


# =============================
# ▼コード②の出力ファイルを読み込む
# =============================

def load_normal_stats(csv_path: str):
    df = pd.read_csv(csv_path)

    required_cols = ["feature", "normal_mean", "normal_std"]

    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"{csv_path} に必要列 {c} がありません。")

    return df


def load_score_rule(csv_path: str):
    df = pd.read_csv(csv_path)

    required_cols = ["feature", "z_column", "coefficient"]

    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"{csv_path} に必要列 {c} がありません。")

    return df


def load_prediction_model(csv_path: str):
    df = pd.read_csv(csv_path)

    required_cols = ["coef", "intercept"]

    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"{csv_path} に必要列 {c} がありません。")

    return float(df.loc[0, "coef"]), float(df.loc[0, "intercept"])


# =============================
# ▼日次点検データを特徴量テーブルに変換
# =============================

def build_daily_feature_table(daily_dir: str, file_pattern: str):
    files = sorted(glob.glob(os.path.join(daily_dir, file_pattern)))

    if len(files) == 0:
        raise RuntimeError(f"CSVが見つかりません: {daily_dir}")

    rows = []

    for p in files:
        file_name = os.path.basename(p)

        check_date = extract_date_from_filename(file_name)

        feats, _, _ = extract_all_features(p)

        row = {
            "file": file_name,
            "date": check_date,
        }

        row.update(feats)

        rows.append(row)

    df = pd.DataFrame(rows)
    df = df.sort_values(["date", "file"]).reset_index(drop=True)

    return df


# =============================
# ▼正常基準でZスコア化
# =============================

def add_normal_based_zscores(
    df: pd.DataFrame,
    normal_stats_df: pd.DataFrame,
):
    """
    コード②で作成した正常基準を使って、
    daily_checkデータをZスコア化します。
    """

    out = df.copy()

    for _, row in normal_stats_df.iterrows():
        feat = row["feature"]
        mu = float(row["normal_mean"])
        sd = float(row["normal_std"])

        if sd <= EPS:
            sd = EPS

        if feat not in out.columns:
            raise ValueError(f"特徴量 {feat} が daily_check データにありません。")

        out[f"{feat}_z"] = (out[feat] - mu) / sd

    return out


# =============================
# ▼劣化スコア計算
# =============================

def add_monotonic_score(
    df: pd.DataFrame,
    score_rule_df: pd.DataFrame,
):
    """
    コード②で作成したスコアルールを使い、
    劣化スコアを計算します。

    例:
    score = -1 × dom_freq_hz_z
            +1 × band_psd_sum_30_80_z
    """

    out = df.copy()

    score = np.zeros(len(out), dtype=np.float64)

    for _, row in score_rule_df.iterrows():
        z_col = row["z_column"]
        coef = float(row["coefficient"])

        if z_col not in out.columns:
            raise ValueError(f"Zスコア列 {z_col} がありません。")

        score += coef * out[z_col].to_numpy(dtype=np.float64)

    out["score_degradation_monotonic"] = score

    return out


# =============================
# ▼ステージ推定
# =============================

def add_predicted_stage(
    df: pd.DataFrame,
    coef: float,
    intercept: float,
):
    """
    コード②で作成した線形回帰モデルを使い、
    劣化スコアからステージを推定します。
    """

    out = df.copy()

    out["pred_stage_continuous"] = (
        coef * out["score_degradation_monotonic"] + intercept
    )

    out["pred_stage_continuous_clipped"] = out["pred_stage_continuous"].clip(
        lower=0.0,
        upper=4.0,
    )

    out["pred_stage_rounded"] = np.round(
        out["pred_stage_continuous_clipped"]
    ).astype(int)

    out["pred_stage_label"] = out["pred_stage_rounded"].map(STAGE_NAME_MAP)

    return out


# =============================
# ▼レベル判定
# =============================

def score_to_level(score: float) -> int:
    if score >= THRESHOLD_DANGER:
        return 3
    elif score >= THRESHOLD_WARNING:
        return 2
    elif score >= THRESHOLD_CAUTION:
        return 1
    else:
        return 0


def stage_to_level(pred_stage: float) -> int:
    if pred_stage >= STAGE_CRITICAL_THRESHOLD:
        return 3
    elif pred_stage >= STAGE_WARNING_THRESHOLD:
        return 2
    elif pred_stage >= 1.0:
        return 1
    else:
        return 0


def delta_to_level(delta_score: float) -> int:
    if delta_score >= RAPID_CHANGE_THRESHOLD_CRITICAL:
        return 3
    elif delta_score >= RAPID_CHANGE_THRESHOLD_WARNING:
        return 2
    else:
        return 0


# =============================
# ▼最終判定
# =============================

def add_final_judgment(df: pd.DataFrame):
    """
    最終判定は、次の3つの観点の最大値で決めます。

    ① スコア水準
    ② 推定ステージ
    ③ 前回からの急変量

    急変がある場合は、劣化スコアがまだ低くても警告します。
    """

    out = df.copy().sort_values(["date", "file"]).reset_index(drop=True)

    score_based_levels = []
    stage_based_levels = []
    rapid_change_levels = []
    delta_scores = []
    final_levels = []
    comments = []

    prev_score = None

    for _, row in out.iterrows():
        score = float(row["score_degradation_monotonic"])
        pred_stage = float(row["pred_stage_continuous_clipped"])

        score_level = score_to_level(score)
        stage_level = stage_to_level(pred_stage)

        if prev_score is None:
            delta_score = np.nan
            rapid_level = 0
        else:
            delta_score = score - prev_score
            rapid_level = delta_to_level(delta_score)

        final_level = max(score_level, stage_level, rapid_level)

        if rapid_level >= 2:
            if final_level == 3:
                comment = "急変が検出されました。重大異常の可能性があります。至急点検してください。"
            else:
                comment = "急変が検出されました。通常劣化より優先して確認してください。"
        else:
            if final_level == 0:
                comment = "Normal. Continue routine monitoring."
            elif final_level == 1:
                comment = "Caution. A slight change is observed. Continue trend monitoring."
            elif final_level == 2:
                comment = "Warning. Degradation is progressing or abnormal change is detected. Increase inspection frequency."
            else:
                comment = "Critical. Severe abnormality or rapid change is detected. Immediate inspection is recommended."

        score_based_levels.append(score_level)
        stage_based_levels.append(stage_level)
        rapid_change_levels.append(rapid_level)
        delta_scores.append(delta_score)
        final_levels.append(final_level)
        comments.append(comment)

        prev_score = score

    out["delta_score_from_previous"] = delta_scores

    out["score_based_level"] = score_based_levels
    out["score_based_label"] = [LEVEL_NAME_MAP[x] for x in score_based_levels]

    out["stage_based_level"] = stage_based_levels
    out["stage_based_label"] = [LEVEL_NAME_MAP[x] for x in stage_based_levels]

    out["rapid_change_level"] = rapid_change_levels
    out["rapid_change_label"] = [LEVEL_NAME_MAP[x] for x in rapid_change_levels]

    out["final_level"] = final_levels
    out["final_label"] = [LEVEL_NAME_MAP[x] for x in final_levels]

    out["judgment_comment"] = comments

    return out


# =============================
# ▼残寿命予測
# =============================

def fit_score_forecast(df: pd.DataFrame):
    """
    劣化スコアの時系列に線形回帰を当てはめ、
    Dangerしきい値に到達する日を予測します。

    これは簡易的な残寿命予測です。
    実設備では、より多くのデータと妥当性検証が必要です。
    """

    plot_df = df.copy().sort_values("date").reset_index(drop=True)

    if len(plot_df) < MIN_POINTS_FOR_FORECAST:
        return {
            "forecast_available": False,
            "reason": f"Need at least {MIN_POINTS_FOR_FORECAST} points.",
        }

    if plot_df["date"].isna().any():
        return {
            "forecast_available": False,
            "reason": "Some dates are missing.",
        }

    fit_df = plot_df.copy()

    if FORECAST_USE_LAST_N is not None and len(fit_df) > FORECAST_USE_LAST_N:
        fit_df = fit_df.iloc[-FORECAST_USE_LAST_N:].copy()

    start_date = fit_df["date"].min()

    fit_df["day_index"] = (fit_df["date"] - start_date).dt.days.astype(float)

    X = fit_df[["day_index"]].to_numpy(dtype=np.float64)
    y = fit_df["score_degradation_monotonic"].to_numpy(dtype=np.float64)

    model = LinearRegression()
    model.fit(X, y)

    slope = float(model.coef_[0])
    intercept = float(model.intercept_)

    latest_date = plot_df["date"].max()

    latest_score = float(
        plot_df.loc[
            plot_df["date"] == latest_date,
            "score_degradation_monotonic",
        ].iloc[-1]
    )

    if latest_score >= THRESHOLD_DANGER:
        return {
            "forecast_available": True,
            "already_in_danger": True,
            "danger_date": latest_date,
            "days_to_danger": 0.0,
            "slope_per_day": slope,
            "intercept": intercept,
            "start_date": start_date,
            "fit_df": fit_df,
            "model": model,
            "forecast_df": None,
            "reason": "Already in danger zone.",
        }

    if slope <= 0:
        return {
            "forecast_available": True,
            "already_in_danger": False,
            "danger_date": pd.NaT,
            "days_to_danger": np.inf,
            "slope_per_day": slope,
            "intercept": intercept,
            "start_date": start_date,
            "fit_df": fit_df,
            "model": model,
            "forecast_df": None,
            "reason": "Trend is flat or decreasing; danger entry is not predicted.",
        }

    danger_day_index = (THRESHOLD_DANGER - intercept) / slope

    if danger_day_index < 0:
        danger_date = start_date
    else:
        danger_date = start_date + pd.to_timedelta(danger_day_index, unit="D")

    days_to_danger = (danger_date - latest_date) / pd.Timedelta(days=1)

    future_end_date = min(
        latest_date + pd.Timedelta(days=FORECAST_HORIZON_DAYS),
        danger_date + pd.Timedelta(days=5),
    )

    future_dates = pd.date_range(
        start=latest_date,
        end=future_end_date,
        freq="D",
    )

    future_day_index = (future_dates - start_date) / pd.Timedelta(days=1)

    future_scores = model.predict(
        np.array(future_day_index).reshape(-1, 1)
    )

    forecast_df = pd.DataFrame(
        {
            "date": future_dates,
            "forecast_score": future_scores,
        }
    )

    return {
        "forecast_available": True,
        "already_in_danger": False,
        "danger_date": danger_date,
        "days_to_danger": float(days_to_danger),
        "slope_per_day": slope,
        "intercept": intercept,
        "start_date": start_date,
        "fit_df": fit_df,
        "model": model,
        "forecast_df": forecast_df,
        "reason": "Forecast generated.",
    }


# =============================
# ▼グラフ1:劣化スコアトレンド
# =============================

def plot_daily_trend(df: pd.DataFrame, forecast_info: dict):
    plot_df = df.copy().sort_values("date")

    fig, ax = plt.subplots(figsize=(12, 5))

    y_min = min(
        plot_df["score_degradation_monotonic"].min() - 1.0,
        THRESHOLD_CAUTION - 1.0,
    )

    y_max_candidate = max(
        plot_df["score_degradation_monotonic"].max() + 1.0,
        THRESHOLD_DANGER + 1.0,
    )

    if forecast_info.get("forecast_df") is not None:
        y_max_candidate = max(
            y_max_candidate,
            forecast_info["forecast_df"]["forecast_score"].max() + 1.0,
        )

    ax.axhspan(y_min, THRESHOLD_CAUTION, facecolor="lightgreen", alpha=0.25, zorder=0)
    ax.axhspan(THRESHOLD_CAUTION, THRESHOLD_WARNING, facecolor="red", alpha=0.08, zorder=0)
    ax.axhspan(THRESHOLD_WARNING, THRESHOLD_DANGER, facecolor="red", alpha=0.16, zorder=0)
    ax.axhspan(THRESHOLD_DANGER, y_max_candidate, facecolor="red", alpha=0.28, zorder=0)

    x_left = plot_df["date"].min()

    ax.text(x_left, (y_min + THRESHOLD_CAUTION) / 2, "Normal", va="center", ha="left", fontsize=10)
    ax.text(x_left, (THRESHOLD_CAUTION + THRESHOLD_WARNING) / 2, "Caution", va="center", ha="left", fontsize=10)
    ax.text(x_left, (THRESHOLD_WARNING + THRESHOLD_DANGER) / 2, "Warning", va="center", ha="left", fontsize=10)
    ax.text(x_left, (THRESHOLD_DANGER + y_max_candidate) / 2, "Danger", va="center", ha="left", fontsize=10)

    for level, sub in plot_df.groupby("final_level"):
        ax.scatter(
            sub["date"],
            sub["score_degradation_monotonic"],
            s=70,
            color=LEVEL_COLOR_MAP.get(level, "gray"),
            label=LEVEL_NAME_MAP.get(level, str(level)),
            zorder=3,
        )

    ax.plot(
        plot_df["date"],
        plot_df["score_degradation_monotonic"],
        linewidth=1.4,
        color="black",
        alpha=0.8,
        label="Measured score",
        zorder=2,
    )

    if USE_MOVING_AVERAGE and len(plot_df) >= MOVING_AVERAGE_WINDOW:
        ma = plot_df["score_degradation_monotonic"].rolling(
            MOVING_AVERAGE_WINDOW
        ).mean()

        ax.plot(
            plot_df["date"],
            ma,
            linewidth=2.0,
            linestyle="--",
            color="tab:orange",
            label=f"{MOVING_AVERAGE_WINDOW}-point moving average",
            zorder=4,
        )

    rapid_df = plot_df[plot_df["rapid_change_level"] >= 2]

    if len(rapid_df) > 0:
        ax.scatter(
            rapid_df["date"],
            rapid_df["score_degradation_monotonic"],
            s=180,
            facecolors="none",
            edgecolors="red",
            linewidths=2.0,
            label="Rapid change",
            zorder=5,
        )

    latest = plot_df.iloc[-1]

    ax.scatter(
        [latest["date"]],
        [latest["score_degradation_monotonic"]],
        s=220,
        facecolors="none",
        edgecolors="black",
        linewidths=2.2,
        label="Latest",
        zorder=6,
    )

    if forecast_info.get("forecast_available", False):
        if forecast_info.get("already_in_danger", False):
            ax.text(
                latest["date"],
                latest["score_degradation_monotonic"] + 0.4,
                "Already in Danger",
                color="red",
                fontsize=11,
                ha="left",
            )

        elif forecast_info.get("forecast_df") is not None:
            forecast_df = forecast_info["forecast_df"].copy()
            forecast_df = forecast_df[forecast_df["date"] >= latest["date"]].copy()

            ax.plot(
                forecast_df["date"],
                forecast_df["forecast_score"],
                linestyle=":",
                linewidth=2.2,
                color="red",
                label="Forecast score",
                zorder=4,
            )

            danger_date = forecast_info.get("danger_date", pd.NaT)

            if pd.notna(danger_date):
                ax.scatter(
                    [danger_date],
                    [THRESHOLD_DANGER],
                    s=160,
                    marker="X",
                    color="red",
                    label="Predicted danger entry",
                    zorder=7,
                )

                ax.text(
                    danger_date,
                    THRESHOLD_DANGER + 0.35,
                    f"Predicted danger: {danger_date.strftime('%m-%d')}",
                    color="red",
                    fontsize=10,
                    ha="left",
                )

    ax.xaxis.set_major_locator(mdates.DayLocator())
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d"))

    if forecast_info.get("forecast_df") is not None:
        ax.set_xlim(
            plot_df["date"].min() - pd.Timedelta(days=0.2),
            forecast_info["forecast_df"]["date"].max() + pd.Timedelta(days=0.2),
        )
    else:
        ax.set_xlim(
            plot_df["date"].min() - pd.Timedelta(days=0.2),
            plot_df["date"].max() + pd.Timedelta(days=0.2),
        )

    ax.set_ylim(y_min, y_max_candidate)

    ax.set_title("Daily Degradation Score Trend")
    ax.set_xlabel("Date")
    ax.set_ylabel("Monotonic Degradation Score")
    ax.grid(True, alpha=0.3)

    handles, labels = ax.get_legend_handles_labels()
    unique = dict(zip(labels, handles))

    ax.legend(unique.values(), unique.keys(), loc="best")

    plt.tight_layout()
    plt.show()


# =============================
# ▼グラフ2:推定ステージトレンド
# =============================

def plot_stage_trend(
    df: pd.DataFrame,
    forecast_info: dict,
    pred_model_coef: float,
    pred_model_intercept: float,
):
    plot_df = df.copy().sort_values("date")

    fig, ax = plt.subplots(figsize=(12, 5))

    colors = [
        STAGE_COLOR_MAP.get(int(s), "gray")
        for s in plot_df["pred_stage_rounded"]
    ]

    ax.scatter(
        plot_df["date"],
        plot_df["pred_stage_continuous_clipped"],
        s=80,
        c=colors,
    )

    ax.plot(
        plot_df["date"],
        plot_df["pred_stage_continuous_clipped"],
        linewidth=1.4,
        color="black",
        alpha=0.8,
        label="Measured stage",
    )

    latest = plot_df.iloc[-1]

    ax.scatter(
        [latest["date"]],
        [latest["pred_stage_continuous_clipped"]],
        s=220,
        facecolors="none",
        edgecolors="black",
        linewidths=2.2,
        label="Latest",
    )

    if forecast_info.get("forecast_df") is not None:
        forecast_df = forecast_info["forecast_df"].copy()

        forecast_df["forecast_stage"] = (
            pred_model_coef * forecast_df["forecast_score"] + pred_model_intercept
        )

        forecast_df["forecast_stage"] = forecast_df["forecast_stage"].clip(
            lower=0.0,
            upper=4.0,
        )

        ax.plot(
            forecast_df["date"],
            forecast_df["forecast_stage"],
            linestyle=":",
            linewidth=2.2,
            color="red",
            label="Forecast stage",
        )

    ax.xaxis.set_major_locator(mdates.DayLocator())
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d"))

    if forecast_info.get("forecast_df") is not None:
        ax.set_xlim(
            plot_df["date"].min() - pd.Timedelta(days=0.2),
            forecast_info["forecast_df"]["date"].max() + pd.Timedelta(days=0.2),
        )
    else:
        ax.set_xlim(
            plot_df["date"].min() - pd.Timedelta(days=0.2),
            plot_df["date"].max() + pd.Timedelta(days=0.2),
        )

    ax.set_yticks([0, 1, 2, 3, 4])
    ax.set_yticklabels(["Normal", "Stage1", "Stage2", "Stage3", "Stage4"])

    ax.set_ylim(-0.2, 4.2)

    ax.set_title("Predicted Degradation Stage Trend")
    ax.set_xlabel("Date")
    ax.set_ylabel("Predicted Stage")
    ax.grid(True, alpha=0.3)
    ax.legend(loc="best")

    plt.tight_layout()
    plt.show()


# =============================
# ▼最新状態表示
# =============================

def print_current_status(df: pd.DataFrame, forecast_info: dict):
    latest = df.sort_values(["date", "file"]).iloc[-1]

    print("\n=== Current status ===")
    print(f"File                 : {latest['file']}")
    print(f"Date                 : {latest['date'].strftime('%Y-%m-%d') if pd.notna(latest['date']) else latest['date']}")
    print(f"Degradation score    : {latest['score_degradation_monotonic']:.4f}")

    if pd.notna(latest["delta_score_from_previous"]):
        print(f"Delta from previous  : {latest['delta_score_from_previous']:.4f}")
    else:
        print("Delta from previous  : N/A")

    print(f"Predicted stage      : {latest['pred_stage_continuous_clipped']:.2f}")
    print(f"Stage label          : {latest['pred_stage_label']}")
    print(f"Score-based level    : {latest['score_based_label']}")
    print(f"Stage-based level    : {latest['stage_based_label']}")
    print(f"Rapid-change level   : {latest['rapid_change_label']}")
    print(f"Final judgment       : {latest['final_label']}")
    print(f"Comment              : {latest['judgment_comment']}")

    print("\n=== Remaining useful life estimate ===")

    if not forecast_info.get("forecast_available", False):
        print(f"Forecast unavailable : {forecast_info.get('reason', 'Unknown reason')}")

    elif forecast_info.get("already_in_danger", False):
        print("Danger status        : Already in danger zone now.")
        print(f"Danger date          : {forecast_info['danger_date'].strftime('%Y-%m-%d')}")

    else:
        danger_date = forecast_info.get("danger_date", pd.NaT)
        days_to_danger = forecast_info.get("days_to_danger", np.inf)

        if pd.isna(danger_date) or np.isinf(days_to_danger):
            print("Danger forecast      : Not predicted")
            print(f"Reason               : {forecast_info.get('reason', '')}")

        else:
            print(f"Days to danger       : {days_to_danger:.1f} days")
            print(f"Predicted danger date: {danger_date.strftime('%Y-%m-%d')}")
            print(f"Trend slope          : {forecast_info.get('slope_per_day', np.nan):.4f} score/day")


# =============================
# ▼メイン処理
# =============================

def main():
    script_dir = os.path.dirname(os.path.abspath(__file__))

    daily_dir = os.path.join(script_dir, DAILY_CHECK_DIR)

    normal_stats_csv = os.path.join(script_dir, NORMAL_STATS_CSV)
    score_rule_csv = os.path.join(script_dir, SCORE_RULE_CSV)
    pred_model_csv = os.path.join(script_dir, PRED_MODEL_CSV)

    out_dir = os.path.join(script_dir, OUTPUT_DIR)

    os.makedirs(out_dir, exist_ok=True)

    if not os.path.isdir(daily_dir):
        raise RuntimeError(f"daily_check フォルダが見つかりません: {daily_dir}")

    normal_stats_df = load_normal_stats(normal_stats_csv)
    score_rule_df = load_score_rule(score_rule_csv)
    model_coef, model_intercept = load_prediction_model(pred_model_csv)

    daily_df = build_daily_feature_table(daily_dir, FILE_PATTERN)
    daily_z_df = add_normal_based_zscores(daily_df, normal_stats_df)
    daily_score_df = add_monotonic_score(daily_z_df, score_rule_df)
    daily_pred_df = add_predicted_stage(daily_score_df, model_coef, model_intercept)
    daily_result_df = add_final_judgment(daily_pred_df)

    forecast_info = fit_score_forecast(daily_result_df)

    print("\n=== Daily check results ===")

    show_cols = [
        "date",
        "file",
        "score_degradation_monotonic",
        "delta_score_from_previous",
        "pred_stage_continuous_clipped",
        "pred_stage_label",
        "score_based_label",
        "stage_based_label",
        "rapid_change_label",
        "final_label",
    ]

    print(daily_result_df[show_cols])

    print_current_status(daily_result_df, forecast_info)

    plot_daily_trend(daily_result_df, forecast_info)

    plot_stage_trend(
        daily_result_df,
        forecast_info,
        model_coef,
        model_intercept,
    )

    daily_result_df.to_csv(
        os.path.join(out_dir, "daily_check_scored_results.csv"),
        index=False,
    )

    latest = daily_result_df.sort_values(["date", "file"]).iloc[-1:]

    latest.to_csv(
        os.path.join(out_dir, "latest_status.csv"),
        index=False,
    )

    forecast_summary = pd.DataFrame(
        [
            {
                "forecast_available": forecast_info.get("forecast_available", False),
                "already_in_danger": forecast_info.get("already_in_danger", False),
                "danger_date": forecast_info.get("danger_date", pd.NaT),
                "days_to_danger": forecast_info.get("days_to_danger", np.nan),
                "slope_per_day": forecast_info.get("slope_per_day", np.nan),
                "reason": forecast_info.get("reason", ""),
            }
        ]
    )

    forecast_summary.to_csv(
        os.path.join(out_dir, "rul_forecast_summary.csv"),
        index=False,
    )

    if forecast_info.get("forecast_df") is not None:
        forecast_info["forecast_df"].to_csv(
            os.path.join(out_dir, "rul_forecast_curve.csv"),
            index=False,
        )

    print(f"\nSaved to: {out_dir}")


if __name__ == "__main__":
    main()

スポンサーリンク
加速度センサー
Follow
この記事を書いた人

【経歴】
関東在住、40代、製造業(品質部門)。
これまで、研究開発、設計、生産技術、仕入先の品質管理を手掛ける。

【保有知識・技術分野】
統計学、信頼性工学、品質工学。
半導体、基板、有機材料、金属、セラミックスの材料、製造、加工技術。
部品加工(機械加工、化学処理)、組立・実装技術、分析・物理解析技術。
QC検定1級保有。

【当サイトについて】
品質・生産の基礎知識をテーマに、用語の解説、使い方(作り方)、メリット、考え方のポイントを分かりやすく解説しています。
某メーカ様の品質教育用の資料としてもご活用いただいております。
QC検定(品質管理検定)の試験対策、おすすめ勉強法も紹介しています。

Follow
QCとらのまき

コメント