Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6,557 changes: 0 additions & 6,557 deletions pixi.lock

This file was deleted.

12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ dev = [
"redis",
"pytest-asyncio",
"pytest-mock",
"tiled[minimal-server]"]
"tiled[minimal-server]",
"typer"]

zmq = [
"pyzmq"
Expand All @@ -49,6 +50,15 @@ tiled = [
"tiled[client]"
]

file-watch = [
"watchfiles >= 0.21.0"
]


[project.scripts]
watcher = "arroyopy.app.redis_file_watcher:app"


[build-system]
build-backend = "hatchling.build"
requires = ["hatchling"]
Expand Down
62 changes: 62 additions & 0 deletions src/_test/test_file_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import asyncio
from pathlib import Path

import pytest

from arroyopy import Operator, Publisher
from arroyopy.files import FileWatcherListener


class MockPublisher(Publisher):
def __init__(self):
self.messages = []

async def publish(self, message):
self.messages.append(message)


class MockOperator(Operator):
def __init__(self, publisher: MockPublisher):
self.publisher = publisher

async def process(self, message):
await self.publisher.publish(message)


@pytest.mark.asyncio
async def test_filewatcher_detects_file_in_subdirectory(tmp_path: Path):
# Set up directory and mocks
publisher = MockPublisher()
operator = MockOperator(publisher)
listener = FileWatcherListener(str(tmp_path), operator)

# Run the listener in the background
task = asyncio.create_task(listener.start())

# Give watchgod a moment to initialize
await asyncio.sleep(0.5)

# Create subdirectory and a valid .gb file in it
subdir = tmp_path / "nested"
subdir.mkdir()
await asyncio.sleep(0.2) # Allow watcher to pick up subdir creation

gb_file = subdir / "test_file.gb"
gb_file.write_text("mock content")

# Allow time for the event to be picked up
await asyncio.sleep(1.5)

# Cancel the watcher task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

# Validate publisher received the notification
assert len(publisher.messages) == 2
assert Path(publisher.messages[0].file_path) == subdir
assert publisher.messages[0].is_directory is True
assert Path(publisher.messages[1].file_path) == gb_file
assert publisher.messages[1].is_directory is False
8 changes: 8 additions & 0 deletions src/arroyopy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .listener import Listener
from .operator import Operator
from .publisher import Publisher

__all__ = ["Listener", "Operator", "Publisher"]

# Make flake8 happy by using the names
_ = Listener, Operator, Publisher
Empty file added src/arroyopy/app/__init__.py
Empty file.
79 changes: 79 additions & 0 deletions src/arroyopy/app/redis_file_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio
import logging
from pathlib import Path
from typing import Optional

import typer

from arroyopy import Operator, Publisher
from arroyopy.files import FileWatcherListener
from arroyopy.redis import RedisPublisher

# -----------------------------------------------------------------------------
# Logging Setup
# -----------------------------------------------------------------------------
logger = logging.getLogger("data_watcher")


def setup_logging(log_level: str = "INFO"):
level = getattr(logging, log_level.upper(), logging.INFO)
logger.setLevel(level)

if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

logger.propagate = False # Prevent duplication through root logger


# -----------------------------------------------------------------------------
# CLI App
# -----------------------------------------------------------------------------
app = typer.Typer(help="Watch a directory and publish new .gb files to Redis.")


class FileWatcherOperator(Operator):
def __init__(self, publisher: Publisher):
self.publisher = publisher

async def process(self, message):
logger.info(f"Processing message: {message}")
await self.publisher.publish(message)


class NullPublisher(Publisher):
async def publish(self, message):
logger.debug(f"NullPublisher: {message} - No action taken.")


@app.command()
def main(
directory: Path = typer.Argument(..., help="Directory to watch for new files"),
redis_host: Optional[str] = typer.Option(None, help="Redis host"),
redis_port: Optional[int] = typer.Option(None, help="Redis port"),
log_level: str = typer.Option(
"INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR)"
),
):
setup_logging(log_level)

loop = asyncio.get_event_loop()

if redis_host:
publisher = RedisPublisher.from_client(redis_host, redis_port)
logger.info(
f"Using Redis publisher with host {redis_host} and port {redis_port}"
)
else:
publisher = NullPublisher()
logger.info("Using default null publisher")

operator = FileWatcherOperator(publisher)
listener = FileWatcherListener(str(directory), operator)
loop.run_until_complete(listener.start())


if __name__ == "__main__":
app()
37 changes: 37 additions & 0 deletions src/arroyopy/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
from pathlib import Path

from pydantic import BaseModel
from watchfiles import awatch

from arroyopy import Listener, Operator
from arroyopy.schemas import Message

logger = logging.getLogger("data_watcher")
logger.setLevel("INFO")
logger.addHandler(logging.StreamHandler())


class FileWatcherMessage(Message, BaseModel):
file_path: str
is_directory: bool


class FileWatcherListener(Listener):
def __init__(self, directory: str, operator: Operator):
self.directory = directory
self.operator = operator

async def start(self):
logger.info(f"🔍 Watching directory recursively: {self.directory}")
async for changes in awatch(self.directory):
for change_type, path_str in changes:
path = Path(path_str)
logger.debug(f"📦 Detected: {change_type} on {path}")
message = FileWatcherMessage(
file_path=str(path), is_directory=path.is_dir()
)
await self.operator.process(message)

async def stop(self):
pass