2424 AggregateMessage ,
2525 StoreMessage ,
2626 ProgramMessage ,
27- MessagesResponse ,
2827)
28+ from pydantic import ValidationError
2929
3030from aleph_client .types import Account , StorageEnum , GenericMessage
3131from .exceptions import MessageNotFoundError , MultipleMessagesError
32+ from .models import MessagesResponse
3233from .utils import get_message_type_value
3334
3435logger = logging .getLogger (__name__ )
@@ -308,6 +309,7 @@ async def create_program(
308309 memory : int = settings .DEFAULT_VM_MEMORY ,
309310 vcpus : int = settings .DEFAULT_VM_VCPUS ,
310311 timeout_seconds : float = settings .DEFAULT_VM_TIMEOUT ,
312+ persistent : bool = False ,
311313 encoding : Encoding = Encoding .zip ,
312314 volumes : List [Dict ] = None ,
313315 subscriptions : Optional [List [Dict ]] = None ,
@@ -321,10 +323,10 @@ async def create_program(
321323 ## Register the different ways to trigger a VM
322324 if subscriptions :
323325 # Trigger on HTTP calls and on Aleph message subscriptions.
324- triggers = {"http" : True , "message" : subscriptions }
326+ triggers = {"http" : True , "persistent" : persistent , " message" : subscriptions }
325327 else :
326328 # Trigger on HTTP calls.
327- triggers = {"http" : True }
329+ triggers = {"http" : True , "persistent" : persistent }
328330
329331 content = ProgramContent (
330332 ** {
@@ -373,6 +375,9 @@ async def create_program(
373375 }
374376 )
375377
378+ # Ensure that the version of aleph-message used supports the field.
379+ assert content .on .persistent == persistent
380+
376381 return await submit (
377382 account = account ,
378383 content = content .dict (exclude_none = True ),
@@ -572,6 +577,8 @@ async def get_messages(
572577 end_date : Optional [Union [datetime , float ]] = None ,
573578 session : Optional [ClientSession ] = None ,
574579 api_server : str = settings .API_HOST ,
580+ ignore_invalid_messages : bool = True ,
581+ invalid_messages_log_level : int = logging .NOTSET ,
575582) -> MessagesResponse :
576583 session = session or get_fallback_session ()
577584
@@ -607,8 +614,36 @@ async def get_messages(
607614
608615 async with session .get (f"{ api_server } /api/v0/messages.json" , params = params ) as resp :
609616 resp .raise_for_status ()
610- messages_json = await resp .json ()
611- return MessagesResponse (** messages_json )
617+ response_json = await resp .json ()
618+ messages_raw = response_json ["messages" ]
619+
620+ # All messages may not be valid according to the latest specification in
621+ # aleph-message. This allows the user to specify how errors should be handled.
622+ messages : List [AlephMessage ] = []
623+ for message_raw in messages_raw :
624+ try :
625+ message = Message (** message_raw )
626+ messages .append (message )
627+ except KeyError as e :
628+ if not ignore_invalid_messages :
629+ raise e
630+ logger .log (
631+ level = invalid_messages_log_level ,
632+ msg = f"KeyError: Field '{ e .args [0 ]} ' not found" ,
633+ )
634+ except ValidationError as e :
635+ if not ignore_invalid_messages :
636+ raise e
637+ if invalid_messages_log_level :
638+ logger .log (level = invalid_messages_log_level , msg = e )
639+
640+ return MessagesResponse (
641+ messages = messages ,
642+ pagination_page = response_json ["pagination_page" ],
643+ pagination_total = response_json ["pagination_total" ],
644+ pagination_per_page = response_json ["pagination_per_page" ],
645+ pagination_item = response_json ["pagination_item" ],
646+ )
612647
613648
614649async def get_message (
0 commit comments