# -*- coding: utf-8 -*-
# pylint: disable=C0301
"""Service for executing jupyter notebooks interactively
Partially referenced the implementation of
https://github.com/geekan/MetaGPT/blob/main/metagpt/actions/di/execute_nb_code.py
"""
import base64
import asyncio
from loguru import logger
try:
import nbclient
import nbformat
except ImportError:
nbclient = None
nbformat = None
from ...manager import FileManager
from ..service_status import ServiceExecStatus
from ..service_response import ServiceResponse
[docs]
class NoteBookExecutor:
"""
Class for executing jupyter notebooks block interactively.
To use the service function, you should first init the class, then call the
run_code_on_notebook function.
Example:
```ipython
from agentscope.service.service_toolkit import *
from agentscope.service.execute_code.exec_notebook import *
nbe = NoteBookExecutor()
code = "print('helloworld')"
# calling directly
nbe.run_code_on_notebook(code)
>>> Executing function run_code_on_notebook with arguments:
>>> code: print('helloworld')
>>> END
# calling with service toolkit
service_toolkit = ServiceToolkit()
service_toolkit.add(nbe.run_code_on_notebook)
input_obs = [{"name": "run_code_on_notebook", "arguments":{"code": code}}]
res_of_string_input = service_toolkit.parse_and_call_func(input_obs)
"1. Execute function run_code_on_notebook\n [ARGUMENTS]:\n code: print('helloworld')\n [STATUS]: SUCCESS\n [RESULT]: ['helloworld\\n']\n"
```
""" # noqa
def __init__(
self,
timeout: int = 300,
) -> None:
"""
The construct function of the NoteBookExecutor.
Args:
timeout (Optional`int`):
The timeout for each cell execution.
Default to 300.
"""
if nbclient is None or nbformat is None:
raise ImportError(
"The package nbclient or nbformat is not found. Please "
"install it by `pip install notebook nbclient nbformat`",
)
self.nb = nbformat.v4.new_notebook()
self.nb_client = nbclient.NotebookClient(nb=self.nb)
self.timeout = timeout
asyncio.run(self._start_client())
def _output_parser(self, output: dict) -> str:
"""Parse the output of the notebook cell and return str"""
if output["output_type"] == "stream":
return output["text"]
elif output["output_type"] == "execute_result":
return output["data"]["text/plain"]
elif output["output_type"] == "display_data":
if "image/png" in output["data"]:
file_path = self._save_image(output["data"]["image/png"])
return f"Displayed image saved to {file_path}"
else:
return "Unsupported display type"
elif output["output_type"] == "error":
return output["traceback"]
else:
logger.info(f"Unsupported output encountered: {output}")
return "Unsupported output encountered"
async def _start_client(self) -> None:
"""start notebook client"""
if self.nb_client.kc is None or not await self.nb_client.kc.is_alive():
self.nb_client.create_kernel_manager()
self.nb_client.start_new_kernel()
self.nb_client.start_new_kernel_client()
async def _kill_client(self) -> None:
"""kill notebook client"""
if (
self.nb_client.km is not None
and await self.nb_client.km.is_alive()
):
await self.nb_client.km.shutdown_kernel(now=True)
await self.nb_client.km.cleanup_resources()
self.nb_client.kc.stop_channels()
self.nb_client.kc = None
self.nb_client.km = None
async def _restart_client(self) -> None:
"""Restart the notebook client"""
await self._kill_client()
self.nb_client = nbclient.NotebookClient(self.nb, timeout=self.timeout)
await self._start_client()
async def _run_cell(self, cell_index: int) -> ServiceResponse:
"""Run a cell in the notebook by its index"""
try:
self.nb_client.execute_cell(self.nb.cells[cell_index], cell_index)
return ServiceResponse(
status=ServiceExecStatus.SUCCESS,
content=[
self._output_parser(output)
for output in self.nb.cells[cell_index].outputs
],
)
except nbclient.exceptions.DeadKernelError:
await self.reset()
return ServiceResponse(
status=ServiceExecStatus.ERROR,
content="DeadKernelError when executing cell, reset kernel",
)
except nbclient.exceptions.CellTimeoutError:
assert self.nb_client.km is not None
await self.nb_client.km.interrupt_kernel()
return ServiceResponse(
status=ServiceExecStatus.ERROR,
content=(
"CellTimeoutError when executing cell"
", code execution timeout"
),
)
except Exception as e:
return ServiceResponse(
status=ServiceExecStatus.ERROR,
content=str(e),
)
@property
def cells_length(self) -> int:
"""return cell length"""
return len(self.nb.cells)
[docs]
async def async_run_code_on_notebook(self, code: str) -> ServiceResponse:
"""
Run the code on interactive notebook
"""
self.nb.cells.append(nbformat.v4.new_code_cell(code))
cell_index = self.cells_length - 1
return await self._run_cell(cell_index)
[docs]
def run_code_on_notebook(self, code: str) -> ServiceResponse:
"""
Run the code on interactive jupyter notebook.
Args:
code (`str`):
The Python code to be executed in the interactive notebook.
Returns:
`ServiceResponse`: whether the code execution was successful,
and the output of the code execution.
"""
return asyncio.run(self.async_run_code_on_notebook(code))
[docs]
def reset_notebook(self) -> ServiceResponse:
"""
Reset the notebook
"""
asyncio.run(self._restart_client())
return ServiceResponse(
status=ServiceExecStatus.SUCCESS,
content="Reset notebook",
)
def _save_image(self, image_base64: str) -> str:
"""Save image data to a file.
The image name is generated randomly here"""
image_data = base64.b64decode(image_base64)
file_manager = FileManager.get_instance()
return file_manager.save_image(image_data)