1
1
import calendar
2
2
import json
3
3
import time
4
- from typing import Dict , List , Union
4
+ from enum import Enum
5
+ from typing import Any , Dict , List , Optional , Union , Tuple
5
6
6
7
import cbor2
7
8
import requests
32
33
__all__ = ["OgmiosChainContext" ]
33
34
34
35
36
+ JSON = Dict [str , Any ]
37
+
38
+
39
+ class OgmiosQueryType (str , Enum ):
40
+ Query = "Query"
41
+ SubmitTx = "SubmitTx"
42
+ EvaluateTx = "EvaluateTx"
43
+
44
+
35
45
class OgmiosChainContext (ChainContext ):
46
+ _ws_url : str
47
+ _network : Network
48
+ _service_name : str
49
+ _kupo_url : Optional [str ]
50
+ _last_known_block_slot : int
51
+ _genesis_param : Optional [GenesisParameters ]
52
+ _protocol_param : Optional [ProtocolParameters ]
53
+
36
54
def __init__ (
37
55
self ,
38
56
ws_url : str ,
@@ -48,15 +66,15 @@ def __init__(
48
66
self ._genesis_param = None
49
67
self ._protocol_param = None
50
68
51
- def _request (self , method : str , args : dict ) -> Union [ dict , int ] :
69
+ def _request (self , method : OgmiosQueryType , args : JSON ) -> Any :
52
70
ws = websocket .WebSocket ()
53
71
ws .connect (self ._ws_url )
54
72
request = json .dumps (
55
73
{
56
74
"type" : "jsonwsp/request" ,
57
75
"version" : "1.0" ,
58
76
"servicename" : self ._service_name ,
59
- "methodname" : method ,
77
+ "methodname" : method . value ,
60
78
"args" : args ,
61
79
},
62
80
separators = ("," , ":" ),
@@ -86,10 +104,9 @@ def _fraction_parser(fraction: str) -> float:
86
104
@property
87
105
def protocol_param (self ) -> ProtocolParameters :
88
106
"""Get current protocol parameters"""
89
- method = "Query"
90
107
args = {"query" : "currentProtocolParameters" }
91
108
if not self ._protocol_param or self ._check_chain_tip_and_update ():
92
- result = self ._request (method , args )
109
+ result = self ._request (OgmiosQueryType . Query , args )
93
110
param = ProtocolParameters (
94
111
min_fee_constant = result ["minFeeConstant" ],
95
112
min_fee_coefficient = result ["minFeeCoefficient" ],
@@ -130,7 +147,7 @@ def protocol_param(self) -> ProtocolParameters:
130
147
param .cost_models ["PlutusV2" ] = param .cost_models .pop ("plutus:v2" )
131
148
132
149
args = {"query" : "genesisConfig" }
133
- result = self ._request (method , args )
150
+ result = self ._request (OgmiosQueryType . Query , args )
134
151
param .min_utxo = result ["protocolParameters" ]["minUtxoValue" ]
135
152
136
153
self ._protocol_param = param
@@ -139,10 +156,9 @@ def protocol_param(self) -> ProtocolParameters:
139
156
@property
140
157
def genesis_param (self ) -> GenesisParameters :
141
158
"""Get chain genesis parameters"""
142
- method = "Query"
143
159
args = {"query" : "genesisConfig" }
144
160
if not self ._genesis_param or self ._check_chain_tip_and_update ():
145
- result = self ._request (method , args )
161
+ result = self ._request (OgmiosQueryType . Query , args )
146
162
system_start_unix = int (
147
163
calendar .timegm (
148
164
time .strptime (
@@ -174,23 +190,21 @@ def network(self) -> Network:
174
190
@property
175
191
def epoch (self ) -> int :
176
192
"""Current epoch number"""
177
- method = "Query"
178
193
args = {"query" : "currentEpoch" }
179
- return self ._request (method , args )
194
+ return self ._request (OgmiosQueryType . Query , args )
180
195
181
196
@property
182
197
def last_block_slot (self ) -> int :
183
198
"""Slot number of last block"""
184
- method = "Query"
185
199
args = {"query" : "chainTip" }
186
- return self ._request (method , args )["slot" ]
200
+ return self ._request (OgmiosQueryType . Query , args )["slot" ]
187
201
188
- def _extract_asset_info (self , asset_hash : str ):
202
+ def _extract_asset_info (self , asset_hash : str ) -> Tuple [ str , ScriptHash , AssetName ] :
189
203
policy_hex , asset_name_hex = asset_hash .split ("." )
190
204
policy = ScriptHash .from_primitive (policy_hex )
191
- asset_name_hex = AssetName .from_primitive (asset_name_hex )
205
+ asset_name = AssetName .from_primitive (asset_name_hex )
192
206
193
- return policy_hex , policy , asset_name_hex
207
+ return policy_hex , policy , asset_name
194
208
195
209
def _check_utxo_unspent (self , tx_id : str , index : int ) -> bool :
196
210
"""Check whether an UTxO is unspent with Ogmios.
@@ -200,9 +214,8 @@ def _check_utxo_unspent(self, tx_id: str, index: int) -> bool:
200
214
index (int): transaction index.
201
215
"""
202
216
203
- method = "Query"
204
217
args = {"query" : {"utxo" : [{"txId" : tx_id , "index" : index }]}}
205
- results = self ._request (method , args )
218
+ results = self ._request (OgmiosQueryType . Query , args )
206
219
207
220
if results :
208
221
return True
@@ -220,6 +233,9 @@ def _utxos_kupo(self, address: str) -> List[UTxO]:
220
233
Returns:
221
234
List[UTxO]: A list of UTxOs.
222
235
"""
236
+ if self ._kupo_url is None :
237
+ raise AssertionError ("kupo_url object attribute has not been assigned properly." )
238
+
223
239
address_url = self ._kupo_url + "/" + address
224
240
results = requests .get (address_url ).json ()
225
241
@@ -282,9 +298,8 @@ def _utxos_ogmios(self, address: str) -> List[UTxO]:
282
298
List[UTxO]: A list of UTxOs.
283
299
"""
284
300
285
- method = "Query"
286
301
args = {"query" : {"utxo" : [address ]}}
287
- results = self ._request (method , args )
302
+ results = self ._request (OgmiosQueryType . Query , args )
288
303
289
304
utxos = []
290
305
@@ -374,9 +389,8 @@ def submit_tx(self, cbor: Union[bytes, str]):
374
389
if isinstance (cbor , bytes ):
375
390
cbor = cbor .hex ()
376
391
377
- method = "SubmitTx"
378
392
args = {"submit" : cbor }
379
- result = self ._request (method , args )
393
+ result = self ._request (OgmiosQueryType . SubmitTx , args )
380
394
if "SubmitFail" in result :
381
395
raise TransactionFailedException (result ["SubmitFail" ])
382
396
@@ -395,9 +409,8 @@ def evaluate_tx(self, cbor: Union[bytes, str]) -> Dict[str, ExecutionUnits]:
395
409
if isinstance (cbor , bytes ):
396
410
cbor = cbor .hex ()
397
411
398
- method = "EvaluateTx"
399
412
args = {"evaluate" : cbor }
400
- result = self ._request (method , args )
413
+ result = self ._request (OgmiosQueryType . EvaluateTx , args )
401
414
if "EvaluationResult" not in result :
402
415
raise TransactionFailedException (result )
403
416
else :
0 commit comments