|  | 
| 1 | 1 | import 'dart:async'; | 
| 2 | 2 | import 'dart:convert'; | 
| 3 | 3 | 
 | 
|  | 4 | +import 'package:sqlite3/common.dart'; | 
|  | 5 | + | 
| 4 | 6 | import '../sqlite_connection.dart'; | 
| 5 | 7 | 
 | 
| 6 | 8 | Future<T> internalReadTransaction<T>(SqliteReadContext ctx, | 
| @@ -75,3 +77,87 @@ Object? mapParameter(Object? parameter) { | 
| 75 | 77 | List<Object?> mapParameters(List<Object?> parameters) { | 
| 76 | 78 |   return [for (var p in parameters) mapParameter(p)]; | 
| 77 | 79 | } | 
|  | 80 | + | 
|  | 81 | +extension ThrottledUpdates on CommonDatabase { | 
|  | 82 | +  /// An unthrottled stream of updated tables that emits on every commit. | 
|  | 83 | +  /// | 
|  | 84 | +  /// A paused subscription on this stream will buffer changed tables into a | 
|  | 85 | +  /// growing set instead of losing events, so this stream is simple to throttle | 
|  | 86 | +  /// downstream. | 
|  | 87 | +  Stream<Set<String>> get updatedTables { | 
|  | 88 | +    final listeners = <_UpdateListener>[]; | 
|  | 89 | +    var uncommitedUpdates = <String>{}; | 
|  | 90 | +    var underlyingSubscriptions = <StreamSubscription<void>>[]; | 
|  | 91 | + | 
|  | 92 | +    void handleUpdate(SqliteUpdate update) { | 
|  | 93 | +      uncommitedUpdates.add(update.tableName); | 
|  | 94 | +    } | 
|  | 95 | + | 
|  | 96 | +    void afterCommit() { | 
|  | 97 | +      for (final listener in listeners) { | 
|  | 98 | +        listener.notify(uncommitedUpdates); | 
|  | 99 | +      } | 
|  | 100 | + | 
|  | 101 | +      uncommitedUpdates.clear(); | 
|  | 102 | +    } | 
|  | 103 | + | 
|  | 104 | +    void afterRollback() { | 
|  | 105 | +      uncommitedUpdates.clear(); | 
|  | 106 | +    } | 
|  | 107 | + | 
|  | 108 | +    void addListener(_UpdateListener listener) { | 
|  | 109 | +      listeners.add(listener); | 
|  | 110 | + | 
|  | 111 | +      if (listeners.length == 1) { | 
|  | 112 | +        // First listener, start listening for raw updates on underlying | 
|  | 113 | +        // database. | 
|  | 114 | +        underlyingSubscriptions = [ | 
|  | 115 | +          updatesSync.listen(handleUpdate), | 
|  | 116 | +          commits.listen((_) => afterCommit()), | 
|  | 117 | +          commits.listen((_) => afterRollback()) | 
|  | 118 | +        ]; | 
|  | 119 | +      } | 
|  | 120 | +    } | 
|  | 121 | + | 
|  | 122 | +    void removeListener(_UpdateListener listener) { | 
|  | 123 | +      listeners.remove(listener); | 
|  | 124 | +      if (listeners.isEmpty) { | 
|  | 125 | +        for (final sub in underlyingSubscriptions) { | 
|  | 126 | +          sub.cancel(); | 
|  | 127 | +        } | 
|  | 128 | +      } | 
|  | 129 | +    } | 
|  | 130 | + | 
|  | 131 | +    return Stream.multi( | 
|  | 132 | +      (listener) { | 
|  | 133 | +        final wrapped = _UpdateListener(listener); | 
|  | 134 | +        addListener(wrapped); | 
|  | 135 | + | 
|  | 136 | +        listener.onResume = wrapped.addPending; | 
|  | 137 | +        listener.onCancel = () => removeListener(wrapped); | 
|  | 138 | +      }, | 
|  | 139 | +      isBroadcast: true, | 
|  | 140 | +    ); | 
|  | 141 | +  } | 
|  | 142 | +} | 
|  | 143 | + | 
|  | 144 | +class _UpdateListener { | 
|  | 145 | +  final MultiStreamController<Set<String>> downstream; | 
|  | 146 | +  Set<String> buffered = {}; | 
|  | 147 | + | 
|  | 148 | +  _UpdateListener(this.downstream); | 
|  | 149 | + | 
|  | 150 | +  void notify(Set<String> pendingUpdates) { | 
|  | 151 | +    buffered.addAll(pendingUpdates); | 
|  | 152 | +    if (!downstream.isPaused) { | 
|  | 153 | +      addPending(); | 
|  | 154 | +    } | 
|  | 155 | +  } | 
|  | 156 | + | 
|  | 157 | +  void addPending() { | 
|  | 158 | +    if (buffered.isNotEmpty) { | 
|  | 159 | +      downstream.add(buffered); | 
|  | 160 | +      buffered = {}; | 
|  | 161 | +    } | 
|  | 162 | +  } | 
|  | 163 | +} | 
0 commit comments