[docs]classNacosAgentCardResolver(AgentCardResolverBase):"""Nacos-based A2A Agent Card resolver. Nacos is a dynamic service discovery, configuration and service management platform for building cloud native applications. This resolver fetches the agent card from a Nacos server and subscribes to updates. """
[docs]def__init__(self,remote_agent_name:str,nacos_client_config:ClientConfig,version:str|None=None,)->None:"""Initialize the nacos agent card resolver. Args: remote_agent_name (`str`): Name of the remote agent in Nacos. nacos_client_config (`ClientConfig | None`, optional): Nacos client configuration, where a `server_addresses` parameter is required. version (`str | None`, optional): Version of the agent card to fetch. If None, fetches the latest version. This version is also used when subscribing to agent card updates. Defaults to None (latest version). """ifnotremote_agent_name:raiseValueError("The remote_agent_name cannot be empty.",)ifnotnacos_client_config:raiseValueError("The nacos_client_config cannot be None.",)self._nacos_client_config=nacos_client_configself._remote_agent_name=remote_agent_nameself._version=version
[docs]asyncdefget_agent_card(self)->AgentCard:"""Get agent card from Nacos with lazy initialization. Returns: `AgentCard`: The resolved agent card from Nacos. """try:fromv2.nacos.ai.model.ai_paramimportGetAgentCardParamfromv2.nacos.ai.nacos_ai_serviceimportNacosAIServiceexceptImportErrorase:raiseImportError("Please install the nacos sdk by running `pip install ""nacos-sdk-python>=3.0.0` first.",)fromeclient=Nonetry:client=awaitNacosAIService.create_ai_service(self._nacos_client_config,)awaitclient.start()returnawaitclient.get_agent_card(GetAgentCardParam(agent_name=self._remote_agent_name,version=self._version,),)finally:ifclient:# Close the Nacos client to free resourcestry:awaitclient.shutdown()exceptExceptionase:logger.warning("Failed to shutdown Nacos client: %s",str(e),)