Eino: 概述

简介

Eino[‘aino] (近似音: i know,希望应用程序达到 “i know” 的愿景) 旨在提供基于 Golang 语言的终极大模型应用开发框架。 它从开源社区中的诸多优秀 LLM 应用开发框架,如 LangChain 和 LlamaIndex 等获取灵感,同时借鉴前沿研究成果与实际应用,提供了一个强调简洁性、可扩展性、可靠性与有效性,且更符合 Go 语言编程惯例的 LLM 应用开发框架。

Eino 提供的价值如下:

  • 精心整理的一系列 组件(component) 抽象与实现,可轻松复用与组合,用于构建 LLM 应用。
  • 强大的 编排(orchestration) 框架,为用户承担繁重的类型检查、流式处理、并发管理、切面注入、选项赋值等工作。
  • 一套精心设计、注重简洁明了的 API
  • 以集成 流程(flow)示例(example) 形式不断扩充的最佳实践集合。
  • 一套实用 工具(DevOps tools),涵盖从可视化开发与调试到在线追踪与评估的整个开发生命周期。

Eino 可在 AI 应用开发周期中的不同阶段,规范、简化和提效:

  • Development: 开箱即用的 AI 相关组件;常见的 Flow 范式;对并发、异步、流式友好的图编排;完善的流处理能力等。这些均可对 AI 应用的开发提供很大助力。
  • Debugging: 可对图编排的应用,进行可视化的开发调试
  • Deployment: 提供丰富的对 AI 应用的评测能力
  • Maintenance: 提供丰富的切面对 AI 应用进行观测、监控

完整 API Reference:https://pkg.go.dev/github.com/cloudwego/eino

快速上手

直接使用组件:

model, _ := openai.NewChatModel(ctx, config) // create an invokable LLM instance
message, _ := model.Generate(ctx, []*Message{
    SystemMessage("you are a helpful assistant."),
    UserMessage("what does the future AI App look like?")}

当然,你可以这样用,Eino 提供了许多开箱即用的有用组件。但通过使用编排功能,你能实现更多,原因有三:

  • 编排封装了大语言模型(LLM)应用的常见模式。
  • 编排解决了处理大语言模型流式响应这一难题。
  • 编排为你处理类型安全、并发管理、切面注入以及选项赋值等问题。

Eino 提供了三组用于编排的 API:

API
特性和使用场景
Chain
简单的链式有向图,只能向前推进。
Graph
有向有环或无环图。功能强大且灵活。
Workflow
有向无环图,支持在结构体字段级别进行数据映射。

我们来创建一个简单的 chain: 一个模版(ChatTemplate)接一个大模型(ChatModel)。

chain, _ := NewChain[map[string]any, *Message]().
           AppendChatTemplate(prompt).
           AppendChatModel(model).
           Compile(ctx)
chain.Invoke(ctx, map[string]any{"query": "what's your name?"})

现在,我们来创建一个 Graph,先用一个 ChatModel 生成 Tool 调用指令,接着用一个 ToolsNode 执行这些 Tool,然后将 Tool 的响应反馈给 ChatModel。

graph := NewGraph[[]*Message, *Message]()
graph.AddChatModelNode("node_model", model)
graph.AddToolsNode("node_tools", toolsNode)
graph.AddEdge(START, "node_model")
graph.AddEdge("node_tools", "node_model")
graph.AddBranch("node_model", branch)
runnable, _ := graph.Compile(ctx)
runnable.Stream(ctx, []*Message{UserMessage("help me plan my weekend")})

现在,我们来创建一个 Workflow,它能在字段级别灵活映射输入与输出:

wf := NewWorkflow[[]*Message, *Message]()
wf.AddChatModelNode("model", model).AddInput(NewMapping(START))
wf.AddLambdaNode("l1", lambda1).AddInput(NewMapping("model").From("Content").To("Input"))
wf.AddLambdaNode("l2", lambda2).AddInput(NewMapping("model").From("Role").To("Role"))
wf.AddLambdaNode("l3", lambda3).AddInput(
    NewMapping("l1").From("Output").To("Query"),
    NewMapping("l2").From("Output").To("MetaData"),
)
wf.AddEnd([]*Mapping{NewMapping("node_l3")}
runnable, _ := wf.Compile(ctx)
runnable.Invoke(ctx, []*Message{UserMessage("kick start this workflow!")})

现在,咱们来创建一个 “ReAct” 智能体:一个 ChatModel 绑定了一些 Tool。它接收输入的消息,自主判断是调用 Tool 还是输出最终结果。Tool 的执行结果会再次成为聊天模型的输入消息,并作为下一轮自主判断的上下文。

我们用几十行代码就能实现这个:

// build a ReAct agent that accepts []*Message as input and outputs *Message as output
func (r *Agent) build(ctx context.Context, config *AgentConfig) (
    _ Runnable[[]*Message, *Message], err error) {
    var (
       // the LLM responsible for reasoning and generating output within the ReAct Agent
       chatModel = config.Model
       // the actual executor of tools
       toolsNode *ToolsNode
       // the meta info of tools
       toolInfos []*schema.ToolInfo
       // the graph consist of the ChatModel and ToolsNode
       graph *Graph[[]*Message, *Message]
       // read and write contextual messages before ChatModel execution
       modelPreHandle StatePreHandler[[]*Message, *state]
       // after ChatModel execution, routes to END if output does not contain tool call info, otherwise routes to ToolsNode
       modelPostBranch *GraphBranch
    )

    if toolInfos, err = genToolInfos(ctx, config); err != nil {
       return nil, err
    }

    if err = chatModel.BindTools(toolInfos); err != nil {
       return nil, err
    }

    if toolsNode, err = NewToolNode(ctx, &config.ToolsConfig); err != nil {
       return nil, err
    }

    // creates a graph with state that stores messages across multiple rounds of ReAct loop
    graph = NewGraph[[]*Message, *Message](
       WithGenLocalState(func(ctx context.Context) *state {
          return &state{Messages: make([]*Message, 0, config.MaxStep+1)}
       }))

    modelPreHandle = func(ctx context.Context, input []*Message, state *state) (
       []*Message, error) {
       state.Messages = append(state.Messages, input...)

       modifiedInput := make([]*Message, 0, len(state.Messages))
       copy(modifiedInput, state.Messages)
       return config.MessageModifier(ctx, modifiedInput), nil // add system prompt
    }

    err = graph.AddChatModelNode(nodeKeyModel, chatModel, WithStatePreHandler(modelPreHandle))
    if err != nil {
       return nil, err
    }

    if err = graph.AddEdge(START, nodeKeyModel); err != nil { // chatModel connects to START because it accepts initial input
       return nil, err
    }

    if err = graph.AddToolsNode(nodeKeyTools, toolsNode); err != nil {
       return nil, err
    }

    // chatModel's output can be a stream with multiple chunks of messages
    // we use StreamGraphBranch here to make the routing decision based only on the first chunk 
    modelPostBranch = NewStreamGraphBranch(
       func(_ context.Context, sr *schema.StreamReader[*Message]) (endNode string, err error) {
          defer sr.Close()

          if msg, err := sr.Recv(); err != nil {
             return "", err
          } else if len(msg.ToolCalls) == 0 {
             return END, nil
          }

          return nodeKeyTools, nil
       }, map[string]bool{nodeKeyTools: true, END: true})
    if err = graph.AddBranch(nodeKeyModel, modelPostBranch); err != nil {
       return nil, err
    }

    if err = graph.AddEdge(nodeKeyTools, nodeKeyModel); err != nil { // toolsNode's output are fed back to chatModel
       return nil, err
    }

    // compile Graph to Runnable:do type check、inject callback aspects、automatic stream boxing and unboxing、generate graph runner, etc.
    return graph.Compile(ctx, WithMaxRunSteps(config.MaxStep))
}

Eino 会在上述代码背后自动完成一些重要工作:

  • 类型检查:在编译时确保两个节点的输入和输出类型匹配。
  • 流处理:如有需要,在将消息流传递给 ChatModel 和 ToolsNode 节点之前进行拼接,以及将该流复制到 callback handler 中。
  • 并发管理:由于 StatePreHandler 是线程安全的,共享的 state 可以被安全地读写。
  • 切面注入:如果指定的 ChatModel 实现未自行注入,会在 ChatModel 执行之前和之后注入回调切面。
  • 选项赋值:调用 Option 可以全局设置,也可以针对特定组件类型或特定节点进行设置。

例如,你可以轻松地通过回调扩展已编译的图:

handler := NewHandlerBuilder().
  OnStartFn(
    func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) {
        log.Infof("onStart, runInfo: %v, input: %v", info, input)
    }).
  OnEndFn(
    func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) {
        log.Infof("onEnd, runInfo: %v, out: %v", info, output)
    }).
  Build()
  
compiledGraph.Invoke(ctx, input, WithCallbacks(handler))

或者你可以轻松地为不同节点分配选项:

// assign to All nodes
compiledGraph.Invoke(ctx, input, WithCallbacks(handler))

// assign only to ChatModel nodes
compiledGraph.Invoke(ctx, input, WithChatModelOption(WithTemperature(0.5))

// assign only to node_1
compiledGraph.Invoke(ctx, input, WithCallbacks(handler).DesignateNode("node_1"))

关键特性

丰富的组件(Component)

  • 将常见的构建模块抽象为组件,每个组件抽象都有多个可开箱即用的组件实现
    • 诸如聊天模型(ChatModel)、工具(Tool)、提示模板(PromptTemplate)、检索器(Retriever)、文档加载器(Document Loader)、Lambda 等组件抽象。
    • 每种组件类型都有其自身的接口:定义了输入和输出类型、定义了选项类型,以及合理的流处理范式。
    • 实现细节是透明的。在编排组件时,你只需关注抽象层面。
  • 实现可以嵌套,并包含复杂的业务逻辑。
    • ReAct 智能体(React Agent)、多查询检索器(MultiQueryRetriever)、主机多智能体(Host MultiAgent)等。它们由多个组件和复杂的业务逻辑构成。
    • 从外部看,它们的实现细节依然透明。例如在任何接受 Retriever 的地方,都可以使用 MultiQueryRetriever。

强大的编排 (Graph/Chain/Workflow)

  • 数据从 Retriever / Document Loader / ChatTemplate 流向 ChatModel,接着流向 Tool ,并被解析为最终答案。这种通过多个组件的有向、可控的数据流,可以通过图编排来实现。
  • 组件实例是图的节点(Node),而**边(Edge)**则是数据流通道。
  • 图编排功能强大且足够灵活,能够实现复杂的业务逻辑:
    • 类型检查、流处理、并发管理、切面注入和选项分配都由框架处理。
    • 在运行时进行分支(Branch)执行、读写全局状态(State),或者使用工作流进行字段级别的数据映射。

完善的流处理(Streaming)

  • 流式处理(Stream Processing)很重要,因为 ChatModel 在生成消息时会实时输出消息块。
  • 对于只接受非流式输入的下游节点(如 ToolsNode),Eino 会自动将流 拼接(Concatenate) 起来。
  • 在 Graph 执行过程中,当需要流时,Eino 会自动将非流式转换为流式。
  • 当多个流汇聚到一个下游节点时,Eino 会自动 合并(Merge) 这些流。
  • 当流分散到不同的下游节点或传递给回调处理器时,Eino 会自动 复制(Copy) 这些流。
  • 最重要的是,当将一个组件添加到图中时,Eino 会自动补充缺失的流处理能力:你可以提供一个仅可 Invoke 的函数,Eino 会创建其他三种范式。

高扩展性的切面(Callbacks)

  • 切面用于处理诸如日志记录、追踪、指标统计等横切面关注点,同时也用于暴露组件实现的内部细节。
  • 支持五种切面:OnStart、OnEnd、OnError、OnStartWithStreamInput、OnEndWithStreamOutput
  • 开发者可以轻松创建自定义回调处理程序,在图运行期间通过 Option 添加它们,这些处理程序会在图运行时被调用。
  • 图还能将切面注入到那些自身不支持回调的组件实现中。

Eino 框架结构

Eino 框架整体由两部分构成:

  • Eino:存放 Eino 的组件抽象,Graph、Chain 等编排能力,切面机制等
  • EinoExt:Eino 的组件实现、通用切面实现、组件使用示例等,以及 Eino 相关的开发、调试、评测等可视化、管理能力。可放置各种各样的 Eino 扩展能力

Eino Core 中的六大概念:

  • Components 抽象
    • 每一种 Component 均有对应的接口抽象及其对应的多种实现。可直接使用、也可被编排
      • 编排时,对应节点的输入输出与接口抽象保持一致
    • 类似于 ChatModel、PromptTemplate、Retriever、Indexer 等开箱即用的原子组件
    • Eino 中 Component 概念较为宽松,任意满足如下职责中的一个,即可被称为 Component
      • 可加入 Graph Node,作为编排对象被编排
      • 作为其他编排对象的依赖注入组件
  • Flow 集成组件
    • 基于框架中的 Component、Graph ,针对常见的应用场景,提供开箱即用的预先编排好的集成组件能力。
    • 可能提供再次被编排的能力
    • 例如:Agent、MapReduce 长文本总结、MultiAgent 等
  • Runnable – 用户弱感知
    • 编排框架中的编排对象和编排产物。
    • 所有的 Component 在被编排时,均需转换成 Runnable 对象,一般用户不可见此过程。
    • 一张图编译成可执行对象时,本质是一个 Runnable 对象
  • Compose 编排
    • 将各种 Component 实例,作为 Node 节点,以图的点线关系连接起来,数据流按照有向边的方向进行传输,并在不同节点中执行。
    • 支持 Graph、Chain、Workflow 等多种编排形式,本质均是通过有向图表达数据流的传递和节点的执行顺序
  • 切面能力
    • Graph 中每个节点执行前后提供的切面能力。
    • 例如:Trace、埋点、日志等
  • Stream
    • 添加到 Node 中的组件实例,其输入、输出既有可能是 流、也有可能是 非流。 Compose 编排可以将这些不同形式的输入输出进行衔接,传递数据流并执行节点。 这个能力可称为流式编排能力
    • 例如,ChatModel 的输出、ASR 的输入输出 都是流式的

Component

具体每种 Component 的职责,可具体看对应的接口定义

下文是示例性说明,不完整,以代码仓库为准

eino/components // 组件根目录
├── document
│   ├── interface.go 
│   └── option.go
├── embedding
│   ├── callback_extra.go
│   ├── interface.go // 一个组件的抽象
│   ├── ark          // 与抽象同级的一个文件夹代表一种具体实现
│   ├── openai
│   └── option.go
├── indexer
│   ├── callback_extra.go
│   ├── interface.go
│   ├── option.go
│   └── volc_vikingdb
├── model
│   ├── callback_extra.go
│   ├── interface.go
│   ├── ark
│   ├── openai
│   └── option.go
├── prompt
│   ├── callback_extra.go
│   ├── chat_template.go
│   ├── chat_template_test.go
│   └── interface.go
├── retriever
│   ├── callback_extra.go
│   ├── interface.go
│   ├── option.go
│   └── volc_vikingdb
├── tool
│   ├── duckduckgo
│   ├── interface.go
│   └── option.go
├── types.go

Runnable

type Runnable[I, O any] interface {
    Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
    Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
    Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
    Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}
  • Runnable 抽象根据输入、输出是否为流式,划分成 4 个 Lamba 算子
  • Compose 编排中,添加到 Node 中的组件实例,会被统一转换成上述的 Runnable 抽象
  • 当一个 Component 转换为 Runnable 时,根据其提供的任意 Lambda 算子,结合着 流化(Streaming)、合并(Concat) 能力,补全剩余的未提供的 Lambda 算子
    • 流与非流间的转换: (用 StreamReader[T] 和 T 分别指代 流 和 非流)
      • 合并(Concat)
        • 将 StreamReader[T] 中的 T-Frame 接收完整,并合并成一个完整的 T
      • 流化(Streaming)
        • 将 T 转换成仅有一个 T-Frame 的 StreamReader[T],进行流式传输
  • 基于上述两种转换关系,Eino 便可根据用户提供的具有任意 N(N<=4) 种交互模式的接口,封装转换成一个完整的 Runnable[I, O]
源\目标
Invoke[I, O any]()
Stream[I, O any]()
Collect[I, O any]()
Transform[I, O any]()
Invoke[I, O any]()
-
- Invoke输入直接透传
- Invoke响应转成单帧流
- Invoke输入转成单帧流
- Invoke响应直接透传
- Invoke输入转成单帧流
- Invoke响应转成单帧流
Stream[I, O any]()
- Stream输入直接透传
- Stream输出Concat后透传
-
- Stream输入转成单帧流
- Stream输出Concat后透传
- Stream输入转成单帧流
- Stream输出直接透传
Collect[I, O any]()
- Collect输入Concat后透传
- Collect输出直接透传
- Collect输入Concat后透传
- Collect输出转成单帧流
-
- Collect输入直接透传
- Collect输出转成单帧流

Transform[I, O any]()
- Transform输入Concat后透传
- Transform输出Concat后透传
- Transform输入Concat后透传
- Transform输出直接透传
- Transform输入直接透传
- Transform输出Concat后透传
-
  • 编程产物中具有的真正的流式能力是什么,取决于如下的编排范式

Stream 流

Notice:Stream 流在 生产消费复制合并转换等场景下,处理逻辑均较为复杂。 实现时稍有考虑不周的地方,便可能导致 生产/消费者互相等待而夯死、Goroutine 泄露或溢出、内存泄露或溢出、CPU 负载高 等问题。 为了减少稳定性问题的产生,Eino 强要求使用 Eino 提供的 Stream 流,因此将 Stream 实现成了 Struct、而非定义成接口。

复杂的流操作的场景:

  • 流式接口和非流式接口直接的转换
    • 流转成非流时,需要将流中的所有数据帧,Concat 成一个完整的数据结构
    • 非流转成流时,需要将一个数据结构转换成仅有一个数据帧的流
  • 同一个数据流可能需要被多次读取和消费,比如多个切面。 因一个流仅能被完成读取一次,所以需要根据消费次数,将流进行 Copy
    • 流 Copy 时,需要考虑多个流的 消费协同、Close 协同。 任意一个流没有正常 Close,均可能导致资源无法正常释放
  • 多个流合并成一个流

为了 Stream 的 API 接口更加清晰和易用,遂与 Golang 内置的 io.Pipe() 方法的定义对齐。

  • API 接口定义为:schema.Pipe[T any](cap int) (*StreamReader[T], *StreamWriter[T])
    • 其中 cap 表示 Stream 的缓存有多大,即在无任何消费的情况下,Sender 可以无阻塞地发送 Chunk 的数量
    • StreamWriter 类似于 io.Pipe 中的 PipeWriter
    • StreamReader 类似于 io.Pipe 中的 PipeReader,只是多了一个 Copy(n int) []*StreamReader[T] 方法
  • WARN:在任何地方见到 *StreamReader[T]*StreamWriter[T] 都不要忘记 Close(),否则可能导致流无法正常释放。一般流的生产和消费都是单独 Goroutine,从而导致 Goroutine 的泄露。

Stream 流 的 API 设计,源码链接:eino/schema/stream.go

Compose 编排

Graph

点(Node)
  • 把一个 Component 实例加入到 Graph 中,便形成一个 Node 节点
  • Component 即可被独立使用,又可被 Graph 编排
  • Add{Component}Node() 接口列举。 此处仅列举几个接口,更详细接口列表可查看最新的 Eino SDK
    • 对于通用的组件类型,均会抽象出一个标准行为语义,并给出不同的实现
    • 业务可通过使用 AddLambdaNode,添加任意定制函数作为节点
// AddChatModelNode add node that implements model.ChatModel.
func (g *graph) AddChatModelNode(key string, node model.ChatModel, opts ...GraphAddNodeOpt) error {
    return g.addNode(key, toChatModelNode(key, node, opts...))
}

// AddChatTemplateNode add node that implements prompt.ChatTemplate.
func (g *graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error {
    return g.addNode(key, toChatTemplateNode(key, node, opts...))
}

func (g *graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error {
    return g.addNode(key, toToolsNode(key, node, opts...))
}

// AddLambdaNode add node that implements at least one of Invoke[I, O], Stream[I, O], Collect[I, O], Transform[I, O].
// due to the lack of supporting method generics, we need to use function generics to generate Lambda run as Runnable[I, O].
// for Invoke[I, O], use compose.InvokableLambda()
// for Stream[I, O], use compose.StreamableLambda()
// for Collect[I, O], use compose.CollectableLambda()
// for Transform[I, O], use compose.TransformableLambda()
// for arbitrary combinations of 4 kinds of lambda, use compose.AnyLambda()
func (g *graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error {
    return g.addNode(key, toLambdaNode(key, node, opts...))
}

// AddGraphNode add one kind of Graph[I, O]、Chain[I, O]、StateChain[I, O, S] as a node.
// for Graph[I, O], comes from NewGraph[I, O]()
// for Chain[I, O], comes from NewChain[I, O]()
// for StateGraph[I, O, S], comes from NewStateGraph[I, O, S]()
func (g *graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error {
    return g.addNode(key, toAnyGraphNode(key, node, opts...))
}

func (g *graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error {
    return g.addNode(key, toRetrieverNode(key, node, opts...))
}
线(Edge)

Eino 提供了多种添加线的方式

AddEdge
// AddEdge adds an edge to the graph, edge means a data flow from startNode to endNode.
// the previous node's output type must be set to the next node's input type.
// NOTE: startNode and endNode must have been added to the graph before adding edge.
// e.g.
//
//  graph.AddNode("start_node_key", compose.NewPassthroughNode())
//  graph.AddNode("end_node_key", compose.NewPassthroughNode())
//
//  err := graph.AddEdge("start_node_key", "end_node_key")
func (g *graph) AddEdge(startNode, endNode string) (err error) {}
  • 在两个节点间添加一条有向的数据传输链路,以控制数据的流动方向和节点的执行顺序

AddBranch
// AddBranch adds a branch to the graph.
// e.g.
//
//  condition := func(ctx context.Context, in string) (string, error) {
//     return "next_node_key", nil
//  }
//  endNodes := map[string]bool{"path01": true, "path02": true}
//  branch := compose.NewGraphBranch(condition, endNodes)
//
//  graph.AddBranch("start_node_key", branch)
func (g *graph) AddBranch(startNode string, branch *GraphBranch) (err error) {}
  • 根据传入的自定义选择函数,运行时根据经运算条件从多个 Node 中选出命中 Node 执行

Parallel
  • 将多个 Node 平行并联, 形成多个节点并发执行的节点
  • 无 AddParallel 方法,通过 AddEdge 构建并联的多条拓扑路径,以次形成 **Parallel **

面(Graph)
  • 通过 NewGraph 创建 graph 实例,并通过 graph.AddXXXNode、graph.AddEdge、graph.AddBranch 绘制点和线,最终形成一张可编译执行的图
// 无状态的 Graph 编排
g := NewGraph[map[string]any, *schema.Message]()

type testState struct {
    ms []string
}

genFn := func(ctx context.Context) *testState {
    return &testState{}
}

// 有状态的 Graph 编排
sg := NewGraph[string, string](WithGenLocalState(genFn))

// 基于 Graph 编排简化 的 Chain
chain := NewChain[map[string]any, string]()

Chain

Chain - 简化的 Graph,将不同类型的 Node 按照先后顺序,进行连接,形成从头到尾的数据流传递和顺序执行。

AppendXXX

XXX 可是 ChatMode、Prompt、Indexer、Retriever、Graph 等多种组件类型

Chain 是简化的 Graph,因此可通过 AppendGraph 实现 Chain 和 Graph 的相互嵌套

  • 将多个 Node 按照传入顺序首尾串联,串联的 Node 依次进行数据传递和执行

AppendParallel

添加一个节点,这个节点具有多个并发执行的多个子节点

// Parallel run multiple nodes in parallel
//
// use `NewParallel()` to create a new parallel type
// Example:

parallel := NewParallel()
parallel.AddChatModel("output_key01", chat01)
parallel.AddChatModel("output_key01", chat02)

chain := NewChain[any,any]()
chain.AppendParallel(parallel)
  • 创建一个 Parallel,容纳并发执行的多个子节点

AppendBranch

添加一个节点,这个节点通过 condition 计算方法,从多个子节点中,选择一个执行

// NewChainBranch creates a new ChainBranch instance based on a given condition.
// It takes a generic type T and a GraphBranchCondition function for that type.
// The returned ChainBranch will have an empty key2BranchNode map and a condition function
// that wraps the provided cond to handle type assertions and error checking.
// eg.

condition := func(ctx context.Context, in string, opts ...any) (endNode string, err error) {
    // logic to determine the next node
    return "some_next_node_key", nil
}

cb := NewChainBranch[string](condition)
cb.AddPassthrough("next_node_key_01", xxx) // node in branch, represent one path of branch
cb.AddPassthrough("next_node_key_02", xxx) // node in branch

chain := NewChain[string, string]()
chain.AppendBranch(cb)

Workflow

允许字段级别做上下游数据映射的有向无环图。

切面(Callbacks)

Component(包括 Lambda)、Graph 编排共同解决“把业务逻辑定义出来”的问题。而 logging, tracing, metrics, 上屏展示等横切面性质的功能,需要有机制把功能注入到 Component(包括 Lambda)、Graph 中。

另一方面,用户可能想拿到某个具体 Component 实现的执行过程中的中间信息,比如 VikingDBRetriever 额外给出查询的 DB Name,ArkChatModel 额外给出请求的 temperature 等参数。需要有机制把中间状态透出。

Callbacks 支持“横切面功能注入”和“中间状态透出”,具体是:用户提供、注册“function”(Callback Handler),Component 和 Graph 在固定的“时机”(或者说切面、位点)回调这些 function,给出对应的信息。

Eino 中的 Component 和 Graph 等实体,在固定的时机 (Callback Timing),回调用户提供的 function (Callback Handler),并把自己是谁 (RunInfo),以及当时发生了什么 (Callback Input & Output) 传出去。

详见:Eino: Callback 用户手册


最后修改 January 16, 2025 : feat: update quick start doc (#1209) (420e3e0)