はじめに
RaspberryPi Pico W のBluetoothで、オムロンのUSB型環境センサ「2JCIE-BU」を使う方法を解説します。言語はMicroPythonです。
「2JCIE-BU」はオムロン社製のUSB型環境センサーで、温度・湿度・CO2濃度・不快指数…などなど、さまざまな環境情報が取得できるすごいセンサーです。
今回その「2JCIE-BU」を、Pico Wから使う方法について質問をいただいたので温度情報を取得方法する例に、解説してみたいと思います。
リクエストいただいた ナカさん、ありがとうございます。
~ この記事の内容 / Contents ~
環境
この記事で使用する環境は以下の通りです。
環境 | バージョンなど | 備考 |
開発用PCのOS | Windows11 | Windows10でもOKです |
言語 | MicroPython | |
開発環境 Thonny | 4.0.2 | |
ボード | RaspberryPi Pico W |
USB型環境センサ「2JCIE-BU」
今回使用するオムロンのUSB型環境センサです。USB・Bluetooth接続で、PC・スマホ・ボードコンピュータなどから使えます。
小型のUSBメモリのような見た目ですが、後述する10種類ものデータが取得でき、60,000回分のデータも保存可能。USBに差すだけで使えるので、IoTやPoC(概念実証)に最適なセンサーです。
2JCIE-BUで取得できる環境データ
2JCIE-BUで取得できるデータは以下の通りです。その他詳細な仕様は、オムロンの公式サイトをご覧ください。
- 温度
- 湿度
- 照度
- 気圧
- 騒音
- 3軸加速度
- eTVOC
- 不快指数
- 熱中症警戒度
- 振動情報(地震回数・振動回数など)
Pico Wと「2JCIE-BU」の接続
Bluetooh(BLE)を使って接続します。配線や回路は必要ありません。
使用する部品
今回の記事で使用する部品は以下の通りです。
Pico W 本体
Raspberry Pi Pico W本体です。技適対応品かどうか心配…という方は、以下のページで出品者を「共立エレショップAセレクト」(国内の有名ショップ)にして購入してください。
環境センサ 2JCIE-BU
前述した、オムロン製のUSB型環境センサです。値段は少々お高めですが、他のセンサで代替えする場合、複数のセンサの準備と実装、配置スペースも必要になるので、トータルコストは断然安いと思います。
プログラム概要
今回のプログラムの概要は以下の通りです。
Pico WをBLEの「セントラル」、2JCIE-BUを「ペリフェラル」とし、最新値(Latest Data)を取得できるサービス「0x5010」のキャラクタリスティック「0x5012」から温度情報を取得します。
- Pico WをBLEのセントラルとして起動する。
- 2JCIEをBLEのペリフェラルとしてスキャンする
- 最新のデータ(Latest Data)のサービス(0x5010)から温度情報を取得する。
- Pico Wで温度情報を表示する。
実行結果
後述するプログラムを実行すると、以下のようにThonnyのシェル上に、2JCIEの温度情報が表示されます。
コードのベース
今回のコードは、PicoWの公式チュートリアルの「BLEのセントラル」のコード(picow_ble_temp_reader.py)をベースにします。コードのほとんどの部分が2JCIE-BUでそのまま使えるので、修正(改造)するのはUUIDなどほんの一部でOKです。
なお、オリジナルの時に使用した、ユーティリティー用のコード「ble_advertising.py」は改造後でも同じように必要になるので、以下から取得してPico W内に保存して下さい。
チュートリアルのコードについてはこちらの記事で詳しく解説しています。
全体コード
全体コードは以下の通りです。詳細な内容は後述する「コードのポイント」で解説します。
import bluetooth
import random
import struct
import time
import micropython
from ble_advertising import decode_services, decode_name
from micropython import const
from machine import Pin
import ubinascii
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)
# センサの最新値を取得する「Latest Dataサービス(0x5010)」のUUIDを定義します
_ENV_SENSE_UUID = bluetooth.UUID('AB705010-0A3A-11E8-BA89-0ED5F89F718B')
# センシングデータを取得するキャラクタリスティック「Latest sensing data(0x5012)」のUUIDを定義します
_TEMP_UUID = bluetooth.UUID('AB705012-0A3A-11E8-BA89-0ED5F89F718B')
_TEMP_CHAR = (
_TEMP_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_ENV_SENSE_SERVICE = (
_ENV_SENSE_UUID,
(_TEMP_CHAR,),
)
class BLETemperatureCentral:
# メンバを初期化して、bleのオブジェクトをアクティブにします。
def __init__(self, ble):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._reset()
self._led = Pin('LED', Pin.OUT)
# メンバを空にします
def _reset(self):
self._name = None
self._addr_type = None
self._addr = None
self._value = None
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
self._notify_callback = None
self._conn_handle = None
self._start_handle = None
self._end_handle = None
self._value_handle = None
#
# イベント発生時にライブラリから呼び出される関数です
#
def _irq(self, event, data):
#
# スキャンの結果
#
if event == _IRQ_SCAN_RESULT:
# アドバタイズの各種データを取得します
addr_type, addr, adv_type, rssi, adv_data = data
if adv_type in (_ADV_IND, _ADV_DIRECT_IND):
# アドバタイズのデータ内容をデコードします
type_list = decode_services(adv_data)
# データを出力します(デバッグ用)
#print('type:{} addr:{} rssi:{} data:{}'.format(addr_type, ubinascii.hexlify(addr,':'), rssi, ubinascii.hexlify(adv_data)))
#
# MACアドレスを使って、スキャン相手が2JCIE-BUかどうかを判定します。
# アドレスはお使いの機器に合わせて書き換えて下さい
#
if ubinascii.hexlify(addr,':') == b"d8:39:2d:24:7e:80":
# スキャンしたペリフェラルが、Pico WのUUIDかどうかチェックします
# if _ENV_SENSE_UUID in type_list:
# アドレスのタイプを取得します
self._addr_type = addr_type
#
# Bluetoothアドレスを取得します。データは変わることがあるので、
# 実体をコピー(deep copy)が必要です。
#
self._addr = bytes(addr)
# サービス名を取得します。
self._name = decode_name(adv_data) or "?"
# スキャンを停止します。
self._ble.gap_scan(None)
#
# スキャン完了
#
elif event == _IRQ_SCAN_DONE:
if self._scan_callback:
# Bluetoothアドレスが取得されている(タイムアウトではない)
if self._addr:
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
# タイムアウト
else:
self._scan_callback(None, None, None)
#
# ペリフェラルへの接続成功
#
elif event == _IRQ_PERIPHERAL_CONNECT:
# dataの取得
conn_handle, addr_type, addr = data
# アドレスタイプ・MACアドレスがペリフェラルのPicoWと同じ
if addr_type == self._addr_type and addr == self._addr:
# 接続ハンドルを取得し、ペリフェラルのサービスを探します
self._conn_handle = conn_handle
self._ble.gattc_discover_services(self._conn_handle)
#
# ペリフェラルと切断した場合
#
elif event == _IRQ_PERIPHERAL_DISCONNECT:
# 切断したのが、接続していたペリフェアルだった場合
conn_handle, _, _ = data
if conn_handle == self._conn_handle:
# メンバ変数で保持していた、接続先の情報をリセットします
self._reset()
#
# ペリフェアルのサービスを取得
#
elif event == _IRQ_GATTC_SERVICE_RESULT:
# dataの取得
conn_handle, start_handle, end_handle, uuid = data
# ペリフェラルのPico WのサービスのUUIDと同じ場合
if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID:
# スタートハンドルとエンドハンドルを取得する
self._start_handle, self._end_handle = start_handle, end_handle
#
# ペリフェラルのサービス取得が完了
#
elif event == _IRQ_GATTC_SERVICE_DONE:
if self._start_handle and self._end_handle:
# サービスの特徴(Characteristics)を取得する。
self._ble.gattc_discover_characteristics(
self._conn_handle, self._start_handle, self._end_handle
)
else:
print("Failed to find environmental sensing service.")
#
# ペリフェラルの特徴(Characteristic)を取得
#
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
#dataの取得
conn_handle, def_handle, value_handle, properties, uuid = data
# 特徴(Characteristics)のUUIDが、ペリフェラルのPicoWのUUIDと同じだったら。
if conn_handle == self._conn_handle and uuid == _TEMP_UUID:
# 特性のValueのATTへのハンドルを保持する。
self._value_handle = value_handle
#
# ペリフェラルの特徴(Characteristic)を取得完了
#
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
# ペリフェラルのPicoWの特性情報を取得できていたら、
if self._value_handle:
if self._conn_callback:
# コールバック関数を呼び出します。
self._conn_callback()
else:
print("Failed to find temperature characteristic.")
#
# 特徴(Characteristic)へのReadの習得
# demo関数からRead実行後にイベントが発生します。
#
elif event == _IRQ_GATTC_READ_RESULT:
# dataの取得
conn_handle, value_handle, char_data = data
# Read元がPico Wであったら場合。
if conn_handle == self._conn_handle and value_handle == self._value_handle:
# 取得したデータで温度情報変換・表示・設定します。
#self._update_value(char_data)
self._update_omron_temp_value(char_data)
# コールバック関数が設定されていたら呼び出します。
if self._read_callback:
self._read_callback(self._value)
self._read_callback = None
# READが完了した場合
elif event == _IRQ_GATTC_READ_DONE:
# dataを取得します
conn_handle, value_handle, status = data
# ペリフェラルからNotifyが送られてきた場合
elif event == _IRQ_GATTC_NOTIFY:
# dataを取得します
conn_handle, value_handle, notify_data = data
# Readと同様に、接続済みのPico Wからの物であれば、
if conn_handle == self._conn_handle and value_handle == self._value_handle:
# 値を変換して表示します。
self._update_value(notify_data)
if self._notify_callback:
self._notify_callback(self._value)
def is_connected(self):
# ペリフェラルへの接続ハンドルと、特徴のValueのATTへのハンドルが取得済みであれば
# ペリフェラルへ接続していると判定します。
return self._conn_handle is not None and self._value_handle is not None
# アドバタイズしているペリフェラルをスキャンします
def scan(self, callback=None):
self._addr_type = None
self._addr = None
self._scan_callback = callback
self._ble.gap_scan(2000, 30000, 30000)
# スキャンで取得した情報で、ペリフェラルに接続します。
def connect(self, addr_type=None, addr=None, callback=None):
self._addr_type = addr_type or self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# ペリフェラルと切断します
def disconnect(self):
if not self._conn_handle:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
# Readを行います
def read(self, callback):
if not self.is_connected():
return
self._read_callback = callback
try:
self._ble.gattc_read(self._conn_handle, self._value_handle)
except OSError as error:
print(error)
# notifyが着た際のコールバックを設定します(未使用)
def on_notify(self, callback):
self._notify_callback = callback
# 温度情報の変換を行います
def _update_value(self, data):
try:
self._value = struct.unpack("<h", data)[0] / 100
except OSError as error:
print(error)
# 2JCIE-BUの温度情報の変換を行います
def _update_omron_temp_value(self, data):
# characteristicの全データを出力
# print( 'data:{}'.format(ubinascii.hexlify(data)) )
# dataをバイト型の配列に変換
raw_data = bytes( data )
# リトルエンディアンなので2->1の順番で結合
upper = int(data << 8 )
lower = int(data )
temp_raw = -100
temp_raw = upper + lower
temp = temp_raw * 0.01
self._value = temp
# メンバ変数Valueを取得する関数です
def value(self):
return self._value
# Pico本体のLEDを点滅させる関数です
def sleep_ms_flash_led(self, flash_count, delay_ms):
self._led.off()
while(delay_ms > 0):
for i in range(flash_count):
self._led.on()
time.sleep_ms(100)
self._led.off()
time.sleep_ms(100)
delay_ms -= 200
time.sleep_ms(1000)
delay_ms -= 1000
def print_temp(result):
print("read temp: %.2f degc" % result)
# 中心となる処理を行う関数です。
def demo(ble, central):
not_found = False
#スキャン完了時に呼び出して貰う関数です。
#ペリフェラルの名前を表示して、接続を行います。
def on_scan(addr_type, addr, name):
if addr_type is not None:
print("Found sensor: %s" % name)
central.connect()
else:
nonlocal not_found
not_found = True
print("No sensor found.")
# ペリフェラルのスキャンを行います。
central.scan(callback=on_scan)
# ライブラリ側が接続処理をしてくれるまで待ちます。
while not central.is_connected():
time.sleep_ms(100)
if not_found:
return
print("Connected")
time.sleep_ms(1000)
# 接続が続いている間、ペリフェラルの温度の値を読み取ります。
while central.is_connected():
central.read(callback=print_temp)
sleep_ms_flash_led(central, 2, 2000)
print("Disconnected")
#メイン関数です
if __name__ == "__main__":
ble = bluetooth.BLE()
central = BLETemperatureCentral(ble)
while(True):
demo(ble, central)
sleep_ms_flash_led(central, 1, 10000)
コードのポイント
UUIDの定義
最新値を取得できるサービスのUUID(0x5010)と、その中のセンシングデータを扱うキャラクタリスティックのUUID(0x5012)を定義します。
2JCIE-BUのUUIDは、どのサービスやキャラクタリスティックであっても、XXXX以外の部分はすべて同じです。
そのため、下記のXXXXの部分に、マニュアルに記載されている「0x5010」などの数字を当てはめてUUIDを定義してください。
UUID 「AB70XXXX-0A3A-11E8-BA89-0ED5F89F718B」
# センサの最新値を取得する「Latest Dataサービス(0x5010)」のUUIDを定義します
_ENV_SENSE_UUID = bluetooth.UUID('AB705010-0A3A-11E8-BA89-0ED5F89F718B')
# センシングデータを取得するキャラクタリスティック「Latest sensing data(0x5012)」のUUIDを定義します
_TEMP_UUID = bluetooth.UUID('AB705012-0A3A-11E8-BA89-0ED5F89F718B')
UUIDではなく、MACアドレスで確認する
改造前のチュートリアルのコードでは、スキャンした相手の確認にアドバタイズデータ中のUUIDを使用していましたが、2JCIE-BUではアドバタイズデータ中にUUIDは含まれていません。そのため、今回のプログラムでは、2JCIE-BUのMACアドレスを使って相手の確認を行います。
MACアドレスはセンサの個体ごとに異なるので、LightBlueなどBluetooth用のアプリを使って、お手持ちの2JCIE-BUのMACアドレスを確認し、コードの内のアドレスを書き換えて下さい。
#
# スキャンの結果
#
if event == _IRQ_SCAN_RESULT:
# アドバタイズの各種データを取得します
addr_type, addr, adv_type, rssi, adv_data = data
if adv_type in (_ADV_IND, _ADV_DIRECT_IND):
# アドバタイズのデータ内容をデコードします
type_list = decode_services(adv_data)
# データを出力します(デバッグ用)
#print('type:{} addr:{} rssi:{} data:{}'.format(addr_type, ubinascii.hexlify(addr,':'), rssi, ubinascii.hexlify(adv_data)))
#
# MACアドレスを使って、スキャン相手が2JCIE-BUかどうかを判定します。
# アドレスはお使いの機器に合わせて書き換えて下さい
#
if ubinascii.hexlify(addr,':') == b"d8:39:2d:24:7e:80":
# スキャンしたペリフェラルが、Pico WのUUIDかどうかチェックします
# if _ENV_SENSE_UUID in type_list:
# アドレスのタイプを取得します
self._addr_type = addr_type
#
# Bluetoothアドレスを取得します。データは変わることがあるので、
# 実体をコピー(deep copy)が必要です。
#
self._addr = bytes(addr)
# サービス名を取得します。
self._name = decode_name(adv_data) or "?"
# スキャンを停止します。
self._ble.gap_scan(None)
温度を計算する
温度の計算の部分はチュートリアルのコードと大きく異なるため、新規に関数として作成しています。
キャラクタリスティック(0x5012)から送られるセンシングデータは、17バイトで構成されており、温度情報は(0始まりで)1バイト目と2バイト目、符号付きの16ビットのデータとなっています。
センシングデータのデータ構成
byte | 内容 | フォーマット |
0 | Sequence number | UInt8 |
1-2 | Temperature | Uint16 |
3-4 | Relative humidity | Uint16 |
5-6 | Ambient light | Uint16 |
7-10 | Barometric pressure | Uint32 |
11-12 | Sound noise | Uint16 |
13-14 | eTVOC | Uint16 |
15-16 | eCO2 | Uint16 |
※ 2JCIE-BUのマニュアルより抜粋
そのため、配列から「1バイト目」と「2バイト目」を取り出して結合。データの並びはリトルエンディアンとなっているため、結合は2バイト目を上位ビットとして扱います。
結合後は、データの単位である[0.01度]を掛けてあげれば、温度情報の完成です。
# 2JCIE-BUの温度情報の変換を行います
def _update_omron_temp_value(self, data):
# characteristicの全データを出力
# print( 'data:{}'.format(ubinascii.hexlify(data)) )
# dataをバイト型の配列に変換
raw_data = bytes( data )
# リトルエンディアンなので2->1の順番で結合
upper = int(data << 8 )
lower = int(data )
temp_raw = -100
temp_raw = upper + lower
temp = temp_raw * 0.01
self._value = temp
読み取り間隔を調整してエラーを予防
オリジナルのコードにはありませんが、そのまま使うと初回のReadの際に「[Errno 114] EALREADY」の例外が発生してしまうので、1秒のsleepを追加しています。
# ライブラリ側が接続処理をしてくれるまで待ちます。
while not central.is_connected():
time.sleep_ms(100)
if not_found:
return
print("Connected")
time.sleep_ms(1000)
# 接続が続いている間、ペリフェラルの温度の値を読み取ります。
while central.is_connected():
central.read(callback=print_temp)
sleep_ms_flash_led(central, 2, 2000)
まとめ
RaspberryPi Pico W のBluetoothで、オムロンのUSB型環境センサ「2JCIE-BU」を使う方法を解説しました。
「2JCIE-BU」は、Pico Wのチュートリアルのコードも使えて、必要な情報もマニュアルにきちんと掲載されているので、とても使いやすいセンサーです。
値段は少々高いですが、他のセンサーで10種類もの環境データを取ろうとすると、複数のセンサーのマニュアル確認、配線、プログラミング・・・となってしまうので、その時間と労力、それと回路のサイズ考えると、とてもパフォーマンスのいいセンサーだと思います。
サクッといろいろな環境データを取りたい方は、是非検討してみてください。
今回は温度情報を取得してみましたが、「他のデータがとり方が知りたい」「こんな使い方が知りたい」などあったら、コメント欄かX(Twitter)で遠慮なく教えてください。
参考になればうれしいです。
お知らせ
MicroPythonのプログラミングガイドブックが遂に発売!
「MicroPython」の本が遂にでました。
この一冊で、MicroPythonの言語仕様から、プログラミングの仕方まで”ガッツリ”学べます!。
内容は、普段別言語で開発している人や、これからマイコンを始める(工学系の)学生を対象としているので「初心者向け」ではありません。しかし、「自前のライブラリの作成」が目標なので、これ一冊で「ガッツリ」とMicroPythonを学ぶことができます。
全ての内容はここでは紹介しきれないので、詳細は以下のAmazonページをご覧ください。目次だけでも”ガッツリ”なのが確認できると思います。
Pico/Pico W関連のおすすめ本
RaspberryPi Pico / Pico W関連のおすすめ本を独断と偏見で3つ選んでみました。Picoやるならとりあえずこれ買っとけ的な本や、電子工作全般で使える本などを厳選しています。
質問・要望 大歓迎です
「こんな解説記事作って」「こんなことがしたいけど、〇〇で困ってる」など、コメント欄で教えてください。 質問・要望に、中の人ができる限り対応します。
使えたよ・設定できたよの一言コメントも大歓迎。気軽に足跡を残してみてください。記事を紹介したい方はブログ、SNSにバシバシ貼ってもらってOKです。