Streaming#

Streams are used to monitor listings for new items. This is beneficial for bots that need to discover and respond to new events in near real time.

#!/usr/bin/env python
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from redditwarp.models.submission_ASYNC import Submission
    from redditwarp.models.comment_ASYNC import Comment

import sys
import asyncio
import traceback

import redditwarp.ASYNC
from redditwarp.streaming.makers.subreddit_ASYNC import (
    create_submission_stream,
    create_comment_stream,
)
from redditwarp.streaming.ASYNC import flow
from redditwarp.util.passthru import passthru

async def main() -> None:
    client = redditwarp.ASYNC.Client()
    async with client:
        submission_stream = create_submission_stream(client, 'AskReddit')

        @submission_stream.output.attach
        async def _(subm: Submission) -> None:
            print(f"{subm.id36}+", '*', repr(subm.title))

        comment_stream = create_comment_stream(client, 'AskReddit')

        @comment_stream.output.attach
        async def _(comm: Comment) -> None:
            print(f"+{comm.id36}", '-', repr(comm.body)[:20])

        @passthru(comment_stream.error.attach)
        @passthru(submission_stream.error.attach)
        async def _(exc: Exception) -> None:
            print('<>', file=sys.stderr)
            traceback.print_exception(exc.__class__, exc, exc.__traceback__)
            print('</>', file=sys.stderr)

        await flow(
            submission_stream,
            comment_stream,
        )

asyncio.run(main())

Because Reddit listings don’t support websockets or webhooks, the client must simulate streaming by polling continuously for new items. RedditWarp’s streams are simple to use, having little to no configuration options, and are designed to be automatically efficient and accurate. It will reduce polling frequency when it can’t connect to the server and speed up polling when the listing is more active.

The steaming logic is based on paginators.

Streamable listings#

Pagination source

Stream creator function

client.p.message.pulls.inbox()

message_ASYNC.create_inbox_message_stream()

client.p.message.pulls.mentions()

message_ASYNC.create_mentions_message_stream()

client.p.moderation.pull_actions()

moderation_ASYNC.create_action_log_stream()

client.p.modmail.pull.inbox()

modmail_ASYNC.create_conversation_message_inbox_stream()

client.p.subreddit.pull.new()

subreddit_ASYNC.create_submission_stream()

client.p.subreddit.pull_new_comments()

subreddit_ASYNC.create_comment_stream()

client.p.subreddit.pulls.new()

subreddit_ASYNC.create_subreddit_stream()

client.p.user.pull_user_subreddits.new()

user_ASYNC.create_user_subreddit_stream()

Stream objects have two events on them: output and error. You can attach event handlers to these dispatchers by passing a single-argument function to their attach() method.

It is important to add thorough error handling logic to event handler code as any exceptions thrown from the attached handlers will be propagated and crash the program as per usual. Note that the error event is for errors generated by the streaming mechanism and not for errors caused by user code.

The .attach() method returns None, so be careful when stacking them as decorators. If you prefer the look of decorators, you can use the passthru utility to force functions to return their input.

from redditwarp.util.passthru import passthru

...

@passthru(comment_stream.error.attach)
@passthru(submission_stream.error.attach)
async def _(exc: Exception) -> None:
    print('<>', file=sys.stderr)
    traceback.print_exception(exc.__class__, exc, exc.__traceback__)
    print('</>', file=sys.stderr)

The flow() method is used to run the steams. Because async streams are awaitable they can also be passed directly to asyncio.gather() instead.

Although streams are useful for discovering new resources quickly, they do not guarantee that an uncovered item will actually be ‘new’. It is recommended to check the creation date of a resource if being new is important to your handler’s logic.

The streams provide the invariant that an item from the stream will never be repeated.