概要
加速度センサーシリーズの続編です。
前回は、振動データの特徴量を利用し、正常データとの比較による異常判定を行いました。
しかし、実際の設備では、異常パターンが事前に分かっているとは限りません。
また、複数の特徴量が少しずつ変化する場合、単純な閾値判定では検出が難しいことがあります。
そこで今回は、機械学習を利用して正常データから「正常な状態の範囲」を学習し、そこから外れたデータを異常として検出します。
詳しくは、以下のYouTube動画をご覧ください。
配線

プログラムコード
マイコン(ESP32)
・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・1000Hz周期でデータ取得
・CSV形式でシリアル出力
#include <Wire.h>
#include <MPU6050_tockn.h>
MPU6050 mpu(Wire);
const unsigned long SAMPLE_US = 1000; // 1000us = 1000Hz
unsigned long last_us = 0;
void setup() {
Serial.begin(115200);
Wire.begin(21, 22);
Wire.setClock(400000);
mpu.begin();
mpu.calcGyroOffsets(true);
Serial.println("time_us,ax,ay,az,acc_total");
}
void loop() {
unsigned long now = micros();
if (now - last_us < SAMPLE_US) return;
last_us = now;
mpu.update();
float ax = mpu.getAccX();
float ay = mpu.getAccY();
float az = mpu.getAccZ();
float acc_total = sqrt(ax*ax + ay*ay + az*az);
Serial.print(now);
Serial.print(",");
Serial.print(ax, 4);
Serial.print(",");
Serial.print(ay, 4);
Serial.print(",");
Serial.print(az, 4);
Serial.print(",");
Serial.println(acc_total, 4);
}Python
Isolation Forest,SVM,PCAの3つの処理を一つのコードにまとめています。
正常データの分布と判定対象のデータの位置関係、PCAの二次元マップをグラフ描画しています。
import os
import glob
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
from sklearn.decomposition import PCA
# ==================================================
# ▼ここだけ設定すればOK
# ==================================================
FS = 1000.0
TARGET_DURATION_S = 10.0
N_SAMPLES = int(FS * TARGET_DURATION_S)
CHANNEL = "acc_total"
F_MIN = 5.0
F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
N_PERSEG = 2048
BANDS_HZ = [
(5.0, 10.0),
(10.0, 30.0),
(30.0, 80.0),
(80.0, 200.0),
(200.0, 400.0),
(400.0, 500.0),
]
EPS = 1e-12
NORMAL_DIR_NAME = "normal"
ABNORMAL_DIR_NAME = "abnormal"
ABNORMAL_GLOB = "acc_usb_highrate_*.csv"
THRESH_PERCENTILE = 99.0
USE_ROBUST_SCALER = False
HIST_BINS = 18
# ==================================================
# ▼前処理:1000Hz、10秒にそろえる
# ==================================================
def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
df = pd.read_csv(csv_path)
required_cols = ["time_us", "ax", "ay", "az", "acc_total"]
for c in required_cols:
if c not in df.columns:
raise ValueError(f"列 {c} が見つかりません。file={csv_path}")
t = df["time_us"].to_numpy(dtype=np.float64) / 1e6
t = t - t[0]
# 時刻の重複・逆行を除外します
keep = np.ones_like(t, dtype=bool)
keep[1:] = t[1:] > t[:-1]
t = t[keep]
df = df.loc[keep].reset_index(drop=True)
if t.size < 2:
raise ValueError(f"time列が短すぎます。file={csv_path}")
# 1000Hz、10秒分の等間隔時間軸を作ります
t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS
out = pd.DataFrame({"time_s": t_uniform})
t_min = t[0]
t_max = t[-1]
for col in ["ax", "ay", "az", "acc_total"]:
y = df[col].to_numpy(dtype=np.float64)
f = interp1d(
t,
y,
kind="linear",
bounds_error=False,
fill_value="extrapolate",
)
y_uniform = f(t_uniform)
# 実測範囲外は0にします
valid = (t_uniform >= t_min) & (t_uniform <= t_max)
y_uniform = np.where(valid, y_uniform, 0.0)
out[col] = y_uniform
return out
# ==================================================
# ▼Welch法でPSDを計算
# ==================================================
def compute_psd(x: np.ndarray):
x = np.asarray(x, dtype=np.float64)
# DC成分を除去します
x = x - np.mean(x)
f, Pxx = welch(
x,
fs=FS,
nperseg=N_PERSEG,
)
return f, Pxx
# ==================================================
# ▼支配周波数、ピーク幅、ピークの鋭さ
# ==================================================
def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)
f_dom = f[m_dom]
P_dom = Pxx[m_dom]
feats = {}
if f_dom.size < 3:
feats["dom_freq_hz"] = 0.0
feats["dom_psd"] = 0.0
feats["peak_width_hz"] = 0.0
feats["peak_sharpness"] = 0.0
return feats
# PSDが最大となる周波数を支配周波数とします
idx_peak_local = int(np.argmax(P_dom))
dom_freq = float(f_dom[idx_peak_local])
dom_psd = float(P_dom[idx_peak_local])
peaks, _ = find_peaks(P_dom)
if peaks.size == 0:
peaks = np.array([idx_peak_local], dtype=int)
peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
pidx = int(peaks_sorted[0])
widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)
df_hz = float(f_dom[1] - f_dom[0])
peak_width_hz = float(widths[0]) * df_hz
# ピーク周辺と比較して、ピークの鋭さを計算します
span_hz = 2.0
span_bins = max(1, int(round(span_hz / df_hz)))
lo = max(0, pidx - span_bins)
hi = min(P_dom.size, pidx + span_bins + 1)
if hi - lo >= 3:
neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
else:
neighborhood = P_dom[lo:hi]
if neighborhood.size > 0:
neigh_mean = float(np.mean(neighborhood))
else:
neigh_mean = float(np.mean(P_dom))
peak_sharpness = dom_psd / (neigh_mean + EPS)
feats["dom_freq_hz"] = dom_freq
feats["dom_psd"] = dom_psd
feats["peak_width_hz"] = float(peak_width_hz)
feats["peak_sharpness"] = float(peak_sharpness)
return feats
# ==================================================
# ▼スペクトル特徴量
# ==================================================
def spectral_features(
f: np.ndarray,
Pxx: np.ndarray,
mean_log_psd: np.ndarray | None,
):
feats = {}
m_ge = f >= F_MIN
f_ge = f[m_ge]
P_ge = Pxx[m_ge]
# 5Hz以上の全振動エネルギーです
feats["psd_sum_ge5"] = float(np.sum(P_ge))
# 周波数帯ごとの振動エネルギーです
for f1, f2 in BANDS_HZ:
m = (f >= f1) & (f < f2)
feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))
# スペクトル重心です
denom = float(np.sum(P_ge))
if denom > 0:
feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
else:
feats["spec_centroid_hz_ge5"] = 0.0
# 高周波ノイズ比です
p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))
feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)
# 正常平均PSDとの差です
# FFTピークだけではなく、PSD全体の形状の違いを見るための特徴量です
log_psd = np.log10(Pxx + 1e-18)
if mean_log_psd is None:
feats["psd_shape_dist_logL2"] = 0.0
else:
feats["psd_shape_dist_logL2"] = float(np.linalg.norm(log_psd - mean_log_psd))
return feats
# ==================================================
# ▼1つのCSVファイルから特徴量を抽出
# ==================================================
def extract_all_features(
csv_path: str,
mean_log_psd: np.ndarray | None,
):
prep = load_and_resample_10s(csv_path)
x = prep[CHANNEL].to_numpy(dtype=np.float64)
f, Pxx = compute_psd(x)
feats = {}
feats.update(peak_features_from_psd(f, Pxx))
feats.update(spectral_features(f, Pxx, mean_log_psd))
return feats
# ==================================================
# ▼特徴量テーブルを作る
# ==================================================
def build_feature_table(
file_list: list[str],
mean_log_psd: np.ndarray | None,
) -> pd.DataFrame:
rows = []
for p in file_list:
feats = extract_all_features(p, mean_log_psd)
feats["file"] = os.path.basename(p)
rows.append(feats)
return pd.DataFrame(rows).sort_values("file").reset_index(drop=True)
# ==================================================
# ▼異常スコア分布を表示
# ==================================================
def plot_score_panel(ax, normal_vals, abnormal_vals, thr, title, score_col):
normal_vals = np.asarray(normal_vals, dtype=np.float64)
normal_vals = normal_vals[~np.isnan(normal_vals)]
counts, bins, _ = ax.hist(
normal_vals,
bins=HIST_BINS,
alpha=0.65,
label="Normal distribution",
)
q1 = np.percentile(normal_vals, 25)
q2 = np.percentile(normal_vals, 50)
q3 = np.percentile(normal_vals, 75)
ax.axvspan(q1, q3, alpha=0.15, label="Normal IQR")
ax.axvline(q2, linestyle=":", label="Median")
ax.axvline(thr, linestyle="--", linewidth=2, label="Threshold")
ymax = float(np.max(counts)) if len(counts) > 0 else 1.0
if ymax <= 0:
ymax = 1.0
n_exceed = 0
n_ab = 0
if abnormal_vals is not None and len(abnormal_vals) > 0:
abnormal_vals = np.asarray(abnormal_vals, dtype=np.float64)
abnormal_vals = abnormal_vals[~np.isnan(abnormal_vals)]
n_ab = len(abnormal_vals)
if n_ab > 0:
y_pos = ymax * 1.05
ax.scatter(
abnormal_vals,
np.full(n_ab, y_pos),
s=80,
zorder=5,
label="Abnormal",
)
n_exceed = int(np.sum(abnormal_vals > thr))
ax.set_ylim(0, ymax * 1.25)
else:
ax.set_ylim(0, ymax * 1.15)
else:
ax.set_ylim(0, ymax * 1.15)
ax.text(
0.02,
0.93,
f"Abnormal > thr: {n_exceed}/{n_ab}",
transform=ax.transAxes,
ha="left",
va="top",
)
ax.set_title(title, fontsize=12)
ax.set_xlabel(score_col)
ax.set_ylabel("Count")
xmax = max(np.max(normal_vals), thr)
xmin = min(np.min(normal_vals), thr)
if abnormal_vals is not None and len(abnormal_vals) > 0:
xmax = max(xmax, np.max(abnormal_vals))
xmin = min(xmin, np.min(abnormal_vals))
pad = (xmax - xmin) * 0.08 if xmax > xmin else 1.0
ax.set_xlim(xmin - pad, xmax + pad)
def plot_score_summary(
normal_scores_path: str,
abnormal_scores_path: str | None,
out_dir: str,
):
normal_df = pd.read_csv(normal_scores_path)
abnormal_df = None
if abnormal_scores_path is not None and os.path.exists(abnormal_scores_path):
abnormal_df = pd.read_csv(abnormal_scores_path)
metrics = [
("iso_anom_score", "Isolation Forest"),
("ocsvm_anom_score", "One-Class SVM"),
("pca_recon_err", "PCA Reconstruction Error"),
]
fig, axes = plt.subplots(1, 3, figsize=(18, 4.8))
thresholds = {
"iso_anom_score": float(np.percentile(normal_df["iso_anom_score"], THRESH_PERCENTILE)),
"ocsvm_anom_score": float(np.percentile(normal_df["ocsvm_anom_score"], THRESH_PERCENTILE)),
"pca_recon_err": float(np.percentile(normal_df["pca_recon_err"], THRESH_PERCENTILE)),
}
for ax, (col, title) in zip(axes, metrics):
normal_vals = normal_df[col].to_numpy()
abnormal_vals = None
if abnormal_df is not None and col in abnormal_df.columns:
abnormal_vals = abnormal_df[col].to_numpy()
plot_score_panel(
ax,
normal_vals,
abnormal_vals,
thresholds[col],
f"{title} | threshold=P{THRESH_PERCENTILE:g}",
col,
)
fig.subplots_adjust(
top=0.82,
bottom=0.18,
left=0.06,
right=0.98,
wspace=0.30,
)
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(
handles,
labels,
loc="lower center",
ncol=4,
frameon=True,
)
fig.suptitle(
"Anomaly Score Distribution",
fontsize=14,
y=0.92,
)
fig.savefig(
os.path.join(out_dir, "anomaly_score_distribution.png"),
dpi=150,
bbox_inches="tight",
)
plt.show()
# ==================================================
# ▼PCA 2次元散布図を表示
# ==================================================
def plot_pca_2d(
Xn_s: np.ndarray,
Xa_s: np.ndarray | None,
normal_files: pd.Series,
abnormal_files: pd.Series | None,
out_dir: str,
):
"""
PCAを2次元に圧縮して、正常データと異常データの位置関係を見える化します。
これは判定用ではなく、動画で説明しやすくするための可視化です。
"""
pca2 = PCA(n_components=2)
Zn = pca2.fit_transform(Xn_s)
plt.figure(figsize=(8, 6))
plt.scatter(
Zn[:, 0],
Zn[:, 1],
s=70,
label="Normal",
alpha=0.8,
)
if Xa_s is not None and Xa_s.shape[0] > 0:
Za = pca2.transform(Xa_s)
plt.scatter(
Za[:, 0],
Za[:, 1],
s=110,
marker="x",
label="Abnormal",
)
for i, fname in enumerate(abnormal_files):
plt.text(
Za[i, 0],
Za[i, 1],
fname,
fontsize=8,
ha="left",
va="bottom",
)
evr1 = pca2.explained_variance_ratio_[0] * 100
evr2 = pca2.explained_variance_ratio_[1] * 100
plt.axhline(0, linewidth=0.8)
plt.axvline(0, linewidth=0.8)
plt.xlabel(f"PC1 ({evr1:.1f}%)")
plt.ylabel(f"PC2 ({evr2:.1f}%)")
plt.title("PCA 2D Map: Normal vs Abnormal")
plt.grid(alpha=0.3)
plt.legend()
plt.tight_layout()
plt.savefig(
os.path.join(out_dir, "pca_2d_map.png"),
dpi=150,
bbox_inches="tight",
)
plt.show()
# PCA座標もCSV保存します
normal_pca_df = pd.DataFrame(
{
"file": normal_files,
"class": "normal",
"PC1": Zn[:, 0],
"PC2": Zn[:, 1],
}
)
rows = [normal_pca_df]
if Xa_s is not None and Xa_s.shape[0] > 0:
Za = pca2.transform(Xa_s)
abnormal_pca_df = pd.DataFrame(
{
"file": abnormal_files,
"class": "abnormal",
"PC1": Za[:, 0],
"PC2": Za[:, 1],
}
)
rows.append(abnormal_pca_df)
pca_pos_df = pd.concat(rows, axis=0).reset_index(drop=True)
pca_pos_df.to_csv(
os.path.join(out_dir, "pca_2d_positions.csv"),
index=False,
)
# ==================================================
# ▼メイン処理
# ==================================================
def main():
script_dir = os.path.dirname(os.path.abspath(__file__))
normal_dir = os.path.join(script_dir, NORMAL_DIR_NAME)
abnormal_dir = os.path.join(script_dir, ABNORMAL_DIR_NAME)
normal_files = sorted(glob.glob(os.path.join(normal_dir, "acc_usb_highrate_*.csv")))
abnormal_files = sorted(glob.glob(os.path.join(abnormal_dir, ABNORMAL_GLOB)))
if len(normal_files) < 10:
raise RuntimeError("正常データが少ないです。10本以上を推奨します。")
print(f"Normal files : {len(normal_files)}")
print(f"Abnormal files: {len(abnormal_files)}")
# ==================================================
# 正常平均log-PSDを作成
# ==================================================
psds = []
f_ref = None
for p in normal_files:
x = load_and_resample_10s(p)[CHANNEL].to_numpy(dtype=np.float64)
f, Pxx = compute_psd(x)
if f_ref is None:
f_ref = f
else:
if f.size != f_ref.size or np.max(np.abs(f - f_ref)) > 1e-9:
raise RuntimeError("Welchの周波数軸が一致しません。設定を確認してください。")
psds.append(Pxx)
psds = np.vstack(psds)
mean_log_psd = np.mean(np.log10(psds + 1e-18), axis=0)
# ==================================================
# 特徴量表を作成
# ==================================================
normal_df = build_feature_table(normal_files, mean_log_psd)
if len(abnormal_files) > 0:
ab_df = build_feature_table(abnormal_files, mean_log_psd)
else:
ab_df = pd.DataFrame()
feature_cols = [c for c in normal_df.columns if c != "file"]
Xn = normal_df[feature_cols].to_numpy(dtype=np.float64)
# ==================================================
# スケーリング
# ==================================================
if USE_ROBUST_SCALER:
scaler = RobustScaler()
else:
scaler = StandardScaler()
Xn_s = scaler.fit_transform(Xn)
# ==================================================
# モデル1:Isolation Forest
# ==================================================
iso = IsolationForest(
n_estimators=500,
contamination="auto",
random_state=42,
)
iso.fit(Xn_s)
# score_samplesは大きいほど正常です
# 異常度にするため符号を反転します
iso_score_n = -iso.score_samples(Xn_s)
iso_thr = float(np.percentile(iso_score_n, THRESH_PERCENTILE))
# ==================================================
# モデル2:One-Class SVM
# ==================================================
oc = OneClassSVM(
kernel="rbf",
nu=0.05,
gamma="scale",
)
oc.fit(Xn_s)
# decision_functionは大きいほど正常です
# 異常度にするため符号を反転します
oc_score_n = -oc.decision_function(Xn_s).ravel()
oc_thr = float(np.percentile(oc_score_n, THRESH_PERCENTILE))
# ==================================================
# モデル3:PCA再構成誤差
# ==================================================
# 判定用PCAです
# 2次元ではなく、最大5成分を使って再構成誤差を計算します
pca = PCA(n_components=min(5, Xn_s.shape[1]))
pca.fit(Xn_s)
Xn_rec = pca.inverse_transform(pca.transform(Xn_s))
pca_err_n = np.mean((Xn_s - Xn_rec) ** 2, axis=1)
pca_thr = float(np.percentile(pca_err_n, THRESH_PERCENTILE))
# ==================================================
# 保存先
# ==================================================
out_dir = os.path.join(script_dir, "ml_anomaly_output")
os.makedirs(out_dir, exist_ok=True)
# ==================================================
# 正常データの異常スコア保存
# ==================================================
normal_scores = pd.DataFrame(
{
"file": normal_df["file"],
"iso_anom_score": iso_score_n,
"ocsvm_anom_score": oc_score_n,
"pca_recon_err": pca_err_n,
}
)
normal_scores_path = os.path.join(out_dir, "normal_scores.csv")
normal_scores.to_csv(normal_scores_path, index=False)
# ==================================================
# 異常データを評価
# ==================================================
abnormal_scores_path = None
Xa_s = None
if len(ab_df) > 0:
Xa = ab_df[feature_cols].to_numpy(dtype=np.float64)
Xa_s = scaler.transform(Xa)
iso_score_a = -iso.score_samples(Xa_s)
oc_score_a = -oc.decision_function(Xa_s).ravel()
Xa_rec = pca.inverse_transform(pca.transform(Xa_s))
pca_err_a = np.mean((Xa_s - Xa_rec) ** 2, axis=1)
ab_scores = pd.DataFrame(
{
"file": ab_df["file"],
"iso_anom_score": iso_score_a,
"iso_is_abnormal": iso_score_a > iso_thr,
"ocsvm_anom_score": oc_score_a,
"ocsvm_is_abnormal": oc_score_a > oc_thr,
"pca_recon_err": pca_err_a,
"pca_is_abnormal": pca_err_a > pca_thr,
}
)
abnormal_scores_path = os.path.join(out_dir, "abnormal_scores.csv")
ab_scores.to_csv(abnormal_scores_path, index=False)
print("\n=== Thresholds ===")
print(f"Isolation Forest threshold : {iso_thr}")
print(f"One-Class SVM threshold : {oc_thr}")
print(f"PCA recon error threshold : {pca_thr}")
print("\n=== Abnormal results ===")
print(ab_scores.sort_values("file").to_string(index=False))
# ==================================================
# グラフ1:異常スコア分布
# ==================================================
plot_score_summary(
normal_scores_path,
abnormal_scores_path,
out_dir,
)
# ==================================================
# グラフ2:PCA 2次元マップ
# ==================================================
if len(ab_df) > 0:
plot_pca_2d(
Xn_s=Xn_s,
Xa_s=Xa_s,
normal_files=normal_df["file"],
abnormal_files=ab_df["file"],
out_dir=out_dir,
)
else:
plot_pca_2d(
Xn_s=Xn_s,
Xa_s=None,
normal_files=normal_df["file"],
abnormal_files=None,
out_dir=out_dir,
)
# ==================================================
# モデル設定を保存
# ==================================================
meta = {
"FS": FS,
"TARGET_DURATION_S": TARGET_DURATION_S,
"N_SAMPLES": N_SAMPLES,
"CHANNEL": CHANNEL,
"F_MIN": F_MIN,
"F_MIN_DOM": F_MIN_DOM,
"F_MAX_DOM": F_MAX_DOM,
"N_PERSEG": N_PERSEG,
"BANDS_HZ": BANDS_HZ,
"THRESH_PERCENTILE": THRESH_PERCENTILE,
"USE_ROBUST_SCALER": USE_ROBUST_SCALER,
"thresholds": {
"iso_thr": iso_thr,
"oc_thr": oc_thr,
"pca_thr": pca_thr,
},
"feature_cols": feature_cols,
}
with open(
os.path.join(out_dir, "model_meta.json"),
"w",
encoding="utf-8",
) as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
normal_df.to_csv(
os.path.join(out_dir, "normal_features_used.csv"),
index=False,
)
if len(ab_df) > 0:
ab_df.to_csv(
os.path.join(out_dir, "abnormal_features_used.csv"),
index=False,
)
print("\nSaved to:", out_dir)
if __name__ == "__main__":
main()

コメント