## How to use
### Creating streams
- stream from iterable
```python
Stream([1, 2, 3])
```
- from variadic arguments
```python
Stream.of(1, 2, 3)
```
- empty stream
```python
Stream.empty()
```
- infinite ordered stream
```python
Stream.iterate(0, lambda x: x + 1)
```
NB: in similar fashion you can create finite ordered stream by providing a condition predicate
```python
Stream.iterate(10, operation=lambda x: x + 1, condition=lambda x: x < 15).to_list()
# [10, 11, 12, 13, 14]
```
- infinite unordered stream
```python
import random
Stream.generate(lambda: random.random())
```
- infinite stream with given value
```python
Stream.constant(42)
```
- stream from range
(from start (inclusive) to stop (exclusive) by an incremental step (defaults to 1))
```python
Stream.from_range(0, 10).to_list()
Stream.from_range(0, 10, 3).to_list()
Stream.from_range(10, -1, -2).to_list()
```
- concat
(concatenate new streams/iterables with the current one)
```python
Stream.of(1, 2, 3).concat(Stream.of(4, 5)).to_list()
Stream([1, 2, 3]).concat([5, 6]).to_list()
```
- prepend
(prepend new stream/iterable to the current one)
```python
Stream([2, 3, 4]).prepend(0, 1).to_list()
Stream.of(3, 4, 5).prepend(Stream.of([0, 1], 2)).to_list()
```
NB: creating new stream from None raises error.
In cases when the iterable could potentially be None use the of_nullable() method instead;
it returns an empty stream if None and a regular one otherwise
--------------------------------------------
### Intermediate operations
- filter
```python
Stream([1, 2, 3]).filter(lambda x: x % 2 == 0)
```
- map
```python
Stream([1, 2, 3]).map(str).to_list()
Stream([1, 2, 3]).map(lambda x: x + 5).to_list()
```
- filter_map
(filter out all None or discard_falsy values (if discard_falsy=True) and applies mapper function to the elements of the stream)
```python
Stream.of(None, "foo", "", "bar", 0, []).filter_map(str.upper, discard_falsy=True).to_list()
# ["FOO", "BAR"]
```
- flat_map
(map each element of the stream and yields the elements of the produced iterators)
```python
Stream([[1, 2], [3, 4], [5]]).flat_map(lambda x: Stream(x)).to_list()
# [1, 2, 3, 4, 5]
```
- flatten
```python
Stream([[1, 2], [3, 4], [5]]).flatten().to_list()
# [1, 2, 3, 4, 5]
```
- reduce
(returns Optional)
```python
Stream([1, 2, 3]).reduce(lambda acc, val: acc + val, identity=3).get()
```
- peek
(perform the provided operation on each element of the stream without consuming it)
```python
(Stream([1, 2, 3, 4])
.filter(lambda x: x > 2)
.peek(lambda x: print(f"{x} ", end=""))
.map(lambda x: x * 20)
.to_list())
```
- enumerate
(returns each element of the Stream preceded by his corresponding index
(by default starting from 0 if not specified otherwise))
```python
iterable = ["x", "y", "z"]
Stream(iterable).enumerate().to_list()
Stream(iterable).enumerate(start=1).to_list()
# [(0, "x"), (1, "y"), (2, "z")]
# [(1, "x"), (2, "y"), (3, "z")]
```
- view
(provides access to a selected part of the stream)
```python
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).view(start=1, stop=-3, step=2).to_list()
# [2, 4, 6]
```
- distinct
(returns a stream with the distinct elements of the current one)
```python
Stream([1, 1, 2, 2, 2, 3]).distinct().to_list()
```
- skip
(discards the first n elements of the stream and returns a new stream with the remaining ones)
```python
Stream.iterate(0, lambda x: x + 1).skip(5).limit(5).to_list()
```
- limit / head
(returns a stream with the first n elements, or fewer if the underlying iterator ends sooner)
```python
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).limit(3).to_tuple()
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).head(3).to_tuple()
```
- tail
(returns a stream with the last n elements, or fewer if the underlying iterator ends sooner)
```python
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).tail(3).to_tuple()
```
- take_while
(returns a stream that yields elements based on a predicate)
```python
Stream.of(1, 2, 3, 4, 5, 6, 7, 2, 3).take_while(lambda x: x < 5).to_list()
# [1, 2, 3, 4]
```
- drop_while
(returns a stream that skips elements based on a predicate and yields the remaining ones)
```python
Stream.of(1, 2, 3, 5, 6, 7, 2).drop_while(lambda x: x < 5).to_list()
# [5, 6, 7, 2]
```
- sort
(sorts the elements of the current stream according to natural order or based on the given comparator;
if 'reverse' flag is True, the elements are sorted in descending order)
```python
(Stream.of((3, 30), (2, 30), (2, 20), (1, 20), (1, 10))
.sort(lambda x: (x[0], x[1]), reverse=True)
.to_list())
# [(3, 30), (2, 30), (2, 20), (1, 20), (1, 10)]
```
- reverse
(sorts the elements of the current stream in reverse order;
alias for 'sort(collector, reverse=True)')
```python
(Stream.of((3, 30), (2, 30), (2, 20), (1, 20), (1, 10))
.reverse(lambda x: (x[0], x[1]))
.to_list())
# [(3, 30), (2, 30), (2, 20), (1, 20), (1, 10)]
```
NB: in case of stream of dicts all key-value pairs are represented internally as DictItem objects
(including recursively for nested Mapping structures)
to provide more convenient intermediate operations syntax e.g.
```python
first_dict = {"a": 1, "b": 2}
second_dict = {"x": 3, "y": 4}
(Stream(first_dict).concat(second_dict)
.filter(lambda x: x.value % 2 == 0)
.map(lambda x: x.key)
.to_list())
```
- on_close
(returns an equivalent Stream with an additional close handler to be invoked automatically by the terminal operation)
```python
(Stream([1, 2, 3, 4])
.on_close(lambda: print("Sorry Montessori"))
.peek(lambda x: print(f"{'$' * x} ", end=""))
.map(lambda x: x * 2)
.to_list())
# "$ $$ $$$ $$$$ Sorry Montessori"
# [2, 4, 6, 8]
```
--------------------------------------------
### Terminal operations
#### Collectors
- collecting result into list, tuple, set
```python
Stream([1, 2, 3]).to_list()
Stream([1, 2, 3]).to_tuple()
Stream([1, 2, 3]).to_set()
```
- into dict
```python
class Foo:
def __init__(self, name, num):
self.name = name
self.num = num
Stream([Foo("fizz", 1), Foo("buzz", 2)]).to_dict(lambda x: (x.name, x.num))
# {"fizz": 1, "buzz": 2}
```
In the case of a collision (duplicate keys) the 'merger' functions indicates which entry should be kept
```python
collection = [Foo("fizz", 1), Foo("fizz", 2), Foo("buzz", 2)]
Stream(collection).to_dict(collector=lambda x: (x.name, x.num), merger=lambda old, new: old)
# {"fizz": 1, "buzz": 2}
```
to_dict method also supports creating dictionaries from dict DictItem objects
```python
first_dict = {"x": 1, "y": 2}
second_dict = {"p": 33, "q": 44, "r": None}
Stream(first_dict).concat(Stream(second_dict)).to_dict(lambda x: DictItem(x.key, x.value or 0))
# {"x": 1, "y": 2, "p": 33, "q": 44, "r": 0}
```
e.g. you could combine streams of dicts by writing:
```python
Stream(first_dict).concat(Stream(second_dict)).to_dict()
```
(simplified from '.to_dict(lambda x: x)')
- into string
```python
Stream({"a": 1, "b": [2, 3]}).to_string()
# "Stream(DictItem(key=a, value=1), DictItem(key=b, value=[2, 3]))"
```
```python
Stream({"a": 1, "b": [2, 3]}).map(lambda x: {x.key: x.value}).to_string(delimiter=" | ")
# "Stream({'a': 1} | {'b': [2, 3]})"
```
- alternative for working with collectors is using the collect method
```python
Stream([1, 2, 3]).collect(tuple)
Stream.of(1, 2, 3).collect(list)
Stream.of(1, 1, 2, 2, 2, 3).collect(set)
Stream.of(1, 2, 3, 4).collect(dict, lambda x: (str(x), x * 10))
Stream.of("x", "y", "z").collect(str, str_delimiter="->")
```
- grouping
```python
Stream("AAAABBBCCD").group_by(collector=lambda key, grouper: (key, len(grouper)))
# {"A": 4, "B": 3, "C": 2, "D": 1}
```
```python
coll = [Foo("fizz", 1), Foo("fizz", 2), Foo("fizz", 3), Foo("buzz", 2), Foo("buzz", 3), Foo("buzz", 4), Foo("buzz", 5)]
Stream(coll).group_by(
classifier=lambda obj: obj.name,
collector=lambda key, grouper: (key, [(obj.name, obj.num) for obj in list(grouper)]))
# {"fizz": [("fizz", 1), ("fizz", 2), ("fizz", 3)],
# "buzz": [("buzz", 2), ("buzz", 3), ("buzz", 4), ("buzz", 5)]}
```
#### Other terminal operations
- for_each
```python
Stream([1, 2, 3, 4]).for_each(lambda x: print(f"{'#' * x} ", end=""))
```
- count
(returns the count of elements in the stream)
```python
Stream([1, 2, 3, 4]).filter(lambda x: x % 2 == 0).count()
```
- sum
```python
Stream.of(1, 2, 3, 4).sum()
```
- min
(returns Optional with the minimum element of the stream)
```python
Stream.of(2, 1, 3, 4).min().get()
```
- max
(returns Optional with the maximum element of the stream)
```python
Stream.of(2, 1, 3, 4).max().get()
```
- average
(returns the average value of elements in the stream)
```python
Stream.of(1, 2, 3, 4, 5).average()
```
- find_first
(search for an element of the stream that satisfies a predicate,
returns an Optional with the first found value, if any, or None)
```python
Stream.of(1, 2, 3, 4).filter(lambda x: x % 2 == 0).find_first().get()
```
- find_any
(search for an element of the stream that satisfies a predicate,
returns an Optional with some of the found values, if any, or None)
```python
Stream.of(1, 2, 3, 4).filter(lambda x: x % 2 == 0).find_any().get()
```
- any_match
(returns whether any elements of the stream match the given predicate)
```python
Stream.of(1, 2, 3, 4).any_match(lambda x: x > 2)
```
- all_match
(returns whether all elements of the stream match the given predicate)
```python
Stream.of(1, 2, 3, 4).all_match(lambda x: x > 2)
```
- none_match
(returns whether no elements of the stream match the given predicate)
```python
Stream.of(1, 2, 3, 4).none_match(lambda x: x < 0)
```
- take_first
(returns Optional with the first element of the stream or a default value)
```python
Stream({"a": 1, "b": 2}).take_first().get()
Stream([]).take_first(default=33).get()
# DictItem(key="a", value=1)
# 33
```
- take_last
(returns Optional with the last element of the stream or a default value)
```python
Stream({"a": 1, "b": 2}).take_last().get()
Stream([]).take_last(default=33).get()
```
- compare_with
(compares linearly the contents of two streams based on a given comparator)
```python
fizz = Foo("fizz", 1)
buzz = Foo("buzz", 2)
Stream([buzz, fizz]).compare_with(Stream([fizz, buzz]), lambda x, y: x.num == y.num)
```
- quantify
(count how many of the elements are Truthy or evaluate to True based on a given predicate)
```python
Stream([2, 3, 4, 5, 6]).quantify(predicate=lambda x: x % 2 == 0)
```
NB: although the Stream is closed automatically by the terminal operation
you can still close it by hand (if needed) invoking the close() method.
In turn that will trigger the close_handler (if such was provided)
--------------------------------------------
### Itertools integration
Invoke use method by passing the itertools function and it's arguments as **kwargs
```python
import itertools
import operator
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).use(itertools.islice, start=3, stop=8)
Stream.of(1, 2, 3, 4, 5).use(itertools.accumulate, func=operator.mul).to_list()
Stream(range(3)).use(itertools.permutations, r=3).to_list()
```
#### Itertools 'recipes'
Invoke the 'recipes' described [here](https://docs.python.org/3/library/itertools.html#itertools-recipes) as stream methods and pass required key-word arguments
```python
Stream([1, 2, 3]).ncycles(count=2).to_list()
Stream.of(2, 3, 4).take_nth(10, default=66).get()
Stream(["ABC", "D", "EF"]).round_robin().to_list()
```
--------------------------------------------
### FileStreams
NB: use 'pip install pyrio[fs]' to install necessary extra dependencies
#### Querying files
- working with json, toml, yaml, xml files
NB: FileStream reads data as series of DictItem objects from underlying dict_items view
```python
FileStream("path/to/file").map(lambda x: f"{x.key}=>{x.value}").to_tuple()
# ("abc=>xyz", "qwerty=>42")
```
```python
from operator import attrgetter
from pyrio import DictItem
(FileStream("path/to/file")
.filter(lambda x: "a" in x.key)
.map(lambda x: DictItem(x.key, sum(x.value) * 10))
.sort(attrgetter("value"), reverse=True)
.map(lambda x: f"{str(x.value)}::{x.key}")
.to_list())
# ["230::xza", "110::abba", "30::a"]
```
- querying csv and tsv files
(each row is read as a dict with keys taken from the header)
```python
FileStream("path/to/file").map(lambda x: f"fizz: {x['fizz']}, buzz: {x['buzz']}").to_tuple()
# ("fizz: 42, buzz: 45", "fizz: aaa, buzz: bbb")
```
```python
from operator import itemgetter
FileStream("path/to/file").map(itemgetter('fizz')).to_list()
# ['42', 'aaa']
```
You could query the nested dicts by creating streams out of them
```python
(FileStream("path/to/file")
.map(lambda x: (Stream(x).to_dict(lambda y: DictItem(y.key, y.value or "Unknown"))))
.save())
```
- reading plain text (if the file doesn't have one of the aforementioned extensions)
```python
(FileStream("path/to/lorem/ipsum")
.map(lambda x: x.strip())
.enumerate()
.filter(lambda line: "id" in line[1])
.to_dict()
)
# {1: "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
# 6: "Excepteur sint occaecat cupidatat non proident, sunt in culpa",
# 7: "qui officia deserunt mollit anim id est laborum."}
```
- reading a file with process() method
- use extra f_open_options (for the underlying open file function)
- f_read_options (to be passed to the corresponding library function that is loading the file content e.g. tomllib, json)
```python
from decimal import Decimal
(FileStream.process(
file_path="path/to/file.json",
f_open_options={"encoding": "utf-8"},
f_read_options={"parse_float": Decimal})
.map(lambda x:x.value).to_list())
# ['foo', True, Decimal('1.22'), Decimal('5.456367654)]
```
To include the root tag when loading an .xml file pass 'include_root=True'
```python
FileStream.process("path/to/custom_root.xml", include_root=True).map(
lambda x: f"root={x.key}: inner_records={str(x.value)}"
).to_list()
# ["root=custom-root: inner_records={'abc': 'xyz', 'qwerty': '42'}"]
```
--------------------------------------------
#### Saving to a file
- write the contents of a FileStream by passing a file_path to the save() method
```python
in_memory_dict = Stream(json_dict).filter(lambda x: len(x.key) < 6).to_tuple()
FileStream("path/to/file.json").prepend(in_memory_dict).save("./tests/resources/updated.json")
```
If no path is given, the source file for the FileStream will be updated
```python
FileStream("path/to/file.json").concat(in_memory_dict).save()
```
NB: if while updating the file something goes wrong, the original content will be restored/preserved
- handle null values
(pass null_handler function to replace null values)
```python
FileStream("path/to/test.toml").save(null_handler=lambda x: DictItem(x.key, x.value or "N/A"))
```
NB: useful for writing .toml files which don't allow None values
- passing advanced file open and write options
similarly to the process method you could provide
- f_open_options (for the underlying open function)
- f_write_options (passed to the corresponding library that will 'dump' the contents of the stream e.g. tomli-w, pyyaml)
```python
FileStream("path/to/file.json").concat(in_memory_dict).save(
file_path="merged.xml",
f_open_options={"encoding": "utf-8"},
f_write_options={"indent": 4},
)
```
E.g. to append to existing file pass f_open_options={"mode": "a"} to the save() method.
By default saving plain text uses "\n" as delimiter between items,
you can pass custom delimiter using f_write_options
```python
(FileStream("path/to/lorem/ipsum")
.map(lambda line: line.strip())
.enumerate()
.filter(lambda line: "ad" in line[1])
.map(lambda line: f"line:{line[0]}, text='{line[1]}'")
.save(f_open_options={"mode": "a"}, f_write_options={"delimiter": " || "})
)
# Lorem ipsum...
# ...
# line:0, text='Lorem ipsum dolor sit amet, consectetur adipisicing elit,' || line:2, text='Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris'
```
When working with plain text you can pass 'header' and 'footer' as f_write_options
to be prepended or appended to the FileStream output
```python
(FileStream("path/to/lorem/ipsum")
.map(lambda line: line.strip())
.enumerate()
.filter(lambda line: line[0] == 3)
.map(lambda line: f"{line[0]}: {line[1]}")
.save(f_open_options={"mode": "a"}, f_write_options={"header": "\nHeader\n", "footer": "\nFooter\n"})
)
# Lorem ipsum...
# ...
# qui officia deserunt mollit anim id est laborum.
#
# Header
# 3: nisi ut aliquip ex ea commodo consequat.
# Footer
#
```
To add custom root tag when saving an .xml file pass 'xml_root="my-custom-root"'
```python
FileStream("path/to/file.json").concat(in_memory_dict).save(
file_path="path/to/custom.xml",
f_open_options={"encoding": "utf-8"},
f_write_options={"indent": 4},
xml_root="my-custom-root",
)
```
--------------------------------------------
### How far can we actually push it?
```python
(
FileStream("path/to/file.csv")
.concat(
FileStream("path/to/other/file.json")
.filter(
lambda x: (
Stream(x.value)
.find_first(lambda y: y.key == "name" and y.value != "Snake")
.or_else_get(lambda: None)
)
is not None
)
.map(lambda x: x.value)
)
.map(lambda x: (Stream(x).to_dict(lambda y: DictItem(y.key, y.value or "N/A"))))
.save("path/to/third/file.tsv")
)
```
- ...some leetcode maybe?
```python
# check if given string is palindrome; string length is guaranteed to be > 0
def validate_str(string):
stop = len(string) // 2 if len(string) > 1 else 1
return Stream.from_range(0, stop).all_match(lambda x: string[x] == string[-x - 1])
validate_str("a1b2c3c2b1a")
validate_str("abc321")
validate_str("xyyx")
validate_str("aba")
validate_str("z")
# True
# False
# True
# True
# True
```
- ...and another one?
```python
# count vowels and constants in given string
def process_str(string):
ALL_VOWELS = "AEIOUaeiou"
return (Stream(string)
.filter(lambda ch: ch.isalpha())
.partition(lambda ch: ch in ALL_VOWELS) # Partitions entries into true and false ones
.map(lambda p: tuple(p))
.enumerate()
.map(lambda x: ("Vowels" if x[0] == 0 else "Consonants", [len(x[1]), x[1]]))
.to_dict()
)
process_str("123Ab5oc-E6db#bCi9<>")
# {'Vowels': [4, ('A', 'o', 'E', 'i')], 'Consonants': [6, ('b', 'c', 'd', 'b', 'b', 'C')]}
```
How hideous can it get?