Source code for agentscope.service.sql_query.mysql
# -*- coding: utf-8 -*-"""query in Mysql """fromtypingimportOptionalfromtypingimportAnyfrom..service_responseimportServiceResponsefrom...utils.commonimport_if_change_databasefrom...service.service_statusimportServiceExecStatustry:importpymysqlexceptImportError:pymysql=None
[docs]defquery_mysql(database:str,query:str,host:str,user:str,password:str,port:int,allow_change_data:bool=False,maxcount_results:Optional[int]=None,**kwargs:Any,)->ServiceResponse:""" Execute query within MySQL database. Args: database (`str`): The name of the database to use. query (`str`): SQL query to execute. host (`str`): The host name or IP address of the MySQL server, e.g. "localhost". user (`str`): The username of the MySQL account to use. password (`str`): The password of the MySQL account to use. port (`str`): The port number of the MySQL server, e.g. 3306. allow_change_data (`bool`, defaults to `False`): Whether to allow changing data in the database. Defaults to `False` to avoid accidental changes to the database. maxcount_results (`int`, defaults to `None`): The maximum number of results to return. Defaults to `100` to avoid too many results. Returns: `ServiceResponse`: A `ServiceResponse` object that contains execution results or error message. """# Check if the query is safeifnotallow_change_dataandnot_if_change_database(query):raiseValueError("Unsafe SQL query detected. Only SELECT statements are allowed. ""If you want to allow changing data in the database, ""set `allow_change_data` to `True`.",)# Limit the number of results by adding LIMIT keywords if necessaryifmaxcount_resultsisnotNone:if"limit"notinquery.lower():query+=f" LIMIT {maxcount_results}"# Execute the querytry:# Establish a connection to the databaseconn=pymysql.connect(host=host,port=port,user=user,password=password,database=database,**kwargs,)cursor=conn.cursor()cursor.execute(query)if_if_change_database(query):conn.commit()cursor.close()conn.close()# Fetch the resultsresults=cursor.fetchall()returnServiceResponse(status=ServiceExecStatus.SUCCESS,content=results,)exceptExceptionase:returnServiceResponse(status=ServiceExecStatus.ERROR,# TODO: more specific error messagecontent=str(e),)