Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions deltachat-rpc-client/build/lib/deltachat_rpc_client/rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
"""JSON-RPC client module."""

from __future__ import annotations

import itertools
import json
import logging
import os
import subprocess
import sys
import socket
from queue import Empty, Queue
from threading import Thread
from typing import TYPE_CHECKING, Any, Iterator, Optional

if TYPE_CHECKING:
import io


class JsonRpcError(Exception):
"""JSON-RPC error."""


class RpcShutdownError(JsonRpcError):
"""Raised in RPC methods if the connection to server is closing."""


class RpcMethod:
"""RPC method."""

def __init__(self, rpc: "BaseRpc", name: str):
self.rpc = rpc
self.name = name

def __call__(self, *args) -> Any:
"""Call JSON-RPC method synchronously."""
future = self.future(*args)
return future()

def future(self, *args) -> Any:
"""Call JSON-RPC method asynchronously."""
request_id = next(self.rpc.id_iterator)
request = {
"jsonrpc": "2.0",
"method": self.name,
"params": args,
"id": request_id,
}
self.rpc.request_results[request_id] = queue = Queue()
self.rpc.request_queue.put(request)

def rpc_future():
"""Wait for the request to receive a result."""
response = queue.get()
if response is None:
raise RpcShutdownError(f"no response for {request_id}/{self.name} while rpc is shutting down")
if "error" in response:
raise JsonRpcError(response["error"])
return response.get("result", None)

return rpc_future


class BaseRpc:
"""Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods
from subclasses to work concretely."""

def __init__(self):
self.id_iterator: Iterator[int]
self.event_queues: dict[int, Queue]
# Map from request ID to a Queue which provides a single result
self.request_results: dict[int, Queue]
self.request_queue: Queue[Any]
self.server_stdin: io.Writer[bytes]
self.server_stdout: io.Reader[bytes]
self.closing: bool
self.reader_thread: Thread
self.writer_thread: Thread
self.events_thread: Thread

def start(self) -> None:
"""Start RPC server subprocess."""
self.server_stdout, self.server_stdin = self.connect_to_server()
self.id_iterator = itertools.count(start=1)
self.event_queues = {}
self.request_results = {}
self.request_queue = Queue()
self.closing = False
self.reader_thread = Thread(target=self.reader_loop)
self.reader_thread.start()
self.writer_thread = Thread(target=self.writer_loop)
self.writer_thread.start()
self.events_thread = Thread(target=self.events_loop)
self.events_thread.start()

def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes."""
self.closing = True
self.disconnect_from_server()
self.reader_thread.join()
self.events_thread.join()
self.request_queue.put(None)
self.writer_thread.join()

def __enter__(self):
self.start()
return self

def __exit__(self, _exc_type, _exc, _tb):
self.close()

def reader_loop(self) -> None:
"""Process JSON-RPC responses from the RPC server process output."""
try:
while line := self.server_stdout.readline():
response = json.loads(line)
if "id" in response:
response_id = response["id"]
self.request_results.pop(response_id).put(response)
else:
logging.warning("Got a response without ID: %s", response)
except Exception:
# Log an exception if the reader loop dies.
logging.exception("Exception in the reader loop")

# terminate pending rpc requests because no responses can arrive anymore
for queue in self.request_results.values():
queue.put(None)

def writer_loop(self) -> None:
"""Writer loop ensuring only a single thread writes requests."""
try:
while request := self.request_queue.get():
data = (json.dumps(request) + "\n").encode()
self.server_stdin.write(data)
self.server_stdin.flush()

except Exception:
# Log an exception if the writer loop dies.
logging.exception("Exception in the writer loop")

def get_queue(self, account_id: int) -> Queue:
"""Get event queue corresponding to the given account ID."""
if account_id not in self.event_queues:
self.event_queues[account_id] = Queue()
return self.event_queues[account_id]

def events_loop(self) -> None:
"""Request new events and distributes them between queues."""
try:
while True:
if self.closing:
return
try:
event = self.get_next_event()
except RpcShutdownError:
return
account_id = event["contextId"]
queue = self.get_queue(account_id)
event = event["event"]
logging.debug("account_id=%d got an event %s", account_id, event)
queue.put(event)
except Exception:
# Log an exception if the event loop dies.
logging.exception("Exception in the event loop")

def wait_for_event(self, account_id: int) -> Optional[dict]:
"""Wait for the next event from the given account and returns it."""
queue = self.get_queue(account_id)
return queue.get()

def clear_all_events(self, account_id: int):
"""Remove all queued-up events for a given account. Useful for tests."""
queue = self.get_queue(account_id)
try:
while True:
queue.get_nowait()
except Empty:
pass

def __getattr__(self, attr: str):
return RpcMethod(self, attr)


class RpcSubprocess(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server in a subprocess."""

def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"):
"""Initialize RPC client.

The given arguments will be passed to subprocess.Popen().
"""
super(RpcSubprocess, self).__init__()
self._accounts_dir = accounts_dir
self.rpc_server_path: str = rpc_server_path

def connect_to_server(self):
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509

if self._accounts_dir:
popen_kwargs["env"] = os.environ.copy()
popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir)

process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
return process.stdout, process.stdin

def disconnect_from_server(self):
self.stop_io_for_all_accounts()
self.server_stdin.close()


# backward compatibility
Rpc = RpcSubprocess


class RpcFIFO(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server through FIFO files."""

def __init__(self, fn_request_fifo: str, fn_response_fifo: str):
super(RpcFIFO, self).__init__()
self.fn_request_fifo = fn_request_fifo
self.fn_response_fifo = fn_response_fifo

def connect_to_server(self):
server_stdin = open(self.fn_request_fifo, "wb") # noqa
server_stdout = open(self.fn_response_fifo, "rb") # noqa
return server_stdout, server_stdin

def disconnect_from_server(self):
self.server_stdin.close()
self.server_stdout.close()

class RpcUnixSocket(BaseRpc):
"""RPC client that connects to a deltachat-rpc-server through FIFO files."""

def __init__(self, socket_path: str):
super(RpcUnixSocket, self).__init__()
self.socket_path = socket_path
self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

def connect_to_server(self):
print(str(self.socket_path))
self.client.connect(self.socket_path)
print("c1")
writer = self.client.makefile("wb")
reader = self.client.makefile("rb")
print("c2")
assert False
return reader, writer

def disconnect_from_server(self):
self.client.close()
3 changes: 2 additions & 1 deletion deltachat-rpc-client/src/deltachat_rpc_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .contact import Contact
from .deltachat import DeltaChat
from .message import Message
from .rpc import Rpc, RpcFIFO
from .rpc import Rpc, RpcFIFO, RpcUnixSocket

__all__ = [
"Account",
Expand All @@ -23,6 +23,7 @@
"SpecialContactId",
"Rpc",
"RpcFIFO",
"RpcUnixSocket",
"run_bot_cli",
"run_client_cli",
]
19 changes: 19 additions & 0 deletions deltachat-rpc-client/src/deltachat_rpc_client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import subprocess
import sys
import socket
from queue import Empty, Queue
from threading import Thread
from typing import TYPE_CHECKING, Any, Iterator, Optional
Expand Down Expand Up @@ -234,3 +235,21 @@ def connect_to_server(self):
def disconnect_from_server(self):
self.server_stdin.close()
self.server_stdout.close()

class RpcUnixSocket(BaseRpc):
"""RPC client that connects to a deltachat-rpc-server through FIFO files."""

def __init__(self, socket_path: str):
super(RpcUnixSocket, self).__init__()
self.socket_path = socket_path
self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

def connect_to_server(self):
print(str(self.socket_path))
self.client.connect(self.socket_path)
writer = self.client.makefile("wb")
reader = self.client.makefile("rb")
return reader, writer

def disconnect_from_server(self):
self.client.close()
37 changes: 37 additions & 0 deletions deltachat-rpc-client/tests/test_rpc_unix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from os import environ
import platform # noqa
import signal
import subprocess
from sys import stderr
from time import sleep

import pytest

from deltachat_rpc_client import DeltaChat, RpcUnixSocket


@pytest.mark.skipif("platform.system() == 'Windows'")
def test_rpc_unix(tmp_path):
socket_file = "/tmp/chatmail.sock" # path needs to be relative or short

path = environ.get("PATH")
assert path is not None

popen = subprocess.Popen(
f"deltachat-rpc-server --unix {socket_file}",
shell=True,
env=dict(
DC_ACCOUNTS_PATH=f"{tmp_path}/accounts/test",
rust_log="info",
PATH=path
)
)

sleep(1) # wait until socket exists # TODO this should not be needed

rpc = RpcUnixSocket(socket_path=socket_file)
with rpc:
dc = DeltaChat(rpc)
assert dc.rpc.get_system_info()["deltachat_core_version"] is not None
popen.send_signal(signal.SIGINT)
popen.wait()
24 changes: 23 additions & 1 deletion deltachat-rpc-server/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Delta Chat RPC server

This program provides a [JSON-RPC 2.0](https://www.jsonrpc.org/specification) interface to DeltaChat
over standard I/O.
over standard I/O or UNIX sockets.

## Install

Expand Down Expand Up @@ -35,3 +35,25 @@ languages other than Rust, for example:

Run `deltachat-rpc-server --version` to check the version of the server.
Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) specification of the provided JSON-RPC API.

### Usage over unix sockets

> At this time this does not work on windows because rust does not support unix sockets on windows, yet (<https://github.com/rust-lang/rust/issues/150487>).

Standard I/O is the default option, but you can also tell `deltachat-rpc-server` to use a unix socket instead.

```
RUST_LOG=info deltachat-rpc-server --unix ./chatmail-core.sock
```

While this technically allows multiple processes to communicate with the same rpc server instance,
please note that there is still only event queue, so only one of these processed should read the events at a time.

You can test it with socat:
```sh
socat - UNIX-CONNECT:./chatmail-core.sock
```
Then paste the following jsonrpc command and press enter:
```json
{"method": "get_system_info","id": 1,"params": []}
```
Loading
Loading