startVirtualTurn method
Stream<String>
startVirtualTurn(
{ - required Object conversationToken,
- required String messageJson,
- required List<({String role, String text})> history,
- String? systemMessage,
- String? toolsJson,
- double temperature = 0.8,
- int topK = 40,
- double? topP,
- int seed = 1,
})
Implementation
Stream<String> startVirtualTurn({
required Object conversationToken,
required String messageJson,
required List<({String role, String text})> history,
String? systemMessage,
String? toolsJson,
double temperature = 0.8,
int topK = 40,
double? topP,
int seed = 1,
String? extraContext,
}) {
// StreamController (not async*) so the mutex release is tied to the
// controller lifecycle — it fires on done, error, AND consumer cancel /
// abandon. An async* generator's finally only runs when the consumer
// drains the stream, so an abandoned stream would hold the mutex forever
// and deadlock every other session.
final controller = StreamController<String>();
var mutexHeld = false;
StreamSubscription<String>? inner;
Future<void> releaseAndCleanup() async {
_virtualTurnInFlight = false;
// Honor a teardown that a closing session deferred while we held the lock.
if (_pendingReleaseToken != null) {
final pending = _pendingReleaseToken;
_pendingReleaseToken = null;
if (_virtualActiveToken == pending) {
final conv = _virtualConv;
if (conv != null) {
_deleteConversation(conv);
_virtualConv = null;
_virtualActiveToken = null;
}
}
}
if (mutexHeld) {
mutexHeld = false;
_nativeMutex.release();
}
}
controller.onListen = () async {
try {
await _nativeMutex.acquire();
mutexHeld = true;
_virtualTurnInFlight = true;
if (_virtualActiveToken != conversationToken || _virtualConv == null) {
// Switching sessions (or first turn): drop the old live conversation
// and rebuild one replaying this session's history as a preface.
final old = _virtualConv;
if (old != null) {
_deleteConversation(old);
_virtualConv = null;
}
final historyJson =
history.isEmpty ? null : buildHistoryJson(history);
_virtualConv = _createRawConversation(
systemMessage: systemMessage,
toolsJson: toolsJson,
messagesJson: historyJson,
temperature: temperature,
topK: topK,
topP: topP,
seed: seed,
);
_virtualActiveToken = conversationToken;
}
inner = _doSendMessageStreamRawOn(_virtualConv!, messageJson,
extraContext: extraContext)
.listen(
controller.add,
onError: controller.addError,
onDone: () async {
await releaseAndCleanup();
if (!controller.isClosed) await controller.close();
},
cancelOnError: false,
);
} catch (e, st) {
controller.addError(e, st);
await releaseAndCleanup();
if (!controller.isClosed) await controller.close();
}
};
// Fires when the consumer cancels / abandons the stream — guarantees the
// mutex is released even if generation never completed.
controller.onCancel = () async {
await inner?.cancel();
await releaseAndCleanup();
};
return controller.stream;
}