agentscope.web.workstation.workflow_node module

Workflow node opt.

class BingSearchServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Bing Search Node

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 3
class CopyNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node that duplicates the output of another node in the workflow.

CopyNode is used to replicate the results of a parent node and can be useful in workflows where the same output is needed for multiple subsequent operations.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 5
class DialogAgentNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a DialogAgent within a workflow.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 1
class DictDialogAgentNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a DictDialogAgent within a workflow.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 1
class ForLoopPipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a for-loop structure in a workflow.

ForLoopPipelineNode allows the execution of a pipeline node multiple times, iterating over a given set of inputs or a specified range.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 2
class GoogleSearchServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Google Search Node

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 3
class IfElsePipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing an if-else conditional structure in a workflow.

IfElsePipelineNode directs the flow of execution to different node nodes based on a specified condition.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 2
class ModelNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node that represents a model in a workflow.

The ModelNode can be used to load and execute a model as part of the workflow node. It initializes model configurations and performs model-related operations when called.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 0
class MsgHubNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node that serves as a messaging hub within a workflow.

MsgHubNode is responsible for broadcasting announcements to participants and managing the flow of messages within a workflow’s node.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 2
class MsgNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node that manages messaging within a workflow.

MsgNode is responsible for handling messages, creating message objects, and performing message-related operations when the node is invoked.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 4
class PlaceHolderNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A placeholder node within a workflow.

This node acts as a placeholder and can be used to pass through information or data without performing any significant operation.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 2
class PythonServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Execute python Node

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 3
class ReActAgentNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a ReActAgent within a workflow.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 1
class ReadTextServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Read Text Service Node

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 3
class SequentialPipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a sequential node within a workflow.

SequentialPipelineNode executes a series of operators or nodes in a sequence, where the output of one node is the input to the next.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 2
class SwitchPipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a switch-case structure within a workflow.

SwitchPipelineNode routes the execution to different node nodes based on the evaluation of a specified key or condition.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 2
class UserAgentNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a UserAgent within a workflow.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 1
class WhileLoopPipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a while-loop structure in a workflow.

WhileLoopPipelineNode enables conditional repeated execution of a node node based on a specified condition.

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 2
class WorkflowNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:ABC

Abstract base class representing a generic node in a workflow.

WorkflowNode is designed to be subclassed with specific logic implemented in the subclass methods. It provides an interface for initialization and execution of operations when the node is called.

abstract compile() dict[源代码]

Compile Node to python executable code dict

node_type = None
class WorkflowNodeType(value)[源代码]

基类:IntEnum

Enum for workflow node.

AGENT = 1
COPY = 5
MESSAGE = 4
MODEL = 0
PIPELINE = 2
SERVICE = 3
class WriteTextServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Write Text Service Node

compile() dict[源代码]

Compile Node to python executable code dict

node_type = 3
get_all_agents(node: WorkflowNode, seen_agents: set | None = None, return_var: bool = False) List[源代码]

Retrieve all unique agent objects from a pipeline.

Recursively traverses the pipeline to collect all distinct agent-based participants. Prevents duplication by tracking already seen agents.

参数:
  • node (WorkflowNode) – The WorkflowNode from which to extract agents.

  • seen_agents (set, optional) – A set of agents that have already been seen to avoid duplication. Defaults to None.

返回:

A list of unique agent objects found in the pipeline.

返回类型:

list