Source code for agentscope.service.sql_query.mongodb
# -*- coding: utf-8 -*-"""query in MongoDB """fromtypingimportOptional,Anyfrom..service_responseimportServiceResponsefrom...service.service_statusimportServiceExecStatustry:importpymongo.errorsexceptImportError:pymongo=None
[docs]defquery_mongodb(database:str,collection:str,query:dict,host:str,port:int,maxcount_results:Optional[int]=None,**kwargs:Any,)->ServiceResponse:"""Execute query within MongoDB database. Args: database (`str`): The name of the database to use. collection (`str`): The name of the collection to use in mongodb. query (`dict`): The mongodb query to execute. host (`str`): The hostname or IP address of the MongoDB server. port (`int`): The port number of MongoDB server. maxcount_results (`int`, defaults to `None`): The maximum number of results to return. Defaults to `100` to avoid too many results. **kwargs: Returns: `ServiceResponse`: A `ServiceResponse` object that contains execution results or error message. Note: MongoDB is a little different from mysql and sqlite, for its operations corresponds to different functions. Now we only support `find` query and leave other operations in the future. """try:# Establish connection to MongoDBwithpymongo.MongoClient(host=host,port=port,**kwargs,)asmongo_client:db=mongo_client[database]coll=db[collection]# Perform the queryifmaxcount_resultsisnotNone:results=coll.find(query).limit(maxcount_results)else:results=coll.find(query)# mongo_client.close()# Convert the cursor to a listdocuments=list(results)returnServiceResponse(status=ServiceExecStatus.SUCCESS,content=documents,)exceptExceptionase:# mongo_client.close()returnServiceResponse(status=ServiceExecStatus.ERROR,# TODO: more specific error messagecontent=str(e),)