Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,8 @@ ENV/
.ropeproject
*~

# VS Code
.vscode/

# OSX
.DS_Store
242 changes: 145 additions & 97 deletions bme280/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,75 @@
Raspberry Pi BME280 Driver.
"""

__version__ = "0.2.4"

from __future__ import annotations
from dataclasses import dataclass

import datetime
import time
import uuid
from enum import IntEnum
from functools import lru_cache

from bme280.reader import Reader

from bme280.reader import reader
import bme280.const as oversampling
import pytz
__version__ = "0.2.4"

# Oversampling modes
oversampling.x1 = 1
oversampling.x2 = 2
oversampling.x4 = 3
oversampling.x8 = 4
oversampling.x16 = 5

DEFAULT_PORT = 0x76
class Oversampling(IntEnum):
x1 = 1
x2 = 2
x4 = 3
x8 = 4
x16 = 5


class uncompensated_readings(object):
DEFAULT_PORT = 0x76

def __init__(self, block):

@dataclass
class CompensationParams:
dig_T1: int
dig_T2: int
dig_T3: int
dig_P1: int
dig_P2: int
dig_P3: int
dig_P4: int
dig_P5: int
dig_P6: int
dig_P7: int
dig_P8: int
dig_P9: int
dig_H1: int
dig_H2: int
dig_H3: int
dig_H4: int
dig_H5: int
dig_H6: int


class UncompensatedReadings:
def __init__(self, block: tuple | list) -> None:
self._block = block
self.pressure = (block[0] << 16 | block[1] << 8 | block[2]) >> 4
self.temperature = (block[3] << 16 | block[4] << 8 | block[5]) >> 4
self.humidity = block[6] << 8 | block[7]

def __repr__(self):
return "uncompensated_reading(temp=0x{0:08X}, pressure=0x{1:08X}, humidity=0x{2:08X}, block={3})".format(
self.temperature, self.pressure, self.humidity,
":".join("{0:02X}".format(c) for c in self._block))
def __repr__(self) -> str:
return (
"uncompensated_reading(temp=0x{0:08X}, "
"pressure=0x{1:08X}, "
"humidity=0x{2:08X}, block={3})"
).format(
self.temperature,
self.pressure,
self.humidity,
":".join("{0:02X}".format(c) for c in self._block),
)


class compensated_readings(object):
class CompensatedReadings:
"""
Compensation formulas translated from Appendix A (8.1) of BME280 datasheet:

Expand All @@ -74,34 +108,53 @@ class compensated_readings(object):
* Humidity in %rH as as double. Output value of "46.332" represents
46.332 %rH
"""
def __init__(self, raw_readings, compensation_params):

def __init__(
self,
raw_readings: UncompensatedReadings,
compensation_params: CompensationParams,
) -> None:
self._comp = compensation_params
self.id = uuid.uuid4()
self.uncompensated = raw_readings
self.timestamp = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
self.temperature = self.__tfine(raw_readings.temperature) / 5120.0
self.humidity = self.__calc_humidity(raw_readings.humidity,
raw_readings.temperature)
self.pressure = self.__calc_pressure(raw_readings.pressure,
raw_readings.temperature) / 100.0

def __tfine(self, t):
self.timestamp = datetime.datetime.now(datetime.timezone.utc)
self.temperature = self._tfine(raw_readings.temperature) / 5120.0
self.humidity = self._calc_humidity(
raw_readings.humidity, raw_readings.temperature
)
self.pressure = (
self._calc_pressure(raw_readings.pressure, raw_readings.temperature) / 100.0
)

def _tfine(self, t: float) -> float:
v1 = (t / 16384.0 - self._comp.dig_T1 / 1024.0) * self._comp.dig_T2
v2 = ((t / 131072.0 - self._comp.dig_T1 / 8192.0) ** 2) * self._comp.dig_T3
return v1 + v2

def __calc_humidity(self, h, t):
res = self.__tfine(t) - 76800.0
res = (h - (self._comp.dig_H4 * 64.0 + self._comp.dig_H5 / 16384.0 * res)) * (self._comp.dig_H2 / 65536.0 * (1.0 + self._comp.dig_H6 / 67108864.0 * res * (1.0 + self._comp.dig_H3 / 67108864.0 * res)))
def _calc_humidity(self, h: float, t: float) -> float:
res = self._tfine(t) - 76800.0
res = (h - (self._comp.dig_H4 * 64.0 + self._comp.dig_H5 / 16384.0 * res)) * (
self._comp.dig_H2
/ 65536.0
* (
1.0
+ self._comp.dig_H6
/ 67108864.0
* res
* (1.0 + self._comp.dig_H3 / 67108864.0 * res)
)
)
res = res * (1.0 - (self._comp.dig_H1 * res / 524288.0))
return max(0.0, min(res, 100.0))

def __calc_pressure(self, p, t):
v1 = self.__tfine(t) / 2.0 - 64000.0
def _calc_pressure(self, p: float, t: float) -> float:
v1 = self._tfine(t) / 2.0 - 64000.0
v2 = v1 * v1 * self._comp.dig_P6 / 32768.0
v2 = v2 + v1 * self._comp.dig_P5 * 2.0
v2 = v2 / 4.0 + self._comp.dig_P4 * 65536.0
v1 = (self._comp.dig_P3 * v1 * v1 / 524288.0 + self._comp.dig_P2 * v1) / 524288.0
v1 = (
self._comp.dig_P3 * v1 * v1 / 524288.0 + self._comp.dig_P2 * v1
) / 524288.0
v1 = (1.0 + v1 / 32768.0) * self._comp.dig_P1

# Prevent divide by zero
Expand All @@ -115,29 +168,20 @@ def __calc_pressure(self, p, t):
res = res + (v1 + v2 + self._comp.dig_P7) / 16.0
return res

def __repr__(self):
return "compensated_reading(id={0}, timestamp={1:%Y-%m-%d %H:%M:%S.%f%Z}, temp={2:0.3f} °C, pressure={3:0.2f} hPa, humidity={4:0.2f} % rH)".format(
self.id, self.timestamp, self.temperature, self.pressure, self.humidity)


class params(dict):
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__

def __repr__(self) -> str:
return (
f"compensated_reading(id={self.id}), "
f"timestamp={self.timestamp}, "
f"temp={self.temperature:.3f} °C, "
f"pressure={self.pressure:.2f} hPa, "
f"humidity={self.humidity:.2f} % rH)"
)

class memoize:
def __init__(self, f):
self.f = f
self.memo = {}

def __call__(self, *args):
if args not in self.memo:
self.memo[args] = self.f(*args)
return self.memo[args]


def load_calibration_params(bus, address=DEFAULT_PORT):
@lru_cache
def load_calibration_params(
bus: int, address: int = DEFAULT_PORT
) -> CompensationParams:
"""
The BME280 output consists of the ADC output values. However, each sensing
element behaves differently. Therefore, the actual pressure and temperature
Expand All @@ -147,52 +191,54 @@ def load_calibration_params(bus, address=DEFAULT_PORT):
formula to perform temperature readout in degC, humidity in % and pressure
in hPA.
"""
read = reader(bus, address)
compensation_params = params()

# Temperature trimming params
compensation_params.dig_T1 = read.unsigned_short(0x88)
compensation_params.dig_T2 = read.signed_short(0x8A)
compensation_params.dig_T3 = read.signed_short(0x8C)

# Pressure trimming params
compensation_params.dig_P1 = read.unsigned_short(0x8E)
compensation_params.dig_P2 = read.signed_short(0x90)
compensation_params.dig_P3 = read.signed_short(0x92)
compensation_params.dig_P4 = read.signed_short(0x94)
compensation_params.dig_P5 = read.signed_short(0x96)
compensation_params.dig_P6 = read.signed_short(0x98)
compensation_params.dig_P7 = read.signed_short(0x9A)
compensation_params.dig_P8 = read.signed_short(0x9C)
compensation_params.dig_P9 = read.signed_short(0x9E)

# Humidity trimming params
compensation_params.dig_H1 = read.unsigned_byte(0xA1)
compensation_params.dig_H2 = read.signed_short(0xE1)
compensation_params.dig_H3 = read.signed_byte(0xE3)
read = Reader(bus, address)

e4 = read.signed_byte(0xE4)
e5 = read.signed_byte(0xE5)
e6 = read.signed_byte(0xE6)

compensation_params.dig_H4 = e4 << 4 | e5 & 0x0F
compensation_params.dig_H5 = ((e5 >> 4) & 0x0F) | (e6 << 4)
compensation_params.dig_H6 = read.signed_byte(0xE7)

return compensation_params


__cache_calibration_params = memoize(load_calibration_params)


def __calc_delay(t_oversampling, h_oversampling, p_oversampling):
# Temperature trimming params
return CompensationParams(
dig_T1=read.unsigned_short(0x88),
dig_T2=read.signed_short(0x8A),
dig_T3=read.signed_short(0x8C),
# Pressure trimming params
dig_P1=read.unsigned_short(0x8E),
dig_P2=read.signed_short(0x90),
dig_P3=read.signed_short(0x92),
dig_P4=read.signed_short(0x94),
dig_P5=read.signed_short(0x96),
dig_P6=read.signed_short(0x98),
dig_P7=read.signed_short(0x9A),
dig_P8=read.signed_short(0x9C),
dig_P9=read.signed_short(0x9E),
# Humidity trimming params
dig_H1=read.unsigned_byte(0xA1),
dig_H2=read.signed_short(0xE1),
dig_H3=read.signed_byte(0xE3),
dig_H4=e4 << 4 | e5 & 0x0F,
dig_H5=((e5 >> 4) & 0x0F) | (e6 << 4),
dig_H6=read.signed_byte(0xE7),
)


def _calc_delay(
t_oversampling: Oversampling,
h_oversampling: Oversampling,
p_oversampling: Oversampling,
) -> float:
t_delay = 0.000575 + 0.0023 * (1 << t_oversampling)
h_delay = 0.000575 + 0.0023 * (1 << h_oversampling)
p_delay = 0.001250 + 0.0023 * (1 << p_oversampling)
return t_delay + h_delay + p_delay


def sample(bus, address=DEFAULT_PORT, compensation_params=None, sampling=oversampling.x1):
def sample(
bus,
address: int = DEFAULT_PORT,
compensation_params: CompensationParams | None = None,
sampling: Oversampling = Oversampling.x1,
) -> CompensatedReadings:
"""
Primes the sensor for reading (defaut: x1 oversampling), pauses for a set
amount of time so that the reading stabilizes, and then returns a
Expand All @@ -204,18 +250,20 @@ def sample(bus, address=DEFAULT_PORT, compensation_params=None, sampling=oversam
* pressure (in hPa)
"""
if compensation_params is None:
compensation_params = __cache_calibration_params(bus, address)
compensation_params = load_calibration_params(bus, address)

mode = 1 # forced
t_oversampling = sampling or oversampling.x1
h_oversampling = sampling or oversampling.x1
p_oversampling = sampling or oversampling.x1
t_oversampling = sampling
h_oversampling = sampling
p_oversampling = sampling

bus.write_byte_data(address, 0xF2, h_oversampling) # ctrl_hum
bus.write_byte_data(address, 0xF4, t_oversampling << 5 | p_oversampling << 2 | mode) # ctrl
delay = __calc_delay(t_oversampling, h_oversampling, p_oversampling)
bus.write_byte_data(
address, 0xF4, t_oversampling << 5 | p_oversampling << 2 | mode
) # ctrl
delay = _calc_delay(t_oversampling, h_oversampling, p_oversampling)
time.sleep(delay)

block = bus.read_i2c_block_data(address, 0xF7, 8)
raw_data = uncompensated_readings(block)
return compensated_readings(raw_data, compensation_params)
raw_data = UncompensatedReadings(block)
return CompensatedReadings(raw_data, compensation_params)
49 changes: 0 additions & 49 deletions bme280/const.py

This file was deleted.

Loading