動作確認した環境
- Raspberry Pi Zero WH
- Python3
- 温湿度・気圧センサモジュールキット 秋月電子[AE-BME280]
#!/usr/bin/env python
# coding: UTF-8
try:
from smbus import SMBus
except ImportError:
from smbus2 import SMBus
from time import sleep
I2CBus = 1
# $ sudo i2cdetect -r -y 1
SlaveAddress = 0x76 # SDO=GND
# SlaveAddress = 0x77 # SDO=VDD
# I2C通信がうまくいかない場合はボーレートを遅くする
# /boot/config.txt に下記を追記して reboot
# dtparam=i2c_arm_baudrate=80000
# 覚え書き
# WriteRegのあと少し待つ必要がある
REGADDR_hum_lsb = 0xFE # Read only Humidity
REGADDR_hum_msb = 0xFD # Read only
REGADDR_temp_xlsb = 0xFC # Read only Temperature
REGADDR_temp_lsb = 0xFB # Read only
REGADDR_temp_msb = 0xFA # Read only
REGADDR_press_xlsb = 0xF9 # Read only Barometric pressure
REGADDR_press_lsb = 0xF8 # Read only
REGADDR_press_msb = 0xF7 # Read only
REGADDR_config = 0xF5 # Read/Write Control
REGADDR_ctrl_meas = 0xF4 # Read/Write Control
REGADDR_status = 0xF3 # Read only
REGADDR_ctrl_hum = 0xF2 # Read/Write Control
REGADDR_calib26_41 = 0xE1 # Read only Top of Calibration data[26..41]
REGADDR_reset = 0xE0 # Write only Reset
REGADDR_id = 0xD0 # Read only chip ID
REGADDR_calib00_25 = 0x88 # Read only Top of Calibration data[0..25]
REGADDR_calib25 = 0xA1 # Read only Calibration data[25]
digT = []
digP = []
digH = []
t_fine = 0.0
class BME280:
def __init__(self, i2cbus=I2CBus, addr=SlaveAddress):
# print('BME280 init')
self.i2c = SMBus(i2cbus)
self.addr = addr
self.reset()
# Humidity oversampling
osrs_h = 1 # x1
# osrs_h = 2 # x2
# osrs_h = 3 # x4
# osrs_h = 4 # x8
# osrs_h = 5 # x16
self.writeReg(REGADDR_ctrl_hum, osrs_h)
# Temperature oversampling
osrs_t = 1 # x1
# Pressure oversampling
osrs_p = 1 # x1
# Sensor mode
# mode = 0 # Sleep mode
# mode = 1 # Force mode
mode = 3 # Normal mode
self.writeReg(REGADDR_ctrl_meas, (osrs_t << 5) | (osrs_p << 2) | mode)
# Tstandby ≒ 計測周期
# t_sb = 0 # 0.5ms
# t_sb = 1 # 62.5ms
# t_sb = 2 # 125ms
# t_sb = 3 # 250ms
# t_sb = 4 # 500ms
t_sb = 5 # 1000ms
# t_sb = 6 # 10ms
# t_sb = 7 # 20ms
# IIR Filter coefficien
# filter_coef = 0 # Filter off
# filter_coef = 1 # 2
# filter_coef = 2 # 4
filter_coef = 3 # 8
# filter_coef = 4 # 16
spi3w_en = 0 # 0=Use I2C, 1=3-wire SPI
self.writeReg(REGADDR_config, (t_sb << 5) | (filter_coef << 2) | spi3w_en)
# print('setup done.')
self.get_calib_param()
# print('get calibration data done.')
# wait normal mode 1st tstandby = 0.5ms
sleep(0.5)
def reset(self):
# print('BME280 soft reset')
self.writeReg(REGADDR_reset, 0xB6) # soft reset
# wait Start-up-time = 2ms
sleep(0.002)
while True:
status = self.readReg(REGADDR_status)
if (status & 0x09) == 0x00: # bit2非公開らしいのでマスク必須(bit3=bit0=0だけ見たい)
break
else:
print('status register 0x%02x' % status)
def writeReg(self, reg_address, data):
self.i2c.write_byte_data(self.addr, reg_address, data)
# print('reg 0x%02x <- data 0x%02x' % (reg_address, data))
sleep(0.1) # 必須
def readReg(self, reg_address):
data = self.i2c.read_byte_data(self.addr, reg_address)
# print('reg 0x%02x -> data 0x%02x' % (reg_address,data))
# sleep(0.25)
return data
def readBlockReg(self, reg_address, byte):
data_list = []
data_list = self.i2c.read_i2c_block_data(self.addr, reg_address, byte)
# print(data_list)
# sleep(0.25)
return data_list
def get_calib_param(self):
calib = []
global digT
# digT1〜3, digP1〜9, digH1 非公開レジスタ0xA0も含めてまとめて読み出す
calib = self.readBlockReg(REGADDR_calib00_25, 26)
del calib[-2] # skip address 0xA0 後ろから2番目の要素(アドレスA0の値)を捨てる
# digH2〜6
calib += self.readBlockReg(REGADDR_calib26_41, 7)
digT.append((calib[1] << 8) | calib[0])
digT.append((calib[3] << 8) | calib[2])
digT.append((calib[5] << 8) | calib[4])
digP.append((calib[7] << 8) | calib[6])
digP.append((calib[9] << 8) | calib[8])
digP.append((calib[11]<< 8) | calib[10])
digP.append((calib[13]<< 8) | calib[12])
digP.append((calib[15]<< 8) | calib[14])
digP.append((calib[17]<< 8) | calib[16])
digP.append((calib[19]<< 8) | calib[18])
digP.append((calib[21]<< 8) | calib[20])
digP.append((calib[23]<< 8) | calib[22])
digH.append( calib[24])
digH.append((calib[26]<< 8) | calib[25])
digH.append( calib[27] )
digH.append((calib[28]<< 4) | ( calib[29] & 0x0F))
digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
digH.append( calib[31] )
# print(digT)
# print(digP)
# print(digH)
for i in range(1, 2):
if digT[i] & 0x8000:
digT[i] = (-digT[i] ^ 0xFFFF) + 1
for i in range(1, 8):
if digP[i] & 0x8000:
digP[i] = (-digP[i] ^ 0xFFFF) + 1
for i in range(0, 6):
if digH[i] & 0x8000:
digH[i] = (-digH[i] ^ 0xFFFF) + 1
def compensate_P(self, adc_P):
global t_fine
global digP
"""
# Device Plusサンプル
pressure = 0.0
v1 = (t_fine / 2.0) - 64000.0
v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
v2 = v2 + ((v1 * digP[4]) * 2.0)
v2 = (v2 / 4.0) + (digP[3] * 65536.0)
v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8) + ((digP[1] * v1) / 2.0)) / 262144
v1 = ((32768 + v1) * digP[0]) / 32768
if v1 == 0:
return 0
pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
if pressure < 0x80000000:
pressure = (pressure * 2.0) / v1
else:
pressure = (pressure / v1) * 2
v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
v2 = ((pressure / 4.0) * digP[7]) / 8192.0
pressure = pressure + ((v1 + v2 + digP[6]) / 16.0)
"""
# データシートをもとに作成
# Returns pressure in Pa as unsigned 32bit integer in Q24.8 format (24 integer bits and 8 fractional bits).
# Output value of "24674867" represents 24674867/256=96386.2 Pa = 963.862hPa
var1 = int(t_fine) - 128000
var2 = var1 * var1 * digP[5]
var2 = var2 + (var1 * digP[4] << 17)
var2 = var2 + (digP[3] << 35)
var1 = ((var1 * var1 * digP[2]) >> 8) + ((var1 * digP[1]) << 12)
var1 = (((1 << 47) + var1)) * digP[0] >> 33
if var1 == 0:
return 0 # avoid exception caused by division by zero
p = int(1048576 - adc_P)
p = int((((p << 31) - var2) * 3125) / var1)
var1 = (digP[8] * (p >> 13) * (p >> 13)) >> 25
var2 = (digP[7] * p) >> 19
p = ((p + var1 + var2) >> 8) + (digP[6] << 4)
pressure = p / 256.0 # [P]
#"""
# print('Barometric pressure\t:%7.2fhPa' % (pressure/100))
return (pressure/100) # [hPa]
def compensate_T(self, adc_T):
global t_fine
global digT
"""
# Device Plusサンプル
v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
t_fine = v1 + v2
temperature = t_fine / 5120.0
"""
# データシートをもとに作成
# Returns temperature in DegC, resolution is 0.01 DegC.
# Output value of "5123" = 51.23 DegC.
var1 = ((((adc_T>>3) - (digT[0]<<1))) * (digT[1])) >> 11
var2 = (((((adc_T>>4) - (digT[0])) * ((adc_T>>4) - (digT[0]))) >> 12) * (digT[2])) >> 14
t_fine = var1 + var2
T = (t_fine * 5 + 128) >> 8
temperature = T / 100.0
#"""
# print('Temperature\t\t:%7.2f℃' % temperature)
return temperature
def compensate_H(self, adc_H):
global t_fine
global digH
"""
# Device Plusサンプル
var_h = t_fine - 76800.0
if var_h != 0:
var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
else:
return 0
var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
if var_h > 100.0:
var_h = 100.0
elif var_h < 0.0:
var_h = 0.0
"""
# データシートをもとに作成
# Returns humidity in %RH as unsigned 32bit integer in Q22.10 format (22 integer and 10 fractional bit).
# Output value of "47445" represents 47445/1024 = 46.333 %RH
v_x1_u32r = int(t_fine - 76800.0)
v_x1_u32r = (((((adc_H << 14) - (digH[3] << 20) - (digH[4] * v_x1_u32r)) + 16384) >> 15) * (((((((v_x1_u32r * digH[5]) >> 10) * (((v_x1_u32r * digH[2]) >> 11) + 32768)) >> 10) + 2097152) * digH[1] + 8192) >> 14))
v_x1_u32r = (v_x1_u32r - (((((v_x1_u32r >> 15) * (v_x1_u32r >> 15)) >> 7) * digH[0]) >> 4))
v_x1_u32r = min(max(0, v_x1_u32r), 419430400)
v_x1_u32r = v_x1_u32r >> 12
var_h = v_x1_u32r / 1024.0
#"""
# print('Humidity\t\t:%7.2f%' % var_h)
return var_h
def readData(self):
data = []
val = []
# wait for the results have been transferred to the data registers.
# 1->0エッジ待ち
while True:
status = self.readReg(REGADDR_status)
if (status & 0x08) == 0x08: # bit2非公開らしいのでマスク必須(bit3=1だけ見たい)
break
while True:
status = self.readReg(REGADDR_status)
if (status & 0x09) == 0x00: # bit2非公開らしいのでマスク必須(bit3=bit0=0だけ見たい)
break
else:
# print('status register 0x%02x' % status)
if (status & 0x02) != 0x00: # bit1非公開のくせに1ならreset必要らしい
print('status register 0x%02x' % status)
sleep(1)
self.__init__()
return val
# data = self.readBlockReg(REGADDR_press_msb, 3) # 0xF7,0xF8,0xF9
# data += self.readBlockReg(REGADDR_temp_msb, 3) # 0xFA,0xFB,0xFC
# data += self.readBlockReg(REGADDR_hum_msb, 2) # 0xFD,0xFE
data = self.readBlockReg(REGADDR_press_msb, 8) # 0xF7〜0xFE
press_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
hum_raw = (data[6] << 8) | data[7]
val.append(self.compensate_P(press_raw))
val.append(self.compensate_T(temp_raw))
val.append(self.compensate_H(hum_raw))
return val
if __name__ == '__main__':
try:
Sensor = BME280()
print('\n\n')
while True:
val = Sensor.readData()
if len(val) == 0:
continue
text = '\033[3A' # 同じ場所に表示する場合
text += ' %7.2f[hPa]\n' % (val[0])
text += ' %7.2f[°C]\n' % (val[1])
text += ' %7.2f[%%]' % (val[2])
print(text)
# sleep(1.0) # status registerポーリングでt_sb設定時間に従う
except IOError:
pass
except KeyboardInterrupt:
pass
exit()