Sven Kreiss ←Articles

Stream Processing in pysparkling

pysparkling is a native Python implementation of PySpark. Stream processing is considered to be one of the most important features of Spark. PySpark provides a Python interface to Spark’s StreamingContext and supports consuming from updating HDFS folders and TCP sockets and provides interfaces to Kafka, Kinesis, Flume and MQTT. Initial support for stream processing from folders and TCP sockets is in pysparkling 0.4.0 which you can now install with:

pip install --upgrade pysparkling

Counting Example

In the normal batch processing way, you can count elements with:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).count()
3

This is similar for stream processing. Incoming data is batched every 0.1 seconds (the batch interval — the second parameter to StreamContext) and elements are counted in 0.2 second windows, i.e. two batch intervals, which returns the count of the first batch, the count of the first and second batch and the count of the second and third batch:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[1, 1, 5], [5, 5, 2, 4], [1, 2]])
...     .countByWindow(0.2)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[3]
[7]
[6]

Other new features apart from the pysparkling.streaming module are an improved pysparkling.fileio module, methods for reading binary files (binaryFiles() and binaryRecords()) and more inline examples in the documentation.

Head over to the RDD (batch datasets) and DStream (discrete stream) documentations to learn more!

API documentation at pysparkling.trivial.io

Related Posts

Go Top