From 25cd09ea89ba2b6ce958d0017a476a098a49efc1 Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Sat, 10 Jan 2026 23:38:18 +0100 Subject: [PATCH 1/7] move stdio implementation into own method --- deltachat-rpc-server/src/main.rs | 65 +++++++++++++++++++------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index bb3c83b84d..b533f1546f 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -81,9 +81,46 @@ async fn main_impl() -> Result<()> { let accounts = Arc::new(RwLock::new(accounts)); let state = CommandApi::from_arc(accounts.clone()).await; + let main_cancel = CancellationToken::new(); + + let cancel = main_cancel.clone(); + let sigterm_task: JoinHandle> = tokio::spawn(async move { + #[cfg(target_family = "unix")] + { + let _cancel_guard = cancel.clone().drop_guard(); + tokio::select! { + _ = cancel.cancelled() => (), + _ = sigterm.recv() => { + log::info!("got SIGTERM"); + } + } + } + let _ = cancel; + Ok(()) + }); + + let (send_task, recv_task) = stdio_impl(state, main_cancel.clone()).await?; + + main_cancel.cancelled().await; + accounts.read().await.stop_io().await; + drop(accounts); + send_task.await??; + sigterm_task.await??; + recv_task.await??; + + Ok(()) +} + +async fn stdio_impl( + state: CommandApi, + main_cancel: CancellationToken, +) -> Result<( + JoinHandle>, + JoinHandle>, +)> { let (client, mut out_receiver) = RpcClient::new(); + let session = RpcSession::new(client.clone(), state.clone()); - let main_cancel = CancellationToken::new(); // Send task prints JSON responses to stdout. let cancel = main_cancel.clone(); @@ -103,22 +140,6 @@ async fn main_impl() -> Result<()> { Ok(()) }); - let cancel = main_cancel.clone(); - let sigterm_task: JoinHandle> = tokio::spawn(async move { - #[cfg(target_family = "unix")] - { - let _cancel_guard = cancel.clone().drop_guard(); - tokio::select! { - _ = cancel.cancelled() => (), - _ = sigterm.recv() => { - log::info!("got SIGTERM"); - } - } - } - let _ = cancel; - Ok(()) - }); - // Receiver task reads JSON requests from stdin. let cancel = main_cancel.clone(); let recv_task: JoinHandle> = tokio::spawn(async move { @@ -150,13 +171,5 @@ async fn main_impl() -> Result<()> { Ok(()) }); - main_cancel.cancelled().await; - accounts.read().await.stop_io().await; - drop(accounts); - drop(state); - send_task.await??; - sigterm_task.await??; - recv_task.await??; - - Ok(()) + Ok((send_task, recv_task)) } From e4d2e5075df10fd77b003838232558fe4270b390 Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Sun, 11 Jan 2026 03:58:44 +0100 Subject: [PATCH 2/7] fix error logging in deltachat-rpc-server --- deltachat-rpc-server/src/main.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index b533f1546f..25d9f45d47 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -24,6 +24,14 @@ use yerpc::{RpcClient, RpcSession}; #[tokio::main(flavor = "multi_thread")] async fn main() { + // Logs from `log` crate and traces from `tracing` crate + // are configurable with `RUST_LOG` environment variable + // and go to stderr to avoid interfering with JSON-RPC using stdout. + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .with_writer(std::io::stderr) + .init(); + let r = main_impl().await; // From tokio documentation: // "For technical reasons, stdin is implemented by using an ordinary blocking read on a separate @@ -64,14 +72,6 @@ async fn main_impl() -> Result<()> { #[cfg(target_family = "unix")] let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?; - // Logs from `log` crate and traces from `tracing` crate - // are configurable with `RUST_LOG` environment variable - // and go to stderr to avoid interfering with JSON-RPC using stdout. - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .with_writer(std::io::stderr) - .init(); - let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); log::info!("Starting with accounts directory `{path}`."); let writable = true; From fe5f59a744916310a17dbca297b7fe09a328b53f Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Sun, 11 Jan 2026 06:03:43 +0100 Subject: [PATCH 3/7] feat: add unix socket support to deltachat-rpc-server --- deltachat-rpc-server/README.md | 24 ++++++- deltachat-rpc-server/src/main.rs | 120 ++++++++++++++++++++++++++++++- 2 files changed, 141 insertions(+), 3 deletions(-) diff --git a/deltachat-rpc-server/README.md b/deltachat-rpc-server/README.md index a5108c4c8d..f6b0afa743 100644 --- a/deltachat-rpc-server/README.md +++ b/deltachat-rpc-server/README.md @@ -1,7 +1,7 @@ # Delta Chat RPC server This program provides a [JSON-RPC 2.0](https://www.jsonrpc.org/specification) interface to DeltaChat -over standard I/O. +over standard I/O or UNIX sockets. ## Install @@ -35,3 +35,25 @@ languages other than Rust, for example: Run `deltachat-rpc-server --version` to check the version of the server. Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) specification of the provided JSON-RPC API. + +### Usage over unix sockets + +> At this time this does not work on windows because rust does not support unix sockets on windows, yet (). + +Standard I/O is the default option, but you can also tell `deltachat-rpc-server` to use a unix socket instead. + +``` +deltachat-rpc-server --unix ./chatmail-core.sock +``` + +While this technically allows multiple processes to communicate with the same rpc server instance, +please note that there is still only event queue, so only one of these processed should read the events at a time. + +You can test it with socat: +```sh +socat - UNIX-CONNECT:./chatmail-core.sock +``` +Then paste the following jsonrpc command and press enter: +```json +{"method": "get_system_info","id": 1,"params": []} +``` diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 25d9f45d47..b8d0a5f8c0 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -2,9 +2,9 @@ //! Delta Chat core RPC server. //! //! It speaks JSON Lines over stdio. -use std::env; use std::path::PathBuf; use std::sync::Arc; +use std::{env, ffi::OsStr}; use anyhow::{anyhow, Context as _, Result}; use deltachat::constants::DC_VERSION_STR; @@ -14,6 +14,8 @@ use tokio::io::{self, AsyncBufReadExt, BufReader}; use tracing_subscriber::EnvFilter; use yerpc::RpcServer as _; +#[cfg(target_family = "unix")] +use tokio::net::UnixListener; #[cfg(target_family = "unix")] use tokio::signal::unix as signal_unix; @@ -46,6 +48,7 @@ async fn main() { async fn main_impl() -> Result<()> { let mut args = env::args_os(); let _program_name = args.next().context("no command line arguments found")?; + let mut unix_socket = None; if let Some(first_arg) = args.next() { if first_arg.to_str() == Some("--version") { if let Some(arg) = args.next() { @@ -59,6 +62,11 @@ async fn main_impl() -> Result<()> { } println!("{}", CommandApi::openrpc_specification()?); return Ok(()); + } else if first_arg.to_str() == Some("--unix") { + let Some(unix_socket_path) = args.next() else { + return Err(anyhow!("Unix Socket Path is missing")); + }; + unix_socket = Some(unix_socket_path) } else { return Err(anyhow!("Unrecognized option {first_arg:?}")); } @@ -99,7 +107,16 @@ async fn main_impl() -> Result<()> { Ok(()) }); - let (send_task, recv_task) = stdio_impl(state, main_cancel.clone()).await?; + let (send_task, recv_task) = if let Some(unix_socket_path) = unix_socket { + #[cfg(not(target_family = "unix"))] + { + bail!("unix sockets are only supported on unix based operating systems"); + } + #[cfg(target_family = "unix")] + unix_socket_impl(&unix_socket_path, state, main_cancel.clone()).await? + } else { + stdio_impl(state, main_cancel.clone()).await? + }; main_cancel.cancelled().await; accounts.read().await.stop_io().await; @@ -173,3 +190,102 @@ async fn stdio_impl( Ok((send_task, recv_task)) } + +#[cfg(target_family = "unix")] +async fn unix_socket_impl( + unix_socket_path: &OsStr, + state: CommandApi, + main_cancel: CancellationToken, +) -> Result<( + JoinHandle>, + JoinHandle>, +)> { + let cancel = main_cancel.clone(); + + let listener = UnixListener::bind(unix_socket_path)?; + + let recv_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); + let cancel = main_cancel.clone(); + + loop { + let connection_result = tokio::select! { + _ = cancel.cancelled() => break, + _ = tokio::signal::ctrl_c() => { + log::info!("got ctrl-c event"); + break; + } + connection_result = listener.accept() => connection_result + }; + match connection_result { + Ok((stream, addr)) => { + log::info!("new client {addr:?}"); + + let (client, mut out_receiver) = RpcClient::new(); + let session = RpcSession::new(client.clone(), state.clone()); + + let (read, mut write) = stream.into_split(); + let mut read_lines = BufReader::new(read).lines(); + + let cancel = main_cancel.clone(); + let _receive_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); + loop { + let message = tokio::select! { + _ = cancel.cancelled() => break, + _ = tokio::signal::ctrl_c() => { + log::info!("got ctrl-c event"); + break; + } + message = + read_lines.next_line() + => match message? { + None => { + log::info!("EOF reached on stdin"); + break; + } + Some(message) => message, + } + }; + log::trace!("RPC recv {message}"); + let session = session.clone(); + tokio::spawn(async move { + session.handle_incoming(&message).await; + }); + } + Ok(()) + }); + + let cancel = main_cancel.clone(); + let _send_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); + loop { + use tokio::io::AsyncWriteExt; + + let message = tokio::select! { + _ = cancel.cancelled() => break, + message = out_receiver.next() => match message { + None => break, + Some(message) => serde_json::to_string(&message)?, + } + }; + log::trace!("RPC send {message}"); + write.write_all(format!("{message}\n").as_bytes()).await?; + } + Ok(()) + }); + + // todo handle shutdown of _send_task and _receive_task + } + Err(e) => { + log::info!("connection failed {e:#}"); + } + } + } + // todo shutdown all remaining unix streams via their shutdown method + + Ok(()) + }); + + Ok((tokio::spawn(async move { Ok(()) }), recv_task)) +} From ae3d7af1528e3cd6d5ed6d7e6d09cf4476a3bfae Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Sun, 11 Jan 2026 06:17:26 +0100 Subject: [PATCH 4/7] enable logging in example usage to make it more interessting to look at --- deltachat-rpc-server/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deltachat-rpc-server/README.md b/deltachat-rpc-server/README.md index f6b0afa743..ac7d20a599 100644 --- a/deltachat-rpc-server/README.md +++ b/deltachat-rpc-server/README.md @@ -43,7 +43,7 @@ Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) spe Standard I/O is the default option, but you can also tell `deltachat-rpc-server` to use a unix socket instead. ``` -deltachat-rpc-server --unix ./chatmail-core.sock +RUST_LOG=info deltachat-rpc-server --unix ./chatmail-core.sock ``` While this technically allows multiple processes to communicate with the same rpc server instance, From 5337c5d945375c284b752da885f08c86e919b0c2 Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Sun, 11 Jan 2026 06:36:05 +0100 Subject: [PATCH 5/7] remove socket file after use --- deltachat-rpc-server/src/main.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index b8d0a5f8c0..bd0a78d68a 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -2,9 +2,9 @@ //! Delta Chat core RPC server. //! //! It speaks JSON Lines over stdio. +use std::env; use std::path::PathBuf; use std::sync::Arc; -use std::{env, ffi::OsStr}; use anyhow::{anyhow, Context as _, Result}; use deltachat::constants::DC_VERSION_STR; @@ -14,6 +14,8 @@ use tokio::io::{self, AsyncBufReadExt, BufReader}; use tracing_subscriber::EnvFilter; use yerpc::RpcServer as _; +#[cfg(target_family = "unix")] +use std::ffi::OsString; #[cfg(target_family = "unix")] use tokio::net::UnixListener; #[cfg(target_family = "unix")] @@ -113,7 +115,7 @@ async fn main_impl() -> Result<()> { bail!("unix sockets are only supported on unix based operating systems"); } #[cfg(target_family = "unix")] - unix_socket_impl(&unix_socket_path, state, main_cancel.clone()).await? + unix_socket_impl(unix_socket_path, state, main_cancel.clone()).await? } else { stdio_impl(state, main_cancel.clone()).await? }; @@ -193,7 +195,7 @@ async fn stdio_impl( #[cfg(target_family = "unix")] async fn unix_socket_impl( - unix_socket_path: &OsStr, + unix_socket_path: OsString, state: CommandApi, main_cancel: CancellationToken, ) -> Result<( @@ -202,7 +204,7 @@ async fn unix_socket_impl( )> { let cancel = main_cancel.clone(); - let listener = UnixListener::bind(unix_socket_path)?; + let listener = UnixListener::bind(&unix_socket_path)?; let recv_task: JoinHandle> = tokio::spawn(async move { let _cancel_guard = cancel.clone().drop_guard(); @@ -241,7 +243,7 @@ async fn unix_socket_impl( read_lines.next_line() => match message? { None => { - log::info!("EOF reached on stdin"); + log::info!("unix socket closed {addr:?}"); break; } Some(message) => message, @@ -274,15 +276,15 @@ async fn unix_socket_impl( } Ok(()) }); - - // todo handle shutdown of _send_task and _receive_task } Err(e) => { log::info!("connection failed {e:#}"); } } } - // todo shutdown all remaining unix streams via their shutdown method + + drop(listener); + std::fs::remove_file(unix_socket_path)?; Ok(()) }); From 3352b8d5f6dfcad05592717089b9284eb3eaa7b3 Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Sun, 11 Jan 2026 07:10:27 +0100 Subject: [PATCH 6/7] fix windows build issue --- deltachat-rpc-server/src/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index bd0a78d68a..7f3b3df32f 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -112,7 +112,9 @@ async fn main_impl() -> Result<()> { let (send_task, recv_task) = if let Some(unix_socket_path) = unix_socket { #[cfg(not(target_family = "unix"))] { - bail!("unix sockets are only supported on unix based operating systems"); + return Err(anyhow!( + "unix sockets are only supported on unix based operating systems" + )); } #[cfg(target_family = "unix")] unix_socket_impl(unix_socket_path, state, main_cancel.clone()).await? From 998ab6e298e805a8b530e36b7b5e1ffab37a4f0b Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Mon, 12 Jan 2026 16:06:19 +0100 Subject: [PATCH 7/7] add python class `RpcUnixSocket` (does not completely work yet) --- .../build/lib/deltachat_rpc_client/rpc.py | 258 ++++++++++++++++++ .../src/deltachat_rpc_client/__init__.py | 3 +- .../src/deltachat_rpc_client/rpc.py | 19 ++ deltachat-rpc-client/tests/test_rpc_unix.py | 37 +++ 4 files changed, 316 insertions(+), 1 deletion(-) create mode 100644 deltachat-rpc-client/build/lib/deltachat_rpc_client/rpc.py create mode 100644 deltachat-rpc-client/tests/test_rpc_unix.py diff --git a/deltachat-rpc-client/build/lib/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/build/lib/deltachat_rpc_client/rpc.py new file mode 100644 index 0000000000..6930e8cc4f --- /dev/null +++ b/deltachat-rpc-client/build/lib/deltachat_rpc_client/rpc.py @@ -0,0 +1,258 @@ +"""JSON-RPC client module.""" + +from __future__ import annotations + +import itertools +import json +import logging +import os +import subprocess +import sys +import socket +from queue import Empty, Queue +from threading import Thread +from typing import TYPE_CHECKING, Any, Iterator, Optional + +if TYPE_CHECKING: + import io + + +class JsonRpcError(Exception): + """JSON-RPC error.""" + + +class RpcShutdownError(JsonRpcError): + """Raised in RPC methods if the connection to server is closing.""" + + +class RpcMethod: + """RPC method.""" + + def __init__(self, rpc: "BaseRpc", name: str): + self.rpc = rpc + self.name = name + + def __call__(self, *args) -> Any: + """Call JSON-RPC method synchronously.""" + future = self.future(*args) + return future() + + def future(self, *args) -> Any: + """Call JSON-RPC method asynchronously.""" + request_id = next(self.rpc.id_iterator) + request = { + "jsonrpc": "2.0", + "method": self.name, + "params": args, + "id": request_id, + } + self.rpc.request_results[request_id] = queue = Queue() + self.rpc.request_queue.put(request) + + def rpc_future(): + """Wait for the request to receive a result.""" + response = queue.get() + if response is None: + raise RpcShutdownError(f"no response for {request_id}/{self.name} while rpc is shutting down") + if "error" in response: + raise JsonRpcError(response["error"]) + return response.get("result", None) + + return rpc_future + + +class BaseRpc: + """Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods + from subclasses to work concretely.""" + + def __init__(self): + self.id_iterator: Iterator[int] + self.event_queues: dict[int, Queue] + # Map from request ID to a Queue which provides a single result + self.request_results: dict[int, Queue] + self.request_queue: Queue[Any] + self.server_stdin: io.Writer[bytes] + self.server_stdout: io.Reader[bytes] + self.closing: bool + self.reader_thread: Thread + self.writer_thread: Thread + self.events_thread: Thread + + def start(self) -> None: + """Start RPC server subprocess.""" + self.server_stdout, self.server_stdin = self.connect_to_server() + self.id_iterator = itertools.count(start=1) + self.event_queues = {} + self.request_results = {} + self.request_queue = Queue() + self.closing = False + self.reader_thread = Thread(target=self.reader_loop) + self.reader_thread.start() + self.writer_thread = Thread(target=self.writer_loop) + self.writer_thread.start() + self.events_thread = Thread(target=self.events_loop) + self.events_thread.start() + + def close(self) -> None: + """Terminate RPC server process and wait until the reader loop finishes.""" + self.closing = True + self.disconnect_from_server() + self.reader_thread.join() + self.events_thread.join() + self.request_queue.put(None) + self.writer_thread.join() + + def __enter__(self): + self.start() + return self + + def __exit__(self, _exc_type, _exc, _tb): + self.close() + + def reader_loop(self) -> None: + """Process JSON-RPC responses from the RPC server process output.""" + try: + while line := self.server_stdout.readline(): + response = json.loads(line) + if "id" in response: + response_id = response["id"] + self.request_results.pop(response_id).put(response) + else: + logging.warning("Got a response without ID: %s", response) + except Exception: + # Log an exception if the reader loop dies. + logging.exception("Exception in the reader loop") + + # terminate pending rpc requests because no responses can arrive anymore + for queue in self.request_results.values(): + queue.put(None) + + def writer_loop(self) -> None: + """Writer loop ensuring only a single thread writes requests.""" + try: + while request := self.request_queue.get(): + data = (json.dumps(request) + "\n").encode() + self.server_stdin.write(data) + self.server_stdin.flush() + + except Exception: + # Log an exception if the writer loop dies. + logging.exception("Exception in the writer loop") + + def get_queue(self, account_id: int) -> Queue: + """Get event queue corresponding to the given account ID.""" + if account_id not in self.event_queues: + self.event_queues[account_id] = Queue() + return self.event_queues[account_id] + + def events_loop(self) -> None: + """Request new events and distributes them between queues.""" + try: + while True: + if self.closing: + return + try: + event = self.get_next_event() + except RpcShutdownError: + return + account_id = event["contextId"] + queue = self.get_queue(account_id) + event = event["event"] + logging.debug("account_id=%d got an event %s", account_id, event) + queue.put(event) + except Exception: + # Log an exception if the event loop dies. + logging.exception("Exception in the event loop") + + def wait_for_event(self, account_id: int) -> Optional[dict]: + """Wait for the next event from the given account and returns it.""" + queue = self.get_queue(account_id) + return queue.get() + + def clear_all_events(self, account_id: int): + """Remove all queued-up events for a given account. Useful for tests.""" + queue = self.get_queue(account_id) + try: + while True: + queue.get_nowait() + except Empty: + pass + + def __getattr__(self, attr: str): + return RpcMethod(self, attr) + + +class RpcSubprocess(BaseRpc): + """RPC client that runs and connects to a deltachat-rpc-server in a subprocess.""" + + def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"): + """Initialize RPC client. + + The given arguments will be passed to subprocess.Popen(). + """ + super(RpcSubprocess, self).__init__() + self._accounts_dir = accounts_dir + self.rpc_server_path: str = rpc_server_path + + def connect_to_server(self): + popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} + if sys.version_info >= (3, 11): + # Prevent subprocess from capturing SIGINT. + popen_kwargs["process_group"] = 0 + else: + # `process_group` is not supported before Python 3.11. + popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509 + + if self._accounts_dir: + popen_kwargs["env"] = os.environ.copy() + popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir) + + process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) + return process.stdout, process.stdin + + def disconnect_from_server(self): + self.stop_io_for_all_accounts() + self.server_stdin.close() + + +# backward compatibility +Rpc = RpcSubprocess + + +class RpcFIFO(BaseRpc): + """RPC client that runs and connects to a deltachat-rpc-server through FIFO files.""" + + def __init__(self, fn_request_fifo: str, fn_response_fifo: str): + super(RpcFIFO, self).__init__() + self.fn_request_fifo = fn_request_fifo + self.fn_response_fifo = fn_response_fifo + + def connect_to_server(self): + server_stdin = open(self.fn_request_fifo, "wb") # noqa + server_stdout = open(self.fn_response_fifo, "rb") # noqa + return server_stdout, server_stdin + + def disconnect_from_server(self): + self.server_stdin.close() + self.server_stdout.close() + +class RpcUnixSocket(BaseRpc): + """RPC client that connects to a deltachat-rpc-server through FIFO files.""" + + def __init__(self, socket_path: str): + super(RpcUnixSocket, self).__init__() + self.socket_path = socket_path + self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + def connect_to_server(self): + print(str(self.socket_path)) + self.client.connect(self.socket_path) + print("c1") + writer = self.client.makefile("wb") + reader = self.client.makefile("rb") + print("c2") + assert False + return reader, writer + + def disconnect_from_server(self): + self.client.close() diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py index 7a22548161..514c311792 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py @@ -8,7 +8,7 @@ from .contact import Contact from .deltachat import DeltaChat from .message import Message -from .rpc import Rpc, RpcFIFO +from .rpc import Rpc, RpcFIFO, RpcUnixSocket __all__ = [ "Account", @@ -23,6 +23,7 @@ "SpecialContactId", "Rpc", "RpcFIFO", + "RpcUnixSocket", "run_bot_cli", "run_client_cli", ] diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index c588d971a1..bfd3add0d0 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -8,6 +8,7 @@ import os import subprocess import sys +import socket from queue import Empty, Queue from threading import Thread from typing import TYPE_CHECKING, Any, Iterator, Optional @@ -234,3 +235,21 @@ def connect_to_server(self): def disconnect_from_server(self): self.server_stdin.close() self.server_stdout.close() + +class RpcUnixSocket(BaseRpc): + """RPC client that connects to a deltachat-rpc-server through FIFO files.""" + + def __init__(self, socket_path: str): + super(RpcUnixSocket, self).__init__() + self.socket_path = socket_path + self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + def connect_to_server(self): + print(str(self.socket_path)) + self.client.connect(self.socket_path) + writer = self.client.makefile("wb") + reader = self.client.makefile("rb") + return reader, writer + + def disconnect_from_server(self): + self.client.close() diff --git a/deltachat-rpc-client/tests/test_rpc_unix.py b/deltachat-rpc-client/tests/test_rpc_unix.py new file mode 100644 index 0000000000..d076433e2e --- /dev/null +++ b/deltachat-rpc-client/tests/test_rpc_unix.py @@ -0,0 +1,37 @@ +from os import environ +import platform # noqa +import signal +import subprocess +from sys import stderr +from time import sleep + +import pytest + +from deltachat_rpc_client import DeltaChat, RpcUnixSocket + + +@pytest.mark.skipif("platform.system() == 'Windows'") +def test_rpc_unix(tmp_path): + socket_file = "/tmp/chatmail.sock" # path needs to be relative or short + + path = environ.get("PATH") + assert path is not None + + popen = subprocess.Popen( + f"deltachat-rpc-server --unix {socket_file}", + shell=True, + env=dict( + DC_ACCOUNTS_PATH=f"{tmp_path}/accounts/test", + rust_log="info", + PATH=path + ) + ) + + sleep(1) # wait until socket exists # TODO this should not be needed + + rpc = RpcUnixSocket(socket_path=socket_file) + with rpc: + dc = DeltaChat(rpc) + assert dc.rpc.get_system_info()["deltachat_core_version"] is not None + popen.send_signal(signal.SIGINT) + popen.wait()