autogen_ext.models.cache#

class ChatCompletionCache(client: ChatCompletionClient, store: CacheStore[CreateResult | List[str | CreateResult]] | None = None)[Quelle]#

Bases: ChatCompletionClient, Component[ChatCompletionCacheConfig]

Ein Wrapper um einen ChatCompletionClient, der Erstellungsergebnisse von einem zugrunde liegenden Client zwischenspeichert. Cache-Treffer tragen nicht zur Token-Nutzung des ursprünglichen Clients bei.

Typische Verwendung

Lassen Sie uns das Caching auf der Festplatte mit dem openai-Client als Beispiel verwenden. Installieren Sie zuerst autogen-ext mit den erforderlichen Paketen

pip install -U "autogen-ext[openai, diskcache]"

Und verwenden Sie es so

import asyncio
import tempfile

from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.diskcache import DiskCacheStore
from diskcache import Cache


async def main():
    with tempfile.TemporaryDirectory() as tmpdirname:
        # Initialize the original client
        openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

        # Then initialize the CacheStore, in this case with diskcache.Cache.
        # You can also use redis like:
        # from autogen_ext.cache_store.redis import RedisStore
        # import redis
        # redis_instance = redis.Redis()
        # cache_store = RedisCacheStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
        cache_store = DiskCacheStore[CHAT_CACHE_VALUE_TYPE](Cache(tmpdirname))
        cache_client = ChatCompletionCache(openai_model_client, cache_store)

        response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
        print(response)  # Should print response from OpenAI
        response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
        print(response)  # Should print cached response


asyncio.run(main())

Für Redis-Caching

import asyncio

from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.redis import RedisStore
import redis


async def main():
    # Initialize the original client
    openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Initialize Redis cache store
    redis_instance = redis.Redis()
    cache_store = RedisStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
    cache_client = ChatCompletionCache(openai_model_client, cache_store)

    response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
    print(response)  # Should print response from OpenAI
    response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
    print(response)  # Should print cached response


asyncio.run(main())

Für Streaming mit Redis-Caching

import asyncio

from autogen_core.models import UserMessage, CreateResult
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.redis import RedisStore
import redis


async def main():
    # Initialize the original client
    openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Initialize Redis cache store
    redis_instance = redis.Redis()
    cache_store = RedisStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
    cache_client = ChatCompletionCache(openai_model_client, cache_store)

    # First streaming call
    async for chunk in cache_client.create_stream(
        [UserMessage(content="List all countries in Africa", source="user")]
    ):
        if isinstance(chunk, CreateResult):
            print("\n")
            print("Cached: ", chunk.cached)  # Should print False
        else:
            print(chunk, end="")

    # Second streaming call (cached)
    async for chunk in cache_client.create_stream(
        [UserMessage(content="List all countries in Africa", source="user")]
    ):
        if isinstance(chunk, CreateResult):
            print("\n")
            print("Cached: ", chunk.cached)  # Should print True
        else:
            print(chunk, end="")


asyncio.run(main())

Sie können nun den cached_client wie den ursprünglichen Client verwenden, jedoch mit aktiviertem Caching.

Parameter:
  • client (ChatCompletionClient) – Der ursprüngliche ChatCompletionClient, der umschlossen werden soll.

  • store (CacheStore) – Ein Store-Objekt, das get- und set-Methoden implementiert. Der Benutzer ist für die Verwaltung des Lebenszyklus des Stores und dessen Löschung (falls erforderlich) verantwortlich. Standardmäßig wird der In-Memory-Cache verwendet.

component_type: ClassVar[ComponentType] = 'chat_completion_cache'#

Der logische Typ der Komponente.

component_provider_override: ClassVar[str | None] = 'autogen_ext.models.cache.ChatCompletionCache'#

Überschreibe den Anbieter-String für die Komponente. Dies sollte verwendet werden, um zu verhindern, dass interne Modulnamen Teil des Modulnamens werden.

component_config_schema#

Alias von ChatCompletionCacheConfig

async create(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = [], tool_choice: Tool | Literal['auto', 'required', 'none'] = 'auto', json_output: bool | type[BaseModel] | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) CreateResult[Quelle]#

Zwischengespeicherte Version von ChatCompletionClient.create. Wenn das Ergebnis eines Aufrufs von create zwischengespeichert wurde, wird es sofort zurückgegeben, ohne den zugrunde liegenden Client aufzurufen.

HINWEIS: cancellation_token wird für zwischengespeicherte Ergebnisse ignoriert.

create_stream(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = [], tool_choice: Tool | Literal['auto', 'required', 'none'] = 'auto', json_output: bool | type[BaseModel] | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) AsyncGenerator[str | CreateResult, None][Quelle]#

Zwischengespeicherte Version von ChatCompletionClient.create_stream. Wenn das Ergebnis eines Aufrufs von create_stream zwischengespeichert wurde, wird es zurückgegeben, ohne vom zugrunde liegenden Client zu streamen.

HINWEIS: cancellation_token wird für zwischengespeicherte Ergebnisse ignoriert.

async close() None[Quelle]#
actual_usage() RequestUsage[Quelle]#
count_tokens(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = []) int[Quelle]#
property capabilities: ModelCapabilities#
property model_info: ModelInfo#
remaining_tokens(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = []) int[Quelle]#
total_usage() RequestUsage[Quelle]#
_to_config() ChatCompletionCacheConfig[Quelle]#

Gib die Konfiguration aus, die erforderlich wäre, um eine neue Instanz einer Komponente zu erstellen, die der Konfiguration dieser Instanz entspricht.

Gibt zurück:

T – Die Konfiguration der Komponente.

classmethod _from_config(config: ChatCompletionCacheConfig) Self[Quelle]#

Erstelle eine neue Instanz der Komponente aus einem Konfigurationsobjekt.

Parameter:

config (T) – Das Konfigurationsobjekt.

Gibt zurück:

Self – Die neue Instanz der Komponente.