Skip to main content

A stream processing framework modeled after Yahoo! Pipes.

Project description

travis versions pypi

Index

Introduction | Requirements | Word Count | Motivation | Usage | Installation | Project Structure | Design Principles | Scripts | Contributing | Credits | More Info | License

Introduction

riko is a Python framework for analyzing and processing streams of structured data. It has synchronous and asynchronous APIs, supports parallel execution, and is well suited for processing rss feeds [1].

With riko, you can

  • Read csv/xml/json/html files

  • Create text and data processing workflows via modular pipes

  • Parse, extract, and process rss feeds

  • Create awesome mashups [2], APIs, and maps

  • Perform parallel processing via cpus/processors or threads

  • and much more…

Notes

Requirements

riko has been tested and is known to work on Python 2.7 and PyPy 4.0.

Optional Dependencies

Feature

Dependency

Installation

Async API

Twisted

pip install riko[async]

Accelerated xml parsing

lxml [3]

pip install lxml

Notes

Word Count

In this example, we use several pipes to count the words on a webpage.

>>> ### Create a SyncPipe workflow ###
>>> #
>>> # `SyncPipe` is a workflow convenience class that enables method
>>> # chaining and parallel processing
>>> from riko.lib.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. `get_path` just looks up a file in the `data` directory
>>> #   2. the `detag` option will strip all html tags from the result
>>> url = get_path('users.jyu.fi.html')                                            # 1
>>> fetch_conf = {'url': url, 'start': '<body>', 'end': '</body>', 'detag': True}  # 2
>>> replace_conf = {'rule': {'find': '\n', 'replace': ' '}}
>>>
>>> counts = (SyncPipe('fetchpage', conf=fetch_conf)
...     .strreplace(conf=replace_conf, assign='content')
...     .stringtokenizer(conf={'delimiter': ' '}, emit=True)
...     .count()
...     .output)
>>>
>>> next(counts)
{'count': 70}

Motivation

Why I built riko

Yahoo! Pipes [4] was a user friendly web application used to “aggregate, manipulate, and mashup content from around the web.” Wanting to create custom pipes, I came across pipe2py which translated a Yahoo! Pipes into python code. pipe2py suited my needs at the time but was unmaintained and lacked asynchronous and parallel processing APIs.

riko addresses the shortcomings of pipe2py but breaks compatibility with Yahoo! Pipes workflows. riko contains ~40 built-in modules, aka pipes, that allow you to programatically recreate much of what you previously could on Yahoo! Pipes.

Why you should use riko

riko provides a number of benefits / differences from other stream processing applications such as Huginn, Flink, Spark, and Storm [5]. Namely:

  • a small footprint (CPU and memory usage)

  • native RSS support

  • simple installation and usage

  • pypy support

  • modular pipes to filter, sort, and modify feeds

The subsequent tradeoffs riko makes are:

  • not distributed (able to run on a cluster of servers)

  • no GUI for creating workflows

  • doesn’t continually monitor feeds for new data

  • can’t react to specific events

  • iterator (pull) based so only supports a single consumer

The following table summaries these observations:

Framework

Stream Type

Footprint

RSS

no outside dependencies

CEP [6]

distributed

riko

pull

small

Huginn

push

med

Others

push

large

For more detailed information, please check-out the FAQ.

Notes

Usage

riko is intended to be used directly as a Python library.

Usage Index

Fetching feeds

riko can read both local and remote filepaths via source pipes. All source pipes return an equivalent feed iterator of dictionaries, aka items.

>>> from itertools import chain
>>> from riko import get_path
>>> from riko.modules.pipefetch import pipe as fetch
>>> from riko.modules.pipefetchpage import pipe as fetchpage
>>> from riko.modules.pipefetchdata import pipe as fetchdata
>>> from riko.modules.pipefetchsitefeed import pipe as fetchsitefeed
>>> from riko.modules.pipefeedautodiscovery import pipe as autodiscovery
>>>
>>> ### Fetch a url ###
>>> feed = fetchpage(conf={'url': 'https://news.ycombinator.com'})
>>>
>>> ### Fetch a filepath ###
>>> #
>>> # Note: `get_path` just looks up a file in the `data` directory
>>> feed = fetchdata(conf={'url': get_path('quote.json')})
>>>
>>> ### View the fetched data ###
>>> item = next(feed)
>>> item['list']['resources'][0]['resource']['fields']['symbol']
'KRW=X'

>>> ### Fetch an rss feed ###
>>> feed = fetch(conf={'url': 'https://news.ycombinator.com/rss'})
>>>
>>> ### Fetch the first rss feed found ###
>>> feed = fetchsitefeed(conf={'url': http://www.bbc.com/news})
>>>
>>> ### Find all rss links and fetch the feeds ###
>>> entries = autodiscovery(conf={'url': http://edition.cnn.com/services/rss/})
>>> urls = (e['link'] for e in entries)
>>> feed = chain.from_iterable(fetch(conf={'url': url}) for url in urls)
>>>
>>> ### Alternatively, create a SyncCollection ###
>>> #
>>> # `SyncCollection` is a url fetching convenience class with support for
>>> # parallel processing
>>> from riko.lib.collections import SyncCollection
>>>
>>> sources = [{'url': url} for url in urls]
>>> feed = SyncCollection(sources).fetch()
>>>
>>> ### View the fetched rss feed(s) ###
>>> #
>>> # Note: regardless of how you fetch an rss feed, it will have the same
>>> # structure
>>> item = next(feed)
>>> sorted(item.keys())
[
    'author', 'author.name', 'author.uri', 'comments', 'content',
    'dc:creator', 'id', 'link', 'pubDate', 'summary', 'title',
    'updated', 'updated_parsed', 'y:id', 'y:published', 'y:title']
>>> item['title'], item['author'], item['link']
(
    u'Using NFC tags in the car', u'Liam Green-Hughes',
    u'http://www.greenhughes.com/content/using-nfc-tags-car')

Please see the FAQ for a complete list of supported file types and protocols

Synchronous processing

riko can modify feeds by combining any of the 40 built-in pipes

>>> from itertools import chain
>>> from riko import get_path
>>> from riko.modules.pipefetch import pipe as fetch
>>> from riko.modules.pipefilter import pipe as pfilter
>>> from riko.modules.pipesubelement import pipe as subelement
>>> from riko.modules.piperegex import pipe as regex
>>> from riko.modules.pipesort import pipe as sort
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. `get_path` just looks up a file in the `data` directory
>>> #   2. the `dotall` option is used to match `.*` across newlines
>>> fetch_conf = {'url': get_path('feed.xml')}                                          # 1
>>> filter_rule = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> sub_conf = {'path': 'content.value'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True}  # 2
>>> sort_conf = {'rule': {'sort_key': 'content', 'sort_dir': 'desc'}}
>>>
>>> ### Create a SyncPipe workflow ###
>>> #
>>> # `SyncPipe` is a workflow convenience class that enables method
>>> # chaining and parallel processing.
>>> #
>>> # The following workflow will:
>>> #   1. fetch the rss feed
>>> #   2. filter for items published before 2/5/2009
>>> #   3. extract the path `content.value` from each feed item
>>> #   4. replace the extracted text with the last href url contained
>>> #      within it
>>> #   5. reverse sort the items by the replaced url
>>> #   5. return the raw feed iterator
>>> #
>>> # Note: sorting is not lazy so take caution when using this pipe
>>> from riko.lib.collections import SyncPipe
>>>
>>> output = (SyncPipe('fetch', conf=fetch_conf)  # 1
...     .filter(conf={'rule': filter_rule})       # 2
...     .subelement(conf=sub_conf, emit=True)     # 3
...     .regex(conf={'rule': regex_rule})         # 4
...     .sort(conf=sort_conf)                     # 5
...     .output)                                  # 6
>>>
>>> next(output)
{'content': 'mailto:mail@writetoreply.org'}

Please see Design Principles for an alternative (function based) workflow. Please see pipes for a complete list of available pipes.

Parallel processing

An example using riko’s ThreadPool based parallel API

>>> from riko import get_path
>>> from riko.lib.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. `get_path` just looks up a file in the `data` directory
>>> #   2. the `dotall` option is used to match `.*` across newlines
>>> url = get_path('feed.xml')                                                          # 1
>>> filter_rule1 = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True}  # 2
>>> filter_rule2 = {'field': 'content', 'op': 'contains', 'value': 'file'}
>>> strtransform_conf = {'rule': {'transform': 'rstrip', 'args': '/'}}
>>>
>>> ### Create a parallel SyncPipe workflow ###
>>> #
>>> # The following workflow will:
>>> #   1. fetch the rss feed
>>> #   2. filter for items published before 2/5/2009
>>> #   3. extract the path `content.value` from each feed item
>>> #   4. replace the extracted text with the last href url contained
>>> #      within it
>>> #   5. filter for items with local file urls (which happen to be rss
>>> #      feeds)
>>> #   6. strip any trailing `\` from the url
>>> #   7. remove duplicate urls
>>> #   8. fetch each rss feed
>>> #   9. Merge the rss feeds into a list
>>> feed = (SyncPipe('fetch', conf={'url': url}, parallel=True)  # 1
...     .filter(conf={'rule': filter_rule1})                     # 2
...     .subelement(conf=sub_conf, emit=True)                    # 3
...     .regex(conf={'rule': regex_rule})                        # 4
...     .filter(conf={'rule': filter_rule2})                     # 5
...     .strtransform(conf=strtransform_conf)                    # 6
...     .uniq(conf={'uniq_key': 'strtransform'})                 # 7
...     .fetch(conf={'url': {'subkey': 'strtransform'}})         # 8
...     .list)                                                   # 9
>>>
>>> len(feed)
25

Asynchronous processing

To enable this asynchronous processing, you must install the async module.

pip install riko[async]

An example using riko’s optional Twisted powered asynchronous API.

>>> from twisted.internet.task import react
>>> from twisted.internet.defer import inlineCallbacks
>>> from riko import get_path
>>> from riko.twisted.collections import AsyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> #   1. `get_path` just looks up a file in the `data` directory
>>> #   2. the `dotall` option is used to match `.*` across newlines
>>> url = get_path('feed.xml')                                                          # 1
>>> filter_rule1 = {'field': 'y:published', 'op': 'before', 'value': '2/5/09'}
>>> match = r'(.*href=")([\w:/.@]+)(".*)'
>>> regex_rule = {'field': 'content', 'match': match, 'replace': '$2', 'dotall': True}  # 2
>>> filter_rule2 = {'field': 'content', 'op': 'contains', 'value': 'file'}
>>> strtransform_conf = {'rule': {'transform': 'rstrip', 'args': '/'}}
>>>
>>> ### Create a AsyncPipe workflow ###
>>> #
>>> # See `Parallel processing` above for an explanation of the steps this
>>> # performs
>>> @inlineCallbacks
... def run(reactor):
...     feed = yield (AsyncPipe('fetch', conf={'url': url})
...         .filter(conf={'rule': filter_rule1})
...         .subelement(conf=sub_conf, emit=True)
...         .regex(conf={'rule': regex_rule})
...         .filter(conf={'rule': filter_rule2})
...         .strtransform(conf=strtransform_conf)
...         .uniq(conf={'uniq_key': 'strtransform'})
...         .fetch(conf={'url': {'subkey': 'strtransform'}})
...         .list)
...
...     print(len(feed))
...
>>> react(run)
25

Cookbook

Please see the cookbook or ipython notebook for more examples.

Installation

(You are using a virtualenv, right?)

At the command line, install riko using either pip (recommended)

pip install riko

or easy_install

easy_install riko

Please see the installation doc for more details.

Project Structure

┌── CONTRIBUTING.rst
├── LICENSE
├── MANIFEST.in
├── Makefile
├── README.rst
├── bin
   └── run
├── data/*
├── dev-requirements.txt
├── docs
   ├── AUTHORS.rst
   ├── CHANGES.rst
   ├── COOKBOOK.rst
   ├── FAQ.rst
   ├── INSTALLATION.rst
   └── TODO.rst
├── examples
   ├── __init__.py
   ├── pipe_base.py
   ├── pipe_gigs.py
   ├── pipe_test.py
   ├── usage.ipynb
   └── usage.py
├── helpers/*
├── manage.py
├── py2-requirements.txt
├── requirements.txt
├── riko
   ├── __init__.py
   ├── lib
      ├── __init__.py
      ├── autorss.py
      ├── collections.py
      ├── dotdict.py
      ├── log.py
      └── utils.py
   ├── modules/*
   └── twisted
       ├── __init__.py
       ├── collections.py
       └── utils.py
├── setup.cfg
├── setup.py
├── tests
   ├── __init__.py
   └── standard.rc
└── tox.ini

Design Principles

The primary data structures in riko are the item, and feed. An item is a simple dictionary, and a feed is an iterator of items. You can create a feed manually with something as simple as [{'content': 'hello world'}]. The primary way to manipulate a feed in riko is via a pipe. A pipe is simply a function that accepts either a feed or item, and returns an iterator of item’s. You can create a workflow by using the output of one pipe as the input to another pipe.

riko pipes come in two flavors; operator and processor. operator’s operate on an entire feed at once and are unable to handle individual items. Example operator’s include pipecount, pipefilter, and pipereverse.

>>> from riko.modules.pipereverse import pipe
>>>
>>> feed = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
>>> next(pipe(feed))
{'title': 'riko pt. 2'}

processor’s process individual feed items and can be parallelized across threads or processes. Example processor’s include pipefetchsitefeed, pipehash, pipeitembuilder, and piperegex.

>>> from riko.modules.pipehash import pipe
>>>
>>> item = {'title': 'riko pt. 1'}
>>> feed = pipe(item, field='title')
>>> next(feed)
{'title': 'riko pt. 1', 'hash': 2853617420}

Some processor’s, e.g. pipestringtokenizer return multiple results.

>>> from riko.modules.pipestringtokenizer import pipe
>>>
>>> item = {'title': 'riko pt. 1'}
>>> tokenizer_conf = {'delimiter': ' '}
>>> feed = pipe(item, conf=tokenizer_conf, field='title')
>>> next(feed)
{
    'title': 'riko pt. 1',
    'stringtokenizer': [
        {'content': 'riko'},
        {'content': 'pt.'},
        {'content': '1'}]}

>>> # In this case, if we just want the result, we can `emit` it instead
>>> feed = pipe(item, conf=tokenizer_conf, field='title', emit=True)
>>> next(feed)
{'content': 'riko'}

operator’s are split into sub-types of aggregator and composer. aggregator’s, e.g., pipecount, aggregate all items of a feed into a single value while composer’s, e.g., pipefilter composed a new feed from a subset or all of the available items.

>>> from riko.modules.pipecount import pipe
>>>
>>> feed = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
>>> next(pipe(feed))
{'count': 2}

processor’s are split into sub-types of source and transformer. source’s, e.g., pipeitembuilder, can create a feed while transformer’s, e.g. pipehash can only transform items in a feed.

>>> from riko.modules.pipeitembuilder import pipe
>>>
>>> attrs = {'key': 'title', 'value': 'riko pt. 1'}
>>> next(pipe(conf={'attrs': attrs}))
{'title': 'riko pt. 1'}

The following table summaries these observations:

type

sub-type

input

output

is parallelizable?

is feed creator?

operator

aggregator

feed

aggregation

composer

feed

feed

processor

source

item

feed

transformer

item

feed

If you are unsure of the type of pipe you have, check its metadata.

>>> from riko.modules.pipefetchpage import asyncPipe
>>> from riko.modules.pipecount import pipe
>>>
>>> asyncPipe.__dict__
{'type': 'processor', 'name': 'fetchpage', 'sub_type': 'source'}
>>> pipe.__dict__
{'type': 'operator', 'name': 'count', 'sub_type': 'aggregator'}

The SyncPipe and AsyncPipe classes (among other things) perform this check for you to allow for convenient method chaining and transparent parallelization.

>>> from riko.lib.collections import SyncPipe
>>>
>>> attrs = [
...     {'key': 'title', 'value': 'riko pt. 1'},
...     {'key': 'content', 'value': "Let's talk about riko!"}]
>>> sync_pipe = SyncPipe('itembuilder', conf={'attrs': attrs})
>>> sync_pipe.hash().list[0]
[
    {
        'title': 'riko pt. 1',
        'content': "Let's talk about riko!",
        'hash': 1346301218}]

Please see the cookbook for advanced examples including how to wire in vales from other pipes or accept user input.

Notes

Scripts

riko comes with a built in task manager manage.py

Setup

pip install -r dev-requirements.txt

Examples

Run python linter and nose tests

manage lint
manage test

Contributing

Please mimic the coding style/conventions used in this repo. If you add new classes or functions, please add the appropriate doc blocks with examples. Also, make sure the python linter and nose tests pass.

Please see the contributing doc for more details.

Credits

Shoutout to pipe2py for heavily inspiring riko. riko started out as a fork of pipe2py, but has since diverged so much that little (if any) of the original code-base remains.

More Info

License

riko is distributed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

riko-0.29.0.tar.gz (599.0 kB view hashes)

Uploaded Source

Built Distribution

riko-0.29.0-py2-none-any.whl (130.6 kB view hashes)

Uploaded Python 2

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page