Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 254 additions & 0 deletions modalapi/external_midi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
# This file is part of pi-stomp.
#
# pi-stomp is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pi-stomp is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pi-stomp. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

import fnmatch
import logging
import time
from typing import TypedDict

import rtmidi


MidiMessage = list[int]


class PortConfig(TypedDict, total=False):
auto_detect: list[str]
port_index: int


class ExternalMidiConfig(TypedDict, total=False):
enabled: bool
send_delay_ms: int
ports: dict[str, PortConfig] # configured devices
messages: dict[str, list[MidiMessage]] # port_name -> list of MIDI messages


class ExternalMidiManager:
"""
Manages external MIDI device synchronization.
Sends MIDI messages to external devices when pedalboards are loaded.
"""

def __init__(self):
self.midi_ports: dict[str, rtmidi.MidiOut | None] = {}
self.port_configs: dict[str, PortConfig] = {}
self.messages: dict[str, list[MidiMessage]] = {}
self.enabled: bool = False
self.send_delay_ms: int = 10

def update_config(self, cfg: ExternalMidiConfig | None) -> None:
"""
Update configuration incrementally (can be called multiple times).
Only updates fields that are present.
"""
if cfg is None:
return

if "enabled" in cfg:
self.enabled = cfg["enabled"]
if self.enabled:
logging.debug("External MIDI enabled")
else:
logging.debug("External MIDI disabled")

if "send_delay_ms" in cfg:
self.send_delay_ms = cfg["send_delay_ms"]

if "ports" in cfg:
# Merge ports (port-level granularity)
self.port_configs.update(cfg["ports"])

if "messages" in cfg:
# Merge messages at port level
# This allows pedalboard config to override specific ports while keeping others default
self.messages.update(cfg["messages"])

def _get_available_ports(self) -> list[str]:
try:
temp_out = rtmidi.MidiOut()
ports = temp_out.get_ports()
del temp_out
return ports
except Exception as e:
logging.error(f"Failed to enumerate MIDI ports: {e}")
return []

def _find_port_by_name(self, port_config: PortConfig) -> int | None:
"""
Find MIDI port index matching a given config, returning its index if found.
"""
if "port_index" in port_config:
return port_config["port_index"]

# Auto-detect by name patterns
auto_detect = port_config.get("auto_detect", [])
if not auto_detect:
return None

available_ports = self._get_available_ports()
if not available_ports:
return None

# Search for matching ports using glob patterns
matched_ports = []
for pattern in auto_detect:
for idx, port_name in enumerate(available_ports):
# Case-insensitive glob matching
if fnmatch.fnmatch(port_name.lower(), pattern.lower()):
matched_ports.append((idx, port_name))

if not matched_ports:
logging.warning(f"No MIDI ports matched patterns: {auto_detect}")
return None

# Warn if multiple matches
if len(matched_ports) > 1:
port_names = [name for _, name in matched_ports]
logging.warning(
f"Multiple MIDI ports matched {auto_detect}: {port_names}. Using first match: {matched_ports[0][1]}"
)

selected_idx, selected_name = matched_ports[0]
logging.info(f"Auto-detected MIDI port: {selected_name} (index {selected_idx})")
return selected_idx

def _init_port(self, port_name: str) -> rtmidi.MidiOut | None:
if port_name in self.midi_ports:
return self.midi_ports[port_name]

port_config = self.port_configs.get(port_name)
if not port_config:
logging.warning(f"No configuration found for MIDI port: {port_name}")
self.midi_ports[port_name] = None
return None

port_idx = self._find_port_by_name(port_config)
if port_idx is None:
logging.warning(f"Could not find MIDI port for: {port_name}")
self.midi_ports[port_name] = None
return None

try:
midi_out = rtmidi.MidiOut()
midi_out.open_port(port_idx)
self.midi_ports[port_name] = midi_out
logging.info(f"Opened MIDI port: {port_name}")
return midi_out
except Exception as e:
logging.error(
f"Failed to open MIDI port {port_name} (index {port_idx}): {e}"
)
self.midi_ports[port_name] = None
return None

def _validate_midi_message(self, message: MidiMessage) -> bool:
if not isinstance(message, list) or len(message) < 2:
logging.warning(
f"Invalid MIDI message format (must be list with 2+ bytes): {message}"
)
return False

# Check status byte (must be 0x80-0xFF)
status = message[0]
if not (0x80 <= status <= 0xFF):
logging.warning(
f"Invalid MIDI status byte (must be 0x80-0xFF): 0x{status:02X}"
)
return False

# Check data bytes (must be 0x00-0x7F)
for i, byte in enumerate(message[1:], start=1):
if not (0x00 <= byte <= 0x7F):
logging.warning(
f"Invalid MIDI data byte at position {i} (must be 0x00-0x7F): 0x{byte:02X}"
)
return False

return True

def _send_messages(
self, port_name: str, messages: list[MidiMessage], delay_ms: int = 10
):
"""
Send MIDI messages to a port.

Args:
port_name: Name of port configuration.
messages: List of MIDI messages to send.
delay_ms: Delay between messages in milliseconds.
"""
midi_out = self._init_port(port_name)
if midi_out is None:
logging.warning(f"Skipping messages for unavailable port: {port_name}")
return

for i, message in enumerate(messages):
if not self._validate_midi_message(message):
logging.warning(
f"Skipping invalid MIDI message {i + 1}/{len(messages)}: {message}"
)
continue

try:
midi_out.send_message(message)
logging.debug(
f"Sent MIDI message to {port_name}: {[f'0x{b:02X}' for b in message]}"
)

# Delay between messages (except after last one)
if i < len(messages) - 1 and delay_ms > 0:
time.sleep(delay_ms / 1000.0)

except Exception as e:
logging.error(f"Failed to send MIDI message to {port_name}: {e}")

def send_messages_for_pedalboard(self) -> bool:
"""
Send external MIDI messages for current pedalboard configuration.
Configuration should have been set via update_config() before calling this.

Returns:
True if messages were sent successfully, False otherwise.
"""
if not self.enabled:
return False

if not self.messages:
return False

for port_name, messages in self.messages.items():
if not messages:
continue

logging.debug(f"Sending MIDI message(s) to {port_name}: {messages.join(', ')}")
self._send_messages(port_name, messages, self.send_delay_ms)

return True

def close(self):
"""Close ports and clean up."""
for port_name, midi_out in self.midi_ports.items():
if midi_out is not None:
try:
midi_out.close_port()
logging.debug(f"Closed MIDI port: {port_name}")
except Exception as e:
logging.warning(f"Error closing MIDI port {port_name}: {e}")

self.midi_ports.clear()
logging.info("External MIDI manager closed")
28 changes: 26 additions & 2 deletions modalapi/mod.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@
import sys
import yaml

from enum import Enum
from rtmidi.midiconstants import CONTROL_CHANGE

import common.token as Token
import common.util as util
import pistomp.switchstate as switchstate
import modalapi.pedalboard as Pedalboard
import modalapi.parameter as Parameter
import modalapi.wifi as Wifi
import modalapi.external_midi as ExternalMidi

from pistomp.analogmidicontrol import AnalogMidiControl
from pistomp.footswitch import Footswitch
from pistomp.handler import Handler
from enum import Enum
from pathlib import Path

#sys.path.append('/usr/lib/python3.5/site-packages') # TODO possibly /usr/local/modep/mod-ui
Expand Down Expand Up @@ -136,12 +139,20 @@ def __init__(self, audiocard, homedir):
self.current_menu = MenuType.MENU_NONE

# This file is modified when the pedalboard is changed via MOD UI
self.pedalboard_modification_file = "/home/pistomp/data/last.json"
self.data_dir = "/home/pistomp/data"
self.pedalboard_modification_file = os.path.join(self.data_dir, "last.json")
self.pedalboard_change_timestamp = os.path.getmtime(self.pedalboard_modification_file)\
if Path(self.pedalboard_modification_file).exists() else 0

self.wifi_manager = Wifi.WifiManager()

# External MIDI device synchronization
self.external_midi = None
try:
self.external_midi = ExternalMidi.ExternalMidiManager()
except Exception as e:
logging.warning(f"Failed to initialize external MIDI manager: {e}")

# Callback function map. Key is the user specified name, value is function from this handler
# Used for calling handler callbacks pointed to by names which may be user set in the config file
self.callbacks = {"set_mod_tap_tempo": self.set_mod_tap_tempo,
Expand All @@ -153,10 +164,14 @@ def __del__(self):
logging.info("Handler cleanup")
if self.wifi_manager:
del self.wifi_manager
if self.external_midi is not None:
self.external_midi.close()

def cleanup(self):
if self.lcd is not None:
self.lcd.cleanup()
if self.external_midi is not None:
self.external_midi.close()

# Container for dynamic data which is unique to the "current" pedalboard
# The self.current pointed above will point to this object which gets
Expand All @@ -183,6 +198,7 @@ def __init__(self, plugin):

def add_hardware(self, hardware):
self.hardware = hardware
hardware.external_midi = self.external_midi

def add_lcd(self, lcd):
self.lcd = lcd
Expand Down Expand Up @@ -534,6 +550,14 @@ def set_current_pedalboard(self, pedalboard):
self.load_current_presets()
self.update_lcd()

# Send external MIDI messages for this pedalboard
# Config was already updated by hardware.reinit(cfg) above
if self.external_midi is not None:
try:
self.external_midi.send_messages_for_pedalboard()
except Exception as e:
logging.warning(f"Failed to send external MIDI messages: {e}")

# Selection info
self.selectable_items.clear()
self.selectable_items.append((SelectedType.PEDALBOARD, None))
Expand Down
Loading