概要
加速度センサーシリーズの続編です。
マイコン(ESP32)と加速度センサー(MPU6050)を使って、加速度データを取得しました。
前回は、洗濯機の脱水工程をサンプリング周期1000Hzで測定しました。
今回は、そのデータをPythonで高速フーリエ変換(FFT)し、周波数成分の観点からグラフ化してみました。
詳しくは、以下のYouTube動画をご覧ください。
配線

プログラムコード
マイコン(ESP32)
・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・1000Hz周期でデータ取得
・CSV形式でシリアル出力
#include <Wire.h>
#include <MPU6050_tockn.h>
MPU6050 mpu(Wire);
const unsigned long SAMPLE_US = 1000; // 1000us = 1000Hz
unsigned long last_us = 0;
void setup() {
Serial.begin(115200);
Wire.begin(21, 22);
Wire.setClock(400000);
mpu.begin();
mpu.calcGyroOffsets(true);
Serial.println("time_us,ax,ay,az,acc_total");
}
void loop() {
unsigned long now = micros();
if (now - last_us < SAMPLE_US) return;
last_us = now;
mpu.update();
float ax = mpu.getAccX();
float ay = mpu.getAccY();
float az = mpu.getAccZ();
float acc_total = sqrt(ax*ax + ay*ay + az*az);
Serial.print(now);
Serial.print(",");
Serial.print(ax, 4);
Serial.print(",");
Serial.print(ay, 4);
Serial.print(",");
Serial.print(az, 4);
Serial.print(",");
Serial.println(acc_total, 4);
}Python
- CSV読み込み
- 時間の前処理(実サンプリング周波数の推定)
- 等間隔データに補正
- トレンド除去
- スペクトログラムで「定速区間」を自動検出
- 定速区間だけを切り出す
- RMS・PSD・バンドパワー・支配周波数を計算
- グラフ出力(スペクトログラム、PSD)
- 結果をCSVに保存
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.signal import detrend, spectrogram, welch, find_peaks
# ==================================================
# ▼ここだけ設定すればOK
# ==================================================
INPUT_CSV = "acc_usb_highrate.csv"
# 第2回で保存した1000Hz測定データを読み込みます
TIME_COL = "time_us"
# ESP32側の時刻です。単位はマイクロ秒です
TARGET_MAIN = "acc_total"
# メインで解析する加速度です
# acc_totalは、X・Y・Z方向を合成した加速度です
TARGET_AXES = ["ax", "ay", "az"]
# 各軸ごとの違いも確認するため、X・Y・Z方向も解析します
OUT_SUMMARY_CSV = "wash_summary.csv"
# 解析結果を保存するCSVファイル名です
# ==================================================
# ▼解析条件
# ==================================================
FMAX_PLOT = 80
# グラフに表示する最大周波数です
F_SEARCH_MIN = 5
F_SEARCH_MAX = 40
# 回転に関係しそうな周波数を探す範囲です
BAND_LOW = 10.0
BAND_HIGH = 30.0
# バンドパワーを計算する周波数範囲です
WELCH_NPERSEG = 2048
# PSD計算に使うデータ分割長です
# ==================================================
# ▼スペクトログラムと定速区間検出の条件
# ==================================================
NPERSEG_STFT = 4096
NOVERLAP_STFT = 3072
IGNORE_EDGE_START_SEC = 6.0
IGNORE_EDGE_END_SEC = 6.0
# 測定開始直後と終了直前は不安定になりやすいため、判定から除外します
MIN_STEADY_SEC = 10.0
# 定速区間として採用する最短時間です
F_STABILITY_HZ = 0.6
DFDT_HZ_PER_S = 0.4
# 周波数がどの程度安定しているかを判定する条件です
RPM_SLOPE_MAX = 2.0
RPM_STD_MAX = 40.0
MIN_RPM_FOR_CHECK = 300.0
# 回転数に換算したときの安定性を確認する条件です
# ==================================================
# ▼ピーク検出条件
# ==================================================
PEAK_PROM_REL = 0.15
N_TOP = 6
F_TOL = 0.6
H_MAX = 4
CONTINUITY_HZ = 1.2
SMOOTH_K = 3
# 基本周波数を安定して追跡するための条件です
# 直接の最大ピークだけでなく、高調波や前後の連続性も考慮します
# ==================================================
# ▼CSVを読み込み、等間隔データに補正する関数
# ==================================================
def load_and_resample(csv_path, time_col, value_col):
df = pd.read_csv(csv_path)
# マイクロ秒を秒に変換します
t = df[time_col].to_numpy(dtype=np.float64) * 1e-6
x = df[value_col].to_numpy(dtype=np.float64)
# 欠損値や異常値を除外します
mask = np.isfinite(t) & np.isfinite(x)
t = t[mask]
x = x[mask]
# 時刻順に並べます
order = np.argsort(t)
t = t[order]
x = x[order]
# 同じ時刻が重複している場合は、最初の値だけを使います
t, idx = np.unique(t, return_index=True)
x = x[idx]
# 実際のサンプリング周波数を推定します
dt = np.diff(t)
dt = dt[(dt > 0) & np.isfinite(dt)]
fs = 1.0 / np.median(dt)
# 等間隔の時刻データを作ります
t_u = np.arange(t[0], t[-1], 1.0 / fs)
# 実測データを等間隔データに補間します
f = interp1d(
t,
x,
kind="linear",
bounds_error=False,
fill_value="extrapolate",
)
x_u = f(t_u)
# ゆっくりした傾き成分を除去します
# FFTでは、トレンドが残ると低周波成分が強く出すぎることがあります
x_u = detrend(x_u, type="linear")
return t_u, x_u, fs
# ==================================================
# ▼RMS・PSD・バンドパワー・支配周波数を計算する関数
# ==================================================
def compute_metrics(x_seg, fs):
if len(x_seg) < 64:
return np.nan, np.nan, (np.array([]), np.array([])), np.nan
# RMSは振動の大きさを表す代表値です
rms = float(np.sqrt(np.mean(x_seg ** 2)))
nperseg = int(min(WELCH_NPERSEG, len(x_seg)))
# Welch法でPSDを計算します
# PSDは、どの周波数にどれだけ振動成分があるかを表します
f_w, pxx = welch(
x_seg,
fs=fs,
window="hann",
nperseg=nperseg,
noverlap=nperseg // 2,
detrend=False,
scaling="density",
)
# 指定した周波数範囲のエネルギーを計算します
sel_band = (f_w >= BAND_LOW) & (f_w <= BAND_HIGH)
band_power = float(np.trapz(pxx[sel_band], f_w[sel_band]))
# 指定範囲の中で最も強い周波数を支配周波数とします
sel_peak = (f_w >= F_SEARCH_MIN) & (f_w <= F_SEARCH_MAX)
f_dom = float(f_w[sel_peak][np.argmax(pxx[sel_peak])])
return rms, band_power, (f_w, pxx), f_dom
# ==================================================
# ▼回転数の傾きとばらつきを計算する関数
# ==================================================
def segment_slope_and_std(t, rpm):
slope = float(np.polyfit(t, rpm, 1)[0])
std = float(np.std(rpm))
return slope, std
# ==================================================
# ▼スペクトルから基本周波数を選ぶ関数
# ==================================================
def pick_fundamental_from_spectrum(p, f, f_prev):
if not np.all(np.isfinite(p)) or float(np.max(p)) <= 0:
return np.nan, 0.0, np.nan
pmax = float(np.max(p))
prom = max(pmax * PEAK_PROM_REL, 1e-18)
# スペクトルのピークを検出します
idx_peaks, props = find_peaks(p, prominence=prom)
# ピークが検出できない場合は、最大値の周波数を使います
if len(idx_peaks) == 0:
k = int(np.argmax(p))
return float(f[k]), 0.2, float(f[k])
prominences = props.get("prominences", np.zeros_like(idx_peaks, dtype=float))
order = np.argsort(prominences)[::-1]
idx_peaks = idx_peaks[order][:N_TOP]
peaks_f = f[idx_peaks]
peaks_p = p[idx_peaks]
f_raw = float(f[int(np.argmax(p))])
candidates = []
# 高調波の可能性を考慮して、基本周波数の候補を作ります
for fp, pp in zip(peaks_f, peaks_p):
for m in range(1, H_MAX + 1):
f0 = float(fp) / m
if F_SEARCH_MIN <= f0 <= F_SEARCH_MAX:
candidates.append((f0, float(pp), m))
if not candidates:
return f_raw, 0.2, f_raw
best_f0 = None
best_score = -1e18
for f0, base_p, m0 in candidates:
harm_score = 0.0
# 基本周波数だけでなく、2倍、3倍などの高調波も見ます
for m in range(1, H_MAX + 1):
target = m * f0
dist = np.abs(peaks_f - target)
if np.any(dist <= F_TOL):
j = int(np.argmin(dist))
harm_score += float(peaks_p[j]) / pmax
# 前回の周波数から急に飛びすぎないかを確認します
cont_score = 0.0
if f_prev is not None and np.isfinite(f_prev):
df = abs(f0 - float(f_prev))
if df <= CONTINUITY_HZ:
cont_score = 1.0 - df / CONTINUITY_HZ
else:
cont_score = -df / CONTINUITY_HZ
score = harm_score + 0.6 * cont_score + 0.15 / max(int(m0), 1)
if score > best_score:
best_score = score
best_f0 = f0
conf = max(0.0, min(1.0, best_score / (H_MAX + 1.0)))
return float(best_f0), float(conf), f_raw
# ==================================================
# ▼スペクトログラムから定速区間を自動検出する関数
# ==================================================
def find_steady_segment(t_u, x_u, fs):
nperseg = int(min(NPERSEG_STFT, len(x_u)))
noverlap = int(min(NOVERLAP_STFT, nperseg - 1))
# スペクトログラムを計算します
# 時間ごとに、どの周波数が強いかを見るための処理です
f_sp, t_sp, Sxx = spectrogram(
x_u,
fs=fs,
window="hann",
nperseg=nperseg,
noverlap=noverlap,
detrend=False,
scaling="density",
mode="psd",
)
# 開始直後と終了直前を除外します
if len(t_sp) >= 2:
t_total = float(t_sp[-1])
t0 = IGNORE_EDGE_START_SEC
t1 = t_total - IGNORE_EDGE_END_SEC
if t1 > t0:
keep = (t_sp >= t0) & (t_sp <= t1)
t_sp = t_sp[keep]
Sxx = Sxx[:, keep]
# 回転に関係しそうな周波数範囲だけを見ます
band = (f_sp >= F_SEARCH_MIN) & (f_sp <= F_SEARCH_MAX)
f_band = f_sp[band]
S_band = Sxx[band, :]
f_fund = np.full(len(t_sp), np.nan)
conf = np.zeros(len(t_sp))
f_rawmax = np.full(len(t_sp), np.nan)
f_prev = None
# 各時刻ごとに基本周波数を推定します
for i in range(len(t_sp)):
ff, cc, fr = pick_fundamental_from_spectrum(S_band[:, i], f_band, f_prev)
f_fund[i] = ff
conf[i] = cc
f_rawmax[i] = fr
if np.isfinite(ff):
f_prev = ff
# 推定周波数を少し平滑化します
if SMOOTH_K >= 3 and len(f_fund) >= SMOOTH_K:
kernel = np.ones(SMOOTH_K) / SMOOTH_K
f_tmp = f_fund.copy()
ok = np.isfinite(f_tmp)
if np.any(ok):
idx = np.arange(len(f_tmp))
f_tmp[~ok] = np.interp(idx[~ok], idx[ok], f_tmp[ok])
f_fund = np.convolve(f_tmp, kernel, mode="same")
# 周波数の変化量を計算します
df = np.abs(np.diff(f_fund, prepend=f_fund[0]))
dt_sp = np.median(np.diff(t_sp)) if len(t_sp) > 1 else 1.0
dfdt = df / max(dt_sp, 1e-9)
# 周波数が安定しているところを候補にします
stable_f = (
(df <= F_STABILITY_HZ)
& (dfdt <= DFDT_HZ_PER_S)
& np.isfinite(f_fund)
)
# Hzをrpmに変換します
rpm = f_fund * 60.0
best = None
start = None
# 安定している区間の中から、条件に合う最長区間を選びます
for i, ok in enumerate(stable_f):
if ok and start is None:
start = i
if (not ok or i == len(stable_f) - 1) and start is not None:
end = i if ok and i == len(stable_f) - 1 else i - 1
dur = float(t_sp[end] - t_sp[start])
if dur >= MIN_STEADY_SEC:
t_seg = t_sp[start:end + 1]
rpm_seg = rpm[start:end + 1]
rpm_mean = float(np.mean(rpm_seg))
slope, std = segment_slope_and_std(t_seg, rpm_seg)
if rpm_mean < MIN_RPM_FOR_CHECK:
ok_rpm = True
else:
ok_rpm = abs(slope) <= RPM_SLOPE_MAX and std <= RPM_STD_MAX
if ok_rpm:
if best is None or dur > best[2]:
best = (start, end, dur, slope, std)
start = None
# 条件に合う定速区間が見つからない場合は、中央付近を仮の解析区間にします
if best is None:
total = float(t_u[-1] - t_u[0])
mid = float((t_u[0] + t_u[-1]) / 2)
half = float(min(7.5, total / 4))
t0_abs = mid - half
t1_abs = mid + half
mode = "FALLBACK"
best_detail = None
else:
s, e, dur, slope, std = best
t0_abs = float(t_u[0] + t_sp[s])
t1_abs = float(t_u[0] + t_sp[e])
mode = "AUTO"
best_detail = {
"s": s,
"e": e,
"dur": dur,
"slope_rpm_per_s": slope,
"std_rpm": std,
}
info = {
"f_sp": f_sp,
"t_sp": t_sp,
"Sxx": Sxx,
"f_peak_s": f_fund,
"rpm": rpm,
"stable_f": stable_f,
"best_detail": best_detail,
"conf": conf,
"f_rawmax": f_rawmax,
}
return (t0_abs, t1_abs), info, mode
# ==================================================
# ▼メイン処理
# ==================================================
# acc_totalを読み込み、等間隔データに補正します
t_u, x_u, fs = load_and_resample(INPUT_CSV, TIME_COL, TARGET_MAIN)
print(f"[INFO] fs = {fs:.1f} Hz")
print(f"[INFO] samples = {len(x_u)}")
# スペクトログラムから定速区間を自動検出します
(seg_t0, seg_t1), det_info, seg_mode = find_steady_segment(t_u, x_u, fs)
print(
f"[INFO] steady segment: "
f"{seg_t0 - t_u[0]:.1f}s - {seg_t1 - t_u[0]:.1f}s"
)
# 定速区間だけを切り出します
seg_mask = (t_u >= seg_t0) & (t_u <= seg_t1)
x_seg = x_u[seg_mask]
# acc_totalの特徴量を計算します
rms_main, bp_main, (f_w_main, pxx_main), f_dom_main = compute_metrics(x_seg, fs)
print(f"[RESULT] {TARGET_MAIN} RMS = {rms_main:.6f} g")
print(f"[RESULT] {TARGET_MAIN} BandPower = {bp_main:.6e} g^2")
print(f"[RESULT] {TARGET_MAIN} DominantFreq = {f_dom_main:.2f} Hz")
print(f"[RESULT] {TARGET_MAIN} DominantRPM = {f_dom_main * 60:.0f} rpm")
# ==================================================
# ▼X・Y・Z軸も同じ定速区間で解析
# ==================================================
axis_results = []
for axis in TARGET_AXES:
t_u2, x_u2, fs2 = load_and_resample(INPUT_CSV, TIME_COL, axis)
seg_mask2 = (t_u2 >= seg_t0) & (t_u2 <= seg_t1)
x_seg2 = x_u2[seg_mask2]
rms_ax, bp_ax, (f_w_ax, pxx_ax), f_dom_ax = compute_metrics(x_seg2, fs2)
print(f"[RESULT] {axis} RMS = {rms_ax:.6f} g")
print(f"[RESULT] {axis} BandPower = {bp_ax:.6e} g^2")
print(f"[RESULT] {axis} DominantFreq = {f_dom_ax:.2f} Hz")
axis_results.append(
{
"name": axis,
"rms": rms_ax,
"band_power": bp_ax,
"f_dom": f_dom_ax,
"f_w": f_w_ax,
"pxx": pxx_ax,
}
)
# ==================================================
# ▼グラフ1:推定回転数の推移
# ==================================================
t_sp = det_info["t_sp"]
rpm = det_info["rpm"]
t0_rel = float(seg_t0 - t_u[0])
t1_rel = float(seg_t1 - t_u[0])
plt.figure(figsize=(12, 4))
plt.plot(t_sp, rpm, linewidth=2)
plt.axvspan(t0_rel, t1_rel, alpha=0.15)
plt.xlabel("Time [s]")
plt.ylabel("Rotation speed [rpm]")
plt.title("Estimated rotation speed")
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# ==================================================
# ▼グラフ2:スペクトログラム
# ==================================================
f_sp = det_info["f_sp"]
Sxx = det_info["Sxx"]
sel_f = f_sp <= FMAX_PLOT
Sxx_db = 10 * np.log10(Sxx + 1e-12)
plt.figure(figsize=(12, 5))
plt.pcolormesh(t_sp, f_sp[sel_f], Sxx_db[sel_f, :], shading="auto")
plt.axvline(t0_rel, linestyle="--")
plt.axvline(t1_rel, linestyle="--")
plt.ylim(0, FMAX_PLOT)
plt.xlabel("Time [s]")
plt.ylabel("Frequency [Hz]")
plt.title("Spectrogram")
plt.colorbar(label="Power [dB]")
plt.tight_layout()
plt.show()
# ==================================================
# ▼グラフ3:acc_totalのPSD
# ==================================================
plt.figure(figsize=(12, 5))
plt.semilogy(f_w_main, pxx_main)
plt.xlim(0, FMAX_PLOT)
plt.xlabel("Frequency [Hz]")
plt.ylabel("PSD [g^2/Hz]")
plt.title(f"Welch PSD: {TARGET_MAIN}")
plt.grid(True, which="both", alpha=0.4)
plt.tight_layout()
plt.show()
# ==================================================
# ▼グラフ4:acc_totalと各軸のPSD比較
# ==================================================
plt.figure(figsize=(12, 5))
plt.semilogy(f_w_main, pxx_main, label=TARGET_MAIN, linewidth=2)
for r in axis_results:
plt.semilogy(r["f_w"], r["pxx"], label=r["name"])
plt.xlim(0, FMAX_PLOT)
plt.xlabel("Frequency [Hz]")
plt.ylabel("PSD [g^2/Hz]")
plt.title("Welch PSD: main vs axes")
plt.grid(True, which="both", alpha=0.4)
plt.legend()
plt.tight_layout()
plt.show()
# ==================================================
# ▼グラフ5:時系列波形と定速区間
# ==================================================
plt.figure(figsize=(12, 4))
plt.plot(t_u - t_u[0], x_u, linewidth=0.8)
plt.axvspan(t0_rel, t1_rel, alpha=0.15)
plt.xlabel("Time [s]")
plt.ylabel(f"{TARGET_MAIN} [g]")
plt.title("Time series with steady segment")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# ==================================================
# ▼解析結果をCSVに保存
# ==================================================
summary = {
"input_csv": INPUT_CSV,
"fs_Hz": float(fs),
"steady_mode": seg_mode,
"steady_start_s": float(t0_rel),
"steady_end_s": float(t1_rel),
"steady_duration_s": float(seg_t1 - seg_t0),
f"{TARGET_MAIN}_rms_g": rms_main,
f"{TARGET_MAIN}_bandpower_g2": bp_main,
f"{TARGET_MAIN}_dominant_Hz": f_dom_main,
f"{TARGET_MAIN}_dominant_rpm": float(f_dom_main * 60.0),
}
for r in axis_results:
name = r["name"]
summary.update(
{
f"{name}_rms_g": r["rms"],
f"{name}_bandpower_g2": r["band_power"],
f"{name}_dominant_Hz": r["f_dom"],
f"{name}_dominant_rpm": float(r["f_dom"] * 60.0),
}
)
pd.DataFrame([summary]).to_csv(
OUT_SUMMARY_CSV,
index=False,
encoding="utf-8",
)
print(f"[OK] saved: {OUT_SUMMARY_CSV}")

コメント