【第9回】異常の原因は分かるのか?故障モード分類に挑戦

加速度センサー

※当サイトは、アフィリエイト広告を利用しています

概要

加速度センサーシリーズの続編です。

前回までは、正常か異常かを判定する仕組みを作りました。

しかし実際の設備保全では、異常と分かっただけでは十分ではありません。

保全担当者が本当に知りたいのは、「何が起きているのか」です。

そこで今回は、PCAとSVMを使って、振動データの特徴から異常の自動判定に挑戦してみました。

詳しくは、以下のYouTube動画をご覧ください。

【第9回】異常の原因は分かるのか?故障モード分類に挑戦

配線

プログラムコード

マイコン(ESP32)

・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・1000Hz周期でデータ取得
・CSV形式でシリアル出力

#include <Wire.h>
#include <MPU6050_tockn.h>

MPU6050 mpu(Wire);

const unsigned long SAMPLE_US = 1000; // 1000us = 1000Hz
unsigned long last_us = 0;

void setup() {
  Serial.begin(115200);
  Wire.begin(21, 22);
  Wire.setClock(400000); 

  mpu.begin();
  mpu.calcGyroOffsets(true);

  Serial.println("time_us,ax,ay,az,acc_total");
}

void loop() {
  unsigned long now = micros();
  if (now - last_us < SAMPLE_US) return;
  last_us = now;

  mpu.update();

  float ax = mpu.getAccX();
  float ay = mpu.getAccY();
  float az = mpu.getAccZ();
  float acc_total = sqrt(ax*ax + ay*ay + az*az);

  Serial.print(now);
  Serial.print(",");
  Serial.print(ax, 4);
  Serial.print(",");
  Serial.print(ay, 4);
  Serial.print(",");
  Serial.print(az, 4);
  Serial.print(",");
  Serial.println(acc_total, 4);
}

Python

正常モード、故障モードA、故障モードBの3種類のデータをあらかじめ用意し、検証用のデータがどれに分類されるか確かめるためのコードです。

import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC


# =========================================================
# ① ここだけ設定すればOK
# =========================================================

BASE_CLASS_DIR = r"C:\Users\**************************************************"
# ↑ normal / ModeA / ModeB / validation フォルダを置いている場所に変更してください

TRAIN_DATA_DIRS = {
    "normal": os.path.join(BASE_CLASS_DIR, "normal"),
    "ModeA":  os.path.join(BASE_CLASS_DIR, "ModeA"),
    "ModeB":  os.path.join(BASE_CLASS_DIR, "ModeB"),
}

VALIDATION_DIR = os.path.join(BASE_CLASS_DIR, "validation")

FILE_PATTERN = "*.csv"

FS = 1000.0
# 第2回で1000Hz測定した前提です

CHANNEL = "acc_total"
# 3軸合成加速度を使って特徴量を作ります

N_PER_SEG = 1024
N_OVERLAP = 512

F_MIN = 5.0
F_MAX = 500.0
# 解析する周波数範囲です

BANDS = [
    ("band_psd_sum_5_10", 5.0, 10.0),
    ("band_psd_sum_10_30", 10.0, 30.0),
    ("band_psd_sum_30_80", 30.0, 80.0),
    ("band_psd_sum_80_200", 80.0, 200.0),
    ("band_psd_sum_200_400", 200.0, 400.0),
    ("band_psd_sum_400_500", 400.0, 500.0),
]

CLASS_COLORS = {
    "normal": "blue",
    "ModeA": "red",
    "ModeB": "green",
    "ModeC": "orange",
}

CLASS_DISPLAY_NAMES = {
    "normal": "Normal",
    "ModeA": "Abnormal A",
    "ModeB": "Abnormal B",
    "ModeC": "Abnormal C",
}

OUTPUT_RESULT_CSV = os.path.join(BASE_CLASS_DIR, "validation_result.csv")


# =========================================================
# ② CSV読み込み・等間隔化
# =========================================================

def load_and_resample_csv(csv_path, channel=CHANNEL, fs=FS):
    """
    CSVを読み込み、time_usを使って等間隔データに補正します。
    ESP32の測定データは完全な等間隔ではないため、
    FFT前に時間軸をそろえます。
    """

    df = pd.read_csv(csv_path)

    required_cols = ["time_us", channel]
    for c in required_cols:
        if c not in df.columns:
            raise ValueError(f"{csv_path} に必要列 '{c}' がありません。")

    t = df["time_us"].to_numpy(dtype=float) * 1e-6
    x = df[channel].to_numpy(dtype=float)

    mask = np.isfinite(t) & np.isfinite(x)
    t = t[mask]
    x = x[mask]

    if len(t) < 10:
        raise ValueError(f"{csv_path} の有効データが少なすぎます。")

    idx = np.argsort(t)
    t = t[idx]
    x = x[idx]

    uniq_mask = np.diff(t, prepend=t[0] - 1.0) > 0
    t = t[uniq_mask]
    x = x[uniq_mask]

    if len(t) < 10:
        raise ValueError(f"{csv_path} は重複除去後の有効データが少なすぎます。")

    dt = 1.0 / fs
    t_uniform = np.arange(t[0], t[-1], dt)

    if len(t_uniform) < 256:
        raise ValueError(f"{csv_path} は解析に必要な長さが不足しています。")

    f_interp = interp1d(
        t,
        x,
        kind="linear",
        fill_value="extrapolate"
    )

    x_uniform = f_interp(t_uniform)

    # 平均値を引いてDC成分を除去します
    x_uniform = x_uniform - np.mean(x_uniform)

    return t_uniform, x_uniform


# =========================================================
# ③ FFT特徴量を抽出
# =========================================================

def calc_psd_features(x, fs=FS):
    """
    Welch法でPSDを計算し、故障モード分類に使う特徴量を作ります。
    """

    f, pxx = welch(
        x,
        fs=fs,
        window="hann",
        nperseg=min(N_PER_SEG, len(x)),
        noverlap=min(N_OVERLAP, max(0, len(x) // 2 - 1)),
        detrend="constant",
        scaling="density"
    )

    band_mask = (f >= F_MIN) & (f <= F_MAX)
    f_sel = f[band_mask]
    p_sel = pxx[band_mask]

    if len(f_sel) < 10:
        raise ValueError("有効な周波数点が不足しています。")

    features = {}

    # 支配周波数とその強さ
    dom_idx = np.argmax(p_sel)
    dom_freq_hz = f_sel[dom_idx]
    dom_psd = p_sel[dom_idx]

    features["dom_freq_hz"] = dom_freq_hz
    features["dom_psd"] = dom_psd

    # 支配ピークの幅
    peaks, _ = find_peaks(p_sel)

    if len(peaks) > 0:
        peak_idx = peaks[np.argmin(np.abs(peaks - dom_idx))]
        widths, _, _, _ = peak_widths(p_sel, [peak_idx], rel_height=0.5)
        dfreq = f_sel[1] - f_sel[0] if len(f_sel) > 1 else 1.0
        peak_width_hz = widths[0] * dfreq
    else:
        peak_width_hz = np.nan

    features["peak_width_hz"] = peak_width_hz

    # ピークの鋭さ
    bg = np.median(p_sel)
    features["peak_sharpness"] = dom_psd / (bg + 1e-12)

    # 5Hz以上の全体エネルギー
    features["psd_sum_ge5"] = np.sum(p_sel)

    # 周波数帯ごとのエネルギー
    for name, fl, fh in BANDS:
        m = (f >= fl) & (f < fh)
        features[name] = np.sum(pxx[m])

    # スペクトル重心
    features["spec_centroid_hz_ge5"] = np.sum(f_sel * p_sel) / (np.sum(p_sel) + 1e-12)

    # 高周波比
    low_5_80 = np.sum(pxx[(f >= 5) & (f < 80)])
    high_80_500 = np.sum(pxx[(f >= 80) & (f <= 500)])
    features["hi_lo_ratio_ge80_over_5to80"] = high_80_500 / (low_5_80 + 1e-12)

    low_5_200 = np.sum(pxx[(f >= 5) & (f < 200)])
    high_200_500 = np.sum(pxx[(f >= 200) & (f <= 500)])
    features["hi_lo_ratio_200_500_over_5_200"] = high_200_500 / (low_5_200 + 1e-12)

    return features


def extract_features_from_file(csv_path):
    _, x = load_and_resample_csv(csv_path, channel=CHANNEL, fs=FS)
    features = calc_psd_features(x, fs=FS)
    return features


# =========================================================
# ④ 学習用・検証用データを読み込む
# =========================================================

def build_feature_table_for_training(data_dirs, file_pattern="*.csv"):
    rows = []

    for class_name, folder in data_dirs.items():
        paths = sorted(glob.glob(os.path.join(folder, file_pattern)))

        if len(paths) == 0:
            print(f"[WARNING] {class_name}: ファイルが見つかりません -> {folder}")
            continue

        for path in paths:
            try:
                feat = extract_features_from_file(path)
                feat["label"] = class_name
                feat["file_name"] = os.path.basename(path)
                feat["source"] = "train"
                rows.append(feat)

            except Exception as e:
                print(f"[SKIP] {path}")
                print(f"       reason: {e}")

    if len(rows) == 0:
        raise RuntimeError("学習用CSVが1件も読み込めませんでした。")

    return pd.DataFrame(rows)


def build_feature_table_for_validation(validation_dir, file_pattern="*.csv"):
    rows = []

    paths = sorted(glob.glob(os.path.join(validation_dir, file_pattern)))

    if len(paths) == 0:
        raise RuntimeError(f"validation フォルダにCSVが見つかりませんでした。 -> {validation_dir}")

    for path in paths:
        try:
            feat = extract_features_from_file(path)
            feat["file_name"] = os.path.basename(path)
            feat["source"] = "validation"
            rows.append(feat)

        except Exception as e:
            print(f"[SKIP] {path}")
            print(f"       reason: {e}")

    if len(rows) == 0:
        raise RuntimeError("検証用CSVが1件も読み込めませんでした。")

    return pd.DataFrame(rows)


# =========================================================
# ⑤ PCA + SVM
# =========================================================

def prepare_training_matrix(df_train):
    meta_cols = ["label", "file_name", "source"]

    feature_cols = [c for c in df_train.columns if c not in meta_cols]

    X = df_train[feature_cols].copy()

    X = X.replace([np.inf, -np.inf], np.nan)
    X = X.fillna(X.median(numeric_only=True))

    return X, feature_cols


def train_pca_svm(df_train):
    X_train, feature_cols = prepare_training_matrix(df_train)

    y_train = df_train["label"].to_numpy()

    # 特徴量は単位や桁が異なるため標準化します
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)

    # PCAで2次元に圧縮します
    # 動画ではこの2次元マップを使うと説明しやすいです
    pca = PCA(n_components=2, random_state=42)
    X_train_pca = pca.fit_transform(X_train_scaled)

    # PCA空間上でSVMを学習します
    clf = SVC(kernel="rbf", C=3.0, gamma="scale")
    clf.fit(X_train_pca, y_train)

    df_train_plot = df_train[["label", "file_name", "source"]].copy()
    df_train_plot["PC1"] = X_train_pca[:, 0]
    df_train_plot["PC2"] = X_train_pca[:, 1]

    return df_train_plot, scaler, pca, clf, feature_cols


def transform_validation(df_val, scaler, pca, feature_cols):
    X_val = df_val[feature_cols].copy()

    X_val = X_val.replace([np.inf, -np.inf], np.nan)
    X_val = X_val.fillna(X_val.median(numeric_only=True))

    X_val_scaled = scaler.transform(X_val)
    X_val_pca = pca.transform(X_val_scaled)

    df_val_plot = df_val[["file_name", "source"]].copy()
    df_val_plot["PC1"] = X_val_pca[:, 0]
    df_val_plot["PC2"] = X_val_pca[:, 1]

    return df_val_plot, X_val_pca


def classify_validation(df_val_plot, X_val_pca, clf):
    pred = clf.predict(X_val_pca)

    df_result = df_val_plot.copy()
    df_result["pred_label"] = pred

    try:
        scores = clf.decision_function(X_val_pca)

        if scores.ndim == 1:
            df_result["decision_score"] = scores
        else:
            df_result["decision_score_max"] = np.max(scores, axis=1)

    except Exception:
        pass

    return df_result


# =========================================================
# ⑥ 可視化
# =========================================================

def plot_pca_map(df_train_plot):
    plt.figure(figsize=(10, 8))

    for cls in df_train_plot["label"].unique():
        sub = df_train_plot[df_train_plot["label"] == cls]

        color = CLASS_COLORS.get(cls, "gray")
        disp = CLASS_DISPLAY_NAMES.get(cls, cls)

        plt.scatter(
            sub["PC1"],
            sub["PC2"],
            c=color,
            label=f"{disp} (n={len(sub)})",
            s=70,
            alpha=0.85,
            edgecolors="k"
        )

    plt.title("PCA 2D Map")
    plt.xlabel("PC1")
    plt.ylabel("PC2")
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.show()


def plot_svm_map_with_validation(df_train_plot, df_val_result, clf):
    all_pc1 = np.concatenate([
        df_train_plot["PC1"].to_numpy(),
        df_val_result["PC1"].to_numpy()
    ])

    all_pc2 = np.concatenate([
        df_train_plot["PC2"].to_numpy(),
        df_val_result["PC2"].to_numpy()
    ])

    x_min, x_max = all_pc1.min() - 1.0, all_pc1.max() + 1.0
    y_min, y_max = all_pc2.min() - 1.0, all_pc2.max() + 1.0

    xx, yy = np.meshgrid(
        np.linspace(x_min, x_max, 400),
        np.linspace(y_min, y_max, 400)
    )

    grid = np.c_[xx.ravel(), yy.ravel()]
    Z = clf.predict(grid)

    class_list = sorted(df_train_plot["label"].unique())
    class_to_num = {c: i for i, c in enumerate(class_list)}

    Z_num = np.array([class_to_num[z] for z in Z]).reshape(xx.shape)

    plt.figure(figsize=(11, 8))

    plt.contourf(xx, yy, Z_num, alpha=0.18, levels=len(class_list))

    for cls in class_list:
        sub = df_train_plot[df_train_plot["label"] == cls]

        color = CLASS_COLORS.get(cls, "gray")
        disp = CLASS_DISPLAY_NAMES.get(cls, cls)

        plt.scatter(
            sub["PC1"],
            sub["PC2"],
            c=color,
            label=f"{disp} train (n={len(sub)})",
            s=70,
            alpha=0.9,
            edgecolors="k"
        )

    for _, row in df_val_result.iterrows():
        pred_cls = row["pred_label"]
        color = CLASS_COLORS.get(pred_cls, "black")

        plt.scatter(
            row["PC1"],
            row["PC2"],
            c=color,
            s=160,
            marker="*",
            edgecolors="k",
            linewidths=1.2
        )

        plt.text(
            row["PC1"] + 0.03,
            row["PC2"] + 0.03,
            f"{row['file_name']} -> {row['pred_label']}",
            fontsize=8
        )

    plt.title("SVM Decision Map on PCA 2D Space + Validation")
    plt.xlabel("PC1")
    plt.ylabel("PC2")
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.show()


# =========================================================
# ⑦ 実行
# =========================================================

def main():
    print("学習用CSVを読み込み、特徴量を抽出しています...")
    df_train = build_feature_table_for_training(TRAIN_DATA_DIRS, FILE_PATTERN)

    print("\n=== 学習用特徴量テーブル(先頭5行) ===")
    print(df_train.head())

    print("\n学習用クラス別件数:")
    print(df_train["label"].value_counts())

    print("\nPCA + SVM を学習しています...")
    df_train_plot, scaler, pca, clf, feature_cols = train_pca_svm(df_train)

    print("\n=== PCA寄与率 ===")
    print(f"PC1: {pca.explained_variance_ratio_[0]:.4f}")
    print(f"PC2: {pca.explained_variance_ratio_[1]:.4f}")
    print(f"合計: {pca.explained_variance_ratio_.sum():.4f}")

    print("\n使用特徴量:")
    for c in feature_cols:
        print(" -", c)

    print("\nvalidation CSVを読み込み、同じ変換で判定しています...")
    df_val = build_feature_table_for_validation(VALIDATION_DIR, FILE_PATTERN)

    df_val_plot, X_val_pca = transform_validation(
        df_val,
        scaler,
        pca,
        feature_cols
    )

    df_val_result = classify_validation(
        df_val_plot,
        X_val_pca,
        clf
    )

    print("\n=== validation 判定結果 ===")
    print(df_val_result.sort_values("file_name"))

    df_val_result.to_csv(
        OUTPUT_RESULT_CSV,
        index=False,
        encoding="utf-8-sig"
    )

    print(f"\n判定結果を保存しました: {OUTPUT_RESULT_CSV}")

    plot_pca_map(df_train_plot)
    plot_svm_map_with_validation(df_train_plot, df_val_result, clf)


if __name__ == "__main__":
    main()
スポンサーリンク
加速度センサー
Follow
この記事を書いた人

【経歴】
関東在住、40代、製造業(品質部門)。
これまで、研究開発、設計、生産技術、仕入先の品質管理を手掛ける。

【保有知識・技術分野】
統計学、信頼性工学、品質工学。
半導体、基板、有機材料、金属、セラミックスの材料、製造、加工技術。
部品加工(機械加工、化学処理)、組立・実装技術、分析・物理解析技術。
QC検定1級保有。

【当サイトについて】
品質・生産の基礎知識をテーマに、用語の解説、使い方(作り方)、メリット、考え方のポイントを分かりやすく解説しています。
某メーカ様の品質教育用の資料としてもご活用いただいております。
QC検定(品質管理検定)の試験対策、おすすめ勉強法も紹介しています。

Follow
QCとらのまき

コメント