5
5
6
6
from __future__ import annotations
7
7
8
- import sys
9
8
from asyncio import Future , Queue
10
9
from asyncio .tasks import FIRST_COMPLETED , ensure_future , gather , wait
10
+ from contextlib import asynccontextmanager
11
11
from logging import getLogger
12
- from typing import Any , AsyncIterator , Awaitable , Callable , List , Sequence , Tuple
12
+ from typing import (
13
+ Any ,
14
+ AsyncIterator ,
15
+ Awaitable ,
16
+ Callable ,
17
+ Dict ,
18
+ List ,
19
+ NamedTuple ,
20
+ Sequence ,
21
+ Tuple ,
22
+ cast ,
23
+ )
13
24
from weakref import WeakSet
14
25
15
26
from anyio import create_task_group
27
+ from jsonpatch import apply_patch , make_patch
16
28
29
+ from idom .core .vdom import VdomJson
17
30
from idom .utils import Ref
18
31
19
- from .layout import Layout , LayoutEvent , LayoutUpdate
20
-
21
-
22
- if sys .version_info >= (3 , 7 ): # pragma: no cover
23
- from contextlib import asynccontextmanager # noqa
24
- else : # pragma: no cover
25
- from async_generator import asynccontextmanager
32
+ from .layout import LayoutEvent , LayoutUpdate
33
+ from .proto import LayoutType
26
34
27
35
28
36
logger = getLogger (__name__ )
29
37
30
- SendCoroutine = Callable [[Any ], Awaitable [None ]]
38
+
39
+ SendCoroutine = Callable [["VdomJsonPatch" ], Awaitable [None ]]
31
40
RecvCoroutine = Callable [[], Awaitable [LayoutEvent ]]
32
41
33
42
34
43
async def dispatch_single_view (
35
- layout : Layout ,
44
+ layout : LayoutType [ LayoutUpdate , LayoutEvent ] ,
36
45
send : SendCoroutine ,
37
46
recv : RecvCoroutine ,
38
47
) -> None :
@@ -49,14 +58,14 @@ async def dispatch_single_view(
49
58
50
59
@asynccontextmanager
51
60
async def create_shared_view_dispatcher (
52
- layout : Layout , run_forever : bool = False
61
+ layout : LayoutType [ LayoutUpdate , LayoutEvent ],
53
62
) -> AsyncIterator [_SharedViewDispatcherFuture ]:
54
63
"""Enter a dispatch context where all subsequent view instances share the same state"""
55
64
with layout :
56
65
(
57
66
dispatch_shared_view ,
58
67
model_state ,
59
- all_update_queues ,
68
+ all_patch_queues ,
60
69
) = await _make_shared_view_dispatcher (layout )
61
70
62
71
dispatch_tasks : List [Future [None ]] = []
@@ -85,16 +94,16 @@ def dispatch_shared_view_soon(
85
94
update_future .cancel ()
86
95
break
87
96
else :
88
- update : LayoutUpdate = update_future .result ()
97
+ patch = VdomJsonPatch . create_from ( update_future .result () )
89
98
90
- model_state .current = update .apply_to (model_state .current )
99
+ model_state .current = patch .apply_to (model_state .current )
91
100
# push updates to all dispatcher callbacks
92
- for queue in all_update_queues :
93
- queue .put_nowait (update )
101
+ for queue in all_patch_queues :
102
+ queue .put_nowait (patch )
94
103
95
104
96
105
def ensure_shared_view_dispatcher_future (
97
- layout : Layout ,
106
+ layout : LayoutType [ LayoutUpdate , LayoutEvent ] ,
98
107
) -> Tuple [Future [None ], SharedViewDispatcher ]:
99
108
"""Ensure the future of a dispatcher created by :func:`create_shared_view_dispatcher`"""
100
109
dispatcher_future : Future [SharedViewDispatcher ] = Future ()
@@ -104,59 +113,93 @@ async def dispatch_shared_view_forever() -> None:
104
113
(
105
114
dispatch_shared_view ,
106
115
model_state ,
107
- all_update_queues ,
116
+ all_patch_queues ,
108
117
) = await _make_shared_view_dispatcher (layout )
109
118
110
119
dispatcher_future .set_result (dispatch_shared_view )
111
120
112
121
while True :
113
- update = await layout . render ( )
114
- model_state .current = update .apply_to (model_state .current )
122
+ patch = await render_json_patch ( layout )
123
+ model_state .current = patch .apply_to (model_state .current )
115
124
# push updates to all dispatcher callbacks
116
- for queue in all_update_queues :
117
- queue .put_nowait (update )
125
+ for queue in all_patch_queues :
126
+ queue .put_nowait (patch )
118
127
119
128
async def dispatch (send : SendCoroutine , recv : RecvCoroutine ) -> None :
120
129
await (await dispatcher_future )(send , recv )
121
130
122
131
return ensure_future (dispatch_shared_view_forever ()), dispatch
123
132
124
133
134
+ async def render_json_patch (layout : LayoutType [LayoutUpdate , Any ]) -> VdomJsonPatch :
135
+ """Render a class:`VdomJsonPatch` from a layout"""
136
+ return VdomJsonPatch .create_from (await layout .render ())
137
+
138
+
139
+ class VdomJsonPatch (NamedTuple ):
140
+ """An object describing an update to a :class:`Layout` in the form of a JSON patch"""
141
+
142
+ path : str
143
+ """The path where changes should be applied"""
144
+
145
+ changes : List [Dict [str , Any ]]
146
+ """A list of JSON patches to apply at the given path"""
147
+
148
+ def apply_to (self , model : VdomJson ) -> VdomJson :
149
+ """Return the model resulting from the changes in this update"""
150
+ return cast (
151
+ VdomJson ,
152
+ apply_patch (
153
+ model , [{** c , "path" : self .path + c ["path" ]} for c in self .changes ]
154
+ ),
155
+ )
156
+
157
+ @classmethod
158
+ def create_from (cls , update : LayoutUpdate ) -> VdomJsonPatch :
159
+ """Return a patch given an layout update"""
160
+ return cls (update .path , make_patch (update .old or {}, update .new ).patch )
161
+
162
+
125
163
async def _make_shared_view_dispatcher (
126
- layout : Layout ,
127
- ) -> Tuple [SharedViewDispatcher , Ref [Any ], WeakSet [Queue [LayoutUpdate ]]]:
128
- initial_update = await layout .render ()
129
- model_state = Ref (initial_update . apply_to ({}) )
164
+ layout : LayoutType [ LayoutUpdate , LayoutEvent ] ,
165
+ ) -> Tuple [SharedViewDispatcher , Ref [Any ], WeakSet [Queue [VdomJsonPatch ]]]:
166
+ update = await layout .render ()
167
+ model_state = Ref (update . new )
130
168
131
169
# We push updates to queues instead of pushing directly to send() callbacks in
132
170
# order to isolate the render loop from any errors dispatch callbacks might
133
171
# raise.
134
- all_update_queues : WeakSet [Queue [LayoutUpdate ]] = WeakSet ()
172
+ all_patch_queues : WeakSet [Queue [VdomJsonPatch ]] = WeakSet ()
135
173
136
174
async def dispatch_shared_view (send : SendCoroutine , recv : RecvCoroutine ) -> None :
137
- update_queue : Queue [LayoutUpdate ] = Queue ()
175
+ patch_queue : Queue [VdomJsonPatch ] = Queue ()
138
176
async with create_task_group () as inner_task_group :
139
- all_update_queues .add (update_queue )
140
- await send (LayoutUpdate .create_from ({}, model_state .current ))
177
+ all_patch_queues .add (patch_queue )
178
+ effective_update = LayoutUpdate ("" , None , model_state .current )
179
+ await send (VdomJsonPatch .create_from (effective_update ))
141
180
inner_task_group .start_soon (_single_incoming_loop , layout , recv )
142
- inner_task_group .start_soon (_shared_outgoing_loop , send , update_queue )
181
+ inner_task_group .start_soon (_shared_outgoing_loop , send , patch_queue )
143
182
return None
144
183
145
- return dispatch_shared_view , model_state , all_update_queues
184
+ return dispatch_shared_view , model_state , all_patch_queues
146
185
147
186
148
- async def _single_outgoing_loop (layout : Layout , send : SendCoroutine ) -> None :
187
+ async def _single_outgoing_loop (
188
+ layout : LayoutType [LayoutUpdate , LayoutEvent ], send : SendCoroutine
189
+ ) -> None :
149
190
while True :
150
- await send (await layout . render ( ))
191
+ await send (await render_json_patch ( layout ))
151
192
152
193
153
- async def _single_incoming_loop (layout : Layout , recv : RecvCoroutine ) -> None :
194
+ async def _single_incoming_loop (
195
+ layout : LayoutType [LayoutUpdate , LayoutEvent ], recv : RecvCoroutine
196
+ ) -> None :
154
197
while True :
155
- await layout .dispatch (await recv ())
198
+ await layout .deliver (await recv ())
156
199
157
200
158
201
async def _shared_outgoing_loop (
159
- send : SendCoroutine , queue : Queue [LayoutUpdate ]
202
+ send : SendCoroutine , queue : Queue [VdomJsonPatch ]
160
203
) -> None :
161
204
while True :
162
205
await send (await queue .get ())
0 commit comments