Source code for QVideo.dvr.QDVRWidget
'''Composite DVR widget for recording and playing back video streams.'''
from collections.abc import Callable
from qtpy import QtCore, QtGui, QtWidgets
from pathlib import Path
import numpy as np
from QVideo.lib import clickable, QVideoSource
from QVideo.lib.videotypes import Image
from .QOpenCVWriter import QOpenCVWriter
from .QOpenCVReader import QOpenCVSource
from .QHDF5Writer import QHDF5Writer
from .QHDF5Reader import QHDF5Source
try:
import h5py as _h5py
_h5py_available = True
except (ImportError, ModuleNotFoundError):
_h5py_available = False
import logging
__all__ = ['QDVRWidget']
logger = logging.getLogger(__name__)
try:
from .icons_rc import *
except Exception: # pragma: no cover
logger.debug(
'Could not load DVR icons; buttons will show text labels only')
[docs]
class QDVRWidget(QtWidgets.QFrame):
'''Widget providing record, play, pause, stop and rewind controls
for a digital video recorder.
Connects to a :class:`~QVideo.lib.QVideoSource.QVideoSource` to
capture incoming frames to a file, and can play back previously
recorded files. Supported formats are determined by the
:attr:`Writer` and :attr:`Player` class attributes; by default
AVI (``.avi``), MKV (``.mkv``), and MP4 (``.mp4``) are supported;
HDF5 (``.h5``) is also supported when ``h5py`` is installed.
Requesting an unsupported extension is logged
as an error and silently ignored.
Recording and playback stop automatically when the widget is closed
or the application is about to quit.
Parameters
----------
source : QVideoSource or None
Video source supplying frames to record.
filename : str or None
Default file path for saving. If ``None``, defaults to
``~/default.mkv``.
*args :
Forwarded to :class:`~qtpy.QtWidgets.QFrame`.
**kwargs :
Forwarded to :class:`~qtpy.QtWidgets.QFrame`.
Signals
-------
newFrame(Image)
Emitted for each frame during playback.
recording(bool)
Emitted when recording starts (``True``) or stops (``False``).
playing(bool)
Emitted when playback starts (``True``) or stops (``False``).
'''
#: Emitted for each frame during playback.
newFrame = QtCore.Signal(np.ndarray)
#: Emitted when recording starts (``True``) or stops (``False``).
recording = QtCore.Signal(bool)
#: Emitted when playback starts (``True``) or stops (``False``).
playing = QtCore.Signal(bool)
FILENAME = 'default.mkv'
Writer: dict[str, type] = {'.avi': QOpenCVWriter,
'.mkv': QOpenCVWriter,
'.mp4': QOpenCVWriter}
Player: dict[str, type] = {'.avi': QOpenCVSource,
'.mkv': QOpenCVSource,
'.mp4': QOpenCVSource}
FileGroups: dict[str, set[str]] = {
'Lossless Video': {'.avi', '.mkv'},
'Video': {'.mp4'}}
if _h5py_available:
Writer['.h5'] = QHDF5Writer
Player['.h5'] = QHDF5Source
FileGroups['HDF5 files'] = {'.h5'}
@classmethod
def _buildFilter(cls, save: bool) -> str:
'''Build a file-dialog filter string from supported formats.
Parameters
----------
save : bool
If ``True``, derive extensions from :attr:`Writer`;
otherwise from :attr:`Player`.
Returns
-------
str
A ``;;``-separated filter string suitable for
``QFileDialog``, with extensions grouped by
:attr:`FileGroups`.
'''
formats = set(cls.Writer if save else cls.Player)
parts = []
covered = set()
for label, exts in cls.FileGroups.items():
matching = sorted(exts & formats)
if matching:
covered |= set(matching)
parts.append(
f'{label} ({" ".join("*" + e for e in matching)})')
ungrouped = sorted(formats - covered)
if ungrouped:
parts.append(
f'Other files ({" ".join("*" + e for e in ungrouped)})')
return ';;'.join(parts) if parts else 'All files (*)'
def __init__(self,
*args,
source: QVideoSource | None = None,
filename: str = '',
**kwargs) -> None:
super().__init__(*args, **kwargs)
self._source: QVideoSource | None = None
self._writer: object | None = None
self._player: QVideoSource | None = None
self._thread: QtCore.QThread | None = None
self._setupUi()
self._connectSignals()
self.source = source
self.filename = filename if filename else str(
Path.home() / self.FILENAME)
def _setupUi(self) -> None:
self.setFrameShape(QtWidgets.QFrame.Shape.Box)
self.recordButton = QtWidgets.QPushButton('&Record', self)
self.recordButton.setStatusTip('Record video')
self.recordButton.setIcon(
QtGui.QIcon(':/icons/icons/media-record.svg'))
self.recordButton.setShortcut('R')
self.stopButton = QtWidgets.QPushButton('&Stop', self)
self.stopButton.setStatusTip('Stop recording')
self.stopButton.setIcon(
QtGui.QIcon(':/icons/icons/media-playback-stop.svg'))
self.stopButton.setShortcut('S')
self.frameNumber = QtWidgets.QLCDNumber(self)
self.frameNumber.setSegmentStyle(
QtWidgets.QLCDNumber.SegmentStyle.Flat)
recordRow = QtWidgets.QHBoxLayout()
recordRow.setSpacing(2)
recordRow.setContentsMargins(0, 0, 6, 0)
recordRow.addWidget(self.recordButton)
recordRow.addWidget(self.stopButton)
recordRow.addWidget(self.frameNumber)
saveLabel = QtWidgets.QLabel('Save As', self)
self.saveEdit = QtWidgets.QLineEdit(self)
self.saveEdit.setReadOnly(True)
self.saveEdit.setStatusTip('Video file name')
saveLabel.setBuddy(self.saveEdit)
saveRow = QtWidgets.QHBoxLayout()
saveRow.setSpacing(6)
saveRow.setContentsMargins(6, 0, 6, 0)
saveRow.addWidget(saveLabel)
saveRow.addWidget(self.saveEdit)
self.rewindButton = QtWidgets.QPushButton('&Rewind', self)
self.rewindButton.setStatusTip('Rewind video file')
self.rewindButton.setIcon(
QtGui.QIcon(':/icons/icons/media-skip-backward.svg'))
self.pauseButton = QtWidgets.QPushButton('&Pause', self)
self.pauseButton.setStatusTip('Pause video playback')
self.pauseButton.setIcon(
QtGui.QIcon(':/icons/icons/media-playback-pause.svg'))
self.playButton = QtWidgets.QPushButton('P&lay', self)
self.playButton.setStatusTip('Play video file')
self.playButton.setIcon(
QtGui.QIcon(':/icons/icons/media-playback-start.svg'))
playRow = QtWidgets.QHBoxLayout()
playRow.setSpacing(2)
playRow.setContentsMargins(0, 1, 0, 1)
playRow.addWidget(self.rewindButton)
playRow.addWidget(self.pauseButton)
playRow.addWidget(self.playButton)
labelPlayFile = QtWidgets.QLabel('Play', self)
self.playEdit = QtWidgets.QLineEdit(self)
self.playEdit.setReadOnly(True)
self.playEdit.setStatusTip('Video file')
labelPlayFile.setBuddy(self.playEdit)
playFileRow = QtWidgets.QHBoxLayout()
playFileRow.setSpacing(6)
playFileRow.setContentsMargins(6, 0, 6, 0)
playFileRow.addWidget(labelPlayFile)
playFileRow.addWidget(self.playEdit)
labelNFrames = QtWidgets.QLabel('Duration', self)
self.nframes = QtWidgets.QSpinBox(self)
self.nframes.setToolTip('number of frames to record')
self.nframes.setAlignment(
QtCore.Qt.AlignmentFlag.AlignRight)
self.nframes.setRange(10, 99000)
self.nframes.setSingleStep(10)
self.nframes.setValue(10000)
labelNFrames.setBuddy(self.nframes)
labelInterval = QtWidgets.QLabel('Interval', self)
self.nskip = QtWidgets.QSpinBox(self)
self.nskip.setToolTip('Record every Nth frame')
self.nskip.setAlignment(
QtCore.Qt.AlignmentFlag.AlignRight)
self.nskip.setRange(1, 999)
labelInterval.setBuddy(self.nskip)
framesRow = QtWidgets.QHBoxLayout()
framesRow.setSpacing(6)
framesRow.setContentsMargins(6, 4, 6, 4)
framesRow.addWidget(labelNFrames)
framesRow.addWidget(self.nframes)
framesRow.addWidget(labelInterval)
framesRow.addWidget(self.nskip)
layout = QtWidgets.QVBoxLayout(self)
layout.setSpacing(6)
layout.setContentsMargins(6, 6, 6, 6)
layout.addLayout(recordRow)
layout.addLayout(saveRow)
layout.addLayout(playRow)
layout.addLayout(playFileRow)
layout.addLayout(framesRow)
def _connectSignals(self) -> None:
clickable(self.playEdit).connect(lambda: self.getFileName(False))
clickable(self.saveEdit).connect(lambda: self.getFileName(True))
self.recordButton.clicked.connect(self.record)
self.stopButton.clicked.connect(self.stop)
self.rewindButton.clicked.connect(self.rewind)
self.pauseButton.clicked.connect(self.pause)
self.playButton.clicked.connect(self.play)
QtCore.QCoreApplication.instance().aboutToQuit.connect(self.stop)
[docs]
def isRecording(self) -> bool:
'''Return ``True`` if recording is in progress.'''
return self._writer is not None
[docs]
def isPlaying(self) -> bool:
'''Return ``True`` if playback is in progress.'''
return self._player is not None
[docs]
def isPaused(self) -> bool:
'''Return ``True`` if playback is paused.'''
return self.isPlaying() and self._player.isPaused()
[docs]
def getFileName(self, save: bool = False) -> str:
'''Open a file dialog and update the filename fields.
Parameters
----------
save : bool
If ``True``, open a save dialog; otherwise open an open
dialog.
Returns
-------
str
Selected filename, or empty string if cancelled.
'''
if self.isPlaying() or self.isRecording():
return ''
get = (QtWidgets.QFileDialog.getSaveFileName if save
else QtWidgets.QFileDialog.getOpenFileName)
try:
options = QtWidgets.QFileDialog.Option.DontUseNativeDialog
except AttributeError:
options = QtWidgets.QFileDialog.DontUseNativeDialog
filename, _ = get(self, 'Video File Name',
str(self.filename),
self._buildFilter(save),
options=options)
if filename:
if save:
self.filename = filename
else:
self.playname = filename
return filename
[docs]
@QtCore.Slot()
def record(self) -> None:
'''Start recording, or stop if already recording.
Does nothing if ``source`` is ``None``, if playback is active,
or if the save filename has an unsupported extension.
'''
if self.source is None or self.isPlaying():
return
if self.isRecording():
self.stop()
return
if not (self.filename or self.getFileName(save=True)):
return
suffix = Path(self.filename).suffix
if suffix not in self.Writer:
logger.error(f'Unsupported file format: {suffix!r}')
return
logger.debug(f'Recording: {self.filename}')
writer_class = self.Writer[suffix]
self._writer = writer_class(self.filename,
fps=self.source.fps,
nframes=self.nframes.value(),
nskip=self.nskip.value())
self._writer.frameNumber.connect(self.setFrameNumber)
self._writer.finished.connect(self.stop)
self._thread = QtCore.QThread()
self._writer.moveToThread(self._thread)
self._thread.start()
self.source.newFrame.connect(self._writer.write)
self.recording.emit(True)
[docs]
@QtCore.Slot()
def play(self) -> None:
'''Start playback, or resume if paused.
Does nothing if recording is active, if playback is already
running, or if the playback filename has an unsupported
extension.
'''
if self.isPaused():
self._player.resume()
return
if self.isRecording() or self.isPlaying():
return
if not (self.playname or self.getFileName(save=False)):
return
suffix = Path(self.playname).suffix
if suffix not in self.Player:
logger.error(f'Unsupported file format: {suffix!r}')
return
self.framenumber = 0
logger.debug(f'Starting Playback: {self.playname}')
player_class = self.Player[suffix]
self._player = player_class(self.playname)
if self._player.isOpen():
logger.debug('connecting signals')
self._player.newFrame.connect(self.stepFrameNumber)
self._player.newFrame.connect(self.newFrame)
self.playing.emit(True)
self._player.start()
else:
self._player = None
[docs]
@QtCore.Slot()
def pause(self) -> None:
'''Pause or resume playback.'''
if self.isPlaying():
self._player.resume() if self.isPaused() else self._player.pause()
[docs]
@QtCore.Slot()
def rewind(self) -> None:
'''Rewind to the first frame and pause.'''
if self.isPlaying():
self._player.source.rewind()
self._player.pause()
self.framenumber = 0
[docs]
@QtCore.Slot()
def stop(self) -> None:
'''Stop recording or playback.'''
if self.isRecording():
logger.debug('Stopping Recording')
try:
self.source.newFrame.disconnect(self._writer.write)
self._writer.frameNumber.disconnect(self.setFrameNumber)
self._writer.finished.disconnect(self.stop)
except (RuntimeError, TypeError):
logger.debug(
'Some recording signals were already disconnected')
self._thread.quit()
self._thread.wait()
self._writer.close()
self._thread = None
self._writer = None
self.recording.emit(False)
if self.isPlaying():
logger.debug('Stopping Playback')
try:
self._player.newFrame.disconnect(self.stepFrameNumber)
self._player.newFrame.disconnect(self.newFrame)
except (RuntimeError, TypeError):
logger.debug('Playback signal was already disconnected')
self._player.stop()
self._player = None
self.playing.emit(False)
self.framenumber = 0
[docs]
def closeEvent(self, event: QtGui.QCloseEvent) -> None:
'''Stop recording or playback when the widget is closed.'''
self.stop()
super().closeEvent(event)
[docs]
@QtCore.Slot(int)
def setFrameNumber(self, framenumber: int) -> None:
'''Set the displayed frame number.'''
self.framenumber = framenumber
[docs]
@QtCore.Slot()
def stepFrameNumber(self) -> None:
'''Increment the displayed frame number.'''
self.framenumber += 1
@property
def source(self) -> QVideoSource | None:
'''The :class:`~QVideo.lib.QVideoSource.QVideoSource` being
recorded.'''
return self._source
@source.setter
def source(self, source: QVideoSource | None) -> None:
'''Set the video source. Disables the record button when ``None``.'''
logger.debug(f'Setting source {type(source)}')
self._source = source
self.recordButton.setDisabled(source is None)
@property
def player(self) -> QVideoSource | None:
'''The active playback source, or ``None`` when not playing.'''
return self._player
@QtCore.Property(str)
def filename(self) -> str:
'''Current save filename.'''
return str(self.saveEdit.text())
@filename.setter
def filename(self, filename: str | None) -> None:
if filename is None:
return
if not (self.isRecording() or self.isPlaying()):
self.saveEdit.setText(filename)
self.playname = filename
@QtCore.Property(str)
def playname(self) -> str:
'''Current playback filename.'''
return str(self.playEdit.text())
@playname.setter
def playname(self, filename: str) -> None:
if not self.isPlaying():
self.playEdit.setText(filename)
@property
def framenumber(self) -> int:
'''Current frame number displayed in the LCD.'''
return self.frameNumber.intValue()
@framenumber.setter
def framenumber(self, number: int) -> None:
self.frameNumber.display(number)