diff --git a/examples/create-data-object.py b/examples/create-data-object.py index 79813ef..cc389a2 100755 --- a/examples/create-data-object.py +++ b/examples/create-data-object.py @@ -6,7 +6,9 @@ from nuvla.api import Api as Nuvla from nuvla.api.resources.data import DataObjectS3 +from nuvla.api.resources.data import DataRecord from nuvla.api.resources.user import User +from nuvla.api.api import NuvlaError from utils import nuvla_conf_user_pass @@ -24,86 +26,244 @@ # Login to Nuvla. # user_api = User(nuvla) -user_api.login_password(username, password) +# user_api.login_password(username, password) +user_api.login(username, password) + +# REQUIRED: ID of S3 credential in Nuvla. +# either we already know this and it is hardcoded below +# s3_cred_id = 'credential/362e6088-394d-4de9-9667-8e1a40136aa6' +# s3_cred_id = 'credential/0f70f0fb-c226-4fc2-a98b-61f0646813d7' + +# OR we can get the credentials from a login if we want... +# as in below +creds = nuvla.get('credential') +s3_cred_id = None +for cred in creds.data['resources']: + try: + parent = nuvla.get(cred['parent']) + if "s3" in str(parent.data['subtype']) and "Exoscale" in str(parent.data['name']): + s3_cred_id = str(cred['id']) + print(f"Found Exoscale S3 credential: {s3_cred_id}") + except (NuvlaError, KeyError): + pass +# if we didn't find the credential, we should exit +if not s3_cred_id: + print("No Exoscale S3 credential found. Exiting.") + exit(1) # # Set correct values for variable below. # - -# REQUIRED: ID of S3 credential in Nuvla. -s3_cred_id = 'credential/362e6088-394d-4de9-9667-8e1a40136aa6' - # Bucket to store the test objects. -bucket = 'cloud.animals' - +# bucket will be created if it doesn't exist? +# +bucket = 'bucket-for-extract-project-demo-purposes-002' +# can we get the bucket name from the credential? # # data-object API object. # data_obj_api = DataObjectS3(nuvla) +def generate_event(local_event_record: dict) -> str: + ''' + Generate an event in Nuvla + :param local_event_record: dict - event record + :return: str - event id + ''' + try: + cimi_event = nuvla.add('event', local_event_record) + except NuvlaError as e: + print(f'Failed to create event: {e}') + return None + event_id = cimi_event.data['resource-id'] + print('created event:', event_id, "with data:") + pp(cimi_event.data) + return event_id + +# should generate an event here as a data record was create +# maybe this should be combined? + +def create_data_record(local_data_record: dict) -> str: + ''' + Create a data record in Nuvla + :param local_data_record: dict - data record + :return: str - data record id + if the data record already exists, it will be deleted and recreated + possibly should be a recursive function + ''' + try: + dr_id_created = dr_api.add(local_data_record) + except NuvlaError as e: + print(f'Failed to create data record: {e}') + for arg in e.args: + if 'data-record' in arg: + dr_id_created = arg.split(' ')[-1] + print(f'Data record already exists, data record id: {dr_id_created}. Deleting and recreating.') + deleted_dr_id = dr_api.delete(dr_id_created) + if deleted_dr_id: + print('Deleted data record id:', deleted_dr_id) + dr_id_created = create_data_record(local_data_record) + break + + return dr_id_created + + return dr_id_created + +def create_data_object(local_content, local_bucket: str, local_object_path: str, local_s3_cred_id: str, local_content_type: str, local_tags) -> str: + try: + object_id = data_obj_api.create(local_content, local_bucket, + local_object_path, local_s3_cred_id, + content_type=local_content_type, + tags=local_tags) + except NuvlaError as e: + print(f'Failed to create data object: {e}') + for arg in e.args: + if 'data-object' in arg: + object_id = arg.split(' ')[-1] + print(f'\n\nObject already exists, object id: {object_id}. Deleting and recreating.') + deleted_object_id = data_obj_api.delete(object_id) + if deleted_object_id: + print(f'Deleted object id: {deleted_object_id}') + object_id = create_data_object(local_content, local_bucket, local_object_path, local_s3_cred_id, local_content_type, local_tags) + print(f'Created object id: {object_id}\n\n') + break + return object_id + return object_id # # Create binary object on S3 and register it in Nuvla. # +print("\nCreating binary object\n") +# here is the file that actually gets uploaded to S3 content = open('data/african-lion.jpg', 'rb').read() +# here is the path where the file will be stored in S3 +object_path = 'africa/african-lion3.jpg' +# here is the content type of the file +content_type = 'image/scary-lions' -object_path = 'africa/african-lion.jpg' -content_type = 'image/jpg' - -# Add object. -# Bucket will be created if it doesn't exit. -bin_object_id = data_obj_api.create(content, bucket, object_path, s3_cred_id, - content_type=content_type, - tags=["zoo", "africa", "lion"]) -print('.png object id:', bin_object_id) +# Add binary object. +# Bucket will be created if it doesn't exist. +# tags can be added to the object +tags = ["zoo", "africa", "lion"] +bin_object_id = create_data_object(content, bucket, \ + object_path, s3_cred_id, \ + content_type, tags) # Get object document. +# print out the object document obj_doc = data_obj_api.get(bin_object_id) pp(obj_doc) +# create a data record for the object +dr_api = DataRecord(nuvla) +# find the S3 infrastructure service +# either it's known and hardcoded or we can search for it using the credential +# s3_infra_service_id = 'infrastructure-service/09b08b49-2408-4b80-a7cc-73f420903fd5' +cred_doc = nuvla.get(s3_cred_id) +s3_infra_service_id=cred_doc.data['parent'] +if not s3_infra_service_id: + print('No infrastructure service found for the credential. Exiting.') + exit(1) + +data_record = { + "infrastructure-service": s3_infra_service_id, + + "description": "Lions in Africa", + "name": "African Lion", + "object": object_path, + "bucket": bucket, + "content-type": "animals/lion", + "bytes": len(content), + "platform": "S3", + "tags": ["zoo", "africa", "lion", "whatevs"], + "another-field": "another-value", + "yet-another-field": "yet-another-value", + "and-another-field": "and-another-value", +} + +# dr_id = dr_api.add(data_record) +print('\n\ncreating data record\n\n') +dr_id = create_data_record(data_record) +print('created data record:', dr_id) +pp(dr_api.get(dr_id)) + +event_record = { + "category": "user", + "content": { + "resource": { + "href": dr_id, # this must be the data record id + "content": { + "content-type": content_type, + }, + }, + "state": "created", + }, + "severity": "medium", + "tags": data_record['tags'], +} + +event = generate_event(event_record) +event_structure = nuvla.get(event).data +pp(event_structure) + # Download object and store it locally to a file. -local_fn = './data/local-african-lion.jpg' -data_obj_api.get_to_file(bin_object_id, local_fn) +local_filename = './data/local-african-lion.jpg' +data_obj_api.get_to_file(bin_object_id, local_filename) # Verify checksum. assert hashlib.md5(content).hexdigest() \ - == hashlib.md5(open(local_fn, 'rb').read()).hexdigest() -os.unlink(local_fn) + == hashlib.md5(open(local_filename, 'rb').read()).hexdigest() +os.unlink(local_filename) +# get a list of data records +records = dr_api.id_by_name('African Lion', filter=f"tags='lion'") +# pp(records) +for record in records: + # pp(dr_api.get(record)) + try: + dr_api.get(record)['content']['resource']['href'] + print(f'New style data {record} record, probably should NOT delete it.') + except KeyError: + # print(f'Old style data {record} record, could delete it.') + print(f'Deleted old style data record:', dr_api.delete(record)) +# exit(1) -# -# Create text object on S3 and register it in Nuvla. +# Create a text object on S3 and register it in Nuvla. # content = open('./data/african-lion.txt', 'r').read() -object_path = 'africa/african-lion.txt' +object_path = 'africa/african-lion-zebra.txt' content_type = 'plain/text' -# Add object. -# Bucket will be created if doesn't exit. -str_object_id = data_obj_api.create(content, bucket, object_path, s3_cred_id, - content_type=content_type, - tags=["zoo", "africa", "lion"]) -print('.txt object id:', str_object_id) +# Add text object. +# Bucket will be created if doesn't exist. + +print("\nCreating text object\n") +str_object_id = create_data_object(content, bucket, \ + object_path, s3_cred_id, \ + content_type, tags) # Get object document. obj_doc = data_obj_api.get(str_object_id) pp(obj_doc) # Download object and store it locally to a file. -local_fn = './data/local-african-lion.txt' -data_obj_api.get_to_file(str_object_id, local_fn) +local_filename = './data/local-african-lion.txt' +data_obj_api.get_to_file(str_object_id, local_filename) # Verify checksum. assert hashlib.md5(content.encode()).hexdigest() \ - == hashlib.md5(open(local_fn, 'r').read().encode()).hexdigest() -os.unlink(local_fn) + == hashlib.md5(open(local_filename, 'r').read().encode()).hexdigest() +os.unlink(local_filename) +# here we could do the same data_record and event creation as above +# but we won't for now # # Delete object from Nuvla and S3. Bucket gets deleted as well if it's empty. +# these are commented out to avoid deleting the objects. # -assert bin_object_id == data_obj_api.delete(bin_object_id) -assert str_object_id == data_obj_api.delete(str_object_id) +# assert bin_object_id == data_obj_api.delete(bin_object_id) +# assert str_object_id == data_obj_api.delete(str_object_id) # Logs out the user. user_api.logout() diff --git a/examples/create-data-record.py b/examples/create-data-record.py index 786fa81..38994d1 100755 --- a/examples/create-data-record.py +++ b/examples/create-data-record.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +import random +from pprint import pprint as pp from nuvla.api import Api as Nuvla from nuvla.api.resources.data import DataRecord from nuvla.api.resources.user import User @@ -20,7 +22,7 @@ # Login to Nuvla. # user_api = User(nuvla) -user_api.login_password(username, password) +user_api.login(username, password) # # Data record API object. @@ -41,28 +43,38 @@ # # Add data record that describes an object on S3. # -s3_infra_service_id = 'infrastructure-service/771fa0f1-a38d-400b-bad4-3f7600f069af' +# s3_infra_service_id = 'infrastructure-service/09b08b49-2408-4b80-a7cc-73f420903fd5' +cred_doc = nuvla.get(s3_cred_id) +print('credential doc:', cred_doc.data['parent']) +s3_infra_service_id=cred_doc.data['parent'] +if not s3_infra_service_id: + print('No infrastructure service found for the credential. Exiting.') + exit(1) + +bin_object_id = 'data-object/1-2-3-4-5' +content_type = 'animals/african-lion' + data_record = { "infrastructure-service": s3_infra_service_id, "description": "Lions in Africa", "name": "African Lion", - "object": "african-lion.jpg", "bucket": "cloud.animals", - "content-type": "animals/lion", - - "bytes": 12499950, + "content-type": "animals/lion", # here is where the content type is defined and the "application/taska" can be added. + "bytes": random.randint(1000, 100000), "platform": "S3", - - "tags": ["zoo", "africa", "lion"] + "another-field": "another-value", # this field will be ignored + "tags": ["zoo", "africa", "lion", "whatevs"], } dr_id = dr_api.add(data_record) print('created data record:', dr_id) +pp(dr_api.get(dr_id)) # # Delete data record. +# OR this can be commented out to keep the data record. # assert dr_id == dr_api.delete(dr_id) print('deleted data record:', dr_id) diff --git a/examples/create-infra-service-cred.py b/examples/create-infra-service-cred.py index 673f569..f12ba95 100755 --- a/examples/create-infra-service-cred.py +++ b/examples/create-infra-service-cred.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 +import yaml +from pprint import pprint + from nuvla.api import Api as Nuvla from nuvla.api.resources.credential import (CredentialK8s, CredentialS3, @@ -13,7 +16,12 @@ nuvla = Nuvla() # nuvla = Nuvla(endpoint='https://nuvla.io', insecure=True) -nuvla.login_password(username, password) + +userpass = {"href" : "session-template/password", + "username" : username, + "password" : password + } +nuvla.login(userpass) # Fake infra service ID to which all credentials will be attached. infra_service_id = "infrastructure-service/1-2-3-4-5" @@ -34,8 +42,18 @@ # #k8s_cred_id = cred_k8s.create(ca, cert, key, infra_service_id, "My K8s creds") +def read_yaml_file(filepath): + with open(filepath, 'r') as file: + try: + return yaml.safe_load(file) + except yaml.YAMLError as exc: + print(exc) + +config = read_yaml_file('data/example-k8s-config.yaml') +print(config) + k8s_cred_id = cred_k8s \ - .create_from_config('data/example-k8s-config.yaml', infra_service_id, + .create_from_config(config, infra_service_id, context='first-k8s-cluster-user', name='My K8s creds') print('Kubernetes creds: ', k8s_cred_id) diff --git a/examples/data_catalogue/__init__.py b/examples/data_catalogue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/data_catalogue/consumer.py b/examples/data_catalogue/consumer.py new file mode 100644 index 0000000..e577bb1 --- /dev/null +++ b/examples/data_catalogue/consumer.py @@ -0,0 +1,71 @@ +import json +import queue +from pprint import pprint + +import paho.mqtt.client as mqtt + +from nuvla.api import Api as Nuvla +from nuvla.api.models import CimiResource + + +class DataConsumer: + def __init__(self, nuvla: Nuvla, topic: str, host: str, port: int): + self.nuvla: Nuvla = nuvla + self.topic: str = topic + self.host = host + self.port = port + + self.mqtt_client: mqtt.Client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + self.mqtt_client.on_connect = self.on_connect + self.mqtt_client.on_message = self.on_message + + self.link_queue: queue.Queue = queue.Queue() + + def on_connect(self, cli, userdata, flags, rc, properties=None): + + if rc == 0: + print("Connected to broker") + self.mqtt_client.subscribe(self.topic) + else: + print("Connection failed") + + def _get_dr_from_message(self, message) -> CimiResource | None: + t = json.loads(message.payload.decode()) + m_str = json.dumps(t, indent=4) + print(f"Received message: \n{m_str}\n on topic {message.topic}") + + dr_id: str = t.get("resource_uri", None) + if not dr_id or "api/data-record" not in dr_id: + print("No resource URI in message") + return None + + dr_id = dr_id.replace("api/", "") + pprint("Data record ID: " + dr_id) + return self.nuvla.get(dr_id) + + def on_message(self, cli, userdata, message): + try: + dr_obj = self._get_dr_from_message(message) + if not dr_obj.data: + return + + data_object = dr_obj.data.get("data-object", None) + + if not data_object: + print("No data object in data record") + return + + dr_obj = self.nuvla.get(data_object) + link = self.nuvla.operation(dr_obj, "download") + + self.link_queue.put(link) + + except Exception as e: + print(f"Error processing message: {e}") + + def listen(self): + """ + Listen to the data catalogue for new events + """ + self.mqtt_client.connect(self.host, self.port) + self.mqtt_client.loop_start() diff --git a/examples/data_catalogue/producer.py b/examples/data_catalogue/producer.py new file mode 100644 index 0000000..5ef1bb5 --- /dev/null +++ b/examples/data_catalogue/producer.py @@ -0,0 +1,97 @@ +""" +Producer takes an object and puts it into the Nuvla data catalogue. It will also trigger the necessary events to notify +""" +from datetime import datetime, timezone +from pprint import pprint +from typing import Union, Optional + +from nuvla.api.resources.data import DataObjectS3, DataRecord +from nuvla.api import Api as Nuvla + + +class DataProducer: + def __init__(self, nuvla: Nuvla, bucket: str, s3_credential: str, infra_service_id: str): + # Nuvla client should be logged in at this stage + self.nuvla: Nuvla = nuvla + self.data_client: DataObjectS3 = DataObjectS3(nuvla) + self.record_client: DataRecord = DataRecord(nuvla) + + self.bucket: str = bucket + self.s3_credential: str = s3_credential + self.infra_service_id: str = infra_service_id + + def produce(self, content: Union[str, bytes, None], object_path, + content_type='text/plain', name=None, description=None, + tags: Optional[list] = None, md5sum: Optional[str] = None): + """ + Produce a new object in the data catalogue + """ + # Create new Data object + object_id: str = self.data_client.create(content, self.bucket, object_path, self.s3_credential, + content_type, name, description, tags, md5sum) + pprint(f"Created data object: {object_id}") + + ts: str = datetime.utcnow().replace(tzinfo=timezone.utc) \ + .replace(microsecond=0).isoformat().replace('+00:00', 'Z') + + # Create new Data record + record_id: str = self.create_data_record(object_id, name, description, content_type, ts, tags) + pprint(f"Created data record: {record_id}") + + event_id: str = self.create_event(record_id, content_type, tags) + pprint(f"Created event: {event_id}") + + def create_event(self, record_id: str, content_type: str, tags: Optional[list] = None): + """ + Create a new event in the data catalogue + """ + ts = datetime.utcnow().replace(tzinfo=timezone.utc) \ + .replace(microsecond=0).isoformat().replace('+00:00', 'Z') + event_data = { + "category": "user", + "name": "Data-Record-Event", + "description": "Event for {}".format(record_id), + "content": { + "resource": { + "href": record_id, # this must be the data record id + "content": { + "content-type": content_type, + }, + }, + "state": "created", + }, + "severity": "medium", + "timestamp": ts + } + if tags: + event_data.update({'tags': tags}) + event = self.nuvla.add('event', event_data) + return event.data['resource-id'] + + def create_data_record(self, object_id: str, name: str, description: str, content_type: str, ts: str, + tags: Optional[list] = None): + """ + Create a new data record in the data catalogue + """ + + record_data = { + "name": name, + "data-object": object_id, + "content-type": content_type, + "timestamp": ts, + "resource:object": object_id, + "infrastructure-service": self.infra_service_id + } + if description: + record_data.update({'description': description}) + if tags: + record_data.update({'tags': tags}) + record_id = self.record_client.create(record_data, self.infra_service_id) + return record_id + + +def new_s3_object(s3_cred_id: str): + """ + New S3 object receives the information of a new object in the S3 bucket. + The object has to be updated before calling this function by the user. + """ diff --git a/examples/delete-data-record.py b/examples/delete-data-record.py index 3b8b390..8353852 100755 --- a/examples/delete-data-record.py +++ b/examples/delete-data-record.py @@ -74,11 +74,11 @@ "object": "african-lion.jpg", "bucket": "cloud.animals", - "content-type": "animals/lion", - + "content": { + "content-type": "animals/lion", # here is where the content type is defined and the "application/taska" can be added. + }, "bytes": random.randint(1000, 100000), "platform": "S3", - "another-field": "another-value", "yet-another-field": "yet-another-value", diff --git a/nuvla/api/resources/data.py b/nuvla/api/resources/data.py index 7366e29..588957d 100644 --- a/nuvla/api/resources/data.py +++ b/nuvla/api/resources/data.py @@ -21,9 +21,9 @@ class DataObjectS3(ResourceBase): def __init__(self, nuvla: Nuvla): super().__init__(nuvla) - def create(self, content: Union[str, bytes], bucket, object_path, s3_cred_id, + def create(self, content: Union[str, bytes, None], bucket, object_path, s3_cred_id, content_type='text/plain', name=None, description=None, - tags: Optional[list]=None, md5sum: Optional[str]=None) -> str: + tags: Optional[list] = None, md5sum: Optional[str] = None) -> str: """Stores `content` in S3 defined by `s3_cred_id` and registers the object as data-object in Nuvla. Returns data-object resource ID. `content` and `content_type` should match (e.g. str and plain/text, @@ -38,8 +38,8 @@ def create(self, content: Union[str, bytes], bucket, object_path, s3_cred_id, "resource-type": "data-object-template", "content-type": content_type, "object": object_path, + 'bytes': len(content) if content else 0, "bucket": bucket, - "bytes": len(content), "href": "data-object-template/generic" } } @@ -53,10 +53,12 @@ def create(self, content: Union[str, bytes], bucket, object_path, s3_cred_id, # Upload data. data_object = self.nuvla.get(data_object_id) response = self.nuvla.operation(data_object, "upload") - upload_url = response.data['uri'] - headers = {"content-type": content_type} - response = requests.put(upload_url, data=content, headers=headers) - response.raise_for_status() + + if content is not None: + upload_url = response.data['uri'] + headers = {"content-type": content_type} + response = requests.put(upload_url, data=content, headers=headers) + response.raise_for_status() # Set object is ready. data_object = self.nuvla.get(data_object_id)