Lazily-evaluated stream with pipelining via the '>>' operator
Project description
Introduction
Streams are generalized iterators with a pipelining mechanism to enable data-flow programming.
The idea is to take the output of a function that turn an iterable into another iterable and plug that as the input of another such function. While you can already do this using function composition, this package provides an elegant notation for it by overloading the ‘>>’ operator.
This approach focuses the programming on processing streams of data, step by step. A pipeline usually starts with a producer, then passes through a number of filters. Multiple streams can be branched and combined. Finally, the output is fed to an accumulator, which can be any function of one iterable argument.
- Producers: anything iterable
from this module: seq, gseq, repeatcall, chaincall
- Filters:
by index: take, drop, cut
by condition: filter, takewhile, dropwhile
by transformation: map, apply, fold
special purpose: attrgetter, itemgetter, methodcaller, splitter
Combinators: prepend, takei, dropi, tee, flatten
- Accumulators: item, maximum, minimum, reduce
from Python: list, sum, dict, max, min …
Values are computed only when an accumulator forces some or all evaluation (not when the stream are set up).
When a producer is doing blocking I/O, it is possible to use a ThreadedFeeder or ForkedFeeder to improve performance. The feeder will start a thread or process to run the producer and feed genereated items to a cache, minimizing the time that the whole pipeline has to wait when the producer is blocking in system calls.
For an article on the module’s moltivation and implementation, see this blog post: <http://blog.onideas.ws/stream.py>.
Examples
Better itertools.islice
>>> from itertools import count >>> c = count() >>> c >> item[1:10:2] [1, 3, 5, 7, 9] >>> c >> item[:5] [10, 11, 12, 13, 14]
String processing
Grep some lines matching a regex from a file, cut out the 4th field separated by ‘ ‘, ‘:’ or ‘.’, strip leading zeroes, then save as a list:
import re s = open('file') \ >> filter(re.compile(regex).search) \ >> map(re.compile(' |:|\.').split) \ >> map(itemgetter(3)) \ >> map(methodcaller('lstrip', '0')) \ >> list
Partial sums
Compute the first few partial sums of the geometric series 1 + 1/2 + 1/4 + ..:
>>> gseq(0.5) >> fold(operator.add) >> item[:5] [1, 1.5, 1.75, 1.875, 1.9375]
Random Walk in 2D
Generate an infinite stream of coordinates representing the position of a random walker in 2D:
from random import choice vectoradd = lambda u,v: zip(u, v) >> apply(operator.add) >> list directions = [[1,0], [0,1], [-1,0], [0,-1]] rw = lambda: repeatcall(choice, directions) >> fold(vectoradd, [0, 0])
Calling choice repeatedly yields the series of unit vectors representing the directions that the walker takes, then these vectors are gradually added to get a series of coordinates.
To instantiate a random-walk, and get the first 10 coordinates:
walk = rw() walk >> item[:10]
Question: what is the farthest point that the walker wanders upto the first return to the origin? (Note that he might never return at all!):
vectorlen = lambda v: v >> map(lambda x: x**2) >> sum rw() >> drop(1) >> takewhile(lambda v: v != [0, 0]) >> maximum(key=vectorlen)
The first coordinate [0, 0], which is the origin, needs to be dropped otherwise takewhile will truncate immediately.
We can also probe into the walker’s chosen path:
probe = Stream() rw() >> drop(1) >> takewhile(lambda v: v != [0, 0]) >> tee(probe) >> maximum(key=vectorlen)
Now you can see his exact coordinates, for example the first 10 are:
probe >> item[:10]
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.