【第6回】異常は見つけられるのか?振動データで異常検知してみた

加速度センサー

※当サイトは、アフィリエイト広告を利用しています

概要

加速度センサーシリーズの続編です。

前回は、振動データから定速運転区間を自動で抽出しました。

これにより、人がグラフを見て解析区間を選ばなくても、安定したデータを自動で取り出せるようになりました。
しかし、定速区間が抽出できても、正常か異常か判断できません。
そこで今回は、定速区間から特徴量を計算し、正常状態と比較することで異常を検知する仕組みを作ります。

詳しくは、以下のYouTube動画をご覧ください。

【第6回】異常は見つけられるのか?振動データで異常検知してみた

配線

プログラムコード

マイコン(ESP32)

・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・1000Hz周期でデータ取得
・CSV形式でシリアル出力

#include <Wire.h>
#include <MPU6050_tockn.h>

MPU6050 mpu(Wire);

const unsigned long SAMPLE_US = 1000; // 1000us = 1000Hz
unsigned long last_us = 0;

void setup() {
  Serial.begin(115200);
  Wire.begin(21, 22);
  Wire.setClock(400000); 

  mpu.begin();
  mpu.calcGyroOffsets(true);

  Serial.println("time_us,ax,ay,az,acc_total");
}

void loop() {
  unsigned long now = micros();
  if (now - last_us < SAMPLE_US) return;
  last_us = now;

  mpu.update();

  float ax = mpu.getAccX();
  float ay = mpu.getAccY();
  float az = mpu.getAccZ();
  float acc_total = sqrt(ax*ax + ay*ay + az*az);

  Serial.print(now);
  Serial.print(",");
  Serial.print(ax, 4);
  Serial.print(",");
  Serial.print(ay, 4);
  Serial.print(",");
  Serial.print(az, 4);
  Serial.print(",");
  Serial.println(acc_total, 4);
}

Python

次の2つのコードを使っています。

正常データから閾値を作るコード

import os
import glob
import numpy as np
import pandas as pd

from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths


# ==================================================
# ▼ここだけ設定すればOK
# ==================================================

FS = 1000.0
# 1000Hzで測定した前提です

TARGET_DURATION_S = 10.0
# 各CSVファイルの先頭10秒を解析します

N_SAMPLES = int(FS * TARGET_DURATION_S)

CHANNEL = "acc_total"
# 今回は3軸合成加速度を使って特徴量を作ります

F_MIN = 5.0
# 5Hz未満は低周波の揺れや傾きの影響を受けやすいため除外します

F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
# 支配周波数を探す範囲です

N_PERSEG = 2048
# Welch法でPSDを計算するときの分割長です

BANDS_HZ = [
    (5.0, 10.0),
    (10.0, 30.0),
    (30.0, 80.0),
    (80.0, 200.0),
    (200.0, 400.0),
    (400.0, 500.0),
]
# 周波数帯ごとの振動エネルギーを計算します

EPS = 1e-12

SAVE_PREPROCESSED_SIGNALS = True
# Trueにすると、前処理後の波形も保存します

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = SCRIPT_DIR

OUT_DIR = os.path.join(SCRIPT_DIR, "fft_normal_model_rulebased_hf")
os.makedirs(OUT_DIR, exist_ok=True)


# ==================================================
# ▼前処理:CSVを読み込み、10秒分の等間隔データに補正
# ==================================================

def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)

    required_cols = ["time_us", "ax", "ay", "az", "acc_total"]

    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"列 {c} が見つかりません。file={csv_path}")

    # マイクロ秒を秒に変換します
    t = df["time_us"].to_numpy(dtype=np.float64) / 1e6

    # 開始時刻を0秒にそろえます
    t = t - t[0]

    # 時刻が逆転・重複している行を除外します
    keep = np.ones_like(t, dtype=bool)
    keep[1:] = t[1:] > t[:-1]

    t = t[keep]
    df = df.loc[keep].reset_index(drop=True)

    if t.size < 2:
        raise ValueError(f"time列が短すぎます。file={csv_path}")

    # 1000Hz、10秒分の等間隔時間軸を作ります
    t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS

    out = pd.DataFrame({"time_s": t_uniform})

    t_min = t[0]
    t_max = t[-1]

    # ax, ay, az, acc_totalを等間隔データに補間します
    for col in ["ax", "ay", "az", "acc_total"]:
        y = df[col].to_numpy(dtype=np.float64)

        f = interp1d(
            t,
            y,
            kind="linear",
            bounds_error=False,
            fill_value="extrapolate",
        )

        y_uniform = f(t_uniform)

        # 実測範囲外は0にします
        valid = (t_uniform >= t_min) & (t_uniform <= t_max)
        y_uniform = np.where(valid, y_uniform, 0.0)

        out[col] = y_uniform

    return out


# ==================================================
# ▼Welch法でPSDを計算
# ==================================================

def compute_psd(x: np.ndarray):
    x = np.asarray(x, dtype=np.float64)

    # DC成分を除去します
    x = x - np.mean(x)

    f, Pxx = welch(
        x,
        fs=FS,
        nperseg=N_PERSEG,
    )

    return f, Pxx


# ==================================================
# ▼支配周波数・ピーク幅・ピークの鋭さを計算
# ==================================================

def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
    m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)

    f_dom = f[m_dom]
    P_dom = Pxx[m_dom]

    feats = {}

    if f_dom.size < 3:
        feats["dom_freq_hz"] = 0.0
        feats["dom_psd"] = 0.0
        feats["peak_width_hz"] = 0.0
        feats["peak_sharpness"] = 0.0
        return feats

    # PSDが最大となる周波数を支配周波数とします
    idx_peak_local = int(np.argmax(P_dom))

    dom_freq = float(f_dom[idx_peak_local])
    dom_psd = float(P_dom[idx_peak_local])

    # ピークを検出します
    peaks, _ = find_peaks(P_dom)

    if peaks.size == 0:
        peaks = np.array([idx_peak_local], dtype=int)

    # 支配周波数に最も近いピークを採用します
    peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
    pidx = int(peaks_sorted[0])

    # ピーク幅を計算します
    widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)

    width_bins = float(widths[0])
    df_hz = float(f_dom[1] - f_dom[0])

    peak_width_hz = width_bins * df_hz

    # ピーク周辺と比較して、どれだけ鋭いピークかを計算します
    span_hz = 4.0
    span_bins = max(1, int(round(span_hz / df_hz)))

    lo = max(0, pidx - span_bins)
    hi = min(P_dom.size, pidx + span_bins + 1)

    if hi - lo >= 3:
        neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
    else:
        neighborhood = P_dom[lo:hi]

    if neighborhood.size > 0:
        neigh_mean = float(np.mean(neighborhood))
    else:
        neigh_mean = float(np.mean(P_dom))

    peak_sharpness = np.log10(dom_psd / (neigh_mean + EPS) + 1.0)

    feats["dom_freq_hz"] = dom_freq
    feats["dom_psd"] = dom_psd
    feats["peak_width_hz"] = float(peak_width_hz)
    feats["peak_sharpness"] = float(peak_sharpness)

    return feats


# ==================================================
# ▼周波数帯ごとの特徴量を計算
# ==================================================

def spectral_features(f: np.ndarray, Pxx: np.ndarray):
    feats = {}

    # 5Hz以上のPSD合計を計算します
    m_ge = f >= F_MIN

    f_ge = f[m_ge]
    P_ge = Pxx[m_ge]

    feats["psd_sum_ge5"] = float(np.sum(P_ge))

    # 周波数帯ごとのPSD合計を計算します
    for f1, f2 in BANDS_HZ:
        m = (f >= f1) & (f < f2)

        feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))

    # スペクトル重心を計算します
    # どの周波数帯にエネルギーの中心があるかを表します
    denom = float(np.sum(P_ge))

    if denom > 0:
        feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
    else:
        feats["spec_centroid_hz_ge5"] = 0.0

    # 高周波成分の割合を計算します
    p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
    p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))

    feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)

    return feats


# ==================================================
# ▼1つのCSVファイルから特徴量を抽出
# ==================================================

def extract_all_features(csv_path: str):
    prep = load_and_resample_10s(csv_path)

    x = prep[CHANNEL].to_numpy(dtype=np.float64)

    f, Pxx = compute_psd(x)

    feats = {}
    feats.update(peak_features_from_psd(f, Pxx))
    feats.update(spectral_features(f, Pxx))

    return feats, f, Pxx, prep


# ==================================================
# ▼正常データの統計情報を作成
# ==================================================

def build_summary(features_df: pd.DataFrame) -> pd.DataFrame:
    numeric_cols = [c for c in features_df.columns if c != "file"]

    mu = features_df[numeric_cols].mean(axis=0)
    sd = features_df[numeric_cols].std(axis=0, ddof=1)

    summary = pd.DataFrame(
        {
            "feature": numeric_cols,
            "mean": mu.values,
            "std": sd.values,
            "upper_3sigma": (mu + 3.0 * sd).values,
            "p95": [np.percentile(features_df[c], 95) for c in numeric_cols],
            "p99": [np.percentile(features_df[c], 99) for c in numeric_cols],
            "min": [features_df[c].min() for c in numeric_cols],
            "max": [features_df[c].max() for c in numeric_cols],
        }
    )

    summary["std"] = summary["std"].fillna(0.0)
    summary["upper_3sigma"] = summary["upper_3sigma"].fillna(summary["mean"])

    return summary


# ==================================================
# ▼メイン処理
# ==================================================

def main():
    # acc_usb_highrate_001.csv のような正常データをまとめて読み込みます
    pattern = os.path.join(DATA_DIR, "acc_usb_highrate_*.csv")

    csv_files = sorted(glob.glob(pattern))

    if len(csv_files) == 0:
        raise RuntimeError(f"対象CSVが見つかりません。pattern={pattern}")

    print(f"Found {len(csv_files)} files.")
    print(f"FS = {FS} Hz")
    print(f"window = {TARGET_DURATION_S} s")
    print(f"target samples = {N_SAMPLES}")
    print(f"channel = {CHANNEL}")

    all_feats = []

    for path in csv_files:
        base = os.path.basename(path)

        print(f"Processing: {base}")

        feats, _, _, prep = extract_all_features(path)

        feats["file"] = base
        all_feats.append(feats)

        # 前処理後の波形を保存します
        if SAVE_PREPROCESSED_SIGNALS:
            out_prep = os.path.join(OUT_DIR, f"prep_{base}")
            prep.to_csv(out_prep, index=False)

    # ① 各CSVファイルごとのFFT特徴量
    features_df = pd.DataFrame(all_feats).sort_values("file").reset_index(drop=True)

    # ② 正常モデルの統計情報
    summary_df = build_summary(features_df)

    features_path = os.path.join(OUT_DIR, "normal_features_rulebased_hf.csv")
    summary_path = os.path.join(OUT_DIR, "normal_summary_rulebased_hf.csv")

    features_df.to_csv(features_path, index=False)
    summary_df.to_csv(summary_path, index=False)

    print("=== 完了 ===")
    print(f"features: {features_path}")
    print(f"summary : {summary_path}")


if __name__ == "__main__":
    main()

【出力されるファイル】
①CSVファイルごとのFFT特徴量
②正常モデルの平均・標準偏差

異常を判定するコード

import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths


# ==================================================
# ▼基本設定
# ==================================================

FS = 1000.0
# 第2回で1000Hz測定した前提で解析します

TARGET_DURATION_S = 10.0
# 各CSVファイルの先頭10秒を解析対象にします

N_SAMPLES = int(FS * TARGET_DURATION_S)

CHANNEL = "acc_total"
# 今回は3軸合成加速度を使って異常判定します

F_MIN = 5.0
# 5Hz未満は低周波の揺れや姿勢変化の影響を受けやすいため除外します

F_MAX_PLOT = 500.0
# PSDグラフの表示上限です

F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
# 支配周波数を探す範囲です

N_PERSEG = 2048
# Welch法でPSDを計算するときの分割長です

BANDS_HZ = [
    (5.0, 10.0),
    (10.0, 30.0),
    (30.0, 80.0),
    (80.0, 200.0),
    (200.0, 400.0),
    (400.0, 500.0),
]
# 周波数帯ごとの振動エネルギーを計算します

EPS = 1e-12


# ==================================================
# ▼フォルダ設定
# ==================================================

NORMAL_DIR_NAME = "normal"
# 正常データを入れるフォルダ名です

ABNORMAL_DIR_NAME = "abnormal"
# 判定対象データを入れるフォルダ名です

ABNORMAL_FILE_NAME = "acc_usb_highrate_judge.csv"
# 判定したいCSVファイル名です


# ==================================================
# ▼しきい値設定
# ==================================================

DOM_K_SIGMA = 3.0
PW_K_SIGMA = 3.0
BP3080_K_SIGMA = 3.0
BP200400_K_SIGMA = 3.0
HI200500_K_SIGMA = 3.0
# 平均±3σ、または平均+3σをしきい値にします

MIN_VIOLATIONS_TO_ABNORMAL = 2
# 2項目以上しきい値を超えたら異常と判定します

MIN_VIOLATIONS_TO_WARNING = 1
# 1項目だけ超えた場合は要注意と判定します


# ==================================================
# ▼前処理:10秒分の等間隔データに補正
# ==================================================

def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)

    required_cols = ["time_us", "ax", "ay", "az", "acc_total"]

    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"列 {c} が見つかりません。file={csv_path}")

    # time_usを秒に変換し、開始時刻を0秒にそろえます
    t = df["time_us"].to_numpy(dtype=np.float64) / 1e6
    t = t - t[0]

    # 時刻の重複や逆転を除外します
    keep = np.ones_like(t, dtype=bool)
    keep[1:] = t[1:] > t[:-1]

    t = t[keep]
    df = df.loc[keep].reset_index(drop=True)

    if t.size < 2:
        raise ValueError(f"time列が短すぎます。file={csv_path}")

    # 1000Hz、10秒分の等間隔時間軸を作ります
    t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS

    out = pd.DataFrame({"time_s": t_uniform})

    t_min = t[0]
    t_max = t[-1]

    for col in ["ax", "ay", "az", "acc_total"]:
        y = df[col].to_numpy(dtype=np.float64)

        f = interp1d(
            t,
            y,
            kind="linear",
            bounds_error=False,
            fill_value="extrapolate",
        )

        y_uniform = f(t_uniform)

        # 実測範囲外は0にします
        valid = (t_uniform >= t_min) & (t_uniform <= t_max)
        y_uniform = np.where(valid, y_uniform, 0.0)

        out[col] = y_uniform

    return out


# ==================================================
# ▼Welch法でPSDを計算
# ==================================================

def compute_psd(x: np.ndarray):
    x = np.asarray(x, dtype=np.float64)

    # 平均値を引いてDC成分を除去します
    x = x - np.mean(x)

    f, Pxx = welch(
        x,
        fs=FS,
        nperseg=N_PERSEG,
    )

    return f, Pxx


# ==================================================
# ▼支配周波数、ピーク幅、ピークの鋭さを計算
# ==================================================

def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
    m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)

    f_dom = f[m_dom]
    P_dom = Pxx[m_dom]

    feats = {}

    if f_dom.size < 3:
        feats["dom_freq_hz"] = 0.0
        feats["dom_psd"] = 0.0
        feats["peak_width_hz"] = 0.0
        feats["peak_sharpness"] = 0.0
        return feats

    # PSDが最大となる周波数を支配周波数とします
    idx_peak_local = int(np.argmax(P_dom))

    dom_freq = float(f_dom[idx_peak_local])
    dom_psd = float(P_dom[idx_peak_local])

    # ピークを検出します
    peaks, _ = find_peaks(P_dom)

    if peaks.size == 0:
        peaks = np.array([idx_peak_local], dtype=int)

    # 支配周波数に最も近いピークを採用します
    peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
    pidx = int(peaks_sorted[0])

    # ピーク幅を計算します
    widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)

    width_bins = float(widths[0])
    df_hz = float(f_dom[1] - f_dom[0])

    peak_width_hz = width_bins * df_hz

    # ピーク周辺と比較して、どれだけ鋭いピークかを計算します
    span_hz = 4.0
    span_bins = max(1, int(round(span_hz / df_hz)))

    lo = max(0, pidx - span_bins)
    hi = min(P_dom.size, pidx + span_bins + 1)

    if hi - lo >= 3:
        neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
    else:
        neighborhood = P_dom[lo:hi]

    if neighborhood.size > 0:
        neigh_mean = float(np.mean(neighborhood))
    else:
        neigh_mean = float(np.mean(P_dom))

    peak_sharpness = np.log10(dom_psd / (neigh_mean + EPS) + 1.0)

    feats["dom_freq_hz"] = dom_freq
    feats["dom_psd"] = dom_psd
    feats["peak_width_hz"] = float(peak_width_hz)
    feats["peak_sharpness"] = float(peak_sharpness)

    return feats


# ==================================================
# ▼周波数帯ごとの特徴量を計算
# ==================================================

def spectral_features(f: np.ndarray, Pxx: np.ndarray):
    feats = {}

    m_ge = f >= F_MIN

    f_ge = f[m_ge]
    P_ge = Pxx[m_ge]

    # 5Hz以上の全体エネルギーです
    feats["psd_sum_ge5"] = float(np.sum(P_ge))

    # 周波数帯ごとのエネルギーです
    for f1, f2 in BANDS_HZ:
        m = (f >= f1) & (f < f2)
        feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))

    # スペクトル重心です
    denom = float(np.sum(P_ge))

    if denom > 0:
        feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
    else:
        feats["spec_centroid_hz_ge5"] = 0.0

    # 80Hz以上の高周波比です。参考値として保存します
    p_hi_80_500 = float(np.sum(Pxx[f >= 80.0]))
    p_lo_5_80 = float(np.sum(Pxx[(f >= 5.0) & (f < 80.0)]))

    feats["hi_lo_ratio_ge80_over_5to80"] = p_hi_80_500 / (p_lo_5_80 + EPS)

    # 今回の判定に使う高周波比です
    p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
    p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))

    feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)

    return feats


# ==================================================
# ▼1つのCSVファイルから特徴量を抽出
# ==================================================

def extract_all_features(csv_path: str):
    prep = load_and_resample_10s(csv_path)

    x = prep[CHANNEL].to_numpy(dtype=np.float64)

    f, Pxx = compute_psd(x)

    feats = {}
    feats.update(peak_features_from_psd(f, Pxx))
    feats.update(spectral_features(f, Pxx))

    return feats, f, Pxx


# ==================================================
# ▼正常データから平均・標準偏差・しきい値を作る
# ==================================================

def calc_rule_stats(values: np.ndarray, k_sigma: float, mode: str):
    mu = float(np.mean(values))
    sd = float(np.std(values, ddof=1))

    # 標準偏差が0に近い場合のゼロ割対策です
    sd = sd if sd > EPS else EPS

    if mode == "two-sided":
        # 支配周波数のように、上にも下にも外れる可能性がある指標です
        lower = mu - k_sigma * sd
        upper = mu + k_sigma * sd

    elif mode == "upper":
        # 振動エネルギーや高周波比のように、増加側を異常と見る指標です
        lower = np.nan
        upper = mu + k_sigma * sd

    else:
        raise ValueError(f"unknown mode: {mode}")

    return mu, sd, lower, upper


# ==================================================
# ▼メイン処理
# ==================================================

def main():
    script_dir = os.path.dirname(os.path.abspath(__file__))

    normal_dir = os.path.join(script_dir, NORMAL_DIR_NAME)
    abnormal_dir = os.path.join(script_dir, ABNORMAL_DIR_NAME)

    if not os.path.isdir(normal_dir):
        raise RuntimeError(f"正常フォルダが見つかりません: {normal_dir}")

    if not os.path.isdir(abnormal_dir):
        raise RuntimeError(f"異常フォルダが見つかりません: {abnormal_dir}")

    normal_files = sorted(glob.glob(os.path.join(normal_dir, "acc_usb_highrate_*.csv")))

    if len(normal_files) < 5:
        raise RuntimeError("正常CSVが少なすぎます。少なくとも5本以上を推奨します。")

    abnormal_file = os.path.join(abnormal_dir, ABNORMAL_FILE_NAME)

    if not os.path.exists(abnormal_file):
        raise RuntimeError(f"判定対象CSVが見つかりません: {abnormal_file}")

    print(f"Normal files: {len(normal_files)}")
    print(f"Target      : {os.path.basename(abnormal_file)}")

    # ==================================================
    # 正常データの平均PSDを作成
    # ==================================================

    normal_psd_list = []
    f_ref = None

    for p in normal_files:
        prep = load_and_resample_10s(p)

        x = prep[CHANNEL].to_numpy(dtype=np.float64)

        f, Pxx = compute_psd(x)

        if f_ref is None:
            f_ref = f
        else:
            if f.size != f_ref.size or np.max(np.abs(f - f_ref)) > 1e-9:
                raise RuntimeError("Welchの周波数軸が一致しません。設定を確認してください。")

        normal_psd_list.append(Pxx)

    normal_psd = np.vstack(normal_psd_list)
    mean_psd = np.mean(normal_psd, axis=0)

    # ==================================================
    # 正常データの特徴量を作成
    # ==================================================

    normal_feats = []

    for p in normal_files:
        feats, _, _ = extract_all_features(p)

        feats["file"] = os.path.basename(p)
        normal_feats.append(feats)

    normal_df = pd.DataFrame(normal_feats).sort_values("file").reset_index(drop=True)

    # ==================================================
    # 判定対象データの特徴量を作成
    # ==================================================

    target_feats, f_target, Pxx_target = extract_all_features(abnormal_file)

    # ==================================================
    # 異常判定に使う5つの実用指標
    # ==================================================

    rule_defs = [
        {"name": "dom_freq_hz", "mode": "two-sided", "k": DOM_K_SIGMA},
        {"name": "peak_width_hz", "mode": "upper", "k": PW_K_SIGMA},
        {"name": "band_psd_sum_30_80", "mode": "upper", "k": BP3080_K_SIGMA},
        {"name": "band_psd_sum_200_400", "mode": "upper", "k": BP200400_K_SIGMA},
        {"name": "hi_lo_ratio_200_500_over_5_200", "mode": "upper", "k": HI200500_K_SIGMA},
    ]

    rule_results = []

    print("\n=== Target features ===")

    for k, v in target_feats.items():
        print(f"{k:<32} {v:.6g}")

    print("\n=== Rule-based judgment ===")

    violation_count = 0

    for rule in rule_defs:
        name = rule["name"]
        mode = rule["mode"]
        k = rule["k"]

        vals = normal_df[name].to_numpy(dtype=np.float64)

        mu, sd, lower, upper = calc_rule_stats(vals, k_sigma=k, mode=mode)

        x = float(target_feats[name])

        if mode == "two-sided":
            violation = (x < lower) or (x > upper)

            print(
                f"{name:<32} "
                f"target={x:.6g}  range=[{lower:.6g}, {upper:.6g}]  "
                f"violation={violation}"
            )

        else:
            violation = x > upper

            print(
                f"{name:<32} "
                f"target={x:.6g}  upper={upper:.6g}  "
                f"violation={violation}"
            )

        if violation:
            violation_count += 1

        z = (x - mu) / sd if sd > EPS else 0.0

        rule_results.append(
            {
                "feature": name,
                "mode": mode,
                "mean": mu,
                "std": sd,
                "lower": lower,
                "upper": upper,
                "target_value": x,
                "z_score": z,
                "violation": bool(violation),
            }
        )

    # ==================================================
    # 最終判定
    # ==================================================

    if violation_count >= MIN_VIOLATIONS_TO_ABNORMAL:
        judgment = "ABNORMAL"
    elif violation_count >= MIN_VIOLATIONS_TO_WARNING:
        judgment = "WARNING"
    else:
        judgment = "NORMAL"

    print("\n=== Final judgment ===")
    print(f"violation_count = {violation_count}")

    if judgment == "ABNORMAL":
        print("→ 異常です。")
    elif judgment == "WARNING":
        print("→ 要注意です。")
    else:
        print("→ 正常です。")

    # ==================================================
    # グラフ1:Zスコア表示
    # ==================================================

    plot_names = [r["feature"] for r in rule_results]
    plot_z = [r["z_score"] for r in rule_results]

    plt.figure(figsize=(11, 4))
    plt.bar(plot_names, plot_z)
    plt.axhline(3.0, linestyle="--")
    plt.axhline(-3.0, linestyle="--")
    plt.title("Feature Z-score")
    plt.xticks(rotation=30, ha="right")
    plt.tight_layout()
    plt.show()

    # ==================================================
    # グラフ2:正常平均PSDと判定対象PSDの比較
    # ==================================================

    plt.figure(figsize=(8, 5))
    plt.semilogy(f_target, Pxx_target, label="Target")
    plt.semilogy(f_ref, mean_psd, label="Normal Mean")
    plt.xlim(0, F_MAX_PLOT)
    plt.title("PSD Comparison")
    plt.xlabel("Frequency [Hz]")
    plt.ylabel("PSD")
    plt.legend()
    plt.tight_layout()
    plt.show()

    # ==================================================
    # 判定結果を表示
    # ==================================================

    rule_df = pd.DataFrame(rule_results)

    print("\n=== Rule summary ===")
    print(
        rule_df[
            [
                "feature",
                "target_value",
                "mean",
                "std",
                "lower",
                "upper",
                "z_score",
                "violation",
            ]
        ]
    )

    # ==================================================
    # CSV保存
    # ==================================================

    out_dir = os.path.join(script_dir, "fft_abnormal_analysis_rulebased_hf")
    os.makedirs(out_dir, exist_ok=True)

    normal_df.to_csv(
        os.path.join(out_dir, "normal_features_rulebased_hf.csv"),
        index=False,
    )

    target_row = pd.DataFrame(
        [
            {
                **{"file": os.path.basename(abnormal_file)},
                **target_feats,
            }
        ]
    )

    target_row.to_csv(
        os.path.join(out_dir, "target_features_rulebased_hf.csv"),
        index=False,
    )

    rule_df.to_csv(
        os.path.join(out_dir, "rule_thresholds_and_results_hf.csv"),
        index=False,
    )

    summary = pd.DataFrame(
        [
            {
                "target_file": os.path.basename(abnormal_file),
                "judgment": judgment,
                "violation_count": int(violation_count),
                "min_violations_to_abnormal": int(MIN_VIOLATIONS_TO_ABNORMAL),
                "min_violations_to_warning": int(MIN_VIOLATIONS_TO_WARNING),
            }
        ]
    )

    summary.to_csv(
        os.path.join(out_dir, "summary_rulebased_hf.csv"),
        index=False,
    )

    print(f"\nSaved to: {out_dir}")


if __name__ == "__main__":
    main()

【注意点】

このコード自体にも正常データから閾値を計算するコードを埋め込んでいます。

一つ目のコードと内容が重複していますが、これ一本で閾値の設定から異常判定まで一括で回せるように動作確認用として作りました。

ただし、実際の現場で運用する際には、あらかじめ正常データを読み込ませてから、日々の稼働データを判定する流れと思いますので、実際には分割して運用するほうが適切と思います。

スポンサーリンク
加速度センサー
Follow
この記事を書いた人

【経歴】
関東在住、40代、製造業(品質部門)。
これまで、研究開発、設計、生産技術、仕入先の品質管理を手掛ける。

【保有知識・技術分野】
統計学、信頼性工学、品質工学。
半導体、基板、有機材料、金属、セラミックスの材料、製造、加工技術。
部品加工(機械加工、化学処理)、組立・実装技術、分析・物理解析技術。
QC検定1級保有。

【当サイトについて】
品質・生産の基礎知識をテーマに、用語の解説、使い方(作り方)、メリット、考え方のポイントを分かりやすく解説しています。
某メーカ様の品質教育用の資料としてもご活用いただいております。
QC検定(品質管理検定)の試験対策、おすすめ勉強法も紹介しています。

Follow
QCとらのまき

コメント