概要
加速度センサーシリーズの続編です。
前回は、回転が安定している定速区間を切り出し、FFT解析で支配周波数や回転数を確認しました。
しかし、実際の設備では、常に一定回転で動いているとは限りません。
そこで今回は、スペクトログラムを使って、
・時間とともに周波数がどう変わるか
・加速中の支配周波数をどう追跡するか
・回転数変化をどのように見える化するか
・インバータ制御らしい挙動が振動データに現れるか
を確認します。
詳しくは、以下のYouTube動画をご覧ください。
配線

プログラムコード
マイコン(ESP32)
・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・1000Hz周期でデータ取得
・CSV形式でシリアル出力
#include <Wire.h>
#include <MPU6050_tockn.h>
MPU6050 mpu(Wire);
const unsigned long SAMPLE_US = 1000; // 1000us = 1000Hz
unsigned long last_us = 0;
void setup() {
Serial.begin(115200);
Wire.begin(21, 22);
Wire.setClock(400000);
mpu.begin();
mpu.calcGyroOffsets(true);
Serial.println("time_us,ax,ay,az,acc_total");
}
void loop() {
unsigned long now = micros();
if (now - last_us < SAMPLE_US) return;
last_us = now;
mpu.update();
float ax = mpu.getAccX();
float ay = mpu.getAccY();
float az = mpu.getAccZ();
float acc_total = sqrt(ax*ax + ay*ay + az*az);
Serial.print(now);
Serial.print(",");
Serial.print(ax, 4);
Serial.print(",");
Serial.print(ay, 4);
Serial.print(",");
Serial.print(az, 4);
Serial.print(",");
Serial.println(acc_total, 4);
}Python
- CSV読み込み
- 時間の前処理(実サンプリング周波数の推定)
- 等間隔データに補正
- トレンド除去
- スペクトログラムで全区間の時間変化を可視化
- 解析区間を手動で指定して切り出す
- RMS・PSD・バンドパワー・支配周波数を計算
- グラフ出力(スペクトログラム、PSD)
- 結果をCSVに保存
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.signal import detrend, welch, spectrogram
# ===============================
# ▼ここだけ設定すればOK
# ===============================
INPUT_CSV = "acc_usb_highrate.csv"
# 第2回で保存した1000Hz測定データです
TIME_COL = "time_us"
# ESP32側の時刻です。単位はマイクロ秒です
TARGET_MAIN = "acc_total"
# メインで解析する加速度です
TARGET_AXES = ["ax", "ay", "az"]
# X・Y・Z方向も比較します
PLOT_MAIN_WITH_AXES = True
# Trueにすると、acc_totalも各軸と一緒に重ね描きします
FMAX_PLOT = 80
# PSDグラフの表示上限周波数です
WELCH_NPERSEG = 2048
# Welch法でPSDを計算するときの分割長です
BAND_LOW = 10.0
BAND_HIGH = 30.0
# バンドパワーを計算する周波数範囲です
F_SEARCH_MIN = 5
F_SEARCH_MAX = 40
# 支配周波数を探す範囲です
# ★解析したい時間区間をここで指定します
# 形式:(開始秒, 終了秒, ラベル)
# end_s=None にすると、最後まで解析します
PSD_WINDOWS = [
(60, 70, "phase_A"),
# (20, 30, "phase_ramp"),
# (30, None, "phase_after_30s"),
]
PLOT_SPECTROGRAM_FULL = True
# Trueにすると、全区間のスペクトログラムを表示します
NPERSEG_STFT = 4096
NOVERLAP_STFT = 3072
F_SPEC_MAX = 80
# スペクトログラム表示用の設定です
OUT_SUMMARY_CSV = "wash_psd_windows_summary.csv"
# 解析結果を保存するCSVファイルです
# ===============================
# ▼CSV読み込み、等間隔補正、トレンド除去
# ===============================
def load_and_resample(csv_path, time_col, value_col):
"""
CSVを読み込み、
実サンプリング周波数の推定、
等間隔データへの補間、
トレンド除去を行います。
"""
df = pd.read_csv(csv_path)
if time_col not in df.columns:
raise ValueError(f"'{time_col}' 列が見つかりません。")
if value_col not in df.columns:
raise ValueError(f"'{value_col}' 列が見つかりません。")
# マイクロ秒を秒に変換します
t = df[time_col].to_numpy(dtype=np.float64) * 1e-6
x = df[value_col].to_numpy(dtype=np.float64)
# 欠損値や無限大を除外します
mask = np.isfinite(t) & np.isfinite(x)
t = t[mask]
x = x[mask]
# 時刻順に並べます
order = np.argsort(t)
t = t[order]
x = x[order]
# 同じ時刻が重複している場合は、最初の値だけ使います
t, idx = np.unique(t, return_index=True)
x = x[idx]
# 実際のサンプリング周波数を推定します
dt = np.diff(t)
dt = dt[(dt > 0) & np.isfinite(dt)]
if len(dt) == 0:
raise ValueError("有効なtime差分が得られません。time_us列を確認してください。")
fs = 1.0 / np.median(dt)
# 等間隔の時間軸を作ります
t_u = np.arange(t[0], t[-1], 1.0 / fs)
# 実測データを等間隔データに補間します
f = interp1d(
t,
x,
kind="linear",
bounds_error=False,
fill_value="extrapolate",
)
x_u = f(t_u)
# ゆっくりした傾き成分を除去します
x_u = detrend(x_u, type="linear")
return t_u, x_u, fs
# ===============================
# ▼指定した時間区間を切り出す関数
# ===============================
def slice_by_time(t_u, x_u, start_s, end_s):
"""
開始からの秒数で指定した区間を切り出します。
end_s=None の場合は、最後まで切り出します。
"""
t0 = t_u[0]
if start_s is None:
t_start = t_u[0]
else:
t_start = t0 + float(start_s)
if end_s is None:
t_end = t_u[-1]
else:
t_end = t0 + float(end_s)
if t_end <= t_start:
raise ValueError(f"時間区間が不正です: start={start_s}, end={end_s}")
mask = (t_u >= t_start) & (t_u <= t_end)
t_seg = t_u[mask]
x_seg = x_u[mask]
return t_seg, x_seg, (t_start - t0), (t_end - t0)
# ===============================
# ▼RMS、PSD、バンドパワー、支配周波数を計算
# ===============================
def compute_metrics_psd(x_seg, fs):
"""
指定区間の振動データから、
RMS、PSD、バンドパワー、支配周波数を計算します。
"""
if len(x_seg) < 64:
return np.nan, np.nan, (np.array([]), np.array([])), np.nan
# RMSは振動の大きさを表す代表値です
rms = float(np.sqrt(np.mean(x_seg ** 2)))
nperseg = int(min(WELCH_NPERSEG, len(x_seg)))
# Welch法でPSDを計算します
f_w, pxx = welch(
x_seg,
fs=fs,
window="hann",
nperseg=nperseg,
noverlap=nperseg // 2,
detrend=False,
scaling="density",
)
# 指定周波数帯のエネルギーを計算します
sel_band = (f_w >= BAND_LOW) & (f_w <= BAND_HIGH)
if np.any(sel_band):
band_power = float(np.trapz(pxx[sel_band], f_w[sel_band]))
else:
band_power = np.nan
# 指定範囲の中で最も強い周波数を支配周波数とします
sel_peak = (f_w >= F_SEARCH_MIN) & (f_w <= F_SEARCH_MAX)
if np.any(sel_peak):
k = int(np.argmax(pxx[sel_peak]))
f_dom = float(f_w[sel_peak][k])
else:
f_dom = np.nan
return rms, band_power, (f_w, pxx), f_dom
# ===============================
# ▼必要な信号を読み込み
# ===============================
signals = []
if PLOT_MAIN_WITH_AXES:
signals.append(TARGET_MAIN)
signals += TARGET_AXES
data = {}
for sig in signals:
t_u, x_u, fs = load_and_resample(INPUT_CSV, TIME_COL, sig)
data[sig] = {
"t_u": t_u,
"x_u": x_u,
"fs": float(fs),
}
ref_sig = TARGET_MAIN if PLOT_MAIN_WITH_AXES else TARGET_AXES[0]
duration = float(data[ref_sig]["t_u"][-1] - data[ref_sig]["t_u"][0])
print(f"[INFO] loaded signals: {signals}")
print(f"[INFO] duration = {duration:.1f} s")
print("[INFO] PSD windows:")
for s0, s1, lab in PSD_WINDOWS:
print(f" - {lab}: {s0} ~ {s1} [s]")
# ===============================
# ▼グラフ1:全区間スペクトログラム
# ===============================
if PLOT_SPECTROGRAM_FULL:
x_u = data[ref_sig]["x_u"]
fs = data[ref_sig]["fs"]
nperseg = int(min(NPERSEG_STFT, len(x_u)))
noverlap = int(min(NOVERLAP_STFT, nperseg - 1))
f_sp, t_sp, Sxx = spectrogram(
x_u,
fs=fs,
window="hann",
nperseg=nperseg,
noverlap=noverlap,
detrend=False,
scaling="density",
mode="psd",
)
sel_f = f_sp <= F_SPEC_MAX
Sxx_db = 10 * np.log10(Sxx + 1e-12)
plt.figure(figsize=(12, 5))
plt.pcolormesh(t_sp, f_sp[sel_f], Sxx_db[sel_f, :], shading="auto")
# 指定した解析区間の境界を縦線で表示します
for s0, s1, lab in PSD_WINDOWS:
if s0 is not None:
plt.axvline(float(s0), linestyle="--", alpha=0.7)
if s1 is not None:
plt.axvline(float(s1), linestyle="--", alpha=0.7)
plt.xlabel("Time [s] from start")
plt.ylabel("Frequency [Hz]")
plt.title(f"Spectrogram full target={ref_sig} fs={fs:.1f} Hz")
plt.ylim(0, F_SPEC_MAX)
plt.colorbar(label="Power [dB]")
plt.tight_layout()
plt.show()
# ===============================
# ▼指定区間ごとにPSDを計算・表示
# ===============================
window_results = []
for s0, s1, lab in PSD_WINDOWS:
plt.figure(figsize=(12, 5))
for sig in signals:
t_u = data[sig]["t_u"]
x_u = data[sig]["x_u"]
fs = data[sig]["fs"]
# 指定した時間区間だけを切り出します
t_seg, x_seg, s0r, s1r = slice_by_time(t_u, x_u, s0, s1)
# RMS、PSD、バンドパワー、支配周波数を計算します
rms, bp, (f_w, pxx), f_dom = compute_metrics_psd(x_seg, fs)
window_results.append(
{
"label": lab,
"start_s": float(s0r),
"end_s": float(s1r),
"duration_s": float(s1r - s0r),
"signal": sig,
"fs_Hz": float(fs),
"rms_g": rms,
f"bandpower_{int(BAND_LOW)}_{int(BAND_HIGH)}Hz_g2": bp,
"dominant_Hz": f_dom,
"dominant_rpm": float(f_dom * 60.0) if np.isfinite(f_dom) else np.nan,
}
)
# PSDを重ね描きします
plt.semilogy(f_w, pxx, label=sig)
if np.isfinite(f_dom):
print(
f"[RESULT] {lab} ({s0r:.1f}-{s1r:.1f}s) {sig}: "
f"RMS={rms:.6f} g, "
f"BandPower({BAND_LOW:.0f}-{BAND_HIGH:.0f}Hz)={bp:.3e} g^2, "
f"Dom={f_dom:.2f} Hz ({f_dom * 60:.0f} rpm)"
)
else:
print(
f"[RESULT] {lab} ({s0r:.1f}-{s1r:.1f}s) {sig}: "
f"RMS={rms:.6f} g, "
f"BandPower({BAND_LOW:.0f}-{BAND_HIGH:.0f}Hz)={bp:.3e} g^2, "
f"Dom=nan"
)
plt.xlim(0, FMAX_PLOT)
plt.xlabel("Frequency [Hz]")
plt.ylabel("PSD [g^2/Hz]")
plt.title(f"Welch PSD by time window: {lab} [{s0r:.0f}-{s1r:.0f}s]")
plt.grid(True, which="both", alpha=0.4)
plt.legend()
plt.tight_layout()
plt.show()
# ===============================
# ▼グラフ3:時間波形と指定区間
# ===============================
t_u = data[ref_sig]["t_u"]
x_u = data[ref_sig]["x_u"]
plt.figure(figsize=(12, 4))
plt.plot(t_u - t_u[0], x_u, linewidth=0.8)
for s0, s1, lab in PSD_WINDOWS:
s0p = 0 if s0 is None else float(s0)
s1p = duration if s1 is None else float(s1)
# 指定した解析区間を背景色で表示します
plt.axvspan(s0p, s1p, alpha=0.12)
plt.xlabel("Time [s] from start")
plt.ylabel(f"{ref_sig} [g] detrended")
plt.title("Time series with selected PSD windows")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# ===============================
# ▼結果をCSVに保存
# ===============================
pd.DataFrame(window_results).to_csv(
OUT_SUMMARY_CSV,
index=False,
encoding="utf-8",
)
print(f"[OK] summary CSV: {OUT_SUMMARY_CSV}")

コメント