agentscope.web.workstation.workflow_node module

Workflow node opt.

class agentscope.web.workstation.workflow_node.WorkflowNodeType(value)[源代码]

基类:IntEnum

Enum for workflow node.

MODEL = 0
AGENT = 1
PIPELINE = 2
SERVICE = 3
MESSAGE = 4
COPY = 5
class agentscope.web.workstation.workflow_node.WorkflowNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:ABC

Abstract base class representing a generic node in a workflow.

WorkflowNode is designed to be subclassed with specific logic implemented in the subclass methods. It provides an interface for initialization and execution of operations when the node is called.

node_type = None
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

abstract compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.ModelNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node that represents a model in a workflow.

The ModelNode can be used to load and execute a model as part of the workflow node. It initializes model configurations and performs model-related operations when called.

node_type = 0
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.MsgNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node that manages messaging within a workflow.

MsgNode is responsible for handling messages, creating message objects, and performing message-related operations when the node is invoked.

node_type = 4
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.DialogAgentNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a DialogAgent within a workflow.

node_type = 1
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.UserAgentNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a UserAgent within a workflow.

node_type = 1
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.DictDialogAgentNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a DictDialogAgent within a workflow.

node_type = 1
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.ReActAgentNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a ReActAgent within a workflow.

node_type = 1
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.MsgHubNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node that serves as a messaging hub within a workflow.

MsgHubNode is responsible for broadcasting announcements to participants and managing the flow of messages within a workflow’s node.

node_type = 2
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.PlaceHolderNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A placeholder node within a workflow.

This node acts as a placeholder and can be used to pass through information or data without performing any significant operation.

node_type = 2
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.SequentialPipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a sequential node within a workflow.

SequentialPipelineNode executes a series of operators or nodes in a sequence, where the output of one node is the input to the next.

node_type = 2
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.ForLoopPipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a for-loop structure in a workflow.

ForLoopPipelineNode allows the execution of a pipeline node multiple times, iterating over a given set of inputs or a specified range.

node_type = 2
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.WhileLoopPipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a while-loop structure in a workflow.

WhileLoopPipelineNode enables conditional repeated execution of a node node based on a specified condition.

node_type = 2
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.IfElsePipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing an if-else conditional structure in a workflow.

IfElsePipelineNode directs the flow of execution to different node nodes based on a specified condition.

node_type = 2
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.SwitchPipelineNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node representing a switch-case structure within a workflow.

SwitchPipelineNode routes the execution to different node nodes based on the evaluation of a specified key or condition.

node_type = 2
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.CopyNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

A node that duplicates the output of another node in the workflow.

CopyNode is used to replicate the results of a parent node and can be useful in workflows where the same output is needed for multiple subsequent operations.

node_type = 5
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.BingSearchServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Bing Search Node

node_type = 3
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.GoogleSearchServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Google Search Node

node_type = 3
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.PythonServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Execute python Node

node_type = 3
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.ReadTextServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Read Text Service Node

node_type = 3
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

class agentscope.web.workstation.workflow_node.WriteTextServiceNode(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list)[源代码]

基类:WorkflowNode

Write Text Service Node

node_type = 3
__init__(node_id: str, opt_kwargs: dict, source_kwargs: dict, dep_opts: list) None[源代码]

Initialize nodes. Implement specific initialization logic in subclasses.

compile() dict[源代码]

Compile Node to python executable code dict

agentscope.web.workstation.workflow_node.get_all_agents(node: WorkflowNode, seen_agents: set | None = None, return_var: bool = False) List[源代码]

Retrieve all unique agent objects from a pipeline.

Recursively traverses the pipeline to collect all distinct agent-based participants. Prevents duplication by tracking already seen agents.

参数:
  • node (WorkflowNode) – The WorkflowNode from which to extract agents.

  • seen_agents (set, optional) – A set of agents that have already been seen to avoid duplication. Defaults to None.

返回:

A list of unique agent objects found in the pipeline.

返回类型:

list