概要
加速度センサーシリーズの続編です。
前回は、振動データから定速運転区間を自動で抽出しました。
これにより、人がグラフを見て解析区間を選ばなくても、安定したデータを自動で取り出せるようになりました。
しかし、定速区間が抽出できても、正常か異常か判断できません。
そこで今回は、定速区間から特徴量を計算し、正常状態と比較することで異常を検知する仕組みを作ります。
詳しくは、以下のYouTube動画をご覧ください。
配線

プログラムコード
マイコン(ESP32)
・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・1000Hz周期でデータ取得
・CSV形式でシリアル出力
#include <Wire.h>
#include <MPU6050_tockn.h>
MPU6050 mpu(Wire);
const unsigned long SAMPLE_US = 1000; // 1000us = 1000Hz
unsigned long last_us = 0;
void setup() {
Serial.begin(115200);
Wire.begin(21, 22);
Wire.setClock(400000);
mpu.begin();
mpu.calcGyroOffsets(true);
Serial.println("time_us,ax,ay,az,acc_total");
}
void loop() {
unsigned long now = micros();
if (now - last_us < SAMPLE_US) return;
last_us = now;
mpu.update();
float ax = mpu.getAccX();
float ay = mpu.getAccY();
float az = mpu.getAccZ();
float acc_total = sqrt(ax*ax + ay*ay + az*az);
Serial.print(now);
Serial.print(",");
Serial.print(ax, 4);
Serial.print(",");
Serial.print(ay, 4);
Serial.print(",");
Serial.print(az, 4);
Serial.print(",");
Serial.println(acc_total, 4);
}Python
次の2つのコードを使っています。
正常データから閾値を作るコード
import os
import glob
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths
# ==================================================
# ▼ここだけ設定すればOK
# ==================================================
FS = 1000.0
# 1000Hzで測定した前提です
TARGET_DURATION_S = 10.0
# 各CSVファイルの先頭10秒を解析します
N_SAMPLES = int(FS * TARGET_DURATION_S)
CHANNEL = "acc_total"
# 今回は3軸合成加速度を使って特徴量を作ります
F_MIN = 5.0
# 5Hz未満は低周波の揺れや傾きの影響を受けやすいため除外します
F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
# 支配周波数を探す範囲です
N_PERSEG = 2048
# Welch法でPSDを計算するときの分割長です
BANDS_HZ = [
(5.0, 10.0),
(10.0, 30.0),
(30.0, 80.0),
(80.0, 200.0),
(200.0, 400.0),
(400.0, 500.0),
]
# 周波数帯ごとの振動エネルギーを計算します
EPS = 1e-12
SAVE_PREPROCESSED_SIGNALS = True
# Trueにすると、前処理後の波形も保存します
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = SCRIPT_DIR
OUT_DIR = os.path.join(SCRIPT_DIR, "fft_normal_model_rulebased_hf")
os.makedirs(OUT_DIR, exist_ok=True)
# ==================================================
# ▼前処理:CSVを読み込み、10秒分の等間隔データに補正
# ==================================================
def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
df = pd.read_csv(csv_path)
required_cols = ["time_us", "ax", "ay", "az", "acc_total"]
for c in required_cols:
if c not in df.columns:
raise ValueError(f"列 {c} が見つかりません。file={csv_path}")
# マイクロ秒を秒に変換します
t = df["time_us"].to_numpy(dtype=np.float64) / 1e6
# 開始時刻を0秒にそろえます
t = t - t[0]
# 時刻が逆転・重複している行を除外します
keep = np.ones_like(t, dtype=bool)
keep[1:] = t[1:] > t[:-1]
t = t[keep]
df = df.loc[keep].reset_index(drop=True)
if t.size < 2:
raise ValueError(f"time列が短すぎます。file={csv_path}")
# 1000Hz、10秒分の等間隔時間軸を作ります
t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS
out = pd.DataFrame({"time_s": t_uniform})
t_min = t[0]
t_max = t[-1]
# ax, ay, az, acc_totalを等間隔データに補間します
for col in ["ax", "ay", "az", "acc_total"]:
y = df[col].to_numpy(dtype=np.float64)
f = interp1d(
t,
y,
kind="linear",
bounds_error=False,
fill_value="extrapolate",
)
y_uniform = f(t_uniform)
# 実測範囲外は0にします
valid = (t_uniform >= t_min) & (t_uniform <= t_max)
y_uniform = np.where(valid, y_uniform, 0.0)
out[col] = y_uniform
return out
# ==================================================
# ▼Welch法でPSDを計算
# ==================================================
def compute_psd(x: np.ndarray):
x = np.asarray(x, dtype=np.float64)
# DC成分を除去します
x = x - np.mean(x)
f, Pxx = welch(
x,
fs=FS,
nperseg=N_PERSEG,
)
return f, Pxx
# ==================================================
# ▼支配周波数・ピーク幅・ピークの鋭さを計算
# ==================================================
def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)
f_dom = f[m_dom]
P_dom = Pxx[m_dom]
feats = {}
if f_dom.size < 3:
feats["dom_freq_hz"] = 0.0
feats["dom_psd"] = 0.0
feats["peak_width_hz"] = 0.0
feats["peak_sharpness"] = 0.0
return feats
# PSDが最大となる周波数を支配周波数とします
idx_peak_local = int(np.argmax(P_dom))
dom_freq = float(f_dom[idx_peak_local])
dom_psd = float(P_dom[idx_peak_local])
# ピークを検出します
peaks, _ = find_peaks(P_dom)
if peaks.size == 0:
peaks = np.array([idx_peak_local], dtype=int)
# 支配周波数に最も近いピークを採用します
peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
pidx = int(peaks_sorted[0])
# ピーク幅を計算します
widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)
width_bins = float(widths[0])
df_hz = float(f_dom[1] - f_dom[0])
peak_width_hz = width_bins * df_hz
# ピーク周辺と比較して、どれだけ鋭いピークかを計算します
span_hz = 4.0
span_bins = max(1, int(round(span_hz / df_hz)))
lo = max(0, pidx - span_bins)
hi = min(P_dom.size, pidx + span_bins + 1)
if hi - lo >= 3:
neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
else:
neighborhood = P_dom[lo:hi]
if neighborhood.size > 0:
neigh_mean = float(np.mean(neighborhood))
else:
neigh_mean = float(np.mean(P_dom))
peak_sharpness = np.log10(dom_psd / (neigh_mean + EPS) + 1.0)
feats["dom_freq_hz"] = dom_freq
feats["dom_psd"] = dom_psd
feats["peak_width_hz"] = float(peak_width_hz)
feats["peak_sharpness"] = float(peak_sharpness)
return feats
# ==================================================
# ▼周波数帯ごとの特徴量を計算
# ==================================================
def spectral_features(f: np.ndarray, Pxx: np.ndarray):
feats = {}
# 5Hz以上のPSD合計を計算します
m_ge = f >= F_MIN
f_ge = f[m_ge]
P_ge = Pxx[m_ge]
feats["psd_sum_ge5"] = float(np.sum(P_ge))
# 周波数帯ごとのPSD合計を計算します
for f1, f2 in BANDS_HZ:
m = (f >= f1) & (f < f2)
feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))
# スペクトル重心を計算します
# どの周波数帯にエネルギーの中心があるかを表します
denom = float(np.sum(P_ge))
if denom > 0:
feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
else:
feats["spec_centroid_hz_ge5"] = 0.0
# 高周波成分の割合を計算します
p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))
feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)
return feats
# ==================================================
# ▼1つのCSVファイルから特徴量を抽出
# ==================================================
def extract_all_features(csv_path: str):
prep = load_and_resample_10s(csv_path)
x = prep[CHANNEL].to_numpy(dtype=np.float64)
f, Pxx = compute_psd(x)
feats = {}
feats.update(peak_features_from_psd(f, Pxx))
feats.update(spectral_features(f, Pxx))
return feats, f, Pxx, prep
# ==================================================
# ▼正常データの統計情報を作成
# ==================================================
def build_summary(features_df: pd.DataFrame) -> pd.DataFrame:
numeric_cols = [c for c in features_df.columns if c != "file"]
mu = features_df[numeric_cols].mean(axis=0)
sd = features_df[numeric_cols].std(axis=0, ddof=1)
summary = pd.DataFrame(
{
"feature": numeric_cols,
"mean": mu.values,
"std": sd.values,
"upper_3sigma": (mu + 3.0 * sd).values,
"p95": [np.percentile(features_df[c], 95) for c in numeric_cols],
"p99": [np.percentile(features_df[c], 99) for c in numeric_cols],
"min": [features_df[c].min() for c in numeric_cols],
"max": [features_df[c].max() for c in numeric_cols],
}
)
summary["std"] = summary["std"].fillna(0.0)
summary["upper_3sigma"] = summary["upper_3sigma"].fillna(summary["mean"])
return summary
# ==================================================
# ▼メイン処理
# ==================================================
def main():
# acc_usb_highrate_001.csv のような正常データをまとめて読み込みます
pattern = os.path.join(DATA_DIR, "acc_usb_highrate_*.csv")
csv_files = sorted(glob.glob(pattern))
if len(csv_files) == 0:
raise RuntimeError(f"対象CSVが見つかりません。pattern={pattern}")
print(f"Found {len(csv_files)} files.")
print(f"FS = {FS} Hz")
print(f"window = {TARGET_DURATION_S} s")
print(f"target samples = {N_SAMPLES}")
print(f"channel = {CHANNEL}")
all_feats = []
for path in csv_files:
base = os.path.basename(path)
print(f"Processing: {base}")
feats, _, _, prep = extract_all_features(path)
feats["file"] = base
all_feats.append(feats)
# 前処理後の波形を保存します
if SAVE_PREPROCESSED_SIGNALS:
out_prep = os.path.join(OUT_DIR, f"prep_{base}")
prep.to_csv(out_prep, index=False)
# ① 各CSVファイルごとのFFT特徴量
features_df = pd.DataFrame(all_feats).sort_values("file").reset_index(drop=True)
# ② 正常モデルの統計情報
summary_df = build_summary(features_df)
features_path = os.path.join(OUT_DIR, "normal_features_rulebased_hf.csv")
summary_path = os.path.join(OUT_DIR, "normal_summary_rulebased_hf.csv")
features_df.to_csv(features_path, index=False)
summary_df.to_csv(summary_path, index=False)
print("=== 完了 ===")
print(f"features: {features_path}")
print(f"summary : {summary_path}")
if __name__ == "__main__":
main()【出力されるファイル】
①CSVファイルごとのFFT特徴量
②正常モデルの平均・標準偏差
異常を判定するコード
import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths
# ==================================================
# ▼基本設定
# ==================================================
FS = 1000.0
# 第2回で1000Hz測定した前提で解析します
TARGET_DURATION_S = 10.0
# 各CSVファイルの先頭10秒を解析対象にします
N_SAMPLES = int(FS * TARGET_DURATION_S)
CHANNEL = "acc_total"
# 今回は3軸合成加速度を使って異常判定します
F_MIN = 5.0
# 5Hz未満は低周波の揺れや姿勢変化の影響を受けやすいため除外します
F_MAX_PLOT = 500.0
# PSDグラフの表示上限です
F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
# 支配周波数を探す範囲です
N_PERSEG = 2048
# Welch法でPSDを計算するときの分割長です
BANDS_HZ = [
(5.0, 10.0),
(10.0, 30.0),
(30.0, 80.0),
(80.0, 200.0),
(200.0, 400.0),
(400.0, 500.0),
]
# 周波数帯ごとの振動エネルギーを計算します
EPS = 1e-12
# ==================================================
# ▼フォルダ設定
# ==================================================
NORMAL_DIR_NAME = "normal"
# 正常データを入れるフォルダ名です
ABNORMAL_DIR_NAME = "abnormal"
# 判定対象データを入れるフォルダ名です
ABNORMAL_FILE_NAME = "acc_usb_highrate_judge.csv"
# 判定したいCSVファイル名です
# ==================================================
# ▼しきい値設定
# ==================================================
DOM_K_SIGMA = 3.0
PW_K_SIGMA = 3.0
BP3080_K_SIGMA = 3.0
BP200400_K_SIGMA = 3.0
HI200500_K_SIGMA = 3.0
# 平均±3σ、または平均+3σをしきい値にします
MIN_VIOLATIONS_TO_ABNORMAL = 2
# 2項目以上しきい値を超えたら異常と判定します
MIN_VIOLATIONS_TO_WARNING = 1
# 1項目だけ超えた場合は要注意と判定します
# ==================================================
# ▼前処理:10秒分の等間隔データに補正
# ==================================================
def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
df = pd.read_csv(csv_path)
required_cols = ["time_us", "ax", "ay", "az", "acc_total"]
for c in required_cols:
if c not in df.columns:
raise ValueError(f"列 {c} が見つかりません。file={csv_path}")
# time_usを秒に変換し、開始時刻を0秒にそろえます
t = df["time_us"].to_numpy(dtype=np.float64) / 1e6
t = t - t[0]
# 時刻の重複や逆転を除外します
keep = np.ones_like(t, dtype=bool)
keep[1:] = t[1:] > t[:-1]
t = t[keep]
df = df.loc[keep].reset_index(drop=True)
if t.size < 2:
raise ValueError(f"time列が短すぎます。file={csv_path}")
# 1000Hz、10秒分の等間隔時間軸を作ります
t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS
out = pd.DataFrame({"time_s": t_uniform})
t_min = t[0]
t_max = t[-1]
for col in ["ax", "ay", "az", "acc_total"]:
y = df[col].to_numpy(dtype=np.float64)
f = interp1d(
t,
y,
kind="linear",
bounds_error=False,
fill_value="extrapolate",
)
y_uniform = f(t_uniform)
# 実測範囲外は0にします
valid = (t_uniform >= t_min) & (t_uniform <= t_max)
y_uniform = np.where(valid, y_uniform, 0.0)
out[col] = y_uniform
return out
# ==================================================
# ▼Welch法でPSDを計算
# ==================================================
def compute_psd(x: np.ndarray):
x = np.asarray(x, dtype=np.float64)
# 平均値を引いてDC成分を除去します
x = x - np.mean(x)
f, Pxx = welch(
x,
fs=FS,
nperseg=N_PERSEG,
)
return f, Pxx
# ==================================================
# ▼支配周波数、ピーク幅、ピークの鋭さを計算
# ==================================================
def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)
f_dom = f[m_dom]
P_dom = Pxx[m_dom]
feats = {}
if f_dom.size < 3:
feats["dom_freq_hz"] = 0.0
feats["dom_psd"] = 0.0
feats["peak_width_hz"] = 0.0
feats["peak_sharpness"] = 0.0
return feats
# PSDが最大となる周波数を支配周波数とします
idx_peak_local = int(np.argmax(P_dom))
dom_freq = float(f_dom[idx_peak_local])
dom_psd = float(P_dom[idx_peak_local])
# ピークを検出します
peaks, _ = find_peaks(P_dom)
if peaks.size == 0:
peaks = np.array([idx_peak_local], dtype=int)
# 支配周波数に最も近いピークを採用します
peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
pidx = int(peaks_sorted[0])
# ピーク幅を計算します
widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)
width_bins = float(widths[0])
df_hz = float(f_dom[1] - f_dom[0])
peak_width_hz = width_bins * df_hz
# ピーク周辺と比較して、どれだけ鋭いピークかを計算します
span_hz = 4.0
span_bins = max(1, int(round(span_hz / df_hz)))
lo = max(0, pidx - span_bins)
hi = min(P_dom.size, pidx + span_bins + 1)
if hi - lo >= 3:
neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
else:
neighborhood = P_dom[lo:hi]
if neighborhood.size > 0:
neigh_mean = float(np.mean(neighborhood))
else:
neigh_mean = float(np.mean(P_dom))
peak_sharpness = np.log10(dom_psd / (neigh_mean + EPS) + 1.0)
feats["dom_freq_hz"] = dom_freq
feats["dom_psd"] = dom_psd
feats["peak_width_hz"] = float(peak_width_hz)
feats["peak_sharpness"] = float(peak_sharpness)
return feats
# ==================================================
# ▼周波数帯ごとの特徴量を計算
# ==================================================
def spectral_features(f: np.ndarray, Pxx: np.ndarray):
feats = {}
m_ge = f >= F_MIN
f_ge = f[m_ge]
P_ge = Pxx[m_ge]
# 5Hz以上の全体エネルギーです
feats["psd_sum_ge5"] = float(np.sum(P_ge))
# 周波数帯ごとのエネルギーです
for f1, f2 in BANDS_HZ:
m = (f >= f1) & (f < f2)
feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))
# スペクトル重心です
denom = float(np.sum(P_ge))
if denom > 0:
feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
else:
feats["spec_centroid_hz_ge5"] = 0.0
# 80Hz以上の高周波比です。参考値として保存します
p_hi_80_500 = float(np.sum(Pxx[f >= 80.0]))
p_lo_5_80 = float(np.sum(Pxx[(f >= 5.0) & (f < 80.0)]))
feats["hi_lo_ratio_ge80_over_5to80"] = p_hi_80_500 / (p_lo_5_80 + EPS)
# 今回の判定に使う高周波比です
p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))
feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)
return feats
# ==================================================
# ▼1つのCSVファイルから特徴量を抽出
# ==================================================
def extract_all_features(csv_path: str):
prep = load_and_resample_10s(csv_path)
x = prep[CHANNEL].to_numpy(dtype=np.float64)
f, Pxx = compute_psd(x)
feats = {}
feats.update(peak_features_from_psd(f, Pxx))
feats.update(spectral_features(f, Pxx))
return feats, f, Pxx
# ==================================================
# ▼正常データから平均・標準偏差・しきい値を作る
# ==================================================
def calc_rule_stats(values: np.ndarray, k_sigma: float, mode: str):
mu = float(np.mean(values))
sd = float(np.std(values, ddof=1))
# 標準偏差が0に近い場合のゼロ割対策です
sd = sd if sd > EPS else EPS
if mode == "two-sided":
# 支配周波数のように、上にも下にも外れる可能性がある指標です
lower = mu - k_sigma * sd
upper = mu + k_sigma * sd
elif mode == "upper":
# 振動エネルギーや高周波比のように、増加側を異常と見る指標です
lower = np.nan
upper = mu + k_sigma * sd
else:
raise ValueError(f"unknown mode: {mode}")
return mu, sd, lower, upper
# ==================================================
# ▼メイン処理
# ==================================================
def main():
script_dir = os.path.dirname(os.path.abspath(__file__))
normal_dir = os.path.join(script_dir, NORMAL_DIR_NAME)
abnormal_dir = os.path.join(script_dir, ABNORMAL_DIR_NAME)
if not os.path.isdir(normal_dir):
raise RuntimeError(f"正常フォルダが見つかりません: {normal_dir}")
if not os.path.isdir(abnormal_dir):
raise RuntimeError(f"異常フォルダが見つかりません: {abnormal_dir}")
normal_files = sorted(glob.glob(os.path.join(normal_dir, "acc_usb_highrate_*.csv")))
if len(normal_files) < 5:
raise RuntimeError("正常CSVが少なすぎます。少なくとも5本以上を推奨します。")
abnormal_file = os.path.join(abnormal_dir, ABNORMAL_FILE_NAME)
if not os.path.exists(abnormal_file):
raise RuntimeError(f"判定対象CSVが見つかりません: {abnormal_file}")
print(f"Normal files: {len(normal_files)}")
print(f"Target : {os.path.basename(abnormal_file)}")
# ==================================================
# 正常データの平均PSDを作成
# ==================================================
normal_psd_list = []
f_ref = None
for p in normal_files:
prep = load_and_resample_10s(p)
x = prep[CHANNEL].to_numpy(dtype=np.float64)
f, Pxx = compute_psd(x)
if f_ref is None:
f_ref = f
else:
if f.size != f_ref.size or np.max(np.abs(f - f_ref)) > 1e-9:
raise RuntimeError("Welchの周波数軸が一致しません。設定を確認してください。")
normal_psd_list.append(Pxx)
normal_psd = np.vstack(normal_psd_list)
mean_psd = np.mean(normal_psd, axis=0)
# ==================================================
# 正常データの特徴量を作成
# ==================================================
normal_feats = []
for p in normal_files:
feats, _, _ = extract_all_features(p)
feats["file"] = os.path.basename(p)
normal_feats.append(feats)
normal_df = pd.DataFrame(normal_feats).sort_values("file").reset_index(drop=True)
# ==================================================
# 判定対象データの特徴量を作成
# ==================================================
target_feats, f_target, Pxx_target = extract_all_features(abnormal_file)
# ==================================================
# 異常判定に使う5つの実用指標
# ==================================================
rule_defs = [
{"name": "dom_freq_hz", "mode": "two-sided", "k": DOM_K_SIGMA},
{"name": "peak_width_hz", "mode": "upper", "k": PW_K_SIGMA},
{"name": "band_psd_sum_30_80", "mode": "upper", "k": BP3080_K_SIGMA},
{"name": "band_psd_sum_200_400", "mode": "upper", "k": BP200400_K_SIGMA},
{"name": "hi_lo_ratio_200_500_over_5_200", "mode": "upper", "k": HI200500_K_SIGMA},
]
rule_results = []
print("\n=== Target features ===")
for k, v in target_feats.items():
print(f"{k:<32} {v:.6g}")
print("\n=== Rule-based judgment ===")
violation_count = 0
for rule in rule_defs:
name = rule["name"]
mode = rule["mode"]
k = rule["k"]
vals = normal_df[name].to_numpy(dtype=np.float64)
mu, sd, lower, upper = calc_rule_stats(vals, k_sigma=k, mode=mode)
x = float(target_feats[name])
if mode == "two-sided":
violation = (x < lower) or (x > upper)
print(
f"{name:<32} "
f"target={x:.6g} range=[{lower:.6g}, {upper:.6g}] "
f"violation={violation}"
)
else:
violation = x > upper
print(
f"{name:<32} "
f"target={x:.6g} upper={upper:.6g} "
f"violation={violation}"
)
if violation:
violation_count += 1
z = (x - mu) / sd if sd > EPS else 0.0
rule_results.append(
{
"feature": name,
"mode": mode,
"mean": mu,
"std": sd,
"lower": lower,
"upper": upper,
"target_value": x,
"z_score": z,
"violation": bool(violation),
}
)
# ==================================================
# 最終判定
# ==================================================
if violation_count >= MIN_VIOLATIONS_TO_ABNORMAL:
judgment = "ABNORMAL"
elif violation_count >= MIN_VIOLATIONS_TO_WARNING:
judgment = "WARNING"
else:
judgment = "NORMAL"
print("\n=== Final judgment ===")
print(f"violation_count = {violation_count}")
if judgment == "ABNORMAL":
print("→ 異常です。")
elif judgment == "WARNING":
print("→ 要注意です。")
else:
print("→ 正常です。")
# ==================================================
# グラフ1:Zスコア表示
# ==================================================
plot_names = [r["feature"] for r in rule_results]
plot_z = [r["z_score"] for r in rule_results]
plt.figure(figsize=(11, 4))
plt.bar(plot_names, plot_z)
plt.axhline(3.0, linestyle="--")
plt.axhline(-3.0, linestyle="--")
plt.title("Feature Z-score")
plt.xticks(rotation=30, ha="right")
plt.tight_layout()
plt.show()
# ==================================================
# グラフ2:正常平均PSDと判定対象PSDの比較
# ==================================================
plt.figure(figsize=(8, 5))
plt.semilogy(f_target, Pxx_target, label="Target")
plt.semilogy(f_ref, mean_psd, label="Normal Mean")
plt.xlim(0, F_MAX_PLOT)
plt.title("PSD Comparison")
plt.xlabel("Frequency [Hz]")
plt.ylabel("PSD")
plt.legend()
plt.tight_layout()
plt.show()
# ==================================================
# 判定結果を表示
# ==================================================
rule_df = pd.DataFrame(rule_results)
print("\n=== Rule summary ===")
print(
rule_df[
[
"feature",
"target_value",
"mean",
"std",
"lower",
"upper",
"z_score",
"violation",
]
]
)
# ==================================================
# CSV保存
# ==================================================
out_dir = os.path.join(script_dir, "fft_abnormal_analysis_rulebased_hf")
os.makedirs(out_dir, exist_ok=True)
normal_df.to_csv(
os.path.join(out_dir, "normal_features_rulebased_hf.csv"),
index=False,
)
target_row = pd.DataFrame(
[
{
**{"file": os.path.basename(abnormal_file)},
**target_feats,
}
]
)
target_row.to_csv(
os.path.join(out_dir, "target_features_rulebased_hf.csv"),
index=False,
)
rule_df.to_csv(
os.path.join(out_dir, "rule_thresholds_and_results_hf.csv"),
index=False,
)
summary = pd.DataFrame(
[
{
"target_file": os.path.basename(abnormal_file),
"judgment": judgment,
"violation_count": int(violation_count),
"min_violations_to_abnormal": int(MIN_VIOLATIONS_TO_ABNORMAL),
"min_violations_to_warning": int(MIN_VIOLATIONS_TO_WARNING),
}
]
)
summary.to_csv(
os.path.join(out_dir, "summary_rulebased_hf.csv"),
index=False,
)
print(f"\nSaved to: {out_dir}")
if __name__ == "__main__":
main()【注意点】
このコード自体にも正常データから閾値を計算するコードを埋め込んでいます。
一つ目のコードと内容が重複していますが、これ一本で閾値の設定から異常判定まで一括で回せるように動作確認用として作りました。
ただし、実際の現場で運用する際には、あらかじめ正常データを読み込ませてから、日々の稼働データを判定する流れと思いますので、実際には分割して運用するほうが適切と思います。


コメント