# -*- coding: utf-8 -*-"""Download file from URL."""importosimportrequestsfromtqdmimporttqdmfromagentscope.serviceimportServiceResponse,ServiceExecStatus
[docs]defdownload_from_url(url:str,filepath:str,timeout:int=120,retries:int=3,)->ServiceResponse:"""Download file from the given url to the specified location. Args: url (`str`): The URL of the file to download. filepath (`str`): The path to save the downloaded file. timeout (`int`, defaults to `120`): The timeout for the download request. retries (`int`, defaults to `3`): The number of retries for the download request. Returns: `ServiceResponse`: A `ServiceResponse` object that contains execution results or error message. """# Check if the target file exists alreadyifos.path.exists(filepath):returnServiceResponse(status=ServiceExecStatus.ERROR,content=f"The file {filepath} already exists.",)# Download the filetry:session=requests.Session()response=session.get(url,stream=True,timeout=timeout)response.raise_for_status()file_size=int(response.headers.get("content-length",0))chunk_size=1024*32# 32 KBprogress_bar=tqdm(total=file_size,unit="iB",unit_scale=True)withopen(filepath,"wb")asfile:forchunkinresponse.iter_content(chunk_size=chunk_size):progress_bar.update(len(chunk))file.write(chunk)progress_bar.close()returnServiceResponse(status=ServiceExecStatus.SUCCESS,content={"url":url,"saved_file_path":filepath,},)exceptrequests.exceptions.RequestExceptionase:ifretries>0:# remove the incomplete fileifos.path.exists(filepath):os.remove(filepath)# retry the downloadreturndownload_from_url(url,filepath,timeout,retries-1)else:returnServiceResponse(status=ServiceExecStatus.ERROR,content=f"Failed to download file from {url}: {str(e)}",)