Skip to main content

Real-time/offline inference framework for video and streaming media

Project description

Stream Infer

English | 简体中文

Stream Infer is a Python library designed for streaming inference in video processing applications, enabling the integration of various image algorithms for video structuring. It supports both real-time and offline inference modes.

It is important to note that Stream Infer serves as a runtime framework; specific algorithms or image processing methods need to be implemented by the user.

Installation

pip install -U stream-infer

Quick Start

All examples depend on YOLOv8 and opencv-python, and additional packages may be required for these examples:

pip install ultralytics opencv-python

The video files used in the examples are available at sample-videos

Rapid Visualization Development & Debugging

Implemented through a visual web application using streamlit.

This mode is primarily used for algorithm development and debugging on local/remote servers, supporting custom frame drawing and data display components.

View and run the demo here: examples/server.py

Offline Inference

https://github.com/zaigie/stream_infer/assets/17232619/32aef0c9-89c7-4bc8-9dd6-25035bee2074

Offline inference processes a finite-length video or stream at the speed the computer can handle, performing inference serially while capturing frames.

Since inference invariably takes time, depending on machine performance, the entire process's duration may be longer or shorter than the video's length.

Besides debugging during the development phase, offline inference can also be used for video structuring analysis in production environments where real-time processing is not essential, such as:

  • Post-meeting video analysis
  • Surgical video review
  • ...

View and run the demo here: examples/offline.py

Real-time Inference

Real-time inference handles a finite/infinite-length video or stream, playing at normal speed if finite.

In this mode, a custom-size frame track is maintained, continuously adding the current frame to the track, with the playback process independent of the inference process.

Since inference takes time and playback does not wait for inference results, there will inevitably be a delay between the inference results and the current scene.

Real-time inference is not suitable for the development phase but is often used in production environments for real-time scenarios like RTMP/RTSP/camera feeds:

  • Various live broadcast scenarios
  • Real-time monitoring
  • Live meetings
  • Clinical surgeries
  • ...

View and run the demo here: examples/realtime.py

Modules

Please read the following content in conjunction with examples.

BaseAlgo

Stream Infer simply encapsulates and abstracts all algorithms into classes with init() and run() functions, implementing BaseAlgo.

Although Stream Infer provides a framework for streaming inference, the actual algorithm functionality still needs to be developed by the user and inherited from the BaseAlgo class for uniform encapsulation and invocation.

For instance, if you have completed a real-time head detection algorithm, the official invocation method is:

# https://modelscope.cn/models/damo/cv_tinynas_head-detection_damoyolo/summary
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks

model_id = 'damo/cv_tinynas_head-detection_damoyolo'
input_location = 'https://modelscope.oss-cn-beijing.aliyuncs.com/test/images/image_detection.jpg'

head_detection = pipeline(Tasks.domain_specific_object_detection, model=model_id)
result = head_detection(input_location)
print("result is : ", result)

To use it in Stream Infer, encapsulate it like this:

from stream_infer.algo import BaseAlgo

class HeadDetectionAlgo(BaseAlgo):
    def init(self):
        self.model_id = 'damo/cv_tinynas_head-detection_damoyolo'
        self.head_detection = pipeline(Tasks.domain_specific_object_detection, model=model_id)

    def run(self, frames):
        return self.head_detection(frames)

This way, you have completed the encapsulation and can call it normally in the future.

[!CAUTION] In many cases, cuda or mps is used to accelerate inference. However, when using these accelerations and needing real-time inference in the production environment after development:

The run() function inherited from BaseAlgo must not return any tensors! Try to manually convert to standard Python data formats, like dicts.

Or copy the tensor (to CPU) for sharing between processes, as GPU tensors cannot be directly shared in a multi-process environment in real-time inference.

Dispatcher

Dispatcher is the central service for playing and inferring, used to cache inference frames, distribute inference frames, and collect inference time and result data.

Dispatcher provides functions for adding/getting frames, adding/getting inference results and times.

You don't need to worry about other aspects, but to enable you to retrieve results and conveniently print or store them elsewhere, you should pay attention to the collect_result() function, implemented as follows:

def collect_result(self, inference_result):
    if inference_result is not None:
        time = str(inference_result[0])
        name = inference_result[1]
        data = inference_result[2]
        if self.collect_results.get(name) is None:
            self.collect_results[name] = {}
        self.collect_results[name][time] = data

Here, inference_result is the original result returned from inference, and the final collect_results format is roughly as follows:

{
  "HeadDetectionAlgo": {
    "1": { "scores": [], "boxes": [] },
    "2": { "scores": [], "boxes": [] }
  },
  "other": {
    "60": { "a": 1 },
    "120": { "a": 2 }
  }
}

Based on this, if you wish to request the results to a REST service or perform other operations on existing data before the request, you can achieve this by inheriting the Dispatcher class and rewriting functions:

from stream_infer.dispatcher import Dispatcher, DispatcherManager
import requests
...
class RequestDispatcher(Dispatcher):
    def __init__(self, max_size: int = 120):
        super().__init__(max_size)
        self.sess = requests.Session()
        ...

    def collect_result(self, inference_result):
        super().__init__(inference_result)
        req_data = {
            "time": inference_result[0]
            "name": inference_result[1]
            "data": inference_result[2]
        }
        self.sess.post("http://xxx.com/result/", json=req_data)
...

# Offline inference
dispatcher = RequestDispatcher()

# Real-time inference
dispatcher = DispatcherManager(RequestDispatcher).create(max_size=150)

[!CAUTION] You may have noticed that the instantiation of dispatcher differs between offline and real-time inference. This is because in real-time inference, playback and inference are not in the same process, and both need to share the same dispatcher, hence the use of DispatcherManager proxy.

Inference

Inference is the core of the framework, implementing functions such as loading algorithms and running inference.

An Inference object requires a Dispatcher object for frame retrieval and sending inference results.

from stream_infer import Inference

...

inference = Inference(dispatcher)

When you need to load an algorithm, for example from the BaseAlgo section:

from anywhere_algo import HeadDetectionAlgo, AnyOtherAlgo

...

inference = Inference(dispatcher)
inference.load_algo(HeadDetectionAlgo("head"), frame_count=1, frame_step=fps, interval=1)
inference.load_algo(AnyOtherAlgo("other"), 5, 6, 60)

Here, we can give HeadDetectionAlgo a name to identify the running algorithm (needed when collecting in Dispatcher and to avoid duplicates).

The parameters for loading an algorithm are the framework's core functionality, allowing you to freely implement frame retrieval logic:

  • frame_count: The number of frames the algorithm needs to get, which is the number of frames the run() function will receive.
  • frame_step: Take 1 frame every frame_step, up to frame_count frames. If this parameter is filled with fps, it means taking the last frame_count frames every second.
  • interval: In seconds, indicating the frequency of algorithm calls, like AnyOtherAlgo will only be called once a minute to save resources when not needed.

Producer

Producer loads videos or streams using different methods, such as PyAV, OpenCV, etc., and adjusts or transforms the frames in terms of width, height, and color space, eventually returning each frame as a numpy array.

Instantiating a Producer often requires inputting the frame width, height, and color order required for inference. The default color order is the same as the BGR order returned by cv2.imread().

from stream_infer.producer import PyAVProducer, OpenCVProducer

producer = PyAVProducer(1920, 1080)
producer = OpenCVProducer(1920, 1080)

[!WARNING] When using PyAVProducer (based on ffmpeg) to load some videos or streams that OpenCV cannot decode, please install PyAV first: pip install av

Player

Player inputs dispatcher, producer, and the video/stream address for playback and inference.

from stream_infer import Player

...

player = Player(dispatcher, producer, video_path)

Run

Simply run the entire script through Inference's start().

inference.start(player, fps=fps, position=0, offline=True)
  • fps: Indicates the desired playback frame rate. If the frame rate of the video source is higher than this number, it will skip frames through frame skipping logic to forcibly play at this specified frame rate, thereby saving performance to some extent.
  • position: Accepts a parameter in seconds, which can specify the start position for inference (only available in offline inference; how could you specify a position in real-time inference, right?).
  • offline: Whether it is offline inference. Set to False if you wish to run real-time inference.

It is worth mentioning that during the inference process, you may need to process the frames or inference results. We provide the set_custom_progress() function to facilitate this purpose.

For specific usage, you can refer to the example codes in examples/offline.py and examples/realtime.py.

Streamlit Service App

A web application based on streamlit is provided for development and debugging purposes. It also allows customization of frame rendering processing and display component functions. For specific usage, please refer to examples/server.py.

License

Stream Infer is licensed under the Apache License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stream_infer-0.2.0.tar.gz (26.5 kB view hashes)

Uploaded Source

Built Distribution

stream_infer-0.2.0-py3-none-any.whl (25.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page