from typing import TYPE_CHECKING
from QVideo.lib import QCamera, QVideoSource
import logging
if TYPE_CHECKING:
from picamera2 import Picamera2
try:
from picamera2 import Picamera2
except (ImportError, ModuleNotFoundError):
Picamera2 = None
logger = logging.getLogger(__name__)
__all__ = ['QPicamera', 'QPicameraSource']
# picamera2 control names and their Python types.
# Only controls listed here are probed and registered as properties.
_CONTROL_TYPES: dict[str, type] = {
'AeEnable': bool,
'AwbEnable': bool,
'Brightness': float,
'Contrast': float,
'Saturation': float,
'Sharpness': float,
'ExposureTime': int,
'AnalogueGain': float,
'AfMode': int,
'AfRange': int,
'AfSpeed': int,
'LensPosition': float,
}
[docs]
class QPicamera(QCamera):
'''Camera backed by the Raspberry Pi camera module via picamera2.
Supports all CSI-connected camera modules on a Raspberry Pi SBC,
including the HQ Camera, Camera Module 3, and similar sensors.
Frames are delivered as RGB arrays.
Requires the ``picamera2`` package, which is pre-installed on
Raspberry Pi OS. Install manually with::
pip install picamera2
Parameters
----------
cameraID : int
Index of the camera to open when multiple modules are attached.
Default: ``0``.
width : int
Initial frame width in pixels. Default: ``1280``.
height : int
Initial frame height in pixels. Default: ``960``.
gray : bool
``True`` convert frames to grayscale.
Default: ``False``.
*args :
Forwarded to :class:`~QVideo.lib.QCamera`.
**kwargs :
Forwarded to :class:`~QVideo.lib.QCamera`.
'''
def __init__(self, *args,
cameraID: int = 0,
width: int = 1280,
height: int = 960,
gray: bool = False,
**kwargs) -> None:
super().__init__(*args, **kwargs)
self._cameraID = cameraID
self._width = int(width)
self._height = int(height)
self._gray = bool(gray)
self._controlValues: dict[str, object] = {}
# Tracks whether the picamera2 hardware is actively streaming.
# Distinct from self._isOpen so that _setControl can cache values
# between a stop/restart cycle (e.g. when fps changes while closed).
self._deviceOpen: bool = False
self._device: 'Picamera2 | None' = None
self.open()
def _initialize(self) -> bool:
'''Open the Raspberry Pi camera and register available controls.
Returns
-------
bool
``True`` if the camera was opened and delivering frames.
'''
if Picamera2 is None:
logger.warning(
'picamera2 is not available. '
'Install it on Raspberry Pi with: pip install picamera2')
return False
try:
self._device = Picamera2(camera_num=self._cameraID)
except Exception as ex:
logger.warning('Could not open Raspberry Pi camera'
f'{self._cameraID}: {ex}')
return False
# Save controls set before this restart (e.g. fps changed while
# the device was closed). _probeControls will overwrite
# _controlValues from fresh metadata, so we save them here and
# re-apply them afterwards.
prior_controls = dict(self._controlValues)
fmt = 'YUV420' if self._gray else 'BGR888'
config = self._device.create_preview_configuration(
main={'size': (self._width, self._height), 'format': fmt})
self._device.configure(config)
self._device.start()
self._deviceOpen = True
try:
self._device.capture_array()
except Exception as ex:
logger.warning(f'Camera did not deliver a frame: {ex}')
self._deviceOpen = False
self._device.stop()
self._device.close()
return False
register = self.registerProperty
register('width', getter=self._getWidth,
setter=self._setWidth, ptype=int)
register('height', getter=self._getHeight,
setter=self._setHeight, ptype=int)
register('color', getter=self._getColor,
setter=self._setColor, ptype=bool)
metadata = self._device.capture_metadata()
self._probeControls(metadata)
self._registerFrameRate(metadata)
self._probeFocus(metadata)
# Re-apply any controls the user set before this restart.
# Filter to keys still valid under the new configuration.
pending = {k: v for k, v in prior_controls.items()
if k in self._controlValues}
if pending:
self._device.set_controls(pending)
self._controlValues.update(pending)
return True
def _probeControls(self, metadata: dict[str, object]) -> None:
'''Register picamera2 controls supported by this camera.
For each name in :data:`_CONTROL_TYPES` that the camera reports in
:attr:`~picamera2.Picamera2.camera_controls`, a read-write property
is registered with the hardware-reported range.
Parameters
----------
metadata : dict
Frame metadata from :meth:`~picamera2.Picamera2.capture_metadata`,
used to initialise cached control values.
'''
for name, ptype in _CONTROL_TYPES.items():
if name not in self._device.camera_controls:
continue
lo, hi, default = self._device.camera_controls[name]
current = metadata.get(name, default)
self._controlValues[name] = ptype(current)
meta = {}
if ptype is not bool:
if lo is not None:
meta['minimum'] = lo
if hi is not None:
meta['maximum'] = hi
self.registerProperty(
name,
getter=lambda n=name, t=ptype: t(self._controlValues[n]),
setter=lambda v, n=name, t=ptype: self._setControl(n, t(v)),
ptype=ptype,
**meta)
def _registerFrameRate(self, metadata: dict[str, object]) -> None:
'''Register the ``fps`` property if the camera supports it.
Frame rate is controlled via ``FrameDurationLimits``, a picamera2
control that accepts a ``(min_µs, max_µs)`` tuple. Setting both
elements to the same value pins the frame rate.
Parameters
----------
metadata : dict
Frame metadata from :meth:`~picamera2.Picamera2.capture_metadata`,
used to read the current ``FrameDuration``.
'''
if 'FrameDurationLimits' not in self._device.camera_controls:
return
lo, hi, default = self._device.camera_controls['FrameDurationLimits']
duration = metadata.get('FrameDuration', None)
if duration is None:
duration = default[0] if isinstance(default, tuple) else lo
self._controlValues['FrameDurationLimits'] = (duration, duration)
self.registerProperty(
'fps',
getter=self._getFps,
setter=self._setFps,
ptype=float,
minimum=1_000_000 / hi,
maximum=1_000_000 / lo,
)
def _probeFocus(self, metadata: dict[str, object]) -> None:
'''Register autofocus controls if the camera supports AF.
AF support is detected by the presence of ``AfTrigger`` in
``camera_controls``. When supported, ``AfState`` is registered
as a read-only property (updated per frame in :meth:`read`) and
``autofocus`` is registered as a trigger method.
Parameters
----------
metadata : dict
Frame metadata from :meth:`~picamera2.Picamera2.capture_metadata`,
used to initialise the cached ``AfState`` value.
'''
if 'AfTrigger' not in self._device.camera_controls:
return
self._controlValues['AfState'] = int(metadata.get('AfState', 0))
self.registerProperty(
'AfState',
getter=lambda: int(self._controlValues.get('AfState', 0)),
setter=None,
ptype=int,
)
self.registerMethod('autofocus', self._triggerAutofocus)
def _triggerAutofocus(self) -> None:
'''Send ``AfTrigger=Start`` to begin a single autofocus sweep.'''
if self._deviceOpen:
self._device.set_controls({'AfTrigger': 0})
def _getFps(self) -> float:
duration = self._controlValues['FrameDurationLimits'][0]
return 1_000_000 / duration
def _setFps(self, fps: float) -> None:
duration = int(1_000_000 / fps)
self._setControl('FrameDurationLimits', (duration, duration))
def _setControl(self, name: str, value: object) -> None:
'''Update the control cache and apply to the device if it is open.
When called while the device is closed (e.g. fps changed between
stop and restart), the value is stored so that :meth:`_initialize`
can re-apply it once the device is reopened.
'''
self._controlValues[name] = value
if self._deviceOpen:
self._device.set_controls({name: value})
def _getColor(self) -> bool:
return not self._gray
def _setColor(self, value: bool) -> None:
self._gray = not bool(value)
self._reconfigure()
self.shapeChanged.emit(self.shape)
def _getWidth(self) -> int:
return self._device.camera_config['main']['size'][0]
def _setWidth(self, value: int) -> None:
'''Store the requested width so the next
:meth:`_initialize` applies it.
Resolution changes require stopping and restarting the picamera2
stream, handled by
:class:`~QVideo.lib.QVideoSource.QVideoSource`.
Storing the value here lets :meth:`_initialize` apply it on
restart via
:meth:`~picamera2.Picamera2.create_preview_configuration`.
'''
self._width = int(value)
def _getHeight(self) -> int:
return self._device.camera_config['main']['size'][1]
def _setHeight(self, value: int) -> None:
'''Store the requested height so the next
:meth:`_initialize` applies it.
See :meth:`_setWidth` for rationale.
'''
self._height = int(value)
def _reconfigure(self,
width: int | None = None,
height: int | None = None) -> None:
'''Reconfigure the camera stream at a new resolution.
Stops acquisition, applies the new resolution, restarts, and
reapplies any controls that were set before reconfiguring.
Parameters
----------
width : int or None
New frame width in pixels. ``None`` keeps the current width.
height : int or None
New frame height in pixels. ``None`` keeps the current height.
'''
w, h = self._device.camera_config['main']['size']
if width is not None:
w = int(width)
if height is not None:
h = int(height)
self._device.stop()
fmt = 'YUV420' if self._gray else 'BGR888'
config = self._device.create_preview_configuration(
main={'size': (w, h), 'format': fmt})
self._device.configure(config)
self._device.start()
# Re-apply cached controls. Note: _reconfigure does not rebuild
# _properties, so if the new configuration changes available controls
# or their ranges the stale registrations will persist silently.
if self._controlValues:
self._device.set_controls(self._controlValues)
def _deinitialize(self) -> None:
'''Stop acquisition and close the Raspberry Pi camera.'''
self._deviceOpen = False
self._device.stop()
self._device.close()
[docs]
def read(self) -> QCamera.CameraData:
'''Read one frame from the camera.
Uses :meth:`~picamera2.Picamera2.capture_request` for direct buffer
access, which avoids an extra copy compared to
:meth:`~picamera2.Picamera2.capture_array`.
Returns
-------
tuple[bool, ndarray or None]
``(True, frame)`` on success, ``(False, None)`` on failure.
'''
if not self.isOpen():
return False, None
try:
request = self._device.capture_request()
frame = request.make_array('main').copy()
if 'AfState' in self._controlValues:
req_meta = request.get_metadata()
if 'AfState' in req_meta:
self._controlValues['AfState'] = int(req_meta['AfState'])
request.release()
except Exception as ex:
logger.warning(f'Frame read failed: {ex}')
return False, None
if self._gray:
# Convert YUV420 to grayscale by taking the Y channel.
frame = frame[:self.height, :self.width]
return True, frame
[docs]
class QPicameraSource(QVideoSource):
'''Threaded video source backed by :class:`QPicamera`.
Parameters
----------
camera : QPicamera or None
Camera instance to wrap. If ``None``, a new :class:`QPicamera`
is created from the remaining arguments.
*args :
Forwarded to :class:`QPicamera` when *camera* is ``None``.
**kwargs :
Forwarded to :class:`QPicamera` when *camera* is ``None``.
'''
def __init__(self, *args,
camera: QPicamera | None = None,
**kwargs) -> None:
camera = camera if camera is not None else QPicamera(*args, **kwargs)
super().__init__(camera)
if __name__ == '__main__': # pragma: no cover
QPicamera.example()