Skip to content

RealTimeConfig

autogen.beta.live.gemini.RealTimeConfig #

RealTimeConfig(model, *, output=None, input=None, max_output_tokens=None, temperature=None, config=None, client=None)

Bases: RealtimeConfig

Realtime config backed by Gemini's bidirectional Live API.

Implements the RealtimeConfig protocol — call session(...) to open a websocket connection that pumps captured audio into the API and emits transcription, audio, and tool-call events on the supplied context.

Source code in autogen/beta/live/gemini.py
def __init__(
    self,
    model: "ModelName | str",
    *,
    output: AudioOutput | TextOutput | None = None,
    input: InputConfig | None = None,
    max_output_tokens: int | None = None,
    temperature: float | None = None,
    config: gtypes.LiveConnectConfigDict | None = None,
    client: Client | None = None,
) -> None:
    self.model = model

    if output is None:
        output = AudioOutput()
    if input is None:
        input = InputConfig()

    base: gtypes.LiveConnectConfigDict = {}
    if temperature is not None:
        base["temperature"] = temperature
    if max_output_tokens is not None:
        base["max_output_tokens"] = max_output_tokens

    if isinstance(output, AudioOutput):
        base["response_modalities"] = [gtypes.Modality.AUDIO]
        speech: gtypes.SpeechConfigDict = {
            "voice_config": {"prebuilt_voice_config": {"voice_name": output.voice}},
        }
        if output.language_code is not None:
            speech["language_code"] = output.language_code
        base["speech_config"] = speech
        # Surface assistant text alongside the audio so observers can
        # consume `ModelMessageChunk` without parsing audio bytes.
        base["output_audio_transcription"] = {}
    else:
        base["response_modalities"] = [gtypes.Modality.TEXT]

    if input.transcribe:
        input_tx: gtypes.AudioTranscriptionConfigDict = {}
        if input.transcription_languages is not None:
            input_tx["language_codes"] = list(input.transcription_languages)
        base["input_audio_transcription"] = input_tx

    realtime: gtypes.RealtimeInputConfigDict = {}
    if input.automatic_activity_detection is not None:
        realtime["automatic_activity_detection"] = input.automatic_activity_detection
    if input.activity_handling is not None:
        realtime["activity_handling"] = input.activity_handling
    if input.turn_coverage is not None:
        realtime["turn_coverage"] = input.turn_coverage
    if realtime:
        base["realtime_input_config"] = realtime

    self._config: gtypes.LiveConnectConfigDict = base
    self._config_overrides: gtypes.LiveConnectConfigDict = config or {}

    self.client = client or Client()

model instance-attribute #

model = model

client instance-attribute #

client = client or Client()

session async #

session(context, *, instructions=(), tools=(), serializer)
Source code in autogen/beta/live/gemini.py
@asynccontextmanager
async def session(
    self,
    context: ConversationContext,
    *,
    instructions: Iterable[str] = (),
    tools: Iterable[ToolSchema] = (),
    serializer: SerializerProto,
) -> AsyncIterator[None]:
    final_config = self._build_session(instructions=instructions, tools=tools)

    async with self.client.aio.live.connect(model=self.model, config=final_config) as session:

        async def _pump_audio(event: RecordedAudioEvent) -> None:
            await session.send_realtime_input(
                audio=gtypes.Blob(data=event.content, mime_type=INPUT_MIME_TYPE),
            )

        async def _forward_tool_result(event: ToolResultEvent) -> None:
            await _send_tool_result(session, event, serializer)

        with (
            context.stream.where(RecordedAudioEvent).sub_scope(_pump_audio),
            context.stream.where(ToolResultEvent).sub_scope(_forward_tool_result),
        ):
            recv_task = asyncio.create_task(_pump_events(session, context))

            try:
                yield

            finally:
                recv_task.cancel()
                with suppress(asyncio.CancelledError):
                    await recv_task