|
| 1 | +import logging |
| 2 | +import httpx |
| 3 | +import uuid |
| 4 | +import json |
| 5 | +from datetime import datetime |
| 6 | +from .rpcclient import RpcClient |
| 7 | +from .constants import NEWLINE, GET, POST, MAINNET, LOCALHOST, MOJONODE, MOJONODE_RPC_ENDPOINTS |
| 8 | + |
| 9 | + |
| 10 | +logging.getLogger(__name__).addHandler(logging.NullHandler()) |
| 11 | + |
| 12 | + |
| 13 | +class MojoClient(RpcClient): |
| 14 | + |
| 15 | + def __init__(self, base_url=MOJONODE, network=MAINNET, mojo_timeout=10, rpc_timeout=5): # 5 second timeout is the httpx default |
| 16 | + """Initialize a MojoClient instance. |
| 17 | +
|
| 18 | + Keywork arguments: |
| 19 | + base_url -- the URL (exluding endpoint) for RPCs |
| 20 | + network -- the network to query |
| 21 | + mojo_timeout -- timeout in seconds for requests to Mojonode |
| 22 | + rpc_timeout -- timeout in seconds for RPCs |
| 23 | + """ |
| 24 | + |
| 25 | + if base_url == MOJONODE: rpc_timeout = mojo_timeout # Override RPC timeout if Mojonode used for RPC calls |
| 26 | + RpcClient.__init__(self, base_url=base_url, network=MAINNET, timeout=rpc_timeout) |
| 27 | + |
| 28 | + self.mojo_headers = {"accept": "application/json", "Content-Type": "application/json"} |
| 29 | + self.mojo_timeout = mojo_timeout |
| 30 | + |
| 31 | + self._streams = {} |
| 32 | + |
| 33 | + self.mojonode_nonrpc_endpoints = [ |
| 34 | + "/get_tx_by_name", |
| 35 | + "/get_uncurried_coin_spend", |
| 36 | + "/get_transactions_for_coin", |
| 37 | + "/get_query_schema", |
| 38 | + "/query", |
| 39 | + "/events", |
| 40 | + "/get_latest_singleton_spend" |
| 41 | + ] |
| 42 | + |
| 43 | + if self.base_url == MOJONODE: |
| 44 | + self.mojoclient = self.client |
| 45 | + else: |
| 46 | + self.mojoclient = httpx.AsyncClient(base_url=MOJONODE, http2=True, timeout=self.mojo_timeout) |
| 47 | + |
| 48 | + |
| 49 | + async def _mojo_request(self, method, endpoint, params, no_network=False): |
| 50 | + """Send a REST request to Mojonode. |
| 51 | +
|
| 52 | + Keyword arguments: |
| 53 | + method -- a REST method (GET, POST, etc) |
| 54 | + endpoint -- URI endpoint to send request to |
| 55 | + params -- dict of request parameters |
| 56 | + no_network -- boolean indicating whether to add a network field to params |
| 57 | + """ |
| 58 | + |
| 59 | + url = MOJONODE + endpoint |
| 60 | + data = json.dumps(self._add_network_param(params, no_network)) |
| 61 | + |
| 62 | + if method == POST: |
| 63 | + logging.info(f"Sending POST request{NEWLINE} URL: {url}{NEWLINE} data: {data}") |
| 64 | + response = await self.mojoclient.post(url, content=data, headers=self.mojo_headers) |
| 65 | + else: |
| 66 | + raise ValueError(f"Unsupported REST method {method}") |
| 67 | + |
| 68 | + return response |
| 69 | + |
| 70 | + |
| 71 | + async def _mojo_request_no_network(self, method, endpoint, params): |
| 72 | + """Send a REST request to Mojonode without specifying a network |
| 73 | +
|
| 74 | + Keyword arguments: |
| 75 | + method -- a REST method (GET, POST, etc) |
| 76 | + endpoint -- URI endpoint to send request to |
| 77 | + params -- dict of request parameters |
| 78 | + """ |
| 79 | + |
| 80 | + return await self._mojo_request(method, endpoint, params, no_network=True) |
| 81 | + |
| 82 | + |
| 83 | + async def get_tx_by_name(self, tx_id): |
| 84 | + """Transaction by transaction ID. |
| 85 | +
|
| 86 | + Arguments: |
| 87 | + tx_id -- a spend bundle name |
| 88 | +
|
| 89 | + Returns the transaction (spend bundle) corresponding to the transaction ID (spend bundle name). |
| 90 | + Since spend bundles are mempool objects, Mojonode may occasionally fail to record a spend, resulting in missing data. |
| 91 | + """ |
| 92 | + |
| 93 | + params = {"name": tx_id} |
| 94 | + return await self._mojo_request(POST, "get_tx_by_name", params) |
| 95 | + |
| 96 | + |
| 97 | + async def get_uncurried_coin_spend(self, coin_id): |
| 98 | + """Uncurried coin spend for given coin ID.""" |
| 99 | + |
| 100 | + params = {"name": coin_id} |
| 101 | + return await self._mojo_request(POST, "get_uncurried_coin_spend", params) |
| 102 | + |
| 103 | + |
| 104 | + async def get_transactions_for_coin(self, coin_id): |
| 105 | + """Transactions in which the specified coin was created and spent. |
| 106 | +
|
| 107 | + Arguments: |
| 108 | + coin_id -- coin name (coin ID) as a byte-32 hex encoded string |
| 109 | +
|
| 110 | + Returns the transaction IDs (spend bundle names) as 32-byte hex encoded stings of the spend bundles that created ('added_by') and spent ('removed_by') the coin. |
| 111 | + Since spend bundles are mempool objects, Mojonode may occasionally fail to record a spend, resulting in missing 'added_by' or 'removed_by' data. |
| 112 | + """ |
| 113 | + |
| 114 | + params = {"name": coin_id} |
| 115 | + return await self._mojo_request(POST, "get_transactions_for_coin", params) |
| 116 | + |
| 117 | + |
| 118 | + async def get_query_schema(self): |
| 119 | + """Mojonode SQL database schema.""" |
| 120 | + |
| 121 | + return await self._mojo_request_no_network(POST, "get_query_schema", {}) |
| 122 | + |
| 123 | + |
| 124 | + async def query(self, query): |
| 125 | + """Queries Mojonode SQL database for Chia blockchain data. |
| 126 | +
|
| 127 | + Arguments: |
| 128 | + query -- a valid SQL query as a string |
| 129 | + """ |
| 130 | + |
| 131 | + params = {"query": query} |
| 132 | + return await self._mojo_request_no_network(POST, "query", params) |
| 133 | + |
| 134 | + |
| 135 | + async def get_latest_singleton_spend(self, address): |
| 136 | + """Latest singleton spend for given address""" |
| 137 | + |
| 138 | + params = {"address": address} |
| 139 | + return await self._mojo_request(POST, "get_latest_singleton_spend", params) |
| 140 | + |
| 141 | + |
| 142 | + async def get_routes(self): |
| 143 | + """Available endpoints""" |
| 144 | + |
| 145 | + if self.base_url == LOCALHOST: |
| 146 | + response = await self._request(POST, "get_routes", {}) |
| 147 | + endpoints = response.json()["routes"] + self.mojonode_nonrpc_endpoints |
| 148 | + elif self.base_url == MOJONODE: |
| 149 | + endpoints = MOJONODE_RPC_ENDPOINTS + self.mojonode_nonrpc_endpoints |
| 150 | + |
| 151 | + # Return available endpoints as HTTP response |
| 152 | + response_data = { |
| 153 | + "routes": endpoints, |
| 154 | + "success": True |
| 155 | + } |
| 156 | + headers = {"Content-Type": "application/json"} |
| 157 | + return httpx.Response(200, content=json.dumps(response_data).encode("utf-8"), headers=headers) |
| 158 | + |
| 159 | + |
| 160 | + async def close_stream(self, stream_id): |
| 161 | + """Closes an event stream.""" |
| 162 | + |
| 163 | + if stream_id in self._streams.keys(): |
| 164 | + self._streams.pop(stream_id) |
| 165 | + else: |
| 166 | + raise ValueError(f"No stream with ID {stream_id} to close") |
| 167 | + |
| 168 | + |
| 169 | + async def events(self, for_object=None, from_ts="$", filters=""): |
| 170 | + """Stream events. |
| 171 | +
|
| 172 | + Mojonode disconnects event streams every 5 mins, so that the client needs to reconnect. |
| 173 | + |
| 174 | + Keyword arguments: |
| 175 | + for_object -- only stream events for specified object (coin, block, transaction). Streams all events if set to None |
| 176 | + from_ts -- only stream events from the given timestamp onwards. Note that timestamps are unique |
| 177 | + filters -- only stream events that pass the filter. See Mojonode documentation for details |
| 178 | + """ |
| 179 | + |
| 180 | + if for_object is not None: |
| 181 | + if not for_object in MOJONODE_EVENT_OBJECTS: raise ValueError(f"Unkown object specified ({object})") |
| 182 | + |
| 183 | + params = f"&from_ts={from_ts}" + f"&filters={filters}" |
| 184 | + if for_object is not None: endpoint = f"for_object={for_object}&" + params |
| 185 | + |
| 186 | + stream_id = str(uuid.uuid4()) |
| 187 | + self._streams[stream_id] = True |
| 188 | + yield stream_id |
| 189 | + |
| 190 | + while stream_id in self._streams.keys(): |
| 191 | + try: |
| 192 | + |
| 193 | + # Context manager for Mojonode event stream |
| 194 | + async with self.mojoclient.stream(GET, MOJONODE + "events?" + params, timeout=None) as response: |
| 195 | + |
| 196 | + logging.debug(f"Connected to stream ID {stream_id}") |
| 197 | + |
| 198 | + try: |
| 199 | + async for data in response.aiter_lines(): |
| 200 | + |
| 201 | + if stream_id in self._streams.keys(): |
| 202 | + |
| 203 | + if data.startswith('data: '): |
| 204 | + event = json.loads(data[6:]) |
| 205 | + yield event |
| 206 | + from_ts = event["ts"] |
| 207 | + else: |
| 208 | + # If stream no longer active, close it |
| 209 | + await response.aclose() |
| 210 | + logging.debug(f"Closed stream ID {stream_id}") |
| 211 | + break |
| 212 | + except Exception as e: |
| 213 | + logging.warning(f"Failed to read data from stream ID {stream_id}") |
| 214 | + |
| 215 | + except Exception as e: |
| 216 | + logging.warning(f"Failed to connect to stream ID {stream_id}") |
| 217 | + |
0 commit comments