Skip to content
/ ssec Public

Python package for synchronous and asynchronous streaming of Server-Sent Events (SSE).

License

Notifications You must be signed in to change notification settings

ReMi-HSBI/ssec

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

drawing

ssec

ruff mypy gitmoji

Description

Python package for synchronous and asynchronous streaming of Server-Sent Events (SSE). This library works with httpx to support synchronous as well as asynchronous workflows but is also usable with other http frameworks (see below).

Installation

pip install ssec

Usage

sync

import logging
import ssec

def main() -> None:
    logging.basicConfig(level=logging.INFO)
    for event in ssec.sse(
        "https://stream.wikimedia.org/v2/stream/recentchange"
    ):
        print(event)

main()

async

import asyncio
import logging
import ssec

async def main() -> None:
    logging.basicConfig(level=logging.INFO)
    async for event in ssec.sse_async(
        "https://stream.wikimedia.org/v2/stream/recentchange"
    ):
        print(event)

asyncio.run(main())

Note

Although there are already some libraries on the subject (aiohttp-sse-client, aiosseclient), these are unfortunately not entirely correct. In example, both mentioned libraries asynchronously iterate over the stream content via async for line in response.content12. This internally calls aiohttp's readuntil method with the default seperator \n, but the official specification says:

Lines must be separated by either a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, or a single +000D CARRIAGE RETURN (CR) character.

Another point is the error handling, which is often not sufficient to analyze the error or is entirely skipped.

aiohttp

Although this library works with httpx, it is also possible to use it with other http frameworks like aiohttp as long as they provide a method to iterate over a byte-stream. Unfortunately, it is not possible to handle reconnection then, so you will have to implement that by yourself. An example could look like this:

import asyncio
import logging

import aiohttp
import ssec

async def main() -> None:
    logging.basicConfig(level=logging.INFO)

    chunk_size = 1024
    connect_attempt = 1
    max_connect_attempts = 5
    config = ssec.SSEConfig(reconnect_timeout=3)
    async with aiohttp.ClientSession() as session:
        while True:
            headers = {
                "Accept": "text/event-stream",
                "Cache-Control": "no-store",
            }
            if config.last_event_id:
                headers["Last-Event-ID"] = config.last_event_id
            try:
                async with session.get(
                    "https://stream.wikimedia.org/v2/stream/recentchange",
                ) as response:
                    streamer = response.content.iter_chunked(chunk_size)
                    async for event in ssec.stream_async(streamer, config=config):
                        print(event)
            except aiohttp.ClientError:
                if connect_attempt >= max_connect_attempts:
                    logging.exception("Failed to connect!")
                    raise

                waiting_period = config.reconnect_timeout

                message = (
                    f"Failed to connect. "
                    f"Reconnect in {waiting_period} seconds "
                    f"[attempt {connect_attempt}/{max_connect_attempts}]."
                )
                logging.info(message)

                connect_attempt += 1
                await asyncio.sleep(waiting_period)

asyncio.run(main())

Miscellaneous

Installation (Developer)

pip install ssec[dev]

Documentation

Build the documentation by running the following command in the root directory of the project:

sphinx-build -b html docs/src docs/build

The command requires the developers edition of ssec.

The documentation is then accessible via docs/build/index.html.

Set up Visual Studio Code for Development

To edit the code base with Visual Studio Code, you should install the @recommended extensions. Other necessary settings are already included in the .vscode directory and should be enabled by default.

Contributing

Contributing to ssec is highly appreciated, but comes with some requirements:

  1. Type Hints

    Write modern python code using type annotations to enable static analysis and potential runtime type checking.

  2. Documentation

    Write quality documentation using numpydoc docstring conventions.

  3. Linting

    Lint your code with ruff and mypy.

  4. Style

    Format your code using ruff.

  5. Testing

    Write tests for your code using pytest.

Footnotes

  1. Code Reference

  2. Code Reference

About

Python package for synchronous and asynchronous streaming of Server-Sent Events (SSE).

Topics

Resources

License

Stars

Watchers

Forks

Languages