## How to use ### Creating streams - stream from iterable ```python Stream([1, 2, 3]) ``` - from variadic arguments ```python Stream.of(1, 2, 3) ``` - empty stream ```python Stream.empty() ``` - infinite ordered stream ```python Stream.iterate(0, lambda x: x + 1) ``` NB: in similar fashion you can create finite ordered stream by providing a condition predicate ```python Stream.iterate(10, operation=lambda x: x + 1, condition=lambda x: x < 15).to_list() # [10, 11, 12, 13, 14] ``` - infinite unordered stream ```python import random Stream.generate(lambda: random.random()) ``` - infinite stream with given value ```python Stream.constant(42) ``` - stream from range
(from start (inclusive) to stop (exclusive) by an incremental step (defaults to 1)) ```python Stream.from_range(0, 10).to_list() Stream.from_range(0, 10, 3).to_list() Stream.from_range(10, -1, -2).to_list() ``` - concat
(concatenate new streams/iterables with the current one) ```python Stream.of(1, 2, 3).concat(Stream.of(4, 5)).to_list() Stream([1, 2, 3]).concat([5, 6]).to_list() ``` - prepend
(prepend new stream/iterable to the current one) ```python Stream([2, 3, 4]).prepend(0, 1).to_list() Stream.of(3, 4, 5).prepend(Stream.of([0, 1], 2)).to_list() ``` NB: creating new stream from None raises error.
In cases when the iterable could potentially be None use the of_nullable() method instead;
it returns an empty stream if None and a regular one otherwise -------------------------------------------- ### Intermediate operations - filter ```python Stream([1, 2, 3]).filter(lambda x: x % 2 == 0) ``` - map ```python Stream([1, 2, 3]).map(str).to_list() Stream([1, 2, 3]).map(lambda x: x + 5).to_list() ``` - filter_map
(filter out all None or discard_falsy values (if discard_falsy=True) and applies mapper function to the elements of the stream) ```python Stream.of(None, "foo", "", "bar", 0, []).filter_map(str.upper, discard_falsy=True).to_list() # ["FOO", "BAR"] ``` - flat_map
(map each element of the stream and yields the elements of the produced iterators) ```python Stream([[1, 2], [3, 4], [5]]).flat_map(lambda x: Stream(x)).to_list() # [1, 2, 3, 4, 5] ``` - flatten ```python Stream([[1, 2], [3, 4], [5]]).flatten().to_list() # [1, 2, 3, 4, 5] ``` - reduce
(returns Optional) ```python Stream([1, 2, 3]).reduce(lambda acc, val: acc + val, identity=3).get() ``` - peek
(perform the provided operation on each element of the stream without consuming it) ```python (Stream([1, 2, 3, 4]) .filter(lambda x: x > 2) .peek(lambda x: print(f"{x} ", end="")) .map(lambda x: x * 20) .to_list()) ``` - enumerate
(returns each element of the Stream preceded by his corresponding index (by default starting from 0 if not specified otherwise)) ```python iterable = ["x", "y", "z"] Stream(iterable).enumerate().to_list() Stream(iterable).enumerate(start=1).to_list() # [(0, "x"), (1, "y"), (2, "z")] # [(1, "x"), (2, "y"), (3, "z")] ``` - view
(provides access to a selected part of the stream) ```python Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).view(start=1, stop=-3, step=2).to_list() # [2, 4, 6] ``` - distinct
(returns a stream with the distinct elements of the current one) ```python Stream([1, 1, 2, 2, 2, 3]).distinct().to_list() ``` - skip
(discards the first n elements of the stream and returns a new stream with the remaining ones) ```python Stream.iterate(0, lambda x: x + 1).skip(5).limit(5).to_list() ``` - limit / head
(returns a stream with the first n elements, or fewer if the underlying iterator ends sooner) ```python Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).limit(3).to_tuple() Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).head(3).to_tuple() ``` - tail
(returns a stream with the last n elements, or fewer if the underlying iterator ends sooner) ```python Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).tail(3).to_tuple() ``` - take_while
(returns a stream that yields elements based on a predicate) ```python Stream.of(1, 2, 3, 4, 5, 6, 7, 2, 3).take_while(lambda x: x < 5).to_list() # [1, 2, 3, 4] ``` - drop_while
(returns a stream that skips elements based on a predicate and yields the remaining ones) ```python Stream.of(1, 2, 3, 5, 6, 7, 2).drop_while(lambda x: x < 5).to_list() # [5, 6, 7, 2] ``` - sort
(sorts the elements of the current stream according to natural order or based on the given comparator;
if 'reverse' flag is True, the elements are sorted in descending order) ```python (Stream.of((3, 30), (2, 30), (2, 20), (1, 20), (1, 10)) .sort(lambda x: (x[0], x[1]), reverse=True) .to_list()) # [(3, 30), (2, 30), (2, 20), (1, 20), (1, 10)] ``` - reverse
(sorts the elements of the current stream in reverse order;
alias for 'sort(collector, reverse=True)') ```python (Stream.of((3, 30), (2, 30), (2, 20), (1, 20), (1, 10)) .reverse(lambda x: (x[0], x[1])) .to_list()) # [(3, 30), (2, 30), (2, 20), (1, 20), (1, 10)] ```
NB: in case of stream of dicts all key-value pairs are represented internally as DictItem objects
(including recursively for nested Mapping structures)
to provide more convenient intermediate operations syntax e.g. ```python first_dict = {"a": 1, "b": 2} second_dict = {"x": 3, "y": 4} (Stream(first_dict).concat(second_dict) .filter(lambda x: x.value % 2 == 0) .map(lambda x: x.key) .to_list()) ``` - on_close
(returns an equivalent Stream with an additional close handler to be invoked automatically by the terminal operation) ```python (Stream([1, 2, 3, 4]) .on_close(lambda: print("Sorry Montessori")) .peek(lambda x: print(f"{'$' * x} ", end="")) .map(lambda x: x * 2) .to_list()) # "$ $$ $$$ $$$$ Sorry Montessori" # [2, 4, 6, 8] ``` -------------------------------------------- ### Terminal operations #### Collectors - collecting result into list, tuple, set ```python Stream([1, 2, 3]).to_list() Stream([1, 2, 3]).to_tuple() Stream([1, 2, 3]).to_set() ``` - into dict ```python class Foo: def __init__(self, name, num): self.name = name self.num = num Stream([Foo("fizz", 1), Foo("buzz", 2)]).to_dict(lambda x: (x.name, x.num)) # {"fizz": 1, "buzz": 2} ``` In the case of a collision (duplicate keys) the 'merger' functions indicates which entry should be kept ```python collection = [Foo("fizz", 1), Foo("fizz", 2), Foo("buzz", 2)] Stream(collection).to_dict(collector=lambda x: (x.name, x.num), merger=lambda old, new: old) # {"fizz": 1, "buzz": 2} ``` to_dict method also supports creating dictionaries from dict DictItem objects ```python first_dict = {"x": 1, "y": 2} second_dict = {"p": 33, "q": 44, "r": None} Stream(first_dict).concat(Stream(second_dict)).to_dict(lambda x: DictItem(x.key, x.value or 0)) # {"x": 1, "y": 2, "p": 33, "q": 44, "r": 0} ``` e.g. you could combine streams of dicts by writing: ```python Stream(first_dict).concat(Stream(second_dict)).to_dict() ``` (simplified from '.to_dict(lambda x: x)') - into string ```python Stream({"a": 1, "b": [2, 3]}).to_string() # "Stream(DictItem(key=a, value=1), DictItem(key=b, value=[2, 3]))" ``` ```python Stream({"a": 1, "b": [2, 3]}).map(lambda x: {x.key: x.value}).to_string(delimiter=" | ") # "Stream({'a': 1} | {'b': [2, 3]})" ``` - alternative for working with collectors is using the collect method ```python Stream([1, 2, 3]).collect(tuple) Stream.of(1, 2, 3).collect(list) Stream.of(1, 1, 2, 2, 2, 3).collect(set) Stream.of(1, 2, 3, 4).collect(dict, lambda x: (str(x), x * 10)) Stream.of("x", "y", "z").collect(str, str_delimiter="->") ``` - grouping ```python Stream("AAAABBBCCD").group_by(collector=lambda key, grouper: (key, len(grouper))) # {"A": 4, "B": 3, "C": 2, "D": 1} ``` ```python coll = [Foo("fizz", 1), Foo("fizz", 2), Foo("fizz", 3), Foo("buzz", 2), Foo("buzz", 3), Foo("buzz", 4), Foo("buzz", 5)] Stream(coll).group_by( classifier=lambda obj: obj.name, collector=lambda key, grouper: (key, [(obj.name, obj.num) for obj in list(grouper)])) # {"fizz": [("fizz", 1), ("fizz", 2), ("fizz", 3)], # "buzz": [("buzz", 2), ("buzz", 3), ("buzz", 4), ("buzz", 5)]} ``` #### Other terminal operations - for_each ```python Stream([1, 2, 3, 4]).for_each(lambda x: print(f"{'#' * x} ", end="")) ``` - count
(returns the count of elements in the stream) ```python Stream([1, 2, 3, 4]).filter(lambda x: x % 2 == 0).count() ``` - sum ```python Stream.of(1, 2, 3, 4).sum() ``` - min
(returns Optional with the minimum element of the stream) ```python Stream.of(2, 1, 3, 4).min().get() ``` - max
(returns Optional with the maximum element of the stream) ```python Stream.of(2, 1, 3, 4).max().get() ``` - average
(returns the average value of elements in the stream) ```python Stream.of(1, 2, 3, 4, 5).average() ``` - find_first
(search for an element of the stream that satisfies a predicate, returns an Optional with the first found value, if any, or None) ```python Stream.of(1, 2, 3, 4).filter(lambda x: x % 2 == 0).find_first().get() ``` - find_any
(search for an element of the stream that satisfies a predicate, returns an Optional with some of the found values, if any, or None) ```python Stream.of(1, 2, 3, 4).filter(lambda x: x % 2 == 0).find_any().get() ``` - any_match
(returns whether any elements of the stream match the given predicate) ```python Stream.of(1, 2, 3, 4).any_match(lambda x: x > 2) ``` - all_match
(returns whether all elements of the stream match the given predicate) ```python Stream.of(1, 2, 3, 4).all_match(lambda x: x > 2) ``` - none_match
(returns whether no elements of the stream match the given predicate) ```python Stream.of(1, 2, 3, 4).none_match(lambda x: x < 0) ``` - take_first
(returns Optional with the first element of the stream or a default value) ```python Stream({"a": 1, "b": 2}).take_first().get() Stream([]).take_first(default=33).get() # DictItem(key="a", value=1) # 33 ``` - take_last
(returns Optional with the last element of the stream or a default value) ```python Stream({"a": 1, "b": 2}).take_last().get() Stream([]).take_last(default=33).get() ``` - compare_with
(compares linearly the contents of two streams based on a given comparator) ```python fizz = Foo("fizz", 1) buzz = Foo("buzz", 2) Stream([buzz, fizz]).compare_with(Stream([fizz, buzz]), lambda x, y: x.num == y.num) ``` - quantify
(count how many of the elements are Truthy or evaluate to True based on a given predicate) ```python Stream([2, 3, 4, 5, 6]).quantify(predicate=lambda x: x % 2 == 0) ``` NB: although the Stream is closed automatically by the terminal operation
you can still close it by hand (if needed) invoking the close() method.
In turn that will trigger the close_handler (if such was provided) -------------------------------------------- ### Itertools integration Invoke use method by passing the itertools function and it's arguments as **kwargs ```python import itertools import operator Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).use(itertools.islice, start=3, stop=8) Stream.of(1, 2, 3, 4, 5).use(itertools.accumulate, func=operator.mul).to_list() Stream(range(3)).use(itertools.permutations, r=3).to_list() ``` #### Itertools 'recipes' Invoke the 'recipes' described [here](https://docs.python.org/3/library/itertools.html#itertools-recipes) as stream methods and pass required key-word arguments ```python Stream([1, 2, 3]).ncycles(count=2).to_list() Stream.of(2, 3, 4).take_nth(10, default=66).get() Stream(["ABC", "D", "EF"]).round_robin().to_list() ``` -------------------------------------------- ### FileStreams NB: use 'pip install pyrio[fs]' to install necessary extra dependencies #### Querying files - working with json, toml, yaml, xml files
NB: FileStream reads data as series of DictItem objects from underlying dict_items view ```python FileStream("path/to/file").map(lambda x: f"{x.key}=>{x.value}").to_tuple() # ("abc=>xyz", "qwerty=>42") ``` ```python from operator import attrgetter from pyrio import DictItem (FileStream("path/to/file") .filter(lambda x: "a" in x.key) .map(lambda x: DictItem(x.key, sum(x.value) * 10)) .sort(attrgetter("value"), reverse=True) .map(lambda x: f"{str(x.value)}::{x.key}") .to_list()) # ["230::xza", "110::abba", "30::a"] ``` - querying csv and tsv files
(each row is read as a dict with keys taken from the header) ```python FileStream("path/to/file").map(lambda x: f"fizz: {x['fizz']}, buzz: {x['buzz']}").to_tuple() # ("fizz: 42, buzz: 45", "fizz: aaa, buzz: bbb") ``` ```python from operator import itemgetter FileStream("path/to/file").map(itemgetter('fizz')).to_list() # ['42', 'aaa'] ``` You could query the nested dicts by creating streams out of them ```python (FileStream("path/to/file") .map(lambda x: (Stream(x).to_dict(lambda y: DictItem(y.key, y.value or "Unknown")))) .save()) ``` - reading plain text (if the file doesn't have one of the aforementioned extensions) ```python (FileStream("path/to/lorem/ipsum") .map(lambda x: x.strip()) .enumerate() .filter(lambda line: "id" in line[1]) .to_dict() ) # {1: "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", # 6: "Excepteur sint occaecat cupidatat non proident, sunt in culpa", # 7: "qui officia deserunt mollit anim id est laborum."} ``` - reading a file with process() method - use extra f_open_options (for the underlying open file function) - f_read_options (to be passed to the corresponding library function that is loading the file content e.g. tomllib, json) ```python from decimal import Decimal (FileStream.process( file_path="path/to/file.json", f_open_options={"encoding": "utf-8"}, f_read_options={"parse_float": Decimal}) .map(lambda x:x.value).to_list()) # ['foo', True, Decimal('1.22'), Decimal('5.456367654)] ``` To include the root tag when loading an .xml file pass 'include_root=True' ```python FileStream.process("path/to/custom_root.xml", include_root=True).map( lambda x: f"root={x.key}: inner_records={str(x.value)}" ).to_list() # ["root=custom-root: inner_records={'abc': 'xyz', 'qwerty': '42'}"] ``` -------------------------------------------- #### Saving to a file - write the contents of a FileStream by passing a file_path to the save() method ```python in_memory_dict = Stream(json_dict).filter(lambda x: len(x.key) < 6).to_tuple() FileStream("path/to/file.json").prepend(in_memory_dict).save("./tests/resources/updated.json") ``` If no path is given, the source file for the FileStream will be updated ```python FileStream("path/to/file.json").concat(in_memory_dict).save() ``` NB: if while updating the file something goes wrong, the original content will be restored/preserved - handle null values
(pass null_handler function to replace null values) ```python FileStream("path/to/test.toml").save(null_handler=lambda x: DictItem(x.key, x.value or "N/A")) ``` NB: useful for writing .toml files which don't allow None values - passing advanced file open and write options
similarly to the process method you could provide - f_open_options (for the underlying open function) - f_write_options (passed to the corresponding library that will 'dump' the contents of the stream e.g. tomli-w, pyyaml) ```python FileStream("path/to/file.json").concat(in_memory_dict).save( file_path="merged.xml", f_open_options={"encoding": "utf-8"}, f_write_options={"indent": 4}, ) ``` E.g. to append to existing file pass f_open_options={"mode": "a"} to the save() method.
By default saving plain text uses "\n" as delimiter between items,
you can pass custom delimiter using f_write_options ```python (FileStream("path/to/lorem/ipsum") .map(lambda line: line.strip()) .enumerate() .filter(lambda line: "ad" in line[1]) .map(lambda line: f"line:{line[0]}, text='{line[1]}'") .save(f_open_options={"mode": "a"}, f_write_options={"delimiter": " || "}) ) # Lorem ipsum... # ... # line:0, text='Lorem ipsum dolor sit amet, consectetur adipisicing elit,' || line:2, text='Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris' ``` When working with plain text you can pass 'header' and 'footer' as f_write_options
to be prepended or appended to the FileStream output ```python (FileStream("path/to/lorem/ipsum") .map(lambda line: line.strip()) .enumerate() .filter(lambda line: line[0] == 3) .map(lambda line: f"{line[0]}: {line[1]}") .save(f_open_options={"mode": "a"}, f_write_options={"header": "\nHeader\n", "footer": "\nFooter\n"}) ) # Lorem ipsum... # ... # qui officia deserunt mollit anim id est laborum. # # Header # 3: nisi ut aliquip ex ea commodo consequat. # Footer # ``` To add custom root tag when saving an .xml file pass 'xml_root="my-custom-root"' ```python FileStream("path/to/file.json").concat(in_memory_dict).save( file_path="path/to/custom.xml", f_open_options={"encoding": "utf-8"}, f_write_options={"indent": 4}, xml_root="my-custom-root", ) ``` -------------------------------------------- ### How far can we actually push it? ```python ( FileStream("path/to/file.csv") .concat( FileStream("path/to/other/file.json") .filter( lambda x: ( Stream(x.value) .find_first(lambda y: y.key == "name" and y.value != "Snake") .or_else_get(lambda: None) ) is not None ) .map(lambda x: x.value) ) .map(lambda x: (Stream(x).to_dict(lambda y: DictItem(y.key, y.value or "N/A")))) .save("path/to/third/file.tsv") ) ``` - ...some leetcode maybe? ```python # check if given string is palindrome; string length is guaranteed to be > 0 def validate_str(string): stop = len(string) // 2 if len(string) > 1 else 1 return Stream.from_range(0, stop).all_match(lambda x: string[x] == string[-x - 1]) validate_str("a1b2c3c2b1a") validate_str("abc321") validate_str("xyyx") validate_str("aba") validate_str("z") # True # False # True # True # True ``` - ...and another one? ```python # count vowels and constants in given string def process_str(string): ALL_VOWELS = "AEIOUaeiou" return (Stream(string) .filter(lambda ch: ch.isalpha()) .partition(lambda ch: ch in ALL_VOWELS) # Partitions entries into true and false ones .map(lambda p: tuple(p)) .enumerate() .map(lambda x: ("Vowels" if x[0] == 0 else "Consonants", [len(x[1]), x[1]])) .to_dict() ) process_str("123Ab5oc-E6db#bCi9<>") # {'Vowels': [4, ('A', 'o', 'E', 'i')], 'Consonants': [6, ('b', 'c', 'd', 'b', 'b', 'C')]} ``` How hideous can it get?

Chubby