Streaming#
Streams are used to monitor listings for new items. This is beneficial for bots that need to discover and respond to new events in near real time.
#!/usr/bin/env python
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from redditwarp.models.submission_ASYNC import Submission
from redditwarp.models.comment_ASYNC import Comment
import sys
import asyncio
import traceback
import redditwarp.ASYNC
from redditwarp.streaming.makers.subreddit_ASYNC import (
create_submission_stream,
create_comment_stream,
)
from redditwarp.streaming.ASYNC import flow
from redditwarp.util.passthru import passthru
async def main() -> None:
client = redditwarp.ASYNC.Client()
async with client:
submission_stream = create_submission_stream(client, 'AskReddit')
@submission_stream.output.attach
async def _(subm: Submission) -> None:
print(f"{subm.id36}+", '*', repr(subm.title))
comment_stream = create_comment_stream(client, 'AskReddit')
@comment_stream.output.attach
async def _(comm: Comment) -> None:
print(f"+{comm.id36}", '-', repr(comm.body)[:20])
@passthru(comment_stream.error.attach)
@passthru(submission_stream.error.attach)
async def _(exc: Exception) -> None:
print('<>', file=sys.stderr)
traceback.print_exception(exc.__class__, exc, exc.__traceback__)
print('</>', file=sys.stderr)
await flow(
submission_stream,
comment_stream,
)
asyncio.run(main())
Because Reddit listings don’t support websockets or webhooks, the client must simulate streaming by polling continuously for new items. RedditWarp’s streams are simple to use, having little to no configuration options, and are designed to be automatically efficient and accurate. It will reduce polling frequency when it can’t connect to the server and speed up polling when the listing is more active.
The steaming logic is based on paginators.
Pagination source |
Stream creator function |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream objects have two events on them: output
and error
. You can attach
event handlers to these dispatchers by passing a single-argument function to
their attach()
method.
It is important to add thorough error handling logic to event handler code as
any exceptions thrown from the attached handlers will be propagated and crash
the program as per usual. Note that the error
event is for errors generated
by the streaming mechanism and not for errors caused by user code.
The .attach()
method returns None
, so be careful when stacking them as
decorators. If you prefer the look of decorators, you can use the passthru
utility to force functions to return their input.
from redditwarp.util.passthru import passthru
...
@passthru(comment_stream.error.attach)
@passthru(submission_stream.error.attach)
async def _(exc: Exception) -> None:
print('<>', file=sys.stderr)
traceback.print_exception(exc.__class__, exc, exc.__traceback__)
print('</>', file=sys.stderr)
The flow()
method is used to run the steams. Because async streams are
awaitable they can also be passed directly to asyncio.gather()
instead.
Although streams are useful for discovering new resources quickly, they do not guarantee that an uncovered item will actually be ‘new’. It is recommended to check the creation date of a resource if being new is important to your handler’s logic.
The streams provide the invariant that an item from the stream will never be repeated.