Skip to main content

This library will help you easily parallelize your python code for all kind of data transformations.

Project description

MLMR

PyPI version Downloads License: MIT

This library will help you easily parallelize your python code for all kind of data transformations. Core functions are built on Map-Reduce paradigm. In this library Map part is parallelized using native python multiprocessing module.

Installation

pip install mlmr

Usage

In order to find out library API specification and advanced usage I recommend you to start with these short tutorials:

  1. Functional API tutorial
  2. Sklearn integration tutorial

Here I'll post several real world mlmr API applications.

Sum of squares in MapReduce fashion example

import numpy as np
from mlmr.function import map_reduce

arr = [1, 2, 3, 4, 5]

def squares_of_slice(arr_slice): # our map function, with partial reduction
    return sum(map(lambda x: x**2, arr_slice))

def get_split_data_func(n_slices): # wrapper function of split data function
    def split_data(data):
        return np.array_split(data, n_slices)
    return split_data

n_jobs = 2

result = map_reduce(
    data=arr,
    data_split_func=get_split_data_func(n_jobs), # split data into n_jobs slices
    map_func=squares_of_slice,
    reduce_func=sum,
    n_jobs=n_jobs
)

Pandas apply parallelization in MapReduce fashion example

In this example function performs parallel data transformations on df (pd.DataFrame, pd.Series). From n_jobs argument, number of processes to run in parallel is calculated. Data is evenly divided into number of processes slices. Then our_transform_func is applied on each slice in parallel (every process has its own slice). After calculation is complete all transformation results are flattened. Flattened result is returned.

from mlmr.function import transform_concat

def comutation_costly_transformation(*_):
    pass

def our_transform_func(df):
    return df.apply(cosly_computation_func)

df_transformed = transform_concat(df, transform_func=our_transform_func, n_jobs=-1)

Sklearn MapReduce transformer integration into Pipeline

from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from mlmr.transformers import BaseMapReduceTransformer

def comutation_costly_text_transformation(df):
    pass

class TextPreprocessor(BaseMapReduceTransformer):
    
    def transform_part(self, X):
        return comutation_costly_text_transformation(X)

n_jobs = 4

text_classification_pipeline = Pipeline([
     ('text_preprocessor', TextPreprocessor(n_jobs=n_jobs)),
     ('vectorizer', TfidfVectorizer(analyzer = "word", max_features=10000)),
     ('classifier', RandomForestClassifier(n_estimators=100, n_jobs=n_jobs))
])

Alternative implementation:

import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from mlmr.transformers import FunctionMapReduceTransformer

def get_split_data_func(n_slices): # wrapper function of split data function
    def split_data(data):
        return np.array_split(data, n_slices)
    return split_data

def comutation_costly_text_transformation(df):
    pass

n_jobs = 4

text_classification_pipeline = Pipeline([
     ('text_preprocessor', FunctionMapReduceTransformer(
         map_func=comutation_costly_text_transformation,
         reduce_func=pd.concat,
         data_split_func=get_split_data_func(n_jobs),
         n_jobs=n_jobs
     )),
     ('vectorizer', TfidfVectorizer(analyzer = "word", max_features=10000)),
     ('classifier', RandomForestClassifier(n_estimators=100, n_jobs=n_jobs))
])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mlmr-1.0.0.tar.gz (4.6 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page