Source code for QVideo.lib.AsyncVideoFilter

'''Async VideoFilter base for computationally expensive operations.'''
import weakref
from qtpy import QtCore
from QVideo.lib.QVideoFilter import VideoFilter
from QVideo.lib.videotypes import Image
import numpy as np


__all__ = ['AsyncVideoFilter']


class _AsyncWorker(QtCore.QObject):
    '''Runs filter.process in a background thread and reports the result.

    Holds a *weakref* to the owning filter rather than a bound method
    (``filter.process``).  A bound method would create a strong reference
    cycle — ``AsyncVideoFilter → _worker → _fn → AsyncVideoFilter`` — that
    Python's reference-counting GC cannot break.  PyQt5 uses weak references
    for signal-slot connections, so the ``destroyed`` and ``aboutToQuit``
    connections do *not* keep the filter alive; only the external reference
    from the rack does.  When the cyclic GC runs (triggered by any allocation
    that crosses the GC threshold, e.g. a mouse-move event), it detects the
    unreachable cycle and collects it, destroying ``_thread`` while it is
    still running and causing Qt to abort.  The weakref breaks the cycle so
    that Python's refcount mechanism handles collection instead, which fires
    ``destroyed`` reliably before ``_thread`` is freed.
    '''

    def __init__(self, filter_ref: 'weakref.ref[AsyncVideoFilter]') -> None:
        super().__init__()
        self._ref = filter_ref

    @QtCore.Slot(np.ndarray)
    def run(self, image: Image) -> None:
        f = self._ref()
        if f is None:
            return
        result = f.process(image)
        f = self._ref()      # re-check: process() may have released the GIL
        if f is not None:
            f._result = result
            f._ready = True


[docs] class AsyncVideoFilter(VideoFilter): '''VideoFilter base for computationally expensive operations. Runs :meth:`process` in a dedicated background :class:`~qtpy.QtCore.QThread` so that heavy computation does not block the GUI event loop. The standard :meth:`add` / :meth:`get` interface is preserved with two behavioural differences: - **Drop-frame strategy**: :meth:`add` submits a frame to the worker only when the worker is idle. If the worker is still processing the previous frame the incoming frame is discarded rather than queued, preventing unbounded latency build-up. - **Cached result**: :meth:`get` returns the result of the last *completed* :meth:`process` call. Before any result is ready it returns the raw input frame as a passthrough so the pipeline always has something to display. Subclasses override :meth:`process` instead of :meth:`add` / :meth:`get`. :meth:`process` runs on the worker thread; it may read instance attributes freely (the GIL makes Python reads safe) but must not write to them. Parameters ---------- None. Subclass constructors should call ``super().__init__()`` after initialising any attributes that :meth:`process` will read, so the worker thread starts with a fully initialised object. ''' _submit = QtCore.Signal(np.ndarray) def __init__(self) -> None: super().__init__() self._ready = True self._result: Image | None = None self._worker = _AsyncWorker(weakref.ref(self)) self._thread = QtCore.QThread() self._worker.moveToThread(self._thread) self._submit.connect(self._worker.run) self._thread.start() self.destroyed.connect(self._cleanup) app = QtCore.QCoreApplication.instance() if app is not None: app.aboutToQuit.connect(self._cleanup)
[docs] def process(self, image: Image) -> Image: '''Perform the heavy computation on *image*. Called in the background thread. The default implementation is a passthrough; subclasses should override this method. Parameters ---------- image : Image Input frame. Returns ------- Image Processed frame. ''' return image
[docs] def add(self, image: Image) -> None: '''Cache *image* and submit it to the worker if idle. If the worker is busy the frame is dropped to prevent unbounded queue growth. Parameters ---------- image : Image Input frame. ''' self.data = image if self._ready: self._ready = False self._submit.emit(image)
[docs] def get(self) -> Image | None: '''Return the most recently processed frame. Returns the cached result of the last completed :meth:`process` call. If no result is available yet (before the first :meth:`process` completes), returns the raw input frame so the pipeline has something to display immediately. Returns ------- Image or None Processed frame, raw input frame, or ``None`` if :meth:`add` has never been called. ''' if self._result is not None: return self._result return self.data
[docs] def shutdown(self) -> None: '''Stop the background thread synchronously. Called by the pipeline when this filter is removed. Safe to call multiple times. ''' self._cleanup()
@QtCore.Slot() def _cleanup(self) -> None: if not self._thread.isRunning(): return self._thread.quit() self._thread.wait()