agentscope.evaluate._evaluator._general_evaluator 源代码

# -*- coding: utf-8 -*-
"""General evaluator implementation in AgentScope, which is easy to debug
compared to the RayEvaluator."""
from typing import Callable, Awaitable, Coroutine, Any

from ._evaluator_base import EvaluatorBase
from .._evaluator_storage import EvaluatorStorageBase
from .._task import Task
from .._solution import SolutionOutput
from .._benchmark_base import BenchmarkBase


[文档] class GeneralEvaluator(EvaluatorBase): """The general evaluator that support users to debug their evaluation"""
[文档] def __init__( self, name: str, benchmark: BenchmarkBase, n_repeat: int, storage: EvaluatorStorageBase, n_workers: int, ) -> None: """Initialize the evaluator.""" super().__init__( name=name, benchmark=benchmark, n_repeat=n_repeat, storage=storage, ) assert isinstance(benchmark, BenchmarkBase) assert n_repeat >= 1, "n_repeat must be at least 1" assert n_workers >= 1, "n_workers must be at least 1" self.benchmark = benchmark self.n_repeat = n_repeat self.n_workers = n_workers
[文档] def run_evaluation( self, task: Task, repeat_id: str, solution_output: SolutionOutput, ) -> None: """Run the evaluation for a task and solution result.""" evaluation_results = task.evaluate(solution_output) # store the evaluation result for result in evaluation_results: self.storage.save_evaluation_result( task_id=task.id, repeat_id=repeat_id, evaluation=result, )
[文档] async def run_solution( self, repeat_id: str, task: Task, solution: Callable[[Task, Callable], Awaitable[SolutionOutput]], ) -> None: """Generate a solution to a task and evaluate.""" if self.storage.solution_result_exists(task.id, repeat_id): # Obtain from storage solution_result = self.storage.get_solution_result( task.id, repeat_id, ) else: # Run the solution solution_result = await solution( task, self.storage.get_agent_pre_print_hook( task.id, repeat_id, ), ) self.storage.save_solution_result( task.id, repeat_id, solution_result, ) # Evaluate the solution with the for metric in task.metrics: if not self.storage.evaluation_result_exists( task.id, repeat_id, metric.name, ): self.run_evaluation( task, repeat_id, solution_result, )
[文档] async def run( self, solution: Callable[ [Task, Callable], Coroutine[Any, Any, SolutionOutput], ], ) -> None: """Run the ray-based distributed and parallel evaluation, and get the results. Args: solution (`Callable[[Task, Callable], Coroutine[Any, Any, \ SolutionOutput]]`): A async function that takes a `Task` instance and a pre-print hook function as input, returns a `SolutionOutput` instance. """ await self._save_evaluation_meta() for repeat_id in range(self.n_repeat): for task in self.benchmark: await self.run_solution( str(repeat_id), task, solution, ) await self.aggregate()