概要
マイコン(ESP32)と加速度センサー(MPU6050)を使って、加速度データをリアルタイムで取得・可視化してみました。
ESP32側では、MPU6050からX・Y・Z方向の加速度を取得し、シリアル通信でCSV形式のデータを出力します。
Python側では、受信したデータをリアルタイムでグラフ表示しながら、CSVファイルへ保存します。
今回の構成を使うことで、
- 振動データの可視化
- FFT解析
- 異常検知
- AIによる設備診断
- 予知保全
などの基礎となるデータ取得環境を、比較的安価に構築できます。
私自身もプログラミングの初心者ですが、生成AIを活用してコーディングしたので、今回の実験は高度な専門知識も不要でした。
ちょっとしたデジタル化のトライアルをお考えの方に、少しでも参考になればうれしいです。
詳しくは、以下のYouTube動画をご覧ください。
配線

プログラムコード
マイコン(ESP32)
・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・10Hz周期でデータ取得
・CSV形式でシリアル出力
#include <Wire.h>
#include <MPU6050_tockn.h>
MPU6050 mpu(Wire);
const unsigned long SAMPLE_US = 100000; // 100,000us = 10Hz
unsigned long last_us = 0;
void setup() {
Serial.begin(115200);
Wire.begin(21, 22);
Wire.setClock(400000);
mpu.begin();
mpu.calcGyroOffsets(true);
Serial.println("time_us,ax,ay,az,acc_total");
}
void loop() {
unsigned long now = micros();
if (now - last_us < SAMPLE_US) return;
last_us = now;
mpu.update();
float ax = mpu.getAccX();
float ay = mpu.getAccY();
float az = mpu.getAccZ();
float acc_total = sqrt(ax*ax + ay*ay + az*az);
Serial.print(now);
Serial.print(",");
Serial.print(ax, 4);
Serial.print(",");
Serial.print(ay, 4);
Serial.print(",");
Serial.print(az, 4);
Serial.print(",");
Serial.println(acc_total, 4);
}
Python
・ESP32からシリアル通信で加速度データを取得
・X・Y・Z方向と合成加速度を読み込み
・リアルタイムでグラフ表示
・一定時間分の波形をスクロール表示
・測定データをCSV形式で保存
import serial
import csv
import time
from collections import deque
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# ===============================
# ▼ここだけ設定すればOK
# ===============================
PORT = "COM5"
# ↑ お使いのPCに合わせて変更してください
# Arduino IDEの「ツール→ポート」で確認できます
BAUD = 115200
# 通信速度(ESP32側と同じ値にします)
CSV_FILE = "acc_demo_10hz.csv"
# 測定データを保存するファイル名
PLOT_SEC = 15
# グラフに表示する時間の長さ(秒)
ASSUME_FS = 10
# 1秒あたりの測定回数(Hz)
UI_HZ = 25
# グラフの更新スピード
# グラフの縦軸範囲
Y_MIN = -2.0
Y_MAX = 2.0
# データの項目名
COLS = ["time_us","ax","ay","az","acc_total"]
# 線の色
COLORS = {
"ax":"#1f77b4", # 青
"ay":"#2ca02c", # 緑
"az":"#d62728", # 赤
"total":"#9467bd" # 紫(合成)
}
# グラフの見た目
plt.rcParams.update({
"font.size":12,
"axes.spines.top":False,
"axes.spines.right":False
})
# ===============================
# ▼ESP32と接続
# ===============================
ser = serial.Serial(PORT, BAUD, timeout=0)
time.sleep(2)
# 接続直後はESP32が再起動するので少し待機
# ===============================
# ▼CSV保存の準備
# ===============================
f=open(CSV_FILE,"w",newline="",encoding="utf-8")
writer=csv.writer(f)
writer.writerow(COLS)
# ===============================
# ▼データを一時保存する箱
# ===============================
maxlen=int(PLOT_SEC*ASSUME_FS)+20
t_buf=deque(maxlen=maxlen)
ax_buf=deque(maxlen=maxlen)
ay_buf=deque(maxlen=maxlen)
az_buf=deque(maxlen=maxlen)
at_buf=deque(maxlen=maxlen)
t0_us=None
rx=""
# ===============================
# ▼1行分のデータを読み取る関数
# ===============================
def parse(line):
if not line or line.startswith("time_us"):
return None
sp=line.strip().split(",")
if len(sp)!=5:
return None
try:
return [float(v) for v in sp]
except:
return None
# ===============================
# ▼グラフの準備
# ===============================
fig,ax=plt.subplots(figsize=(12,5))
ax.set_title("Realtime Acceleration",fontsize=16)
ln_ax,=ax.plot([],[],color=COLORS["ax"],lw=1.5,label="ax")
ln_ay,=ax.plot([],[],color=COLORS["ay"],lw=1.5,label="ay")
ln_az,=ax.plot([],[],color=COLORS["az"],lw=1.5,label="az")
ln_at,=ax.plot([],[],color=COLORS["total"],lw=2.5,label="total")
ax.set_ylim(Y_MIN,Y_MAX)
ax.set_xlabel("Time [s]")
ax.set_ylabel("Acceleration [g]")
ax.grid(alpha=0.3)
ax.legend(loc="upper right")
info=fig.text(0.01,0.01,"",fontsize=10)
# ===============================
# ▼ESP32からデータを読む
# ===============================
def read_fast():
global rx,t0_us
n=ser.in_waiting
if n<=0:
return
data=ser.read(n).decode(errors="ignore")
rx+=data
lines=rx.split("\n")
rx=lines[-1]
for line in lines[:-1]:
vals=parse(line)
if vals is None:
continue
# CSV保存
writer.writerow(vals)
t_us=vals[0]
if t0_us is None:
t0_us=t_us
# 秒に変換
t_s=(t_us-t0_us)*1e-6
# グラフ用に保存
t_buf.append(t_s)
ax_buf.append(vals[1])
ay_buf.append(vals[2])
az_buf.append(vals[3])
at_buf.append(vals[4])
# ===============================
# ▼グラフ更新処理
# ===============================
def update(_):
read_fast()
if len(t_buf)<3:
return
t=np.array(t_buf)
ln_ax.set_data(t,np.array(ax_buf))
ln_ay.set_data(t,np.array(ay_buf))
ln_az.set_data(t,np.array(az_buf))
ln_at.set_data(t,np.array(at_buf))
# 表示時間をスライド
t_now=t[-1]
ax.set_xlim(max(0,t_now-PLOT_SEC),max(PLOT_SEC,t_now))
info.set_text(f"time={t_now:.1f}s")
return ln_ax,ln_ay,ln_az,ln_at
ani=FuncAnimation(fig,update,interval=int(1000/UI_HZ))
plt.tight_layout()
plt.show()
# 終了処理
f.close()
ser.close()


コメント