diff --git a/14 - Databases/Momox/notify_by_new_ticket.py b/14 - Databases/Momox/notify_by_new_ticket.py new file mode 100644 index 000000000..b411f2323 --- /dev/null +++ b/14 - Databases/Momox/notify_by_new_ticket.py @@ -0,0 +1,218 @@ +""" +Jira Issue Monitor für Momox +Überwacht kontinuierlich neue Jira-Tickets und benachrichtigt per Sprachausgabe. +""" + +import os +import time +from typing import List, Optional +import requests +from requests.auth import HTTPBasicAuth +from gtts import gTTS + +# ===== KONFIGURATION ===== +# Jira API Endpoints +JIRA_BASE_URL = 'https://momox.atlassian.net/rest/api/3' +JIRA_SEARCH_URL = f'{JIRA_BASE_URL}/search' + +# Authentifizierung (aus Umgebungsvariablen laden) +JIRA_TOKEN = os.getenv('JIRA_API_TOKEN') +JIRA_EMAIL = os.getenv('JIRA_EMAIL', 'falk.liebezeit@momox.biz') + +if not JIRA_TOKEN: + raise ValueError("JIRA_API_TOKEN Umgebungsvariable nicht gesetzt! Bitte setzen Sie: export JIRA_API_TOKEN='your_token'") + +# Audio-Konfiguration +AUDIO_OUTPUT_PATH = "/home/momox/output.wav" +os.environ['AUDIODEV'] = 'default' +os.environ['LD_PRELOAD'] = '/usr/lib/x86_64-linux-gnu/libaoss.so' + +# Monitoring-Konfiguration +PROJECT_KEY = 'LEJSD' +TARGET_STATUS = "To Do" +POLL_INTERVAL = 5 # Sekunden zwischen Abfragen +MAX_RESULTS = 10 # Maximale Anzahl der abzurufenden Issues + +# HTTP-Header für Jira-API +HEADERS = {'Accept': 'application/json'} + +print(f"AUDIODEV: {os.environ.get('AUDIODEV')}") +print(f"LD_PRELOAD: {os.environ.get('LD_PRELOAD')}") + + +def fetch_recent_issues() -> Optional[List[dict]]: + """ + Ruft die neuesten Jira-Issues ab. + + Returns: + Liste von Issues oder None bei Fehler + """ + query = { + 'jql': 'ORDER BY created DESC', + 'maxResults': MAX_RESULTS, + 'fields': ['key', 'summary', 'status', 'created'] + } + + try: + response = requests.get( + url=JIRA_SEARCH_URL, + params=query, + headers=HEADERS, + auth=HTTPBasicAuth(username=JIRA_EMAIL, password=JIRA_TOKEN), + timeout=10 + ) + response.raise_for_status() + data = response.json() + return data.get('issues', []) + except requests.RequestException as e: + print(f"Fehler beim Abrufen der Issues: {e}") + return None + + +def check_issue_status(issue_number: int) -> Optional[str]: + """ + Überprüft den aktuellen Status eines Issues. + + Args: + issue_number: Die Issue-Nummer (nur Zahl, ohne Projekt-Präfix) + + Returns: + Status-String oder None bei Fehler + """ + issue_key = f'{PROJECT_KEY}-{issue_number}' + + try: + response = requests.get( + url=f'{JIRA_BASE_URL}/issue/{issue_key}', + headers=HEADERS, + auth=HTTPBasicAuth(username=JIRA_EMAIL, password=JIRA_TOKEN), + timeout=10 + ) + response.raise_for_status() + data = response.json() + return data.get('fields', {}).get('status', {}).get('name') + except requests.RequestException as e: + print(f"Fehler beim Abrufen von Issue {issue_key}: {e}") + return None + + +def filter_open_issues(issues: List[dict]) -> List[int]: + """ + Filtert Issues nach Projekt und Status "To Do". + + Args: + issues: Liste der zu filternden Issues + + Returns: + Liste der Issue-Nummern im Status "To Do" + """ + open_issues = [] + + for issue in issues: + issue_key = issue['key'] + issue_status = issue['fields']['status']['name'] + + # Issue-Key aufteilen (z.B. "LEJSD-123" -> ["LEJSD", "123"]) + parts = issue_key.split('-') + + if len(parts) == 2 and parts[0] == PROJECT_KEY and issue_status == TARGET_STATUS: + issue_number = int(parts[1]) + print(f'[{issue_number}] gefunden!') + open_issues.append(issue_number) + + return open_issues + + +def verify_issue_statuses(issue_numbers: List[int]) -> List[int]: + """ + Verifiziert den Status jedes Issues und entfernt nicht mehr offene Issues. + + Args: + issue_numbers: Liste der zu überprüfenden Issue-Nummern + + Returns: + Liste der bestätigten offenen Issue-Nummern + """ + verified_issues = [] + + for issue_number in issue_numbers: + status = check_issue_status(issue_number) + print(f'Issue [{issue_number}] Status: {status}') + + if status == TARGET_STATUS: + verified_issues.append(issue_number) + else: + print(f'[{issue_number}] wird entfernt ...') + + return verified_issues + + +def play_audio_notification(issue_count: int) -> None: + """ + Erstellt und spielt eine Sprachbenachrichtigung ab. + + Args: + issue_count: Anzahl der gefundenen Issues + """ + if issue_count == 0: + return + + # Grammatikalisch korrekte Nachricht erstellen + text = f"{issue_count} neue Störung gefunden" if issue_count == 1 else f"{issue_count} neue Störungen gefunden" + print(f"{issue_count} Issue(s) gefunden") + + try: + # TTS-Datei erstellen + tts = gTTS(text=text, lang='de') + tts.save(AUDIO_OUTPUT_PATH) + + if os.path.exists(AUDIO_OUTPUT_PATH): + print("Audio-Datei erfolgreich erstellt.") + # Audio abspielen + os.system(f'mpg321 {AUDIO_OUTPUT_PATH}') + else: + print("Fehler: Audio-Datei konnte nicht erstellt werden.") + except Exception as e: + print(f"Fehler bei der Audio-Wiedergabe: {e}") + + +def monitor_issues() -> None: + """ + Hauptschleife: Überwacht kontinuierlich Jira-Issues und benachrichtigt bei neuen Tickets. + """ + print(f"Starte Jira-Monitor für Projekt {PROJECT_KEY}...") + + while True: + try: + # Neueste Issues abrufen + issues = fetch_recent_issues() + + if issues: + # Nach offenen Issues filtern + open_issues = filter_open_issues(issues) + + # Status jedes Issues verifizieren + verified_open_issues = verify_issue_statuses(open_issues) + + # Bei gefundenen Issues Benachrichtigung abspielen + play_audio_notification(len(verified_open_issues)) + + print(f'Offene Tickets: {verified_open_issues}') + + # Wartezeit bis zur nächsten Abfrage + time.sleep(POLL_INTERVAL) + + except KeyboardInterrupt: + print("\nMonitoring beendet.") + break + except Exception as e: + print(f"Unerwarteter Fehler: {e}") + time.sleep(POLL_INTERVAL) + + +if __name__ == "__main__": + monitor_issues() + + + + diff --git a/14 - Databases/Momox/ring.py b/14 - Databases/Momox/ring.py new file mode 100644 index 000000000..b411f2323 --- /dev/null +++ b/14 - Databases/Momox/ring.py @@ -0,0 +1,218 @@ +""" +Jira Issue Monitor für Momox +Überwacht kontinuierlich neue Jira-Tickets und benachrichtigt per Sprachausgabe. +""" + +import os +import time +from typing import List, Optional +import requests +from requests.auth import HTTPBasicAuth +from gtts import gTTS + +# ===== KONFIGURATION ===== +# Jira API Endpoints +JIRA_BASE_URL = 'https://momox.atlassian.net/rest/api/3' +JIRA_SEARCH_URL = f'{JIRA_BASE_URL}/search' + +# Authentifizierung (aus Umgebungsvariablen laden) +JIRA_TOKEN = os.getenv('JIRA_API_TOKEN') +JIRA_EMAIL = os.getenv('JIRA_EMAIL', 'falk.liebezeit@momox.biz') + +if not JIRA_TOKEN: + raise ValueError("JIRA_API_TOKEN Umgebungsvariable nicht gesetzt! Bitte setzen Sie: export JIRA_API_TOKEN='your_token'") + +# Audio-Konfiguration +AUDIO_OUTPUT_PATH = "/home/momox/output.wav" +os.environ['AUDIODEV'] = 'default' +os.environ['LD_PRELOAD'] = '/usr/lib/x86_64-linux-gnu/libaoss.so' + +# Monitoring-Konfiguration +PROJECT_KEY = 'LEJSD' +TARGET_STATUS = "To Do" +POLL_INTERVAL = 5 # Sekunden zwischen Abfragen +MAX_RESULTS = 10 # Maximale Anzahl der abzurufenden Issues + +# HTTP-Header für Jira-API +HEADERS = {'Accept': 'application/json'} + +print(f"AUDIODEV: {os.environ.get('AUDIODEV')}") +print(f"LD_PRELOAD: {os.environ.get('LD_PRELOAD')}") + + +def fetch_recent_issues() -> Optional[List[dict]]: + """ + Ruft die neuesten Jira-Issues ab. + + Returns: + Liste von Issues oder None bei Fehler + """ + query = { + 'jql': 'ORDER BY created DESC', + 'maxResults': MAX_RESULTS, + 'fields': ['key', 'summary', 'status', 'created'] + } + + try: + response = requests.get( + url=JIRA_SEARCH_URL, + params=query, + headers=HEADERS, + auth=HTTPBasicAuth(username=JIRA_EMAIL, password=JIRA_TOKEN), + timeout=10 + ) + response.raise_for_status() + data = response.json() + return data.get('issues', []) + except requests.RequestException as e: + print(f"Fehler beim Abrufen der Issues: {e}") + return None + + +def check_issue_status(issue_number: int) -> Optional[str]: + """ + Überprüft den aktuellen Status eines Issues. + + Args: + issue_number: Die Issue-Nummer (nur Zahl, ohne Projekt-Präfix) + + Returns: + Status-String oder None bei Fehler + """ + issue_key = f'{PROJECT_KEY}-{issue_number}' + + try: + response = requests.get( + url=f'{JIRA_BASE_URL}/issue/{issue_key}', + headers=HEADERS, + auth=HTTPBasicAuth(username=JIRA_EMAIL, password=JIRA_TOKEN), + timeout=10 + ) + response.raise_for_status() + data = response.json() + return data.get('fields', {}).get('status', {}).get('name') + except requests.RequestException as e: + print(f"Fehler beim Abrufen von Issue {issue_key}: {e}") + return None + + +def filter_open_issues(issues: List[dict]) -> List[int]: + """ + Filtert Issues nach Projekt und Status "To Do". + + Args: + issues: Liste der zu filternden Issues + + Returns: + Liste der Issue-Nummern im Status "To Do" + """ + open_issues = [] + + for issue in issues: + issue_key = issue['key'] + issue_status = issue['fields']['status']['name'] + + # Issue-Key aufteilen (z.B. "LEJSD-123" -> ["LEJSD", "123"]) + parts = issue_key.split('-') + + if len(parts) == 2 and parts[0] == PROJECT_KEY and issue_status == TARGET_STATUS: + issue_number = int(parts[1]) + print(f'[{issue_number}] gefunden!') + open_issues.append(issue_number) + + return open_issues + + +def verify_issue_statuses(issue_numbers: List[int]) -> List[int]: + """ + Verifiziert den Status jedes Issues und entfernt nicht mehr offene Issues. + + Args: + issue_numbers: Liste der zu überprüfenden Issue-Nummern + + Returns: + Liste der bestätigten offenen Issue-Nummern + """ + verified_issues = [] + + for issue_number in issue_numbers: + status = check_issue_status(issue_number) + print(f'Issue [{issue_number}] Status: {status}') + + if status == TARGET_STATUS: + verified_issues.append(issue_number) + else: + print(f'[{issue_number}] wird entfernt ...') + + return verified_issues + + +def play_audio_notification(issue_count: int) -> None: + """ + Erstellt und spielt eine Sprachbenachrichtigung ab. + + Args: + issue_count: Anzahl der gefundenen Issues + """ + if issue_count == 0: + return + + # Grammatikalisch korrekte Nachricht erstellen + text = f"{issue_count} neue Störung gefunden" if issue_count == 1 else f"{issue_count} neue Störungen gefunden" + print(f"{issue_count} Issue(s) gefunden") + + try: + # TTS-Datei erstellen + tts = gTTS(text=text, lang='de') + tts.save(AUDIO_OUTPUT_PATH) + + if os.path.exists(AUDIO_OUTPUT_PATH): + print("Audio-Datei erfolgreich erstellt.") + # Audio abspielen + os.system(f'mpg321 {AUDIO_OUTPUT_PATH}') + else: + print("Fehler: Audio-Datei konnte nicht erstellt werden.") + except Exception as e: + print(f"Fehler bei der Audio-Wiedergabe: {e}") + + +def monitor_issues() -> None: + """ + Hauptschleife: Überwacht kontinuierlich Jira-Issues und benachrichtigt bei neuen Tickets. + """ + print(f"Starte Jira-Monitor für Projekt {PROJECT_KEY}...") + + while True: + try: + # Neueste Issues abrufen + issues = fetch_recent_issues() + + if issues: + # Nach offenen Issues filtern + open_issues = filter_open_issues(issues) + + # Status jedes Issues verifizieren + verified_open_issues = verify_issue_statuses(open_issues) + + # Bei gefundenen Issues Benachrichtigung abspielen + play_audio_notification(len(verified_open_issues)) + + print(f'Offene Tickets: {verified_open_issues}') + + # Wartezeit bis zur nächsten Abfrage + time.sleep(POLL_INTERVAL) + + except KeyboardInterrupt: + print("\nMonitoring beendet.") + break + except Exception as e: + print(f"Unerwarteter Fehler: {e}") + time.sleep(POLL_INTERVAL) + + +if __name__ == "__main__": + monitor_issues() + + + + diff --git a/14 - Databases/XML/XML Dictionary Parser using ElementTree.py b/14 - Databases/XML/XML Dictionary Parser using ElementTree.py new file mode 100644 index 000000000..3e45f68a1 --- /dev/null +++ b/14 - Databases/XML/XML Dictionary Parser using ElementTree.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +""" +XML Dictionary Parser using ElementTree + +This module demonstrates how to parse XML files and convert them into Python dictionaries +with proper type casting based on XML attributes. +""" + +import xml.etree.ElementTree as ET + +# Type mapping for converting string values to their respective Python types +TYPE_MAPPING = { + "int": int, + "str": str, + "float": float, + "bool": lambda x: x.lower() in ("true", "1", "yes") +} + + +def read_element(element): + """ + Read an XML element and convert its text content to the appropriate Python type. + + Args: + element: An XML Element object with optional 'typ' attribute + + Returns: + The element's text content converted to the specified type (defaults to str) + """ + element_type = element.get("typ", "str") + + # Return None if element has no text content + if element.text is None: + return None + + # Convert to the specified type, fallback to string if type is unknown + converter = TYPE_MAPPING.get(element_type, str) + try: + return converter(element.text) + except (ValueError, AttributeError) as e: + print(f"Warning: Could not convert '{element.text}' to type '{element_type}': {e}") + return element.text + + +def load_dict_from_xml(filename): + """ + Load a dictionary from an XML file with the following structure: + + + key + value + + + + Args: + filename: Path to the XML file to parse + + Returns: + dict: A dictionary with keys and values extracted from the XML + + Raises: + FileNotFoundError: If the XML file doesn't exist + ET.ParseError: If the XML file is malformed + """ + # Parse the XML file + tree = ET.parse(filename) + root = tree.getroot() + + # Build dictionary from XML entries + result = {} + for entry in root: + key_element = entry.find("schluessel") + value_element = entry.find("wert") + + # Skip entries with missing key or value elements + if key_element is None or value_element is None: + print(f"Warning: Skipping entry with missing key or value element") + continue + + key = read_element(key_element) + value = read_element(value_element) + result[key] = value + + return result + + +if __name__ == "__main__": + try: + dictionary = load_dict_from_xml("dict.xml") + print("Loaded dictionary from XML:") + print(dictionary) + except FileNotFoundError: + print("Error: dict.xml file not found") + except ET.ParseError as e: + print(f"Error: Failed to parse XML file: {e}") diff --git a/14 - Databases/XML/XML Dictionary Parser using SAX.py b/14 - Databases/XML/XML Dictionary Parser using SAX.py new file mode 100644 index 000000000..25ad5a2e6 --- /dev/null +++ b/14 - Databases/XML/XML Dictionary Parser using SAX.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +""" +XML Dictionary Parser using SAX + +This module demonstrates how to parse XML files using SAX (Simple API for XML), +which is more memory-efficient than DOM parsing for large files as it processes +the XML sequentially without loading the entire document into memory. +""" + +import os +import xml.sax as sax + + +class DictionaryHandler(sax.handler.ContentHandler): + """ + SAX Content Handler for parsing XML dictionary entries. + + Handles XML with the structure: + + + key + value + + + """ + + # Type mapping for converting string values to Python types + TYPE_MAPPING = { + "int": int, + "str": str, + "float": float, + "bool": lambda x: x.lower() in ("true", "1", "yes") + } + + def __init__(self): + """Initialize the handler with empty state.""" + super().__init__() + self.result = {} # Final dictionary result + self.current_key = "" # Current key being processed + self.current_value = "" # Current value being processed + self.active_element = None # Currently active element ('schluessel' or 'wert') + self.value_type = str # Type converter for current value + + def startElement(self, name, attrs): + """ + Called when an opening XML tag is encountered. + + Args: + name: Name of the XML element + attrs: Attributes of the XML element + """ + if name == "eintrag": + # Start of a new dictionary entry - reset current key and value + self.current_key = "" + self.current_value = "" + self.value_type = str + + elif name in ("schluessel", "wert"): + # Start reading key or value element + self.active_element = name + + # Determine type converter from 'typ' attribute + type_name = attrs.get("typ", "str") + self.value_type = self.TYPE_MAPPING.get(type_name, str) + + def endElement(self, name): + """ + Called when a closing XML tag is encountered. + + Args: + name: Name of the XML element being closed + """ + if name == "eintrag": + # End of dictionary entry - store key-value pair + try: + # Convert value to appropriate type and store in result + converted_value = self.value_type(self.current_value.strip()) + self.result[self.current_key.strip()] = converted_value + except (ValueError, TypeError) as e: + print(f"Warning: Could not convert value '{self.current_value}' - using string instead: {e}") + self.result[self.current_key.strip()] = self.current_value.strip() + + elif name in ("schluessel", "wert"): + # End of key or value element - stop accumulating content + self.active_element = None + + def characters(self, content): + """ + Called when character data is encountered between XML tags. + SAX may call this method multiple times for a single element's content. + + Args: + content: Character data from the XML + """ + if self.active_element == "schluessel": + # Accumulate key content + self.current_key += content + elif self.active_element == "wert": + # Accumulate value content + self.current_value += content + + +def load_dict_from_xml(filename): + """ + Load a dictionary from an XML file using SAX parsing. + + SAX parsing is more memory-efficient than DOM parsing (ElementTree) + for large XML files because it processes the file sequentially + without loading the entire document into memory. + + Args: + filename: Path to the XML file to parse + + Returns: + dict: A dictionary with keys and values extracted from the XML + + Raises: + FileNotFoundError: If the XML file doesn't exist + sax.SAXException: If the XML file is malformed + """ + # Resolve the file path to absolute path + if not os.path.isabs(filename): + # If relative path, make it relative to this script's directory + script_dir = os.path.dirname(os.path.abspath(__file__)) + filename = os.path.join(script_dir, filename) + + # Check if file exists + if not os.path.exists(filename): + raise FileNotFoundError(f"XML file not found: {filename}") + + # Create handler and parser + handler = DictionaryHandler() + parser = sax.make_parser() + parser.setContentHandler(handler) + + # Parse the XML file + parser.parse(filename) + + return handler.result + + +if __name__ == "__main__": + try: + dictionary = load_dict_from_xml("dict.xml") + print("Loaded dictionary from XML using SAX:") + print(dictionary) + except FileNotFoundError: + print("Error: dict.xml file not found") + except sax.SAXException as e: + print(f"Error: Failed to parse XML file: {e}") diff --git a/14 - Databases/XML/beispiel_01_elementtree.py b/14 - Databases/XML/beispiel_01_elementtree.py deleted file mode 100644 index b9a49276b..000000000 --- a/14 - Databases/XML/beispiel_01_elementtree.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python - -import xml.etree.ElementTree as ElementTree - -typen = { - "int" : int, - "str" : str -} - -def lese_element(element): - typ = element.get("typ", "str") - try: - return typen[typ](element.text) - except KeyError: - return element.text - -def lade_dict(dateiname): - d = {} - baum = ElementTree.parse(dateiname) - tag_dict = baum.getroot() - for eintrag in tag_dict: - tag_schluessel = eintrag.find("schluessel") - tag_wert = eintrag.find("wert") - d[lese_element(tag_schluessel)] = lese_element(tag_wert) - return d - - -if __name__ == "__main__": - print(lade_dict("dict.xml")) diff --git a/14 - Databases/XML/beispiel_02_sax.py b/14 - Databases/XML/beispiel_02_sax.py deleted file mode 100644 index 661ceab4c..000000000 --- a/14 - Databases/XML/beispiel_02_sax.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python - -import xml.sax as sax - -class DictHandler(sax.handler.ContentHandler): - typen = { - "int" : int, - "str" : str - } - - def __init__(self): - self.ergebnis = {} - self.schluessel = "" - self.wert = "" - self.aktiv = None - self.typ = None - - def startElement(self, name, attrs): - if name == "eintrag": - self.schluessel = "" - self.wert = "" - elif name in ("schluessel", "wert"): - self.aktiv = name - try: - self.typ = self.typen[attrs["typ"]] - except KeyError: - self.typ = str - - def endElement(self, name): - if name == "eintrag": - self.ergebnis[self.schluessel] = self.typ(self.wert) - elif name in ("schluessel", "wert"): - self.aktiv = None - - def characters(self, content): - if self.aktiv == "schluessel": - self.schluessel += content - elif self.aktiv == "wert": - self.wert += content - - -def lade_dict(dateiname): - handler = DictHandler() - parser = sax.make_parser() - parser.setContentHandler(handler) - parser.parse(dateiname) - return handler.ergebnis - - -if __name__ == "__main__": - print(lade_dict("dict.xml"))