How to use

Creating streams

  • stream from iterable

Stream([1, 2, 3])
  • from variadic arguments

Stream.of(1, 2, 3)
  • empty stream

Stream.empty()
  • infinite ordered stream

Stream.iterate(0, lambda x: x + 1)

NB: in similar fashion you can create finite ordered stream by providing a condition predicate

Stream.iterate(10, operation=lambda x: x + 1, condition=lambda x: x < 15).to_list()
# [10, 11, 12, 13, 14]
  • infinite unordered stream

import random

Stream.generate(lambda: random.random())
  • infinite stream with given value

Stream.constant(42)
  • stream from range
    (from start (inclusive) to stop (exclusive) by an incremental step (defaults to 1))

Stream.from_range(0, 10).to_list()
Stream.from_range(0, 10, 3).to_list()
Stream.from_range(10, -1, -2).to_list()
  • concat
    (concatenate new streams/iterables with the current one)

Stream.of(1, 2, 3).concat(Stream.of(4, 5)).to_list()
Stream([1, 2, 3]).concat([5, 6]).to_list()
  • prepend
    (prepend new stream/iterable to the current one)

Stream([2, 3, 4]).prepend(0, 1).to_list()
Stream.of(3, 4, 5).prepend(Stream.of([0, 1], 2)).to_list()

NB: creating new stream from None raises error.
In cases when the iterable could potentially be None use the of_nullable() method instead;
it returns an empty stream if None and a regular one otherwise


Intermediate operations

  • filter

Stream([1, 2, 3]).filter(lambda x: x % 2 == 0)
  • map

Stream([1, 2, 3]).map(str).to_list()
Stream([1, 2, 3]).map(lambda x: x + 5).to_list()
  • filter_map
    (filter out all None or discard_falsy values (if discard_falsy=True) and applies mapper function to the elements of the stream)

Stream.of(None, "foo", "", "bar", 0, []).filter_map(str.upper, discard_falsy=True).to_list()
# ["FOO", "BAR"]
  • flat_map
    (map each element of the stream and yields the elements of the produced iterators)

Stream([[1, 2], [3, 4], [5]]).flat_map(lambda x: Stream(x)).to_list()
# [1, 2, 3, 4, 5]
  • flatten

Stream([[1, 2], [3, 4], [5]]).flatten().to_list()
# [1, 2, 3, 4, 5]
  • reduce
    (returns Optional)

Stream([1, 2, 3]).reduce(lambda acc, val: acc + val, identity=3).get()
  • peek
    (perform the provided operation on each element of the stream without consuming it)

(Stream([1, 2, 3, 4])
    .filter(lambda x: x > 2)
    .peek(lambda x: print(f"{x} ", end=""))
    .map(lambda x: x * 20)
    .to_list())
  • enumerate
    (returns each element of the Stream preceded by his corresponding index (by default starting from 0 if not specified otherwise))

iterable = ["x", "y", "z"]
Stream(iterable).enumerate().to_list()
Stream(iterable).enumerate(start=1).to_list()
# [(0, "x"), (1, "y"), (2, "z")]
# [(1, "x"), (2, "y"), (3, "z")]
  • view
    (provides access to a selected part of the stream)

Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).view(start=1, stop=-3, step=2).to_list()
# [2, 4, 6]
  • distinct
    (returns a stream with the distinct elements of the current one)

Stream([1, 1, 2, 2, 2, 3]).distinct().to_list()
  • skip
    (discards the first n elements of the stream and returns a new stream with the remaining ones)

Stream.iterate(0, lambda x: x + 1).skip(5).limit(5).to_list()
  • limit / head
    (returns a stream with the first n elements, or fewer if the underlying iterator ends sooner)

Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).limit(3).to_tuple()
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).head(3).to_tuple()
  • tail
    (returns a stream with the last n elements, or fewer if the underlying iterator ends sooner)

Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).tail(3).to_tuple()
  • take_while
    (returns a stream that yields elements based on a predicate)

Stream.of(1, 2, 3, 4, 5, 6, 7, 2, 3).take_while(lambda x: x < 5).to_list()
# [1, 2, 3, 4]
  • drop_while
    (returns a stream that skips elements based on a predicate and yields the remaining ones)

Stream.of(1, 2, 3, 5, 6, 7, 2).drop_while(lambda x: x < 5).to_list()
# [5, 6, 7, 2]
  • sort
    (sorts the elements of the current stream according to natural order or based on the given comparator;
    if ‘reverse’ flag is True, the elements are sorted in descending order)

(Stream.of((3, 30), (2, 30), (2, 20), (1, 20), (1, 10))
    .sort(lambda x: (x[0], x[1]), reverse=True)
    .to_list())
# [(3, 30), (2, 30), (2, 20), (1, 20), (1, 10)]
  • reverse
    (sorts the elements of the current stream in reverse order;
    alias for ’sort(collector, reverse=True)’)

(Stream.of((3, 30), (2, 30), (2, 20), (1, 20), (1, 10))
    .reverse(lambda x: (x[0], x[1]))
    .to_list())
# [(3, 30), (2, 30), (2, 20), (1, 20), (1, 10)]


NB: in case of stream of dicts all key-value pairs are represented internally as DictItem objects
(including recursively for nested Mapping structures)
to provide more convenient intermediate operations syntax e.g.

first_dict = {"a": 1, "b": 2}
second_dict = {"x": 3, "y": 4}
(Stream(first_dict).concat(second_dict)
    .filter(lambda x: x.value % 2 == 0)
    .map(lambda x: x.key)
    .to_list())
  • on_close
    (returns an equivalent Stream with an additional close handler to be invoked automatically by the terminal operation)

(Stream([1, 2, 3, 4])
    .on_close(lambda: print("Sorry Montessori"))
    .peek(lambda x: print(f"{'$' * x} ", end=""))
    .map(lambda x: x * 2)
    .to_list())
# "$ $$ $$$ $$$$ Sorry Montessori"
# [2, 4, 6, 8]

Terminal operations

Collectors

  • collecting result into list, tuple, set

Stream([1, 2, 3]).to_list()
Stream([1, 2, 3]).to_tuple()
Stream([1, 2, 3]).to_set()
  • into dict

class Foo:
    def __init__(self, name, num):
        self.name = name
        self.num = num

Stream([Foo("fizz", 1), Foo("buzz", 2)]).to_dict(lambda x: (x.name, x.num))
# {"fizz": 1, "buzz": 2}

In the case of a collision (duplicate keys) the ‘merger’ functions indicates which entry should be kept

collection = [Foo("fizz", 1), Foo("fizz", 2), Foo("buzz", 2)]
Stream(collection).to_dict(collector=lambda x: (x.name, x.num), merger=lambda old, new: old)
# {"fizz": 1, "buzz": 2}

to_dict method also supports creating dictionaries from dict DictItem objects

first_dict = {"x": 1, "y": 2}
second_dict = {"p": 33, "q": 44, "r": None}
Stream(first_dict).concat(Stream(second_dict)).to_dict(lambda x: DictItem(x.key, x.value or 0))
# {"x": 1, "y": 2, "p": 33, "q": 44, "r": 0}

e.g. you could combine streams of dicts by writing:

Stream(first_dict).concat(Stream(second_dict)).to_dict()

(simplified from ’.to_dict(lambda x: x)’)

  • into string

Stream({"a": 1, "b": [2, 3]}).to_string()
# "Stream(DictItem(key=a, value=1), DictItem(key=b, value=[2, 3]))"
Stream({"a": 1, "b": [2, 3]}).map(lambda x: {x.key: x.value}).to_string(delimiter=" | ")
# "Stream({'a': 1} | {'b': [2, 3]})"
  • alternative for working with collectors is using the collect method

Stream([1, 2, 3]).collect(tuple)
Stream.of(1, 2, 3).collect(list)
Stream.of(1, 1, 2, 2, 2, 3).collect(set)
Stream.of(1, 2, 3, 4).collect(dict, lambda x: (str(x), x * 10))
Stream.of("x", "y", "z").collect(str, str_delimiter="->")
  • grouping

Stream("AAAABBBCCD").group_by(collector=lambda key, grouper: (key, len(grouper)))
# {"A": 4, "B": 3, "C": 2, "D": 1}
coll = [Foo("fizz", 1), Foo("fizz", 2), Foo("fizz", 3), Foo("buzz", 2), Foo("buzz", 3), Foo("buzz", 4), Foo("buzz", 5)]
Stream(coll).group_by(
    classifier=lambda obj: obj.name,
    collector=lambda key, grouper: (key, [(obj.name, obj.num) for obj in list(grouper)]))
# {"fizz": [("fizz", 1), ("fizz", 2), ("fizz", 3)],
#  "buzz": [("buzz", 2), ("buzz", 3), ("buzz", 4), ("buzz", 5)]}

Other terminal operations

  • for_each

Stream([1, 2, 3, 4]).for_each(lambda x: print(f"{'#' * x} ", end=""))
  • count
    (returns the count of elements in the stream)

Stream([1, 2, 3, 4]).filter(lambda x: x % 2 == 0).count()
  • sum

Stream.of(1, 2, 3, 4).sum()
  • min
    (returns Optional with the minimum element of the stream)

Stream.of(2, 1, 3, 4).min().get()
  • max
    (returns Optional with the maximum element of the stream)

Stream.of(2, 1, 3, 4).max().get()
  • average
    (returns the average value of elements in the stream)

Stream.of(1, 2, 3, 4, 5).average()
  • find_first
    (search for an element of the stream that satisfies a predicate, returns an Optional with the first found value, if any, or None)

Stream.of(1, 2, 3, 4).filter(lambda x: x % 2 == 0).find_first().get()
  • find_any
    (search for an element of the stream that satisfies a predicate, returns an Optional with some of the found values, if any, or None)

Stream.of(1, 2, 3, 4).filter(lambda x: x % 2 == 0).find_any().get()
  • any_match
    (returns whether any elements of the stream match the given predicate)

Stream.of(1, 2, 3, 4).any_match(lambda x: x > 2)
  • all_match
    (returns whether all elements of the stream match the given predicate)

Stream.of(1, 2, 3, 4).all_match(lambda x: x > 2)
  • none_match
    (returns whether no elements of the stream match the given predicate)

Stream.of(1, 2, 3, 4).none_match(lambda x: x < 0)
  • take_first
    (returns Optional with the first element of the stream or a default value)

Stream({"a": 1, "b": 2}).take_first().get()
Stream([]).take_first(default=33).get()
# DictItem(key="a", value=1)
# 33
  • take_last
    (returns Optional with the last element of the stream or a default value)

Stream({"a": 1, "b": 2}).take_last().get()
Stream([]).take_last(default=33).get()
  • compare_with
    (compares linearly the contents of two streams based on a given comparator)

fizz = Foo("fizz", 1)
buzz = Foo("buzz", 2)
Stream([buzz, fizz]).compare_with(Stream([fizz, buzz]), lambda x, y: x.num == y.num)
  • quantify
    (count how many of the elements are Truthy or evaluate to True based on a given predicate)

Stream([2, 3, 4, 5, 6]).quantify(predicate=lambda x: x % 2 == 0)

NB: although the Stream is closed automatically by the terminal operation
you can still close it by hand (if needed) invoking the close() method.
In turn that will trigger the close_handler (if such was provided)


Itertools integration

Invoke use method by passing the itertools function and it’s arguments as **kwargs

import itertools
import operator

Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).use(itertools.islice, start=3, stop=8)
Stream.of(1, 2, 3, 4, 5).use(itertools.accumulate, func=operator.mul).to_list()
Stream(range(3)).use(itertools.permutations, r=3).to_list()

Itertools ‘recipes’

Invoke the ‘recipes’ described here as stream methods and pass required key-word arguments

Stream([1, 2, 3]).ncycles(count=2).to_list()
Stream.of(2, 3, 4).take_nth(10, default=66).get()
Stream(["ABC", "D", "EF"]).round_robin().to_list()

FileStreams

NB: use ’pip install pyrio[fs]’ to install necessary extra dependencies

Querying files

  • working with json, toml, yaml, xml files
    NB: FileStream reads data as series of DictItem objects from underlying dict_items view

FileStream("path/to/file").map(lambda x: f"{x.key}=>{x.value}").to_tuple()
# ("abc=>xyz", "qwerty=>42")
from operator import attrgetter
from pyrio import DictItem

(FileStream("path/to/file")
 .filter(lambda x: "a" in x.key)
 .map(lambda x: DictItem(x.key, sum(x.value) * 10))
 .sort(attrgetter("value"), reverse=True)
 .map(lambda x: f"{str(x.value)}::{x.key}")
 .to_list())
# ["230::xza", "110::abba", "30::a"]
  • querying csv and tsv files
    (each row is read as a dict with keys taken from the header)

FileStream("path/to/file").map(lambda x: f"fizz: {x['fizz']}, buzz: {x['buzz']}").to_tuple()
# ("fizz: 42, buzz: 45", "fizz: aaa, buzz: bbb")
from operator import itemgetter

FileStream("path/to/file").map(itemgetter('fizz')).to_list()
# ['42', 'aaa']

You could query the nested dicts by creating streams out of them

(FileStream("path/to/file")
    .map(lambda x: (Stream(x).to_dict(lambda y: DictItem(y.key, y.value or "Unknown"))))
    .save())
  • reading plain text (if the file doesn’t have one of the aforementioned extensions)

(FileStream("path/to/lorem/ipsum")
    .map(lambda x: x.strip())
    .enumerate()
    .filter(lambda line: "id" in line[1])
    .to_dict()
)

# {1: "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
#  6: "Excepteur sint occaecat cupidatat non proident, sunt in culpa",
#  7: "qui officia deserunt mollit anim id est laborum."}
  • reading a file with process() method

    • use extra f_open_options (for the underlying open file function)

    • f_read_options (to be passed to the corresponding library function that is loading the file content e.g. tomllib, json)

from decimal import Decimal

(FileStream.process(
    file_path="path/to/file.json",
    f_open_options={"encoding": "utf-8"},
    f_read_options={"parse_float": Decimal})
 .map(lambda x:x.value).to_list())
# ['foo', True, Decimal('1.22'), Decimal('5.456367654)]

To include the root tag when loading an .xml file pass ’include_root=True’

FileStream.process("path/to/custom_root.xml", include_root=True).map(
    lambda x: f"root={x.key}: inner_records={str(x.value)}"
).to_list()
# ["root=custom-root: inner_records={'abc': 'xyz', 'qwerty': '42'}"]

Saving to a file

  • write the contents of a FileStream by passing a file_path to the save() method

in_memory_dict = Stream(json_dict).filter(lambda x: len(x.key) < 6).to_tuple()
FileStream("path/to/file.json").prepend(in_memory_dict).save("./tests/resources/updated.json")

If no path is given, the source file for the FileStream will be updated

FileStream("path/to/file.json").concat(in_memory_dict).save()

NB: if while updating the file something goes wrong, the original content will be restored/preserved

  • handle null values
    (pass null_handler function to replace null values)

FileStream("path/to/test.toml").save(null_handler=lambda x: DictItem(x.key, x.value or "N/A"))

NB: useful for writing .toml files which don’t allow None values

  • passing advanced file open and write options
    similarly to the process method you could provide

    • f_open_options (for the underlying open function)

    • f_write_options (passed to the corresponding library that will ‘dump’ the contents of the stream e.g. tomli-w, pyyaml)

FileStream("path/to/file.json").concat(in_memory_dict).save(
    file_path="merged.xml",
    f_open_options={"encoding": "utf-8"},
    f_write_options={"indent": 4},
)

E.g. to append to existing file pass f_open_options={“mode”: “a”} to the save() method.
By default saving plain text uses ”\n” as delimiter between items,
you can pass custom delimiter using f_write_options

(FileStream("path/to/lorem/ipsum")
    .map(lambda line: line.strip())
    .enumerate()
    .filter(lambda line: "ad" in line[1])
    .map(lambda line: f"line:{line[0]}, text='{line[1]}'")
    .save(f_open_options={"mode": "a"}, f_write_options={"delimiter": " || "})
)

# Lorem ipsum...
# ...
# line:0, text='Lorem ipsum dolor sit amet, consectetur adipisicing elit,' || line:2, text='Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris'

When working with plain text you can pass ’header’ and ’footer’ as f_write_options
to be prepended or appended to the FileStream output

(FileStream("path/to/lorem/ipsum")
  .map(lambda line: line.strip())
  .enumerate()
  .filter(lambda line: line[0] == 3)
  .map(lambda line: f"{line[0]}: {line[1]}")
  .save(f_open_options={"mode": "a"}, f_write_options={"header": "\nHeader\n", "footer": "\nFooter\n"})
 )

# Lorem ipsum...
# ...
# qui officia deserunt mollit anim id est laborum.
#
# Header
# 3: nisi ut aliquip ex ea commodo consequat.
# Footer
#

To add custom root tag when saving an .xml file pass ’xml_root=”my-custom-root”’

FileStream("path/to/file.json").concat(in_memory_dict).save(
    file_path="path/to/custom.xml",
    f_open_options={"encoding": "utf-8"},
    f_write_options={"indent": 4},
    xml_root="my-custom-root",
)

How far can we actually push it?

(
    FileStream("path/to/file.csv")
    .concat(
        FileStream("path/to/other/file.json")
        .filter(
            lambda x: (
                Stream(x.value)
                .find_first(lambda y: y.key == "name" and y.value != "Snake")
                .or_else_get(lambda: None)
            )
            is not None
        )
        .map(lambda x: x.value)
    )
    .map(lambda x: (Stream(x).to_dict(lambda y: DictItem(y.key, y.value or "N/A"))))
    .save("path/to/third/file.tsv")
)
  • …some leetcode maybe?

#  check if given string is palindrome; string length is guaranteed to be > 0
def validate_str(string):
    stop = len(string) // 2 if len(string) > 1 else 1
    return Stream.from_range(0, stop).all_match(lambda x: string[x] == string[-x - 1])

validate_str("a1b2c3c2b1a")
validate_str("abc321")
validate_str("xyyx")
validate_str("aba")
validate_str("z")

# True
# False
# True
# True
# True
  • …and another one?

# count vowels and constants in given string
def process_str(string):
    ALL_VOWELS = "AEIOUaeiou"
    return (Stream(string)
        .filter(lambda ch: ch.isalpha())
        .partition(lambda ch: ch in ALL_VOWELS)  # Partitions entries into true and false ones
        .map(lambda p: tuple(p))
        .enumerate()
        .map(lambda x: ("Vowels" if x[0] == 0 else "Consonants", [len(x[1]), x[1]]))
        .to_dict()
    )

process_str("123Ab5oc-E6db#bCi9<>")

# {'Vowels': [4, ('A', 'o', 'E', 'i')], 'Consonants': [6, ('b', 'c', 'd', 'b', 'b', 'C')]}

How hideous can it get?

Chubby