概要
加速度センサーシリーズの続編です。
今回は、振動データを使った劣化予測に挑戦します。
前回までは、正常か異常か、どの故障モードかを判定する仕組みを作ってきました。
しかし、使用者が本当に知りたいのは、「どれくらい劣化しているのか」「いつ頃故障しそうなのか」です。
そこで今回は、吸気異常を4段階に変化させたデータを使い、特徴量がどのように変化するのか、あとどのくらいで故障に至るのか予測モデルの構築に挑戦してみました。
詳しくは、以下のYouTube動画をご覧ください。
配線

プログラムコード
マイコン(ESP32)
・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・1000Hz周期でデータ取得
・CSV形式でシリアル出力
#include <Wire.h>
#include <MPU6050_tockn.h>
MPU6050 mpu(Wire);
const unsigned long SAMPLE_US = 1000; // 1000us = 1000Hz
unsigned long last_us = 0;
void setup() {
Serial.begin(115200);
Wire.begin(21, 22);
Wire.setClock(400000);
mpu.begin();
mpu.calcGyroOffsets(true);
Serial.println("time_us,ax,ay,az,acc_total");
}
void loop() {
unsigned long now = micros();
if (now - last_us < SAMPLE_US) return;
last_us = now;
mpu.update();
float ax = mpu.getAccX();
float ay = mpu.getAccY();
float az = mpu.getAccZ();
float acc_total = sqrt(ax*ax + ay*ay + az*az);
Serial.print(now);
Serial.print(",");
Serial.print(ax, 4);
Serial.print(",");
Serial.print(ay, 4);
Serial.print(",");
Serial.print(az, 4);
Serial.print(",");
Serial.println(acc_total, 4);
}Python
次の3つのコードを使用しています。①⇒②⇒③の順に出力されたファイルが次のコードのインプットになっています。
①ステージ別特徴量の作成・比較
"""
-----------------------------------------
正常状態から劣化状態までの振動データを比較し、
・どの特徴量が劣化とともに変化するのか
・劣化度合いを表せる特徴量は何か
を確認します。
今回の実験では、normal/stage1/stage2/stage3/stage4の5段階のデータを比較します。
処理の流れ
-----------------------------------------
① CSV読み込み
② time_usを使って等間隔データへ補正
③ FFT、Welch PSDを計算
④ FFT特徴量を抽出
・支配周波数
・ピーク幅
・バンドパワー
・スペクトル重心
・高周波比
⑤ ステージごとの平均を比較
⑥ 箱ひげ図でばらつきを確認
⑦ all_stage_features.csv を保存
このCSVを次のコードで読み込み、劣化スコアを作成します。
=========================================================
"""
import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths
# =============================
# ▼基本設定
# =============================
FS = 1000.0
# 第2回で1000Hz測定した前提です
TARGET_DURATION_S = 10.0
# 各CSVファイルの先頭10秒を解析します
N_SAMPLES = int(FS * TARGET_DURATION_S)
CHANNEL = "acc_total"
# 3軸合成加速度を解析対象にします
F_MIN = 5.0
# 5Hz未満は低周波の揺れや姿勢変化の影響を受けやすいため除外します
F_MAX_PLOT = 500.0
# PSDグラフの表示上限です
F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
# 支配周波数を探す範囲です
N_PERSEG = 2048
# Welch法でPSDを計算するときの分割長です
BANDS_HZ = [
(5.0, 10.0),
(10.0, 30.0),
(30.0, 80.0),
(80.0, 200.0),
(200.0, 400.0),
(400.0, 500.0),
]
EPS = 1e-12
# =============================
# ▼劣化レベルごとのフォルダ
# =============================
#
# このPythonファイルと同じ場所に、
#
# normal
# stage1
# stage2
# stage3
# stage4
#
# というフォルダを作成し、
# それぞれのCSVを入れてください。
#
# stage番号が大きいほど、
# 劣化が進んでいる想定です。
#
GROUP_DIRS = {
"normal": "normal",
"stage1": "stage1",
"stage2": "stage2",
"stage3": "stage3",
"stage4": "stage4",
}
FILE_PATTERN = "acc_usb_highrate_*.csv"
DISPLAY_NAMES = {
"normal": "Normal",
"stage1": "Stage 1",
"stage2": "Stage 2",
"stage3": "Stage 3",
"stage4": "Stage 4",
}
GROUP_COLORS = {
"normal": "blue",
"stage1": "green",
"stage2": "orange",
"stage3": "red",
"stage4": "purple",
}
# =============================
# ▼劣化評価に使う特徴量
# =============================
#
# どの特徴量が劣化に反応するかを確認します。
#
# dom_freq_hz
# 支配周波数
#
# peak_width_hz
# ピーク幅
#
# band_psd_sum_30_80
# 30~80Hz帯の振動エネルギー
#
# band_psd_sum_200_400
# 200~400Hz帯の高周波エネルギー
#
# hi_lo_ratio_200_500_over_5_200
# 高周波ノイズ比
#
# spec_centroid_hz_ge5
# スペクトル重心
#
COMPARE_FEATURES = [
"dom_freq_hz",
"peak_width_hz",
"band_psd_sum_30_80",
"band_psd_sum_200_400",
"hi_lo_ratio_200_500_over_5_200",
"spec_centroid_hz_ge5",
]
# =============================
# ▼前処理:time_usから等間隔化し、先頭10秒を抽出
# =============================
def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
"""
ESP32の測定データは、完全な等間隔とは限りません。
FFTを正しく行うために、
time_us を使って1000Hzの等間隔データへ補正します。
さらに、各CSVで比較条件をそろえるため、
先頭10秒だけを解析対象にします。
"""
df = pd.read_csv(csv_path)
required_cols = ["time_us", "ax", "ay", "az", "acc_total"]
for c in required_cols:
if c not in df.columns:
raise ValueError(f"列 {c} が見つかりません。file={csv_path}")
# time_usを秒に変換し、開始時刻を0秒にそろえます
t = df["time_us"].to_numpy(dtype=np.float64) / 1e6
t = t - t[0]
# 時刻の重複や逆行を除外します
keep = np.ones_like(t, dtype=bool)
keep[1:] = t[1:] > t[:-1]
t = t[keep]
df = df.loc[keep].reset_index(drop=True)
if t.size < 2:
raise ValueError(f"time列が短すぎます。file={csv_path}")
# 1000Hz、10秒分の等間隔時間軸を作ります
t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS
out = pd.DataFrame({"time_s": t_uniform})
t_min = t[0]
t_max = t[-1]
# ax, ay, az, acc_totalを等間隔データに補間します
for col in ["ax", "ay", "az", "acc_total"]:
y = df[col].to_numpy(dtype=np.float64)
f = interp1d(
t,
y,
kind="linear",
bounds_error=False,
fill_value="extrapolate",
)
y_uniform = f(t_uniform)
# 実測範囲外は0にします
valid = (t_uniform >= t_min) & (t_uniform <= t_max)
y_uniform = np.where(valid, y_uniform, 0.0)
out[col] = y_uniform
return out
# =============================
# ▼Welch PSD
# =============================
def compute_psd(x: np.ndarray):
"""
Welch法でPSDを計算します。
PSDを見ることで、
どの周波数にどれだけ振動エネルギーがあるかを確認できます。
Welch法は、単純なFFTよりもノイズの影響を抑えやすく、
安定したスペクトルを得やすい方法です。
"""
x = np.asarray(x, dtype=np.float64)
# 平均値を引いてDC成分を除去します
x = x - np.mean(x)
f, Pxx = welch(
x,
fs=FS,
nperseg=N_PERSEG,
)
return f, Pxx
# =============================
# ▼支配ピークの特徴量
# =============================
def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
"""
最も強い周波数成分を調べます。
抽出する特徴量:
dom_freq_hz
支配周波数
dom_psd
支配周波数の強さ
peak_width_hz
ピークの広がり
peak_sharpness
ピークの鋭さ
劣化が進むと、ピーク位置や鋭さが変化することがあります。
"""
m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)
f_dom = f[m_dom]
P_dom = Pxx[m_dom]
feats = {}
if f_dom.size < 3:
feats["dom_freq_hz"] = 0.0
feats["dom_psd"] = 0.0
feats["peak_width_hz"] = 0.0
feats["peak_sharpness"] = 0.0
return feats
# PSDが最大となる周波数を支配周波数とします
idx_peak_local = int(np.argmax(P_dom))
dom_freq = float(f_dom[idx_peak_local])
dom_psd = float(P_dom[idx_peak_local])
# ピークを検出します
peaks, _ = find_peaks(P_dom)
if peaks.size == 0:
peaks = np.array([idx_peak_local], dtype=int)
# 支配周波数に最も近いピークを採用します
peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
pidx = int(peaks_sorted[0])
# ピーク幅を計算します
widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)
width_bins = float(widths[0])
df_hz = float(f_dom[1] - f_dom[0])
peak_width_hz = width_bins * df_hz
# ピーク周辺と比較して、ピークの鋭さを計算します
span_hz = 4.0
span_bins = max(1, int(round(span_hz / df_hz)))
lo = max(0, pidx - span_bins)
hi = min(P_dom.size, pidx + span_bins + 1)
if hi - lo >= 3:
neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
else:
neighborhood = P_dom[lo:hi]
if neighborhood.size > 0:
neigh_mean = float(np.mean(neighborhood))
else:
neigh_mean = float(np.mean(P_dom))
peak_sharpness = np.log10(dom_psd / (neigh_mean + EPS) + 1.0)
feats["dom_freq_hz"] = dom_freq
feats["dom_psd"] = dom_psd
feats["peak_width_hz"] = float(peak_width_hz)
feats["peak_sharpness"] = float(peak_sharpness)
return feats
# =============================
# ▼スペクトル特徴量
# =============================
def spectral_features(f: np.ndarray, Pxx: np.ndarray):
"""
FFTスペクトル全体から特徴量を作ります。
劣化が進むと、
高周波成分や特定帯域のエネルギーが増えることがあります。
"""
feats = {}
m_ge = f >= F_MIN
f_ge = f[m_ge]
P_ge = Pxx[m_ge]
# 5Hz以上の全体エネルギーです
feats["psd_sum_ge5"] = float(np.sum(P_ge))
# 周波数帯ごとのエネルギーです
for f1, f2 in BANDS_HZ:
m = (f >= f1) & (f < f2)
feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))
# スペクトル重心です
# エネルギーの中心がどの周波数にあるかを表します
denom = float(np.sum(P_ge))
if denom > 0:
feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
else:
feats["spec_centroid_hz_ge5"] = 0.0
# 80Hz以上の高周波比です
p_hi_80_500 = float(np.sum(Pxx[f >= 80.0]))
p_lo_5_80 = float(np.sum(Pxx[(f >= 5.0) & (f < 80.0)]))
feats["hi_lo_ratio_ge80_over_5to80"] = p_hi_80_500 / (p_lo_5_80 + EPS)
# 200〜500Hzの高周波比です
p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))
feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)
return feats
# =============================
# ▼1つのCSVファイルから特徴量を抽出
# =============================
def extract_all_features(csv_path: str):
prep = load_and_resample_10s(csv_path)
x = prep[CHANNEL].to_numpy(dtype=np.float64)
f, Pxx = compute_psd(x)
feats = {}
feats.update(peak_features_from_psd(f, Pxx))
feats.update(spectral_features(f, Pxx))
return feats, f, Pxx, prep
# =============================
# ▼グループごとの特徴量を読み込む
# =============================
def load_group_features(group_name: str, folder_path: str):
"""
normal、stage1、stage2…の各フォルダからCSVを読み込み、
グループごとの特徴量、PSD、時間波形をまとめます。
"""
files = sorted(glob.glob(os.path.join(folder_path, FILE_PATTERN)))
if len(files) == 0:
raise RuntimeError(f"{group_name} のCSVが見つかりません: {folder_path}")
feat_rows = []
psd_list = []
waveform_list = []
f_ref = None
time_ref = None
for p in files:
feats, f, Pxx, prep = extract_all_features(p)
x = prep[CHANNEL].to_numpy(dtype=np.float64)
t = prep["time_s"].to_numpy(dtype=np.float64)
# 周波数軸が全ファイルで同じか確認します
if f_ref is None:
f_ref = f
else:
if f.size != f_ref.size or np.max(np.abs(f - f_ref)) > 1e-9:
raise RuntimeError(f"Welchの周波数軸が一致しません: {p}")
# 時間軸が全ファイルで同じか確認します
if time_ref is None:
time_ref = t
else:
if t.size != time_ref.size or np.max(np.abs(t - time_ref)) > 1e-12:
raise RuntimeError(f"時間軸が一致しません: {p}")
row = {
"group": group_name,
"file": os.path.basename(p),
"path": p,
}
row.update(feats)
feat_rows.append(row)
psd_list.append(Pxx)
waveform_list.append(x)
df = pd.DataFrame(feat_rows).sort_values("file").reset_index(drop=True)
psd_arr = np.vstack(psd_list)
waveform_arr = np.vstack(waveform_list)
return df, f_ref, psd_arr, time_ref, waveform_arr
# =============================
# ▼グラフ1:平均時間波形の比較
# =============================
def plot_mean_time_waveform(groups_waveform, time_ref):
"""
各ステージの平均時間波形を比較します。
時系列波形の見た目が変わるかを確認できます。
"""
plt.figure(figsize=(12, 7))
for group_name, info in groups_waveform.items():
mean_waveform = np.mean(info["waveform"], axis=0)
color = GROUP_COLORS.get(group_name, None)
label_mean = DISPLAY_NAMES.get(group_name, group_name)
plt.plot(
time_ref,
mean_waveform,
label=label_mean,
color=color,
linewidth=1.0,
)
plt.xlim(0, TARGET_DURATION_S)
plt.title("Mean Time Waveform comparison")
plt.xlabel("Time [s]")
plt.ylabel(CHANNEL)
plt.legend(fontsize=9)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# =============================
# ▼グラフ2:平均PSD比較
# =============================
def plot_mean_psd(groups_psd, f_ref):
"""
各ステージの平均PSDを比較します。
正常からstage4に向けて、
どの周波数帯が変化するかを見るグラフです。
第10回コード①の一番重要なグラフです。
"""
plt.figure(figsize=(12, 7))
for group_name, info in groups_psd.items():
mean_psd = np.mean(info["psd"], axis=0)
color = GROUP_COLORS.get(group_name, None)
label_mean = DISPLAY_NAMES.get(group_name, group_name)
plt.semilogy(
f_ref,
mean_psd,
label=label_mean,
color=color,
linewidth=1.0,
)
plt.xlim(0, F_MAX_PLOT)
plt.title("Mean PSD comparison")
plt.xlabel("Frequency [Hz]")
plt.ylabel("PSD")
plt.legend(fontsize=9)
plt.tight_layout()
plt.show()
# =============================
# ▼グラフ3:特徴量の箱ひげ図
# =============================
def plot_feature_boxplots(all_df: pd.DataFrame, feature_names):
"""
劣化ステージごとの特徴量分布を比較します。
平均値だけでなく、
ばらつきやグループ間の重なりを確認します。
劣化指標として使いやすい特徴量を探すためのグラフです。
"""
n = len(feature_names)
ncols = 2
nrows = int(np.ceil(n / ncols))
fig, axes = plt.subplots(nrows, ncols, figsize=(12, 4 * nrows))
axes = np.array(axes).reshape(-1)
group_order = list(GROUP_DIRS.keys())
for i, feat in enumerate(feature_names):
ax = axes[i]
data = []
labels = []
for g in group_order:
vals = all_df.loc[all_df["group"] == g, feat].to_numpy(dtype=np.float64)
if len(vals) > 0:
data.append(vals)
labels.append(DISPLAY_NAMES.get(g, g))
ax.boxplot(
data,
labels=labels,
showmeans=True,
showfliers=False,
meanprops=dict(
marker="^",
markerfacecolor="green",
markeredgecolor="green",
markersize=8,
),
)
ax.set_title(feat)
ax.tick_params(axis="x", rotation=20)
ax.grid(axis="y", alpha=0.3)
for j in range(i + 1, len(axes)):
fig.delaxes(axes[j])
plt.tight_layout()
plt.show()
# =============================
# ▼メイン処理
# =============================
def main():
"""
処理の流れ:
1. 各ステージのCSVを読み込む
2. FFT特徴量を抽出する
3. 平均時間波形を比較する
4. 平均PSDを比較する
5. 箱ひげ図で特徴量を比較する
6. all_stage_features.csv を保存する
このCSVが、
第10回② 劣化スコア作成コードの入力になります。
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
group_feature_tables = {}
groups_psd = {}
groups_waveform = {}
f_master = None
time_master = None
# 各ステージフォルダを順番に読み込みます
for group_name, folder_name in GROUP_DIRS.items():
folder_path = os.path.join(script_dir, folder_name)
if not os.path.isdir(folder_path):
raise RuntimeError(f"フォルダが見つかりません: {folder_path}")
df, f_ref, psd_arr, time_ref, waveform_arr = load_group_features(
group_name,
folder_path,
)
if f_master is None:
f_master = f_ref
else:
if f_ref.size != f_master.size or np.max(np.abs(f_ref - f_master)) > 1e-9:
raise RuntimeError("グループ間で周波数軸が一致しません。")
if time_master is None:
time_master = time_ref
else:
if time_ref.size != time_master.size or np.max(np.abs(time_ref - time_master)) > 1e-12:
raise RuntimeError("グループ間で時間軸が一致しません。")
group_feature_tables[group_name] = df
groups_psd[group_name] = {"psd": psd_arr}
groups_waveform[group_name] = {"waveform": waveform_arr}
print(f"{group_name}: {len(df)} files loaded")
# 全ステージの特徴量を1つの表に結合します
all_df = pd.concat(
group_feature_tables.values(),
axis=0,
ignore_index=True,
)
# ステージごとの平均・標準偏差を作成します
summary_rows = []
for group_name, df in group_feature_tables.items():
row = {
"group": group_name,
"n_files": len(df),
}
for feat in COMPARE_FEATURES:
vals = df[feat].to_numpy(dtype=np.float64)
row[f"{feat}_mean"] = float(np.mean(vals))
row[f"{feat}_std"] = float(np.std(vals, ddof=1)) if len(vals) >= 2 else 0.0
summary_rows.append(row)
summary_df = pd.DataFrame(summary_rows)
print("\n=== Group summary ===")
show_cols = ["group", "n_files"] + [f"{f}_mean" for f in COMPARE_FEATURES]
print(summary_df[show_cols])
# グラフを表示します
plot_mean_time_waveform(groups_waveform, time_master)
plot_mean_psd(groups_psd, f_master)
plot_feature_boxplots(all_df, COMPARE_FEATURES)
# 結果を保存します
out_dir = os.path.join(script_dir, "fft_stage_comparison")
os.makedirs(out_dir, exist_ok=True)
all_df.to_csv(
os.path.join(out_dir, "all_stage_features.csv"),
index=False,
)
summary_df.to_csv(
os.path.join(out_dir, "stage_summary.csv"),
index=False,
)
# ステージごとの平均PSDも保存します
psd_save_rows = []
for group_name, info in groups_psd.items():
mean_psd = np.mean(info["psd"], axis=0)
tmp = pd.DataFrame(
{
"frequency_hz": f_master,
"group": group_name,
"mean_psd": mean_psd,
}
)
psd_save_rows.append(tmp)
mean_psd_df = pd.concat(psd_save_rows, axis=0, ignore_index=True)
mean_psd_df.to_csv(
os.path.join(out_dir, "mean_psd_by_stage.csv"),
index=False,
)
# ステージごとの平均時間波形も保存します
waveform_save_rows = []
for group_name, info in groups_waveform.items():
mean_waveform = np.mean(info["waveform"], axis=0)
tmp = pd.DataFrame(
{
"time_s": time_master,
"group": group_name,
f"mean_{CHANNEL}": mean_waveform,
}
)
waveform_save_rows.append(tmp)
mean_waveform_df = pd.concat(waveform_save_rows, axis=0, ignore_index=True)
mean_waveform_df.to_csv(
os.path.join(out_dir, "mean_time_waveform_by_stage.csv"),
index=False,
)
print(f"\nSaved to: {out_dir}")
if __name__ == "__main__":
main()②劣化スコア作成、劣化ステージ予測モデル作成
"""
-----------------------------------------
コード①で作成した all_stage_features.csv を読み込み、
・正常基準で特徴量をZスコア化する
・劣化が進むほど大きくなるスコアを作る
・ステージごとのスコア変化を確認する
・スコアから劣化ステージを推定するモデルを作る
ことを目的にします。
このコードの位置付け
-----------------------------------------
コード①:劣化ステージごとのFFT特徴量を抽出する
コード②:特徴量を使って、劣化スコアと予測式を作る
コード③:日次点検データに劣化スコアと予測式を適用する
処理の流れ
-----------------------------------------
① all_stage_features.csv を読み込む
② normal、stage1、stage2、stage3、stage4 を数値に変換する
③ 正常データを基準にZスコア化する
④ 劣化が進むほど大きくなる単調劣化スコアを作る
⑤ ステージごとのスコア分布を確認する
⑥ 線形回帰で「スコア → 劣化ステージ」の予測式を作る
⑦ 結果CSVを保存する
=========================================================
"""
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
# =========================
# ▼基本設定
# =========================
INPUT_CSV = "all_stage_features.csv"
# コード①で作成したCSVです
OUTPUT_DIR = "degradation_monotonic_analysis"
# 結果の保存先フォルダです
EPS = 1e-12
# =========================
# ▼劣化スコアに使う特徴量
# =========================
#
# 正常基準でZスコア化したうえで、
# 劣化が進むほどスコアが増えるようにします。
#
# dom_freq_hz
# 支配周波数です。
# 今回の実験では、劣化が進むほど下がる想定です。
# そのため符号を反転して使います。
#
# band_psd_sum_30_80
# 30~80Hz帯の振動エネルギーです。
# 今回の実験では、劣化が進むほど増える想定です。
# そのためそのまま使います。
#
# 係数が -1.0 の特徴量:
# 劣化で値が下がるため、符号を反転します。
#
# 係数が 1.0 の特徴量:
# 劣化で値が上がるため、そのまま使います。
#
MONOTONIC_FEATURE_RULES = {
"dom_freq_hz": -1.0,
"band_psd_sum_30_80": 1.0,
}
# =========================
# ▼ステージ番号の定義
# =========================
#
# normal を0、
# stage1~stage4 を1~4として扱います。
#
# この数値を教師データとして、
# スコアからステージを予測するモデルを作ります。
#
STAGE_MAP = {
"normal": 0,
"stage1": 1,
"stage2": 2,
"stage3": 3,
"stage4": 4,
}
# =========================
# ▼データ読み込み
# =========================
def load_data(input_csv: str) -> pd.DataFrame:
"""
コード①で作成した all_stage_features.csv を読み込みます。
group列には、
normal、stage1、stage2、stage3、stage4
が入っている前提です。
"""
df = pd.read_csv(input_csv)
required_cols = ["group"] + list(MONOTONIC_FEATURE_RULES.keys())
for c in required_cols:
if c not in df.columns:
raise ValueError(f"必要列 {c} が見つかりません。")
df = df.copy()
# group名を数値ステージに変換します
df["stage_num"] = df["group"].map(STAGE_MAP)
if df["stage_num"].isna().any():
bad = df.loc[df["stage_num"].isna(), "group"].unique()
raise ValueError(f"stage_num に変換できない group があります: {bad}")
return df
# =========================
# ▼正常基準でZスコア化
# =========================
def add_normal_based_zscores(
df: pd.DataFrame,
feature_names,
):
"""
normalデータの平均と標準偏差を基準にして、
各特徴量をZスコア化します。
Zスコアとは、
Z = (測定値 - 正常平均) / 正常標準偏差
です。
これにより、
単位や大きさが異なる特徴量を同じ尺度で比較できます。
"""
out = df.copy()
normal_df = out[out["group"] == "normal"].copy()
if len(normal_df) == 0:
raise ValueError("normal データがありません。")
stats_rows = []
for feat in feature_names:
mu = float(normal_df[feat].mean())
sd = float(normal_df[feat].std(ddof=1))
if sd <= EPS:
sd = EPS
out[f"{feat}_z"] = (out[feat] - mu) / sd
stats_rows.append(
{
"feature": feat,
"normal_mean": mu,
"normal_std": sd,
}
)
stats_df = pd.DataFrame(stats_rows)
return out, stats_df
# =========================
# ▼単調劣化スコア作成
# =========================
def add_monotonic_degradation_score(
df: pd.DataFrame,
feature_rules: dict,
):
"""
複数の特徴量を組み合わせて、
劣化が進むほど大きくなるスコアを作ります。
今回は、
score = -1 × dom_freq_hz_z
+1 × band_psd_sum_30_80_z
としています。
支配周波数が下がる変化と、
30~80Hz帯エネルギーが増える変化を、
どちらも「劣化が進む方向」として加算します。
"""
out = df.copy()
score = np.zeros(len(out), dtype=np.float64)
detail_rows = []
for feat, coef in feature_rules.items():
z_col = f"{feat}_z"
if z_col not in out.columns:
raise ValueError(f"{z_col} が見つかりません。")
contribution = coef * out[z_col].to_numpy(dtype=np.float64)
score += contribution
detail_rows.append(
{
"feature": feat,
"z_column": z_col,
"coefficient": coef,
"meaning": (
"increase_with_degradation"
if coef > 0
else "decrease_with_degradation_then_sign_flipped"
),
}
)
out["score_degradation_monotonic"] = score
detail_df = pd.DataFrame(detail_rows)
return out, detail_df
# =========================
# ▼ステージごとのスコア集計
# =========================
def summarize_by_stage(df: pd.DataFrame):
"""
各ステージごとに、
劣化スコアの平均、標準偏差、最小値、最大値を集計します。
ステージが進むほどスコア平均が大きくなっていれば、
劣化スコアとして使いやすいことが分かります。
"""
rows = []
for g, sub in df.groupby("group"):
row = {
"group": g,
"stage_num": int(sub["stage_num"].iloc[0]),
"n_files": len(sub),
"score_mean": float(sub["score_degradation_monotonic"].mean()),
"score_std": (
float(sub["score_degradation_monotonic"].std(ddof=1))
if len(sub) >= 2
else 0.0
),
"score_min": float(sub["score_degradation_monotonic"].min()),
"score_max": float(sub["score_degradation_monotonic"].max()),
}
rows.append(row)
summary_df = pd.DataFrame(rows).sort_values("stage_num").reset_index(drop=True)
return summary_df
# =========================
# ▼可視化①:スコア分布
# =========================
def plot_score_by_stage(df: pd.DataFrame):
"""
各ステージの劣化スコアを散布図で確認します。
stage番号が大きくなるほど、
点が上に移動していれば、
劣化スコアとして機能していると判断できます。
"""
groups_in_order = ["normal", "stage1", "stage2", "stage3", "stage4"]
plt.figure(figsize=(9, 5))
for g in groups_in_order:
sub = df[df["group"] == g]
if len(sub) == 0:
continue
plt.scatter(
[g] * len(sub),
sub["score_degradation_monotonic"],
s=70,
)
plt.title("Monotonic Degradation Score by Stage")
plt.ylabel("Score")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# =========================
# ▼可視化②:劣化カーブ
# =========================
def plot_degradation_curve(summary_df: pd.DataFrame):
"""
ステージごとの平均劣化スコアを線で結びます。
劣化が進むほどスコアが単調に上がっているかを確認します。
"""
plt.figure(figsize=(8, 5))
plt.plot(
summary_df["stage_num"],
summary_df["score_mean"],
marker="o",
)
plt.xlabel("Stage")
plt.ylabel("Mean Degradation Score")
plt.title("Monotonic Degradation Curve")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# =========================
# ▼可視化③:ステージ予測モデル
# =========================
def fit_and_plot_prediction(df: pd.DataFrame):
"""
劣化スコアからステージ番号を推定するモデルを作ります。
今回は分かりやすさを優先して、
線形回帰を使います。
stage = a × score + b
という形で、
劣化スコアからステージを推定します。
"""
X = df[["score_degradation_monotonic"]].to_numpy(dtype=np.float64)
y = df["stage_num"].to_numpy(dtype=np.float64)
model = LinearRegression()
model.fit(X, y)
coef = float(model.coef_[0])
intercept = float(model.intercept_)
score_min = float(df["score_degradation_monotonic"].min())
score_max = float(df["score_degradation_monotonic"].max())
x_line = np.linspace(
score_min - 0.1 * abs(score_min),
score_max + 0.1 * abs(score_max),
100,
)
y_line = model.predict(x_line.reshape(-1, 1))
plt.figure(figsize=(8, 5))
plt.scatter(
df["score_degradation_monotonic"],
df["stage_num"],
label="data",
s=70,
)
plt.plot(
x_line,
y_line,
color="red",
label="prediction",
)
plt.xlabel("Monotonic Degradation Score")
plt.ylabel("Stage")
plt.title("Prediction Model")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return model, coef, intercept
# =========================
# ▼可視化④:特徴量寄与の確認
# =========================
def plot_feature_contributions(
df: pd.DataFrame,
feature_rules: dict,
):
"""
劣化スコアに対して、
どの特徴量がどれだけ寄与しているかを確認します。
今回は、
・支配周波数の低下
・30~80Hz帯エネルギーの増加
の2つが、スコアにどのように効いているかを見ます。
"""
groups_in_order = ["normal", "stage1", "stage2", "stage3", "stage4"]
rows = []
for g in groups_in_order:
sub = df[df["group"] == g]
if len(sub) == 0:
continue
row = {"group": g}
for feat, coef in feature_rules.items():
row[f"{feat}_contribution_mean"] = float(
(coef * sub[f"{feat}_z"]).mean()
)
rows.append(row)
contrib_df = pd.DataFrame(rows)
plt.figure(figsize=(9, 5))
x = np.arange(len(contrib_df))
bottom = np.zeros(len(contrib_df), dtype=np.float64)
for feat, coef in feature_rules.items():
col = f"{feat}_contribution_mean"
vals = contrib_df[col].to_numpy(dtype=np.float64)
plt.bar(
x,
vals,
bottom=bottom,
label=feat,
width=0.65,
)
bottom += vals
plt.xticks(x, contrib_df["group"])
plt.ylabel("Mean Contribution to Score")
plt.title("Feature Contributions to Monotonic Score")
plt.legend()
plt.grid(axis="y", alpha=0.3)
plt.tight_layout()
plt.show()
return contrib_df
# =========================
# ▼メイン処理
# =========================
def main():
script_dir = os.path.dirname(os.path.abspath(__file__))
input_csv = os.path.join(script_dir, INPUT_CSV)
out_dir = os.path.join(script_dir, OUTPUT_DIR)
os.makedirs(out_dir, exist_ok=True)
# 1. コード①で作成した特徴量CSVを読み込みます
df = load_data(input_csv)
# 2. 正常基準でZスコア化します
feature_names = list(MONOTONIC_FEATURE_RULES.keys())
df_z, normal_stats_df = add_normal_based_zscores(
df,
feature_names,
)
# 3. 劣化が進むほど大きくなるスコアを作ります
df_score, score_rule_df = add_monotonic_degradation_score(
df_z,
MONOTONIC_FEATURE_RULES,
)
# 4. ステージごとにスコアを集計します
summary_df = summarize_by_stage(df_score)
print("\n=== Normal-based statistics ===")
print(normal_stats_df)
print("\n=== Monotonic score rules ===")
print(score_rule_df)
print("\n=== Stage summary ===")
print(summary_df)
# 5. グラフを表示します
plot_score_by_stage(df_score)
plot_degradation_curve(summary_df)
model, coef, intercept = fit_and_plot_prediction(df_score)
contrib_df = plot_feature_contributions(
df_score,
MONOTONIC_FEATURE_RULES,
)
print("\n=== Prediction model ===")
print(
f"stage = {coef:.6f} "
f"* score_degradation_monotonic + {intercept:.6f}"
)
# 6. 結果を保存します
df_score.to_csv(
os.path.join(out_dir, "all_stage_features_with_monotonic_score.csv"),
index=False,
)
normal_stats_df.to_csv(
os.path.join(out_dir, "normal_stats_for_monotonic_score.csv"),
index=False,
)
score_rule_df.to_csv(
os.path.join(out_dir, "monotonic_score_rule.csv"),
index=False,
)
summary_df.to_csv(
os.path.join(out_dir, "monotonic_score_stage_summary.csv"),
index=False,
)
contrib_df.to_csv(
os.path.join(out_dir, "monotonic_score_feature_contributions.csv"),
index=False,
)
model_df = pd.DataFrame(
[
{
"model": "LinearRegression",
"target": "stage_num",
"input": "score_degradation_monotonic",
"coef": coef,
"intercept": intercept,
}
]
)
model_df.to_csv(
os.path.join(out_dir, "monotonic_prediction_model.csv"),
index=False,
)
print(f"\nSaved to: {out_dir}")
if __name__ == "__main__":
main()③日次点検データの劣化判定・残寿命予測
"""
-----------------------------------------
コード②で作成した、
・正常基準統計
・劣化スコアルール
・ステージ予測モデル
を使って、daily_checkフォルダ内の測定データを評価します。
このコードで分かること
-----------------------------------------
・現在の劣化スコア
・前回からの変化量
・推定劣化ステージ
・Normal / Caution / Warning / Critical 判定
・危険レベルに到達する予測日
・残り日数の目安
処理の流れ
-----------------------------------------
① daily_check フォルダのCSVを読み込む
② FFT特徴量を抽出する
③ 正常基準でZスコア化する
④ 劣化スコアを計算する
⑤ ステージ予測モデルで劣化ステージを推定する
⑥ スコア・ステージ・急変量から最終判定する
⑦ 劣化スコアのトレンドから危険到達日を予測する
⑧ グラフとCSVを出力する
=========================================================
"""
import os
import re
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from scipy.interpolate import interp1d
from scipy.signal import welch, find_peaks, peak_widths
from sklearn.linear_model import LinearRegression
# =============================
# ▼基本設定
# =============================
FS = 1000.0
# 第2回で1000Hz測定した前提です
TARGET_DURATION_S = 10.0
# 各CSVファイルの先頭10秒を解析します
N_SAMPLES = int(FS * TARGET_DURATION_S)
CHANNEL = "acc_total"
# 3軸合成加速度を解析対象にします
F_MIN = 5.0
# 5Hz未満は低周波の揺れや姿勢変化の影響を受けやすいため除外します
F_MAX_PLOT = 500.0
F_MIN_DOM = 5.0
F_MAX_DOM = 200.0
# 支配周波数を探す範囲です
N_PERSEG = 2048
# Welch法でPSDを計算するときの分割長です
EPS = 1e-12
BANDS_HZ = [
(5.0, 10.0),
(10.0, 30.0),
(30.0, 80.0),
(80.0, 200.0),
(200.0, 400.0),
(400.0, 500.0),
]
# =============================
# ▼ユーザーが主に変更する場所
# =============================
DAILY_CHECK_DIR = "daily_check"
# 日次点検データを入れるフォルダです
FILE_PATTERN = "*.csv"
# daily_checkフォルダ内で読み込むCSVのパターンです
NORMAL_STATS_CSV = "normal_stats_for_monotonic_score.csv"
# コード②で作成した正常基準統計です
SCORE_RULE_CSV = "monotonic_score_rule.csv"
# コード②で作成した劣化スコアルールです
PRED_MODEL_CSV = "monotonic_prediction_model.csv"
# コード②で作成したステージ予測モデルです
OUTPUT_DIR = "daily_check_result"
# 結果保存先です
USE_MOVING_AVERAGE = True
# トレンドグラフに移動平均を表示するかどうかです
MOVING_AVERAGE_WINDOW = 3
# 移動平均の点数です
# =============================
# ▼スコアの閾値
# =============================
#
# score_degradation_monotonic の値によって、
# Normal / Caution / Warning / Critical を判定します。
#
THRESHOLD_CAUTION = -5.0
THRESHOLD_WARNING = 0.0
THRESHOLD_DANGER = 5.0
# =============================
# ▼急変判定の閾値
# =============================
#
# 前回測定からスコアが急に上がった場合は、
# 通常の劣化レベルよりも優先して警告します。
#
RAPID_CHANGE_THRESHOLD_WARNING = 2.5
RAPID_CHANGE_THRESHOLD_CRITICAL = 4.0
# =============================
# ▼ステージ判定の閾値
# =============================
STAGE_WARNING_THRESHOLD = 2.0
STAGE_CRITICAL_THRESHOLD = 3.0
# =============================
# ▼残寿命予測の設定
# =============================
MIN_POINTS_FOR_FORECAST = 3
# 予測に必要な最小データ点数です
FORECAST_HORIZON_DAYS = 30
# 予測線を描く最大日数です
FORECAST_USE_LAST_N = None
# Noneなら全点で回帰します
# 例:5にすると直近5点だけで回帰します
# =============================
# ▼表示名
# =============================
STAGE_NAME_MAP = {
0: "Normal",
1: "Stage 1",
2: "Stage 2",
3: "Stage 3",
4: "Stage 4",
}
LEVEL_NAME_MAP = {
0: "Normal",
1: "Caution",
2: "Warning",
3: "Critical",
}
LEVEL_COLOR_MAP = {
0: "blue",
1: "green",
2: "orange",
3: "red",
}
STAGE_COLOR_MAP = {
0: "blue",
1: "green",
2: "orange",
3: "red",
4: "purple",
}
# =============================
# ▼前処理:1000Hz、10秒にそろえる
# =============================
def load_and_resample_10s(csv_path: str) -> pd.DataFrame:
"""
CSVを読み込み、time_usを使って1000Hzの等間隔データに補正します。
ESP32の測定データは、厳密には完全な等間隔ではないため、
FFT前に時間軸をそろえます。
"""
df = pd.read_csv(csv_path)
required_cols = ["time_us", "ax", "ay", "az", "acc_total"]
for c in required_cols:
if c not in df.columns:
raise ValueError(f"列 {c} が見つかりません。file={csv_path}")
t = df["time_us"].to_numpy(dtype=np.float64) / 1e6
t = t - t[0]
# 時刻の重複や逆行を除外します
keep = np.ones_like(t, dtype=bool)
keep[1:] = t[1:] > t[:-1]
t = t[keep]
df = df.loc[keep].reset_index(drop=True)
if t.size < 2:
raise ValueError(f"time列が短すぎます。file={csv_path}")
t_uniform = np.arange(N_SAMPLES, dtype=np.float64) / FS
out = pd.DataFrame({"time_s": t_uniform})
t_min = t[0]
t_max = t[-1]
for col in ["ax", "ay", "az", "acc_total"]:
y = df[col].to_numpy(dtype=np.float64)
f = interp1d(
t,
y,
kind="linear",
bounds_error=False,
fill_value="extrapolate",
)
y_uniform = f(t_uniform)
valid = (t_uniform >= t_min) & (t_uniform <= t_max)
y_uniform = np.where(valid, y_uniform, 0.0)
out[col] = y_uniform
return out
# =============================
# ▼Welch法でPSDを計算
# =============================
def compute_psd(x: np.ndarray):
"""
Welch法でPSDを計算します。
PSDを見ることで、
どの周波数帯にどれだけ振動エネルギーがあるかを確認できます。
"""
x = np.asarray(x, dtype=np.float64)
# DC成分を除去します
x = x - np.mean(x)
f, Pxx = welch(
x,
fs=FS,
nperseg=N_PERSEG,
)
return f, Pxx
# =============================
# ▼支配ピークの特徴量
# =============================
def peak_features_from_psd(f: np.ndarray, Pxx: np.ndarray):
"""
支配周波数、支配周波数の強さ、ピーク幅、ピークの鋭さを計算します。
"""
m_dom = (f >= F_MIN_DOM) & (f <= F_MAX_DOM)
f_dom = f[m_dom]
P_dom = Pxx[m_dom]
feats = {}
if f_dom.size < 3:
feats["dom_freq_hz"] = 0.0
feats["dom_psd"] = 0.0
feats["peak_width_hz"] = 0.0
feats["peak_sharpness"] = 0.0
return feats
idx_peak_local = int(np.argmax(P_dom))
dom_freq = float(f_dom[idx_peak_local])
dom_psd = float(P_dom[idx_peak_local])
peaks, _ = find_peaks(P_dom)
if peaks.size == 0:
peaks = np.array([idx_peak_local], dtype=int)
peaks_sorted = peaks[np.argsort(np.abs(peaks - idx_peak_local))]
pidx = int(peaks_sorted[0])
widths, _, _, _ = peak_widths(P_dom, [pidx], rel_height=0.5)
width_bins = float(widths[0])
df_hz = float(f_dom[1] - f_dom[0])
peak_width_hz = width_bins * df_hz
span_hz = 4.0
span_bins = max(1, int(round(span_hz / df_hz)))
lo = max(0, pidx - span_bins)
hi = min(P_dom.size, pidx + span_bins + 1)
if hi - lo >= 3:
neighborhood = np.concatenate([P_dom[lo:pidx], P_dom[pidx + 1:hi]])
else:
neighborhood = P_dom[lo:hi]
if neighborhood.size > 0:
neigh_mean = float(np.mean(neighborhood))
else:
neigh_mean = float(np.mean(P_dom))
peak_sharpness = np.log10(dom_psd / (neigh_mean + EPS) + 1.0)
feats["dom_freq_hz"] = dom_freq
feats["dom_psd"] = dom_psd
feats["peak_width_hz"] = float(peak_width_hz)
feats["peak_sharpness"] = float(peak_sharpness)
return feats
# =============================
# ▼スペクトル特徴量
# =============================
def spectral_features(f: np.ndarray, Pxx: np.ndarray):
"""
FFTスペクトル全体から特徴量を作ります。
コード②で使った特徴量と同じ定義にしておくことで、
日次データにも同じ劣化スコアを適用できます。
"""
feats = {}
m_ge = f >= F_MIN
f_ge = f[m_ge]
P_ge = Pxx[m_ge]
feats["psd_sum_ge5"] = float(np.sum(P_ge))
for f1, f2 in BANDS_HZ:
m = (f >= f1) & (f < f2)
feats[f"band_psd_sum_{f1:g}_{f2:g}"] = float(np.sum(Pxx[m]))
denom = float(np.sum(P_ge))
if denom > 0:
feats["spec_centroid_hz_ge5"] = float(np.sum(f_ge * P_ge) / denom)
else:
feats["spec_centroid_hz_ge5"] = 0.0
p_hi_80_500 = float(np.sum(Pxx[f >= 80.0]))
p_lo_5_80 = float(np.sum(Pxx[(f >= 5.0) & (f < 80.0)]))
feats["hi_lo_ratio_ge80_over_5to80"] = p_hi_80_500 / (p_lo_5_80 + EPS)
p_hi_200_500 = float(np.sum(Pxx[(f >= 200.0) & (f <= 500.0)]))
p_lo_5_200 = float(np.sum(Pxx[(f >= 5.0) & (f < 200.0)]))
feats["hi_lo_ratio_200_500_over_5_200"] = p_hi_200_500 / (p_lo_5_200 + EPS)
return feats
# =============================
# ▼1ファイル分の特徴量抽出
# =============================
def extract_all_features(csv_path: str):
prep = load_and_resample_10s(csv_path)
x = prep[CHANNEL].to_numpy(dtype=np.float64)
f, Pxx = compute_psd(x)
feats = {}
feats.update(peak_features_from_psd(f, Pxx))
feats.update(spectral_features(f, Pxx))
return feats, f, Pxx
# =============================
# ▼ファイル名から日付を抽出
# =============================
def extract_date_from_filename(file_name: str):
"""
ファイル名から日付を抽出します。
対応例:
acc_2026-06-01.csv
acc_20260601.csv
日付が取れない場合は NaT になります。
"""
m1 = re.search(r"(20\d{2}-\d{2}-\d{2})", file_name)
if m1:
return pd.to_datetime(m1.group(1), format="%Y-%m-%d")
m2 = re.search(r"(20\d{6})", file_name)
if m2:
return pd.to_datetime(m2.group(1), format="%Y%m%d")
return pd.NaT
# =============================
# ▼コード②の出力ファイルを読み込む
# =============================
def load_normal_stats(csv_path: str):
df = pd.read_csv(csv_path)
required_cols = ["feature", "normal_mean", "normal_std"]
for c in required_cols:
if c not in df.columns:
raise ValueError(f"{csv_path} に必要列 {c} がありません。")
return df
def load_score_rule(csv_path: str):
df = pd.read_csv(csv_path)
required_cols = ["feature", "z_column", "coefficient"]
for c in required_cols:
if c not in df.columns:
raise ValueError(f"{csv_path} に必要列 {c} がありません。")
return df
def load_prediction_model(csv_path: str):
df = pd.read_csv(csv_path)
required_cols = ["coef", "intercept"]
for c in required_cols:
if c not in df.columns:
raise ValueError(f"{csv_path} に必要列 {c} がありません。")
return float(df.loc[0, "coef"]), float(df.loc[0, "intercept"])
# =============================
# ▼日次点検データを特徴量テーブルに変換
# =============================
def build_daily_feature_table(daily_dir: str, file_pattern: str):
files = sorted(glob.glob(os.path.join(daily_dir, file_pattern)))
if len(files) == 0:
raise RuntimeError(f"CSVが見つかりません: {daily_dir}")
rows = []
for p in files:
file_name = os.path.basename(p)
check_date = extract_date_from_filename(file_name)
feats, _, _ = extract_all_features(p)
row = {
"file": file_name,
"date": check_date,
}
row.update(feats)
rows.append(row)
df = pd.DataFrame(rows)
df = df.sort_values(["date", "file"]).reset_index(drop=True)
return df
# =============================
# ▼正常基準でZスコア化
# =============================
def add_normal_based_zscores(
df: pd.DataFrame,
normal_stats_df: pd.DataFrame,
):
"""
コード②で作成した正常基準を使って、
daily_checkデータをZスコア化します。
"""
out = df.copy()
for _, row in normal_stats_df.iterrows():
feat = row["feature"]
mu = float(row["normal_mean"])
sd = float(row["normal_std"])
if sd <= EPS:
sd = EPS
if feat not in out.columns:
raise ValueError(f"特徴量 {feat} が daily_check データにありません。")
out[f"{feat}_z"] = (out[feat] - mu) / sd
return out
# =============================
# ▼劣化スコア計算
# =============================
def add_monotonic_score(
df: pd.DataFrame,
score_rule_df: pd.DataFrame,
):
"""
コード②で作成したスコアルールを使い、
劣化スコアを計算します。
例:
score = -1 × dom_freq_hz_z
+1 × band_psd_sum_30_80_z
"""
out = df.copy()
score = np.zeros(len(out), dtype=np.float64)
for _, row in score_rule_df.iterrows():
z_col = row["z_column"]
coef = float(row["coefficient"])
if z_col not in out.columns:
raise ValueError(f"Zスコア列 {z_col} がありません。")
score += coef * out[z_col].to_numpy(dtype=np.float64)
out["score_degradation_monotonic"] = score
return out
# =============================
# ▼ステージ推定
# =============================
def add_predicted_stage(
df: pd.DataFrame,
coef: float,
intercept: float,
):
"""
コード②で作成した線形回帰モデルを使い、
劣化スコアからステージを推定します。
"""
out = df.copy()
out["pred_stage_continuous"] = (
coef * out["score_degradation_monotonic"] + intercept
)
out["pred_stage_continuous_clipped"] = out["pred_stage_continuous"].clip(
lower=0.0,
upper=4.0,
)
out["pred_stage_rounded"] = np.round(
out["pred_stage_continuous_clipped"]
).astype(int)
out["pred_stage_label"] = out["pred_stage_rounded"].map(STAGE_NAME_MAP)
return out
# =============================
# ▼レベル判定
# =============================
def score_to_level(score: float) -> int:
if score >= THRESHOLD_DANGER:
return 3
elif score >= THRESHOLD_WARNING:
return 2
elif score >= THRESHOLD_CAUTION:
return 1
else:
return 0
def stage_to_level(pred_stage: float) -> int:
if pred_stage >= STAGE_CRITICAL_THRESHOLD:
return 3
elif pred_stage >= STAGE_WARNING_THRESHOLD:
return 2
elif pred_stage >= 1.0:
return 1
else:
return 0
def delta_to_level(delta_score: float) -> int:
if delta_score >= RAPID_CHANGE_THRESHOLD_CRITICAL:
return 3
elif delta_score >= RAPID_CHANGE_THRESHOLD_WARNING:
return 2
else:
return 0
# =============================
# ▼最終判定
# =============================
def add_final_judgment(df: pd.DataFrame):
"""
最終判定は、次の3つの観点の最大値で決めます。
① スコア水準
② 推定ステージ
③ 前回からの急変量
急変がある場合は、劣化スコアがまだ低くても警告します。
"""
out = df.copy().sort_values(["date", "file"]).reset_index(drop=True)
score_based_levels = []
stage_based_levels = []
rapid_change_levels = []
delta_scores = []
final_levels = []
comments = []
prev_score = None
for _, row in out.iterrows():
score = float(row["score_degradation_monotonic"])
pred_stage = float(row["pred_stage_continuous_clipped"])
score_level = score_to_level(score)
stage_level = stage_to_level(pred_stage)
if prev_score is None:
delta_score = np.nan
rapid_level = 0
else:
delta_score = score - prev_score
rapid_level = delta_to_level(delta_score)
final_level = max(score_level, stage_level, rapid_level)
if rapid_level >= 2:
if final_level == 3:
comment = "急変が検出されました。重大異常の可能性があります。至急点検してください。"
else:
comment = "急変が検出されました。通常劣化より優先して確認してください。"
else:
if final_level == 0:
comment = "Normal. Continue routine monitoring."
elif final_level == 1:
comment = "Caution. A slight change is observed. Continue trend monitoring."
elif final_level == 2:
comment = "Warning. Degradation is progressing or abnormal change is detected. Increase inspection frequency."
else:
comment = "Critical. Severe abnormality or rapid change is detected. Immediate inspection is recommended."
score_based_levels.append(score_level)
stage_based_levels.append(stage_level)
rapid_change_levels.append(rapid_level)
delta_scores.append(delta_score)
final_levels.append(final_level)
comments.append(comment)
prev_score = score
out["delta_score_from_previous"] = delta_scores
out["score_based_level"] = score_based_levels
out["score_based_label"] = [LEVEL_NAME_MAP[x] for x in score_based_levels]
out["stage_based_level"] = stage_based_levels
out["stage_based_label"] = [LEVEL_NAME_MAP[x] for x in stage_based_levels]
out["rapid_change_level"] = rapid_change_levels
out["rapid_change_label"] = [LEVEL_NAME_MAP[x] for x in rapid_change_levels]
out["final_level"] = final_levels
out["final_label"] = [LEVEL_NAME_MAP[x] for x in final_levels]
out["judgment_comment"] = comments
return out
# =============================
# ▼残寿命予測
# =============================
def fit_score_forecast(df: pd.DataFrame):
"""
劣化スコアの時系列に線形回帰を当てはめ、
Dangerしきい値に到達する日を予測します。
これは簡易的な残寿命予測です。
実設備では、より多くのデータと妥当性検証が必要です。
"""
plot_df = df.copy().sort_values("date").reset_index(drop=True)
if len(plot_df) < MIN_POINTS_FOR_FORECAST:
return {
"forecast_available": False,
"reason": f"Need at least {MIN_POINTS_FOR_FORECAST} points.",
}
if plot_df["date"].isna().any():
return {
"forecast_available": False,
"reason": "Some dates are missing.",
}
fit_df = plot_df.copy()
if FORECAST_USE_LAST_N is not None and len(fit_df) > FORECAST_USE_LAST_N:
fit_df = fit_df.iloc[-FORECAST_USE_LAST_N:].copy()
start_date = fit_df["date"].min()
fit_df["day_index"] = (fit_df["date"] - start_date).dt.days.astype(float)
X = fit_df[["day_index"]].to_numpy(dtype=np.float64)
y = fit_df["score_degradation_monotonic"].to_numpy(dtype=np.float64)
model = LinearRegression()
model.fit(X, y)
slope = float(model.coef_[0])
intercept = float(model.intercept_)
latest_date = plot_df["date"].max()
latest_score = float(
plot_df.loc[
plot_df["date"] == latest_date,
"score_degradation_monotonic",
].iloc[-1]
)
if latest_score >= THRESHOLD_DANGER:
return {
"forecast_available": True,
"already_in_danger": True,
"danger_date": latest_date,
"days_to_danger": 0.0,
"slope_per_day": slope,
"intercept": intercept,
"start_date": start_date,
"fit_df": fit_df,
"model": model,
"forecast_df": None,
"reason": "Already in danger zone.",
}
if slope <= 0:
return {
"forecast_available": True,
"already_in_danger": False,
"danger_date": pd.NaT,
"days_to_danger": np.inf,
"slope_per_day": slope,
"intercept": intercept,
"start_date": start_date,
"fit_df": fit_df,
"model": model,
"forecast_df": None,
"reason": "Trend is flat or decreasing; danger entry is not predicted.",
}
danger_day_index = (THRESHOLD_DANGER - intercept) / slope
if danger_day_index < 0:
danger_date = start_date
else:
danger_date = start_date + pd.to_timedelta(danger_day_index, unit="D")
days_to_danger = (danger_date - latest_date) / pd.Timedelta(days=1)
future_end_date = min(
latest_date + pd.Timedelta(days=FORECAST_HORIZON_DAYS),
danger_date + pd.Timedelta(days=5),
)
future_dates = pd.date_range(
start=latest_date,
end=future_end_date,
freq="D",
)
future_day_index = (future_dates - start_date) / pd.Timedelta(days=1)
future_scores = model.predict(
np.array(future_day_index).reshape(-1, 1)
)
forecast_df = pd.DataFrame(
{
"date": future_dates,
"forecast_score": future_scores,
}
)
return {
"forecast_available": True,
"already_in_danger": False,
"danger_date": danger_date,
"days_to_danger": float(days_to_danger),
"slope_per_day": slope,
"intercept": intercept,
"start_date": start_date,
"fit_df": fit_df,
"model": model,
"forecast_df": forecast_df,
"reason": "Forecast generated.",
}
# =============================
# ▼グラフ1:劣化スコアトレンド
# =============================
def plot_daily_trend(df: pd.DataFrame, forecast_info: dict):
plot_df = df.copy().sort_values("date")
fig, ax = plt.subplots(figsize=(12, 5))
y_min = min(
plot_df["score_degradation_monotonic"].min() - 1.0,
THRESHOLD_CAUTION - 1.0,
)
y_max_candidate = max(
plot_df["score_degradation_monotonic"].max() + 1.0,
THRESHOLD_DANGER + 1.0,
)
if forecast_info.get("forecast_df") is not None:
y_max_candidate = max(
y_max_candidate,
forecast_info["forecast_df"]["forecast_score"].max() + 1.0,
)
ax.axhspan(y_min, THRESHOLD_CAUTION, facecolor="lightgreen", alpha=0.25, zorder=0)
ax.axhspan(THRESHOLD_CAUTION, THRESHOLD_WARNING, facecolor="red", alpha=0.08, zorder=0)
ax.axhspan(THRESHOLD_WARNING, THRESHOLD_DANGER, facecolor="red", alpha=0.16, zorder=0)
ax.axhspan(THRESHOLD_DANGER, y_max_candidate, facecolor="red", alpha=0.28, zorder=0)
x_left = plot_df["date"].min()
ax.text(x_left, (y_min + THRESHOLD_CAUTION) / 2, "Normal", va="center", ha="left", fontsize=10)
ax.text(x_left, (THRESHOLD_CAUTION + THRESHOLD_WARNING) / 2, "Caution", va="center", ha="left", fontsize=10)
ax.text(x_left, (THRESHOLD_WARNING + THRESHOLD_DANGER) / 2, "Warning", va="center", ha="left", fontsize=10)
ax.text(x_left, (THRESHOLD_DANGER + y_max_candidate) / 2, "Danger", va="center", ha="left", fontsize=10)
for level, sub in plot_df.groupby("final_level"):
ax.scatter(
sub["date"],
sub["score_degradation_monotonic"],
s=70,
color=LEVEL_COLOR_MAP.get(level, "gray"),
label=LEVEL_NAME_MAP.get(level, str(level)),
zorder=3,
)
ax.plot(
plot_df["date"],
plot_df["score_degradation_monotonic"],
linewidth=1.4,
color="black",
alpha=0.8,
label="Measured score",
zorder=2,
)
if USE_MOVING_AVERAGE and len(plot_df) >= MOVING_AVERAGE_WINDOW:
ma = plot_df["score_degradation_monotonic"].rolling(
MOVING_AVERAGE_WINDOW
).mean()
ax.plot(
plot_df["date"],
ma,
linewidth=2.0,
linestyle="--",
color="tab:orange",
label=f"{MOVING_AVERAGE_WINDOW}-point moving average",
zorder=4,
)
rapid_df = plot_df[plot_df["rapid_change_level"] >= 2]
if len(rapid_df) > 0:
ax.scatter(
rapid_df["date"],
rapid_df["score_degradation_monotonic"],
s=180,
facecolors="none",
edgecolors="red",
linewidths=2.0,
label="Rapid change",
zorder=5,
)
latest = plot_df.iloc[-1]
ax.scatter(
[latest["date"]],
[latest["score_degradation_monotonic"]],
s=220,
facecolors="none",
edgecolors="black",
linewidths=2.2,
label="Latest",
zorder=6,
)
if forecast_info.get("forecast_available", False):
if forecast_info.get("already_in_danger", False):
ax.text(
latest["date"],
latest["score_degradation_monotonic"] + 0.4,
"Already in Danger",
color="red",
fontsize=11,
ha="left",
)
elif forecast_info.get("forecast_df") is not None:
forecast_df = forecast_info["forecast_df"].copy()
forecast_df = forecast_df[forecast_df["date"] >= latest["date"]].copy()
ax.plot(
forecast_df["date"],
forecast_df["forecast_score"],
linestyle=":",
linewidth=2.2,
color="red",
label="Forecast score",
zorder=4,
)
danger_date = forecast_info.get("danger_date", pd.NaT)
if pd.notna(danger_date):
ax.scatter(
[danger_date],
[THRESHOLD_DANGER],
s=160,
marker="X",
color="red",
label="Predicted danger entry",
zorder=7,
)
ax.text(
danger_date,
THRESHOLD_DANGER + 0.35,
f"Predicted danger: {danger_date.strftime('%m-%d')}",
color="red",
fontsize=10,
ha="left",
)
ax.xaxis.set_major_locator(mdates.DayLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d"))
if forecast_info.get("forecast_df") is not None:
ax.set_xlim(
plot_df["date"].min() - pd.Timedelta(days=0.2),
forecast_info["forecast_df"]["date"].max() + pd.Timedelta(days=0.2),
)
else:
ax.set_xlim(
plot_df["date"].min() - pd.Timedelta(days=0.2),
plot_df["date"].max() + pd.Timedelta(days=0.2),
)
ax.set_ylim(y_min, y_max_candidate)
ax.set_title("Daily Degradation Score Trend")
ax.set_xlabel("Date")
ax.set_ylabel("Monotonic Degradation Score")
ax.grid(True, alpha=0.3)
handles, labels = ax.get_legend_handles_labels()
unique = dict(zip(labels, handles))
ax.legend(unique.values(), unique.keys(), loc="best")
plt.tight_layout()
plt.show()
# =============================
# ▼グラフ2:推定ステージトレンド
# =============================
def plot_stage_trend(
df: pd.DataFrame,
forecast_info: dict,
pred_model_coef: float,
pred_model_intercept: float,
):
plot_df = df.copy().sort_values("date")
fig, ax = plt.subplots(figsize=(12, 5))
colors = [
STAGE_COLOR_MAP.get(int(s), "gray")
for s in plot_df["pred_stage_rounded"]
]
ax.scatter(
plot_df["date"],
plot_df["pred_stage_continuous_clipped"],
s=80,
c=colors,
)
ax.plot(
plot_df["date"],
plot_df["pred_stage_continuous_clipped"],
linewidth=1.4,
color="black",
alpha=0.8,
label="Measured stage",
)
latest = plot_df.iloc[-1]
ax.scatter(
[latest["date"]],
[latest["pred_stage_continuous_clipped"]],
s=220,
facecolors="none",
edgecolors="black",
linewidths=2.2,
label="Latest",
)
if forecast_info.get("forecast_df") is not None:
forecast_df = forecast_info["forecast_df"].copy()
forecast_df["forecast_stage"] = (
pred_model_coef * forecast_df["forecast_score"] + pred_model_intercept
)
forecast_df["forecast_stage"] = forecast_df["forecast_stage"].clip(
lower=0.0,
upper=4.0,
)
ax.plot(
forecast_df["date"],
forecast_df["forecast_stage"],
linestyle=":",
linewidth=2.2,
color="red",
label="Forecast stage",
)
ax.xaxis.set_major_locator(mdates.DayLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d"))
if forecast_info.get("forecast_df") is not None:
ax.set_xlim(
plot_df["date"].min() - pd.Timedelta(days=0.2),
forecast_info["forecast_df"]["date"].max() + pd.Timedelta(days=0.2),
)
else:
ax.set_xlim(
plot_df["date"].min() - pd.Timedelta(days=0.2),
plot_df["date"].max() + pd.Timedelta(days=0.2),
)
ax.set_yticks([0, 1, 2, 3, 4])
ax.set_yticklabels(["Normal", "Stage1", "Stage2", "Stage3", "Stage4"])
ax.set_ylim(-0.2, 4.2)
ax.set_title("Predicted Degradation Stage Trend")
ax.set_xlabel("Date")
ax.set_ylabel("Predicted Stage")
ax.grid(True, alpha=0.3)
ax.legend(loc="best")
plt.tight_layout()
plt.show()
# =============================
# ▼最新状態表示
# =============================
def print_current_status(df: pd.DataFrame, forecast_info: dict):
latest = df.sort_values(["date", "file"]).iloc[-1]
print("\n=== Current status ===")
print(f"File : {latest['file']}")
print(f"Date : {latest['date'].strftime('%Y-%m-%d') if pd.notna(latest['date']) else latest['date']}")
print(f"Degradation score : {latest['score_degradation_monotonic']:.4f}")
if pd.notna(latest["delta_score_from_previous"]):
print(f"Delta from previous : {latest['delta_score_from_previous']:.4f}")
else:
print("Delta from previous : N/A")
print(f"Predicted stage : {latest['pred_stage_continuous_clipped']:.2f}")
print(f"Stage label : {latest['pred_stage_label']}")
print(f"Score-based level : {latest['score_based_label']}")
print(f"Stage-based level : {latest['stage_based_label']}")
print(f"Rapid-change level : {latest['rapid_change_label']}")
print(f"Final judgment : {latest['final_label']}")
print(f"Comment : {latest['judgment_comment']}")
print("\n=== Remaining useful life estimate ===")
if not forecast_info.get("forecast_available", False):
print(f"Forecast unavailable : {forecast_info.get('reason', 'Unknown reason')}")
elif forecast_info.get("already_in_danger", False):
print("Danger status : Already in danger zone now.")
print(f"Danger date : {forecast_info['danger_date'].strftime('%Y-%m-%d')}")
else:
danger_date = forecast_info.get("danger_date", pd.NaT)
days_to_danger = forecast_info.get("days_to_danger", np.inf)
if pd.isna(danger_date) or np.isinf(days_to_danger):
print("Danger forecast : Not predicted")
print(f"Reason : {forecast_info.get('reason', '')}")
else:
print(f"Days to danger : {days_to_danger:.1f} days")
print(f"Predicted danger date: {danger_date.strftime('%Y-%m-%d')}")
print(f"Trend slope : {forecast_info.get('slope_per_day', np.nan):.4f} score/day")
# =============================
# ▼メイン処理
# =============================
def main():
script_dir = os.path.dirname(os.path.abspath(__file__))
daily_dir = os.path.join(script_dir, DAILY_CHECK_DIR)
normal_stats_csv = os.path.join(script_dir, NORMAL_STATS_CSV)
score_rule_csv = os.path.join(script_dir, SCORE_RULE_CSV)
pred_model_csv = os.path.join(script_dir, PRED_MODEL_CSV)
out_dir = os.path.join(script_dir, OUTPUT_DIR)
os.makedirs(out_dir, exist_ok=True)
if not os.path.isdir(daily_dir):
raise RuntimeError(f"daily_check フォルダが見つかりません: {daily_dir}")
normal_stats_df = load_normal_stats(normal_stats_csv)
score_rule_df = load_score_rule(score_rule_csv)
model_coef, model_intercept = load_prediction_model(pred_model_csv)
daily_df = build_daily_feature_table(daily_dir, FILE_PATTERN)
daily_z_df = add_normal_based_zscores(daily_df, normal_stats_df)
daily_score_df = add_monotonic_score(daily_z_df, score_rule_df)
daily_pred_df = add_predicted_stage(daily_score_df, model_coef, model_intercept)
daily_result_df = add_final_judgment(daily_pred_df)
forecast_info = fit_score_forecast(daily_result_df)
print("\n=== Daily check results ===")
show_cols = [
"date",
"file",
"score_degradation_monotonic",
"delta_score_from_previous",
"pred_stage_continuous_clipped",
"pred_stage_label",
"score_based_label",
"stage_based_label",
"rapid_change_label",
"final_label",
]
print(daily_result_df[show_cols])
print_current_status(daily_result_df, forecast_info)
plot_daily_trend(daily_result_df, forecast_info)
plot_stage_trend(
daily_result_df,
forecast_info,
model_coef,
model_intercept,
)
daily_result_df.to_csv(
os.path.join(out_dir, "daily_check_scored_results.csv"),
index=False,
)
latest = daily_result_df.sort_values(["date", "file"]).iloc[-1:]
latest.to_csv(
os.path.join(out_dir, "latest_status.csv"),
index=False,
)
forecast_summary = pd.DataFrame(
[
{
"forecast_available": forecast_info.get("forecast_available", False),
"already_in_danger": forecast_info.get("already_in_danger", False),
"danger_date": forecast_info.get("danger_date", pd.NaT),
"days_to_danger": forecast_info.get("days_to_danger", np.nan),
"slope_per_day": forecast_info.get("slope_per_day", np.nan),
"reason": forecast_info.get("reason", ""),
}
]
)
forecast_summary.to_csv(
os.path.join(out_dir, "rul_forecast_summary.csv"),
index=False,
)
if forecast_info.get("forecast_df") is not None:
forecast_info["forecast_df"].to_csv(
os.path.join(out_dir, "rul_forecast_curve.csv"),
index=False,
)
print(f"\nSaved to: {out_dir}")
if __name__ == "__main__":
main()

コメント