Raspberry Piで温湿度計を使う

動作確認した環境

  • Raspberry Pi Zero WH
  • Python3
  • 温湿度・気圧センサモジュールキット 秋月電子[AE-BME280]

#!/usr/bin/env python
# coding: UTF-8
try:
    from smbus import SMBus
except ImportError:
    from smbus2 import SMBus
from time import sleep

I2CBus = 1
# $ sudo i2cdetect -r -y 1
SlaveAddress = 0x76  # SDO=GND
# SlaveAddress = 0x77  # SDO=VDD

# I2C通信がうまくいかない場合はボーレートを遅くする
# /boot/config.txt に下記を追記して reboot
# dtparam=i2c_arm_baudrate=80000

# 覚え書き
# WriteRegのあと少し待つ必要がある

REGADDR_hum_lsb    = 0xFE  # Read only  Humidity
REGADDR_hum_msb    = 0xFD  # Read only
REGADDR_temp_xlsb  = 0xFC  # Read only  Temperature
REGADDR_temp_lsb   = 0xFB  # Read only
REGADDR_temp_msb   = 0xFA  # Read only
REGADDR_press_xlsb = 0xF9  # Read only  Barometric pressure
REGADDR_press_lsb  = 0xF8  # Read only
REGADDR_press_msb  = 0xF7  # Read only
REGADDR_config     = 0xF5  # Read/Write Control
REGADDR_ctrl_meas  = 0xF4  # Read/Write Control
REGADDR_status     = 0xF3  # Read only
REGADDR_ctrl_hum   = 0xF2  # Read/Write Control
REGADDR_calib26_41 = 0xE1  # Read only  Top of Calibration data[26..41]
REGADDR_reset      = 0xE0  # Write only Reset
REGADDR_id         = 0xD0  # Read only  chip ID
REGADDR_calib00_25 = 0x88  # Read only  Top of Calibration data[0..25]
REGADDR_calib25    = 0xA1  # Read only  Calibration data[25]

digT = []
digP = []
digH = []
t_fine = 0.0
class BME280:
    def __init__(self, i2cbus=I2CBus, addr=SlaveAddress):
#        print('BME280 init')
        self.i2c = SMBus(i2cbus)
        self.addr = addr

        self.reset()

        # Humidity oversampling
        osrs_h = 1  # x1
#        osrs_h = 2  # x2
#        osrs_h = 3  # x4
#        osrs_h = 4  # x8
#        osrs_h = 5  # x16
        self.writeReg(REGADDR_ctrl_hum, osrs_h)

        # Temperature oversampling
        osrs_t = 1  # x1
        # Pressure oversampling
        osrs_p = 1  # x1
        # Sensor mode
#        mode = 0  # Sleep mode
#        mode = 1  # Force mode
        mode = 3  # Normal mode
        self.writeReg(REGADDR_ctrl_meas, (osrs_t << 5) | (osrs_p << 2) | mode)

        # Tstandby ≒ 計測周期
#        t_sb = 0  # 0.5ms
#        t_sb = 1  # 62.5ms
#        t_sb = 2  # 125ms
#        t_sb = 3  # 250ms
#        t_sb = 4  # 500ms
        t_sb = 5  # 1000ms
#        t_sb = 6  # 10ms
#        t_sb = 7  # 20ms
        # IIR Filter coefficien
#        filter_coef = 0  # Filter off
#        filter_coef = 1  # 2
#        filter_coef = 2  # 4
        filter_coef = 3  # 8
#        filter_coef = 4  # 16
        spi3w_en = 0  # 0=Use I2C, 1=3-wire SPI
        self.writeReg(REGADDR_config, (t_sb << 5) | (filter_coef << 2) | spi3w_en)
#        print('setup done.')
        self.get_calib_param()
#        print('get calibration data done.')
        # wait normal mode 1st tstandby = 0.5ms
        sleep(0.5)

    def reset(self):
#        print('BME280 soft reset')
        self.writeReg(REGADDR_reset, 0xB6)  # soft reset
        # wait Start-up-time = 2ms
        sleep(0.002)
        while True:
            status = self.readReg(REGADDR_status)
            if (status & 0x09) == 0x00:  # bit2非公開らしいのでマスク必須(bit3=bit0=0だけ見たい)
                break
            else:
                print('status register 0x%02x' % status)

    def writeReg(self, reg_address, data):
        self.i2c.write_byte_data(self.addr, reg_address, data)
#        print('reg 0x%02x <- data 0x%02x' % (reg_address, data))
        sleep(0.1)  # 必須

    def readReg(self, reg_address):
        data = self.i2c.read_byte_data(self.addr, reg_address)
#        print('reg 0x%02x -> data 0x%02x' % (reg_address,data))
#        sleep(0.25)
        return data

    def readBlockReg(self, reg_address, byte):
        data_list = []
        data_list = self.i2c.read_i2c_block_data(self.addr, reg_address, byte)
#        print(data_list)
#        sleep(0.25)
        return data_list

    def get_calib_param(self):
        calib = []
        global digT
        # digT1〜3, digP1〜9, digH1 非公開レジスタ0xA0も含めてまとめて読み出す
        calib = self.readBlockReg(REGADDR_calib00_25, 26)
        del calib[-2]  # skip address 0xA0 後ろから2番目の要素(アドレスA0の値)を捨てる
        # digH2〜6
        calib += self.readBlockReg(REGADDR_calib26_41, 7)

        digT.append((calib[1] << 8) | calib[0])
        digT.append((calib[3] << 8) | calib[2])
        digT.append((calib[5] << 8) | calib[4])
        digP.append((calib[7] << 8) | calib[6])
        digP.append((calib[9] << 8) | calib[8])
        digP.append((calib[11]<< 8) | calib[10])
        digP.append((calib[13]<< 8) | calib[12])
        digP.append((calib[15]<< 8) | calib[14])
        digP.append((calib[17]<< 8) | calib[16])
        digP.append((calib[19]<< 8) | calib[18])
        digP.append((calib[21]<< 8) | calib[20])
        digP.append((calib[23]<< 8) | calib[22])
        digH.append(                  calib[24])
        digH.append((calib[26]<< 8) | calib[25])
        digH.append( calib[27] )
        digH.append((calib[28]<< 4) | ( calib[29] & 0x0F))
        digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
        digH.append( calib[31] )
#        print(digT)
#        print(digP)
#        print(digH)
        for i in range(1, 2):
            if digT[i] & 0x8000:
                digT[i] = (-digT[i] ^ 0xFFFF) + 1
        for i in range(1, 8):
            if digP[i] & 0x8000:
                digP[i] = (-digP[i] ^ 0xFFFF) + 1
        for i in range(0, 6):
            if digH[i] & 0x8000:
                digH[i] = (-digH[i] ^ 0xFFFF) + 1

    def compensate_P(self, adc_P):
        global t_fine
        global digP
        """
        # Device Plusサンプル
        pressure = 0.0
        v1 = (t_fine / 2.0) - 64000.0
        v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
        v2 = v2 + ((v1 * digP[4]) * 2.0)
        v2 = (v2 / 4.0) + (digP[3] * 65536.0)
        v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8)  + ((digP[1] * v1) / 2.0)) / 262144
        v1 = ((32768 + v1) * digP[0]) / 32768
        if v1 == 0:
            return 0
        pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
        if pressure < 0x80000000:
            pressure = (pressure * 2.0) / v1
        else:
            pressure = (pressure / v1) * 2
        v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
        v2 = ((pressure / 4.0) * digP[7]) / 8192.0
        pressure = pressure + ((v1 + v2 + digP[6]) / 16.0)
        """
        # データシートをもとに作成
        # Returns pressure in Pa as unsigned 32bit integer in Q24.8 format (24 integer bits and 8 fractional bits).
        # Output value of "24674867" represents 24674867/256=96386.2 Pa = 963.862hPa
        var1 = int(t_fine) - 128000
        var2 = var1 * var1 * digP[5]
        var2 = var2 + (var1 * digP[4] << 17)
        var2 = var2 + (digP[3] << 35)
        var1 = ((var1 * var1 * digP[2]) >> 8) + ((var1 * digP[1]) << 12)
        var1 = (((1 << 47) + var1)) * digP[0] >> 33
        if var1 == 0:
            return 0  # avoid exception caused by division by zero
        p = int(1048576 - adc_P)
        p = int((((p << 31) - var2) * 3125) / var1)
        var1 = (digP[8] * (p >> 13) * (p >> 13)) >> 25
        var2 = (digP[7] * p) >> 19
        p = ((p + var1 + var2) >> 8) + (digP[6] << 4)
        pressure = p / 256.0   # [P]
        #"""
#        print('Barometric pressure\t:%7.2fhPa' % (pressure/100))
        return (pressure/100)  # [hPa]

    def compensate_T(self, adc_T):
        global t_fine
        global digT
        """
        # Device Plusサンプル
        v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
        v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
        t_fine = v1 + v2
        temperature = t_fine / 5120.0
        """
        # データシートをもとに作成
        # Returns temperature in DegC, resolution is 0.01 DegC.
        # Output value of "5123" = 51.23 DegC.
        var1 = ((((adc_T>>3) - (digT[0]<<1))) * (digT[1])) >> 11
        var2 = (((((adc_T>>4) - (digT[0])) * ((adc_T>>4) - (digT[0]))) >> 12) * (digT[2])) >> 14
        t_fine = var1 + var2
        T = (t_fine * 5 + 128) >> 8
        temperature = T / 100.0
        #"""
#        print('Temperature\t\t:%7.2f℃' % temperature)
        return temperature

    def compensate_H(self, adc_H):
        global t_fine
        global digH
        """
        # Device Plusサンプル
        var_h = t_fine - 76800.0
        if var_h != 0:
            var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
        else:
            return 0
        var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
        if var_h > 100.0:
            var_h = 100.0
        elif var_h < 0.0:
            var_h = 0.0
        """
        # データシートをもとに作成
        # Returns humidity in %RH as unsigned 32bit integer in Q22.10 format (22 integer and 10 fractional bit).
        # Output value of "47445" represents 47445/1024 = 46.333 %RH
        v_x1_u32r = int(t_fine - 76800.0)
        v_x1_u32r = (((((adc_H << 14) - (digH[3] << 20) - (digH[4] * v_x1_u32r)) + 16384) >> 15) * (((((((v_x1_u32r * digH[5]) >> 10) * (((v_x1_u32r * digH[2]) >> 11) + 32768)) >> 10) + 2097152) * digH[1] + 8192) >> 14))
        v_x1_u32r = (v_x1_u32r - (((((v_x1_u32r >> 15) * (v_x1_u32r >> 15)) >> 7) * digH[0]) >> 4))
        v_x1_u32r = min(max(0, v_x1_u32r), 419430400)
        v_x1_u32r = v_x1_u32r >> 12
        var_h = v_x1_u32r / 1024.0
        #"""
#        print('Humidity\t\t:%7.2f%' % var_h)
        return var_h


    def readData(self):
        data = []
        val = []
        # wait for the results have been transferred to the data registers.
        # 1->0エッジ待ち
        while True:
            status = self.readReg(REGADDR_status)
            if (status & 0x08) == 0x08:  # bit2非公開らしいのでマスク必須(bit3=1だけ見たい)
                break
        while True:
            status = self.readReg(REGADDR_status)
            if (status & 0x09) == 0x00:  # bit2非公開らしいのでマスク必須(bit3=bit0=0だけ見たい)
                break
            else:
#                print('status register 0x%02x' % status)
                if (status & 0x02) != 0x00:  # bit1非公開のくせに1ならreset必要らしい
                    print('status register 0x%02x' % status)
                    sleep(1)
                    self.__init__()
                    return val
#        data  = self.readBlockReg(REGADDR_press_msb, 3)  # 0xF7,0xF8,0xF9
#        data += self.readBlockReg(REGADDR_temp_msb, 3)   # 0xFA,0xFB,0xFC
#        data += self.readBlockReg(REGADDR_hum_msb, 2)    # 0xFD,0xFE
        data = self.readBlockReg(REGADDR_press_msb, 8)  # 0xF7〜0xFE

        press_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
        temp_raw  = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
        hum_raw   = (data[6] << 8)  |  data[7]
        val.append(self.compensate_P(press_raw))
        val.append(self.compensate_T(temp_raw))
        val.append(self.compensate_H(hum_raw))
        return val


if __name__ == '__main__':
    try:
        Sensor = BME280()
        print('\n\n')
        while True:
            val = Sensor.readData()
            if len(val) == 0:
                continue
            text  = '\033[3A'  # 同じ場所に表示する場合
            text += ' %7.2f[hPa]\n' % (val[0])
            text += ' %7.2f[°C]\n' % (val[1])
            text += ' %7.2f[%%]' % (val[2])
            print(text)
#            sleep(1.0)  # status registerポーリングでt_sb設定時間に従う
    except IOError:
        pass
    except KeyboardInterrupt:
        pass
exit()