【第1回】ESP32で予知保全に挑戦!加速度センサの振動をリアルタイム表示してみた

加速度センサー

※当サイトは、アフィリエイト広告を利用しています

概要

マイコン(ESP32)と加速度センサー(MPU6050)を使って、加速度データをリアルタイムで取得・可視化してみました。

ESP32側では、MPU6050からX・Y・Z方向の加速度を取得し、シリアル通信でCSV形式のデータを出力します。

Python側では、受信したデータをリアルタイムでグラフ表示しながら、CSVファイルへ保存します。

今回の構成を使うことで、

  • 振動データの可視化
  • FFT解析
  • 異常検知
  • AIによる設備診断
  • 予知保全

などの基礎となるデータ取得環境を、比較的安価に構築できます。

私自身もプログラミングの初心者ですが、生成AIを活用してコーディングしたので、今回の実験は高度な専門知識も不要でした。

ちょっとしたデジタル化のトライアルをお考えの方に、少しでも参考になればうれしいです。

詳しくは、以下のYouTube動画をご覧ください。

【第1回】ESP32で予知保全に挑戦!加速度センサの振動をリアルタイム表示してみた

配線

プログラムコード

マイコン(ESP32)

・MPU6050で、X・Y・Z方向の加速度を計測
・合成加速度 acc_total を計算
・10Hz周期でデータ取得
・CSV形式でシリアル出力

#include <Wire.h>
#include <MPU6050_tockn.h>

MPU6050 mpu(Wire);

const unsigned long SAMPLE_US = 100000; // 100,000us = 10Hz
unsigned long last_us = 0;

void setup() {
  Serial.begin(115200);
  Wire.begin(21, 22);
  Wire.setClock(400000);

  mpu.begin();
  mpu.calcGyroOffsets(true);

  Serial.println("time_us,ax,ay,az,acc_total");
}

void loop() {
  unsigned long now = micros();
  if (now - last_us < SAMPLE_US) return;
  last_us = now;

  mpu.update();

  float ax = mpu.getAccX();
  float ay = mpu.getAccY();
  float az = mpu.getAccZ();
  float acc_total = sqrt(ax*ax + ay*ay + az*az);

  Serial.print(now);
  Serial.print(",");
  Serial.print(ax, 4);
  Serial.print(",");
  Serial.print(ay, 4);
  Serial.print(",");
  Serial.print(az, 4);
  Serial.print(",");
  Serial.println(acc_total, 4);
}

Python

・ESP32からシリアル通信で加速度データを取得
・X・Y・Z方向と合成加速度を読み込み
・リアルタイムでグラフ表示
・一定時間分の波形をスクロール表示
・測定データをCSV形式で保存

import serial
import csv
import time
from collections import deque

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

# ===============================
# ▼ここだけ設定すればOK
# ===============================

PORT = "COM5"  
# ↑ お使いのPCに合わせて変更してください
# Arduino IDEの「ツール→ポート」で確認できます

BAUD = 115200  
# 通信速度(ESP32側と同じ値にします)

CSV_FILE = "acc_demo_10hz.csv"  
# 測定データを保存するファイル名

PLOT_SEC = 15  
# グラフに表示する時間の長さ(秒)

ASSUME_FS = 10  
# 1秒あたりの測定回数(Hz)

UI_HZ = 25  
# グラフの更新スピード

# グラフの縦軸範囲
Y_MIN = -2.0  
Y_MAX = 2.0  

# データの項目名
COLS = ["time_us","ax","ay","az","acc_total"]

# 線の色
COLORS = {
    "ax":"#1f77b4",   # 青
    "ay":"#2ca02c",   # 緑
    "az":"#d62728",   # 赤
    "total":"#9467bd" # 紫(合成)
}

# グラフの見た目
plt.rcParams.update({
    "font.size":12,
    "axes.spines.top":False,
    "axes.spines.right":False
})

# ===============================
# ▼ESP32と接続
# ===============================
ser = serial.Serial(PORT, BAUD, timeout=0)
time.sleep(2)  
# 接続直後はESP32が再起動するので少し待機

# ===============================
# ▼CSV保存の準備
# ===============================
f=open(CSV_FILE,"w",newline="",encoding="utf-8")
writer=csv.writer(f)
writer.writerow(COLS)

# ===============================
# ▼データを一時保存する箱
# ===============================
maxlen=int(PLOT_SEC*ASSUME_FS)+20

t_buf=deque(maxlen=maxlen)
ax_buf=deque(maxlen=maxlen)
ay_buf=deque(maxlen=maxlen)
az_buf=deque(maxlen=maxlen)
at_buf=deque(maxlen=maxlen)

t0_us=None
rx=""

# ===============================
# ▼1行分のデータを読み取る関数
# ===============================
def parse(line):
    if not line or line.startswith("time_us"):
        return None

    sp=line.strip().split(",")

    if len(sp)!=5:
        return None

    try:
        return [float(v) for v in sp]
    except:
        return None

# ===============================
# ▼グラフの準備
# ===============================
fig,ax=plt.subplots(figsize=(12,5))

ax.set_title("Realtime Acceleration",fontsize=16)

ln_ax,=ax.plot([],[],color=COLORS["ax"],lw=1.5,label="ax")
ln_ay,=ax.plot([],[],color=COLORS["ay"],lw=1.5,label="ay")
ln_az,=ax.plot([],[],color=COLORS["az"],lw=1.5,label="az")
ln_at,=ax.plot([],[],color=COLORS["total"],lw=2.5,label="total")

ax.set_ylim(Y_MIN,Y_MAX)
ax.set_xlabel("Time [s]")
ax.set_ylabel("Acceleration [g]")
ax.grid(alpha=0.3)

ax.legend(loc="upper right")

info=fig.text(0.01,0.01,"",fontsize=10)

# ===============================
# ▼ESP32からデータを読む
# ===============================
def read_fast():
    global rx,t0_us

    n=ser.in_waiting
    if n<=0:
        return

    data=ser.read(n).decode(errors="ignore")
    rx+=data

    lines=rx.split("\n")
    rx=lines[-1]

    for line in lines[:-1]:

        vals=parse(line)
        if vals is None:
            continue

        # CSV保存
        writer.writerow(vals)

        t_us=vals[0]

        if t0_us is None:
            t0_us=t_us

        # 秒に変換
        t_s=(t_us-t0_us)*1e-6

        # グラフ用に保存
        t_buf.append(t_s)
        ax_buf.append(vals[1])
        ay_buf.append(vals[2])
        az_buf.append(vals[3])
        at_buf.append(vals[4])

# ===============================
# ▼グラフ更新処理
# ===============================
def update(_):

    read_fast()

    if len(t_buf)<3:
        return

    t=np.array(t_buf)

    ln_ax.set_data(t,np.array(ax_buf))
    ln_ay.set_data(t,np.array(ay_buf))
    ln_az.set_data(t,np.array(az_buf))
    ln_at.set_data(t,np.array(at_buf))

    # 表示時間をスライド
    t_now=t[-1]
    ax.set_xlim(max(0,t_now-PLOT_SEC),max(PLOT_SEC,t_now))

    info.set_text(f"time={t_now:.1f}s")

    return ln_ax,ln_ay,ln_az,ln_at

ani=FuncAnimation(fig,update,interval=int(1000/UI_HZ))

plt.tight_layout()
plt.show()

# 終了処理
f.close()
ser.close()
スポンサーリンク
加速度センサー
Follow
この記事を書いた人

【経歴】
関東在住、40代、製造業(品質部門)。
これまで、研究開発、設計、生産技術、仕入先の品質管理を手掛ける。

【保有知識・技術分野】
統計学、信頼性工学、品質工学。
半導体、基板、有機材料、金属、セラミックスの材料、製造、加工技術。
部品加工(機械加工、化学処理)、組立・実装技術、分析・物理解析技術。
QC検定1級保有。

【当サイトについて】
品質・生産の基礎知識をテーマに、用語の解説、使い方(作り方)、メリット、考え方のポイントを分かりやすく解説しています。
某メーカ様の品質教育用の資料としてもご活用いただいております。
QC検定(品質管理検定)の試験対策、おすすめ勉強法も紹介しています。

Follow
QCとらのまき

コメント