@@ -1507,8 +1507,9 @@ out into the `CopyEnd` class that is derived below:
15071507``` python
15081508class CopyState (Enum ):
15091509 IDLE = 1
1510- COPYING = 2
1511- DONE = 3
1510+ SYNC_COPYING = 2
1511+ ASYNC_COPYING = 3
1512+ DONE = 4
15121513
15131514class CopyEnd (Waitable ):
15141515 state: CopyState
@@ -1517,14 +1518,19 @@ class CopyEnd(Waitable):
15171518 Waitable.__init__ (self )
15181519 self .state = CopyState.IDLE
15191520
1521+ def copying (self ):
1522+ return self .state == CopyState.SYNC_COPYING or self .state == CopyState.ASYNC_COPYING
1523+
15201524 def drop (self ):
1521- trap_if(self .state == CopyState. COPYING )
1525+ trap_if(self .copying() )
15221526 Waitable.drop(self )
15231527```
15241528As shown in ` drop ` , attempting to drop a readable or writable end while a copy
15251529is in progress traps. This means that client code must take care to wait for
15261530these operations to finish (potentially cancelling them via
1527- ` stream.cancel-{read,write} ` ) before dropping.
1531+ ` stream.cancel-{read,write} ` ) before dropping. The ` SYNC_COPY ` vs. ` ASYNC_COPY `
1532+ distinction is tracked in the state to determine whether the copy operation can
1533+ be cancelled.
15281534
15291535Given the above, we can define the concrete ` {Readable,Writable}StreamEnd `
15301536classes which are almost entirely symmetric, with the only difference being
@@ -4083,7 +4089,6 @@ until this point into a single `i32` payload for core wasm.
40834089``` python
40844090 def stream_event (result , reclaim_buffer ):
40854091 reclaim_buffer()
4086- assert (e.state == CopyState.COPYING )
40874092 if result == CopyResult.DROPPED :
40884093 e.state = CopyState.DONE
40894094 else :
@@ -4099,7 +4104,6 @@ until this point into a single `i32` payload for core wasm.
40994104 def on_copy_done (result ):
41004105 e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda :()))
41014106
4102- e.state = CopyState.COPYING
41034107 e.copy(thread.task.inst, buffer, on_copy, on_copy_done)
41044108```
41054109
@@ -4111,8 +4115,10 @@ synchronously and return `BLOCKED` if not:
41114115``` python
41124116 if not e.has_pending_event():
41134117 if opts.sync:
4118+ e.state = CopyState.SYNC_COPYING
41144119 thread.suspend_until(e.has_pending_event)
41154120 else :
4121+ e.state = CopyState.ASYNC_COPYING
41164122 return [BLOCKED ]
41174123 code,index,payload = e.get_pending_event()
41184124 assert (code == event_code and index == i and payload != BLOCKED )
@@ -4177,7 +4183,6 @@ of elements copied is not packed in the high 28 bits; they're always zero.
41774183``` python
41784184 def future_event (result ):
41794185 assert ((buffer.remain() == 0 ) == (result == CopyResult.COMPLETED ))
4180- assert (e.state == CopyState.COPYING )
41814186 if result == CopyResult.DROPPED or result == CopyResult.COMPLETED :
41824187 e.state = CopyState.DONE
41834188 else :
@@ -4188,7 +4193,6 @@ of elements copied is not packed in the high 28 bits; they're always zero.
41884193 assert (result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE )
41894194 e.set_pending_event(partial(future_event, result))
41904195
4191- e.state = CopyState.COPYING
41924196 e.copy(thread.task.inst, buffer, on_copy_done)
41934197```
41944198
@@ -4197,8 +4201,10 @@ and returning either the progress made or `BLOCKED`.
41974201``` python
41984202 if not e.has_pending_event():
41994203 if opts.sync:
4204+ e.state = CopyState.SYNC_COPYING
42004205 thread.suspend_until(e.has_pending_event)
42014206 else :
4207+ e.state = CopyState.ASYNC_COPYING
42024208 return [BLOCKED ]
42034209 code,index,payload = e.get_pending_event()
42044210 assert (code == event_code and index == i)
@@ -4240,7 +4246,7 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, thread, i):
42404246 e = thread.task.inst.table.get(i)
42414247 trap_if(not isinstance (e, EndT))
42424248 trap_if(e.shared.t != stream_or_future_t.t)
4243- trap_if(e.state != CopyState.COPYING )
4249+ trap_if(e.state != CopyState.ASYNC_COPYING )
42444250 if not e.has_pending_event():
42454251 e.shared.cancel()
42464252 if not e.has_pending_event():
@@ -4249,9 +4255,12 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, thread, i):
42494255 else :
42504256 return [BLOCKED ]
42514257 code,index,payload = e.get_pending_event()
4252- assert (e.state != CopyState. COPYING and code == event_code and index == i)
4258+ assert (not e.copying() and code == event_code and index == i)
42534259 return [payload]
42544260```
4261+ Cancellation traps if there is not currently an async copy in progress (sync
4262+ copies do not expect or check for cancellation and thus cannot be cancelled).
4263+
42554264The * first* check for ` e.has_pending_event() ` catches the case where the copy has
42564265already racily finished, in which case we must * not* call ` cancel() ` . Calling
42574266` cancel() ` may, but is not required to, recursively call one of the ` on_* `
0 commit comments