startVirtualTurn method

Stream<String> startVirtualTurn({
  1. required Object conversationToken,
  2. required String messageJson,
  3. required List<({String role, String text})> history,
  4. String? systemMessage,
  5. String? toolsJson,
  6. double temperature = 0.8,
  7. int topK = 40,
  8. double? topP,
  9. int seed = 1,
  10. String? extraContext,
})

Implementation

Stream<String> startVirtualTurn({
  required Object conversationToken,
  required String messageJson,
  required List<({String role, String text})> history,
  String? systemMessage,
  String? toolsJson,
  double temperature = 0.8,
  int topK = 40,
  double? topP,
  int seed = 1,
  String? extraContext,
}) {
  // StreamController (not async*) so the mutex release is tied to the
  // controller lifecycle — it fires on done, error, AND consumer cancel /
  // abandon. An async* generator's finally only runs when the consumer
  // drains the stream, so an abandoned stream would hold the mutex forever
  // and deadlock every other session.
  final controller = StreamController<String>();
  var mutexHeld = false;
  StreamSubscription<String>? inner;

  Future<void> releaseAndCleanup() async {
    _virtualTurnInFlight = false;
    // Honor a teardown that a closing session deferred while we held the lock.
    if (_pendingReleaseToken != null) {
      final pending = _pendingReleaseToken;
      _pendingReleaseToken = null;
      if (_virtualActiveToken == pending) {
        final conv = _virtualConv;
        if (conv != null) {
          _deleteConversation(conv);
          _virtualConv = null;
          _virtualActiveToken = null;
        }
      }
    }
    if (mutexHeld) {
      mutexHeld = false;
      _nativeMutex.release();
    }
  }

  controller.onListen = () async {
    try {
      await _nativeMutex.acquire();
      mutexHeld = true;
      _virtualTurnInFlight = true;
      if (_virtualActiveToken != conversationToken || _virtualConv == null) {
        // Switching sessions (or first turn): drop the old live conversation
        // and rebuild one replaying this session's history as a preface.
        final old = _virtualConv;
        if (old != null) {
          _deleteConversation(old);
          _virtualConv = null;
        }
        final historyJson =
            history.isEmpty ? null : buildHistoryJson(history);
        _virtualConv = _createRawConversation(
          systemMessage: systemMessage,
          toolsJson: toolsJson,
          messagesJson: historyJson,
          temperature: temperature,
          topK: topK,
          topP: topP,
          seed: seed,
        );
        _virtualActiveToken = conversationToken;
      }
      inner = _doSendMessageStreamRawOn(_virtualConv!, messageJson,
              extraContext: extraContext)
          .listen(
        controller.add,
        onError: controller.addError,
        onDone: () async {
          await releaseAndCleanup();
          if (!controller.isClosed) await controller.close();
        },
        cancelOnError: false,
      );
    } catch (e, st) {
      controller.addError(e, st);
      await releaseAndCleanup();
      if (!controller.isClosed) await controller.close();
    }
  };

  // Fires when the consumer cancels / abandons the stream — guarantees the
  // mutex is released even if generation never completed.
  controller.onCancel = () async {
    await inner?.cancel();
    await releaseAndCleanup();
  };

  return controller.stream;
}