Skip to main content

ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.

Project description

ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.

support share memory (py>=3.8 and linux) and mq process worker (py >=3.6)
# -*- coding: utf-8 -*-
# @Time    : 2021/11/23 9:35

'''
demo share memrory
recommended system linux and python >= 3.8
    recommended linux
    python 3.8

Do not recommended run in windows , it will report an error as follow
    RuntimeError:
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.

'''
import multiprocessing
import os
import signal

from ipc_worker import logger
from ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker

class My_worker(SHM_process_worker):
    def __init__(self,config,*args,**kwargs):
        super(My_worker,self).__init__(*args,**kwargs)
        #config info , use by yourself

        logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))
        logger.info(config)
        self.config = config


    #Process begin trigger this func
    def run_begin(self):
        logger.info('worker pid {}...'.format(os.getpid()))
        self.handle = None
        pass

    # Process end trigger this func
    def run_end(self):
        if self.handle is not None:
            pass

    #any data put will trigger this func
    def run_once(self,request_data):
        #process request_data
        if isinstance(request_data,dict):
            request_data['b']  = 200
        if self.handle is not None:
            #do some thing
            pass
        return request_data


if __name__ == '__main__':
    config = {
        "anything" : "anything",
        "aa": 100
    }

    evt_quit = multiprocessing.Manager().Event()

    # group_name 为共享内存组名,需唯一
    # manager is an agent  and act as a load balancing
    # worker is real doing your work
    instance = IPC_shm(
        CLS_worker=My_worker,
        worker_args=(config,),  # must be tuple
        worker_num=10,  # number of worker Process
        manager_num=2,  # number of agent Process
        group_name='serving_shm',  # share memory name
        shm_size=1 * 1024 * 1024,  # share memory size
        queue_size=20,  # recv queue size
        is_log_time=False,  # whether log compute time
        daemon=False,
    )

    instance.start()

    #demo produce and consume message , you can process by http
    for i in range(10):
        print('produce message')
        data = {"a" : 100}
        request_id = instance.put(data)
        data = instance.get(request_id)
        print('get process result',data)


    def signal_handler(signum, frame):
        evt_quit.set()
        instance.terminate()
        raise KeyboardInterrupt
    signal.signal(signal.SIGINT, signal_handler)

    try:
        instance.join()
    except Exception as e:
        evt_quit.set()
        instance.terminate()

    del evt_quit
# -*- coding: utf-8 -*-
# @Time    : 2021/11/29 15:06
# @Author  : tk
import multiprocessing
import os
import signal
import torch
from ipc_worker import logger
from ipc_worker.ipc_zmq_loader import IPC_zmq, ZMQ_process_worker




class My_worker(ZMQ_process_worker):
    def __init__(self, config, *args, **kwargs):
        super(My_worker, self).__init__(*args, **kwargs)
        # config info , use by yourself
        logger.info('Process id {}, group name {} , identity {}'.format(self._idx, self._group_name, self._identity))
        logger.info(config)
        self.config = config

    # Process begin trigger this func
    def run_begin(self):
        logger.info('worker pid {}...'.format(os.getpid()))
        self.handle = None
        pass

    # Process end trigger this func
    def run_end(self):
        if self.handle is not None:
            pass

    # any data put will trigger this func
    def run_once(self, request_data):
        # process request_data
        print(torch.cuda.device_count(), torch.cuda.current_device())

        if isinstance(request_data, dict):
            request_data['b'] = 200
        if self.handle is not None:
            # do some thing
            pass
        return request_data

if __name__ == '__main__':


    # torch.multiprocessing.set_start_method('spawn', force=True)
    '''
        demo ZMQ depend zmq
        pip install pyzmq
        test pass >= python3.6
    '''

    tmp_dir = './tmp'
    if not os.path.exists(tmp_dir):
        os.mkdir(tmp_dir)

    os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir


    config = {
        "anything" : "anything",
        "aa": 100
    }

    evt_quit = multiprocessing.Manager().Event()

    # group_name 为共享内存组名,需唯一
    # manager is an agent  and act as a load balancing
    # worker is real doing your work
    instance = IPC_zmq(
        CLS_worker=My_worker,
        worker_args=(config,),  # must be tuple
        worker_num=10,  # number of worker Process
        group_name='serving_zmq',  # share memory name
        evt_quit=evt_quit,
        queue_size=20,  # recv queue size
        is_log_time=True,  # whether log compute time
        daemon=False,
    )
    instance.start()

    #demo produce and consume message , you can process by http
    for i in range(10):
        data = {"a" : 100}
        request_id = instance.put(data)

        data = instance.get(request_id)
        print('get process result',request_id,data)


    def signal_handler(signum, frame):
        evt_quit.set()
        instance.terminate()
        raise KeyboardInterrupt
    signal.signal(signal.SIGINT, signal_handler)

    try:
        instance.join()
    except Exception as e:
        evt_quit.set()
        instance.terminate()
    del evt_quit

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

ipc_worker-0.1.7-py3-none-any.whl (19.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page