Source code for QVideo.lib.QVideoSource
'''QThread wrapper that drives a QCamera and emits newFrame signals.'''
from qtpy import QtCore
from QVideo.lib.QCamera import QCamera
from QVideo.lib.QVideoReader import QVideoReader
from QVideo.lib.videotypes import Image
from typing import TypeAlias
import numpy as np
import logging
logger = logging.getLogger(__name__)
VideoSourceType: TypeAlias = QCamera | QVideoReader
__all__ = ['QVideoSource', 'VideoSourceType']
[docs]
class QVideoSource(QtCore.QThread):
'''A threaded video source that reads frames from a camera
or video file in a separate thread.
Parameters
----------
source : QCamera | QVideoReader
The video source to read frames from.
Signals
-------
newFrame(Image)
Emitted when a new video frame is available.
Properties
----------
source : QCamera | QVideoReader
The video source object.
fps : float
Frame rate of the video source [frames per second].
shape : QSize
The shape of the video frames (width, height).
Methods
-------
isOpen() -> bool
Check if the video source is open.
start() -> QVideoSource
Start the video source thread.
stop() -> None
Stop the video source thread.
pause() -> None
Pause video readout.
resume() -> None
Resume video readout after pause().
isPaused() -> bool
Check if video readout is paused.
example(*args) -> None
Demonstrate basic operation of a threaded video source.
Notes
-----
The source is moved to this thread via ``moveToThread`` so that its
slots are delivered in the capture thread rather than the main thread.
State variables ``_running`` and ``_paused`` are protected by
:attr:`mutex`; :attr:`waitcondition` is used to block the capture
loop while paused and to wake it on :meth:`resume` or :meth:`stop`.
'''
#: Emitted when a new video frame is available.
newFrame = QtCore.Signal(np.ndarray)
def __init__(self, source: VideoSourceType) -> None:
'''Initialise the video source thread.
Parameters
----------
source : QCamera | QVideoReader
The video source to read frames from. It is moved to this
thread so that its context manager (open/close) runs in the
capture thread.
'''
super().__init__()
self.source = source
self.source.moveToThread(self)
self.mutex = QtCore.QMutex()
self.waitcondition = QtCore.QWaitCondition()
self._paused = False
self._running = True
@property
def source(self) -> VideoSourceType:
'''The underlying QCamera or QVideoReader.'''
return self._source
@source.setter
def source(self, source: VideoSourceType) -> None:
self._source = source
self.shapeChanged = source.shapeChanged
[docs]
def isOpen(self) -> bool:
'''Return whether the source is open.'''
return self.source.isOpen()
@property
def fps(self) -> float:
'''Frame rate of the video source [frames per second].'''
return self.source.fps
@property
def shape(self) -> QtCore.QSize:
'''Shape of the video frames as ``QSize(width, height)``.'''
return self.source.shape
[docs]
def run(self) -> None:
'''Capture loop: open the source, read frames, emit :attr:`newFrame`.
Opens the source via its context manager, then loops calling
:meth:`~QCamera.saferead` and emitting :attr:`newFrame` for each
successful frame. The loop blocks when :meth:`pause` is called and
resumes when :meth:`resume` or :meth:`stop` is called.
This method is invoked automatically by :meth:`start` in a new
thread and should not be called directly in production code.
'''
logger.debug('streaming started')
with self.source:
while True:
with QtCore.QMutexLocker(self.mutex):
if self._paused:
self.waitcondition.wait(self.mutex)
self._paused = False
if not self._running:
break
ok, frame = self.source.saferead()
if ok:
self.newFrame.emit(frame)
logger.debug('streaming finished')
[docs]
@QtCore.Slot()
def start(self) -> 'QVideoSource':
'''Start the capture thread.
Safe to call after :meth:`stop` / :meth:`wait` to restart the source.
Resets internal running state before launching the thread so that a
previously-stopped source can be started again.
Returns
-------
QVideoSource
``self``, to allow chaining
e.g. ``src = QVideoSource(cam).start()``.
'''
logger.debug('starting')
with QtCore.QMutexLocker(self.mutex):
self._running = True
self._paused = False
super().start()
return self
[docs]
@QtCore.Slot()
def stop(self) -> None:
'''Stop the capture thread.
Sets ``_running`` to ``False`` and wakes any thread blocked in
:meth:`pause`, so that :meth:`run` exits cleanly at the next
loop iteration.
'''
logger.debug('stopping')
with QtCore.QMutexLocker(self.mutex):
self._running = False
self._paused = False
self.waitcondition.wakeAll()
[docs]
@QtCore.Slot()
def pause(self) -> None:
'''Pause frame readout.
The capture loop will block after the current frame completes.
Has no effect if the thread is not running. Call :meth:`resume`
to continue.
'''
logger.debug('pausing')
with QtCore.QMutexLocker(self.mutex):
if self._running:
self._paused = True
[docs]
@QtCore.Slot()
def resume(self) -> None:
'''Resume frame readout after :meth:`pause`.'''
self.waitcondition.wakeAll()
[docs]
def isPaused(self) -> bool:
'''Return ``True`` if the capture loop is currently paused.'''
return self._paused
[docs]
@classmethod
def example(cls: type['QVideoSource'], *args) -> None: # pragma: no cover
'''Demonstrate basic operation of a threaded video source.'''
from pprint import pprint
source = cls(*args).start()
print(source.source.name)
pprint(source.source.settings)
source.stop()
source.quit()
source.wait()