Apache Flink Agents 0.3 学习教程

Cosolar 6 阅读 AI技术架构

本教程基于 Apache Flink Agents 0.3 官方文档编写,系统讲解 Flink Agents 的设计原理、核心架构、关键概念与实战开发。

Apache Flink Agents 是 Apache Flink 社区的一个子项目,定位为面向企业级生产场景的流式 Agent 操作系统(streaming Agent OS)。它将 AI Agent 引入 Flink 的流处理管道中,使 Agent 成为实时数据流中的一等公民——Agent 在 live event 的流动过程中做出 AI 决策,而非被动等待人类提示。

与传统的交互式 Agent(如聊天机器人、Copilot)不同,Flink Agents 中的 Agent 是事件驱动的:每条到达的数据记录都会触发 Agent 的执行,Agent 在流上自主完成推理、工具调用、记忆检索和输出生成。这得益于 Flink 经过大规模验证的流式引擎所提供的分布式处理、容错机制和状态管理能力。

核心特性

特性 说明
海量规模与毫秒级延迟 借助 Flink 分布式引擎,实时处理大规模事件流
数据与 AI 无缝集成 Agent 直接与 Flink DataStream 和 Table API 交互,结构化数据处理与语义 AI 能力融合
可靠的精确一次执行 通过 Flink checkpoint 机制与外部预写日志(WAL)集成,保证 Agent 动作及其副作用的 exactly-once 一致性
熟悉的 Agent 抽象 复用主流 AI Agent 概念——技能、短期/长期记忆、提示词、工具、动态编排
多语言支持 提供 Python、Java 原生 API 和声明式 YAML API,甚至支持跨语言混用
丰富生态 原生集成主流 LLM(OpenAI、Anthropic、Ollama、通义千问等)、向量存储和 MCP 服务器
可观测性 采用事件驱动的编排方式,所有 Agent 动作通过事件连接和控制,可通过事件日志观察和理解 Agent 行为

核心架构与设计理念

事件驱动的编排模型

Flink Agents 的核心设计理念是事件驱动的编排。一个 Agent 的逻辑被分解为若干个模块化的步骤,称为 Action(动作)。每个 Action 监听特定类型的事件,当事件发生时被触发执行。Action 在执行过程中可以产生新的事件,这些事件又会触发下游的 Action。

这种设计形成了一个有向图的计算结构:

  • 节点 是 Action
  • 是事件类型

这种事件驱动的有向图可以是循环的(directed cyclic graph),支持复杂的推理循环,如 LLM 调用 → 工具执行 → 再次 LLM 调用 的迭代过程。

分层架构

Flink Agents 的架构可以划分为以下几个层次:

┌─────────────────────────────────────────────────────────┐
│                    Agent 层                              │
│  Workflow Agent  |  ReAct Agent  |  YAML Agent          │
├─────────────────────────────────────────────────────────┤
│                  编排与资源层                             │
│  Action  |  Event  |  Prompt  |  Memory  |  Skills     │
├─────────────────────────────────────────────────────────┤
│                  模型与工具层                             │
│  Chat Model  |  Tool  |  MCP  |  Vector Store           │
├─────────────────────────────────────────────────────────┤
│                  Flink 运行时层                           │
│  DataStream API  |  Table API  |  State  |  Checkpoint │
└─────────────────────────────────────────────────────────┘

最上层是 Agent 类型,定义了不同的编排范式;中间层是核心构建块,包括动作、事件、提示词、记忆和技能;模型与工具层提供 LLM 交互和外部能力调用;底层是 Flink 的流处理运行时,提供分布式执行、状态管理和容错能力。

Agent 执行环境

Flink Agents 引入了 AgentsExecutionEnvironment 的概念,它包装了 Flink 的 StreamExecutionEnvironment,负责管理 Agent 运行所需的资源(如 LLM 连接、工具、技能等),并将 Agent 与 Flink 的 DataStream/Table 连接起来。

from pyflink.datastream import StreamExecutionEnvironment
from flink_agents.api.execution_environment import AgentsExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
agents_env = AgentsExecutionEnvironment.get_execution_environment(env)

AgentsExecutionEnvironment 提供了以下核心能力:

  • 注册共享资源(LLM 连接、工具等)
  • 从 DataStream 或 Table 创建 Agent 输入
  • 将 Agent 输出转换回 DataStream 或 Table
  • 提交并执行 Flink 作业

环境安装与配置

前置条件

依赖 版本要求 说明
Java 11+(推荐 21+) Java 21+ 无功能限制;Java 11-20 不支持异步执行
Python 3.10、3.11 或 3.12 仅使用 Python API 或 PyFlink 时需要
Flink 1.20.3 或更高 推荐使用所选 Flink 小版本的最新稳定版

Python 3.12 需要 Flink 2.1+ 和 Flink Agents 0.3+。

快速安装(推荐)

官方提供了一键安装脚本,会自动下载 Apache Flink、创建 Python 虚拟环境、安装 flink-agents 和 apache-flink,并将必要的 JAR 包复制到 $FLINK_HOME/lib

curl -fsSL https://raw.githubusercontent.com/apache/flink-agents/main/tools/install.sh | bash

手动安装

# 下载并解压 Flink
export FLINK_VERSION=<version>
tar -xzf flink-${FLINK_VERSION}-bin-scala_2.12.tgz
export FLINK_HOME=$(pwd)/flink-${FLINK_VERSION}

# 将 flink-python JAR 从 opt 复制到 lib(PyFlink 需要)
cp $FLINK_HOME/opt/flink-python-${FLINK_VERSION}.jar $FLINK_HOME/lib/

第二步:创建 Python 虚拟环境

python3 -m venv venv
source venv/bin/activate
pip install flink-agents apache-flink==${FLINK_VERSION}

第四步:配置 PYTHONPATH

Flink 运行在自己的 JVM 进程中,需要通过 PYTHONPATH 环境变量定位 flink-agents Python 包:

export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])')

建议将此行添加到 shell 配置文件(~/.bashrc~/.zshrc)中,使其在所有终端会话中自动生效。

Java Maven 依赖(Java 开发者)

pom.xml 中添加以下依赖:

<properties>
    <flink.version>2.2.1</flink.version>
    <flink-agents.version>0.3.0</flink-agents.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-agents-api</artifactId>
        <version>${flink-agents.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- IDE 本地运行时需要 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-agents-ide-support</artifactId>
        <version>${flink-agents.version}</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

所有依赖应使用 provided scope,以避免与 Flink 集群上的依赖冲突。

核心概念详解

Action(动作)

Action 是 Agent 中可执行代码的最小单元。每个 Action 监听至少一种事件类型,当对应类型的事件发生时,Action 被触发执行。Action 可以在执行过程中发送新的事件来触发其他 Action,也可以输出数据。

在 Python 中,使用 @action(EventType) 装饰器声明 Action;在 Java 中,使用 @Action(listenEventTypes = {EventType.class}) 注解。

Action 的函数签名为 (Event, RunnerContext) -> None

from flink_agents.api.agent import Agent
from flink_agents.api.decorators import action
from flink_agents.api.event import Event, InputEvent, OutputEvent
from flink_agents.api.context import RunnerContext

class MyAgent(Agent):
    @action(InputEvent.EVENT_TYPE)
    @staticmethod
    def process_input(event: Event, ctx: RunnerContext) -> None:
        # 从事件中提取输入数据
        input_event = InputEvent.from_event(event)
        # 通过 RunnerContext 发送新事件,触发下游 Action
        ctx.send_event(SomeEvent(...))

RunnerContext 是 Action 执行时的上下文对象,提供以下核心能力:

  • send_event() — 发送事件触发其他 Action 或输出结果
  • short_term_memory — 访问短期记忆
  • durable_execute() — 执行持久化代码块

Action 间的事件传递

一个 Action 可以发送不同类型的事件来实现不同的效果:

# 触发内置的聊天模型 Action
ctx.send_event(ChatRequestEvent(model="my_model", messages=messages))

# 输出结果到下游(OutputEvent 会被框架直接收集并输出)
ctx.send_event(OutputEvent(output=result))

OutputEvent 会被框架立即收集并输出到 Agent 的下游,绕过 Action 路由。其他事件(如 ChatRequestEvent)则会被路由到监听该事件类型的 Action。

Event(事件)

事件是 Action 之间传递的 JSON 可序列化消息。每个事件都有一个类型字符串(用于路由)和一个属性映射(用于承载负载数据)。一个事件可以触发多个监听它的 Action。

内置特殊事件

事件类型 说明
InputEvent 由框架生成,携带到达 Agent 的输入数据记录。监听 InputEvent 的 Action 是 Agent 的入口点
OutputEvent 框架监听此事件,将其 output 属性转化为 Agent 的输出
ChatRequestEvent Action 发送此事件请求 LLM 聊天补全
ChatResponseEvent 框架返回此事件,包含 LLM 的响应
ToolRequestEvent 请求执行工具调用
ToolResponseEvent 工具执行完成后的响应

统一事件(Unified Event)

对于简单场景,可以直接使用 Event 类传递自定义数据,无需定义子类:

# 发送自定义事件
@action(InputEvent.EVENT_TYPE)
@staticmethod
def create_my_event(event: Event, ctx: RunnerContext) -> None:
    ctx.send_event(
        Event(type="my_event", attributes={"field1": "test", "field2": 42})
    )

# 消费自定义事件
@action("my_event")
@staticmethod
def handle_my_event(event: Event, ctx: RunnerContext) -> None:
    field1: str = event.get_attr("field1")
    field2: int = event.get_attr("field2")

自定义事件子类

对于结构化、可复用的事件,可以定义自定义事件子类。数据存储在 attributes 映射中,子类需要实现 from_event 工厂方法来重建类型化对象:

from typing import ClassVar
from typing_extensions import override

class MyEvent(Event):
    EVENT_TYPE: ClassVar[str] = "my_event"

    def __init__(self, value: str) -> None:
        super().__init__(type=MyEvent.EVENT_TYPE, attributes={"value": value})

    @classmethod
    @override
    def from_event(cls, event: Event) -> "MyEvent":
        assert "value" in event.attributes
        result = MyEvent(value=event.attributes["value"])
        result.id = event.id  # 保留基础事件 ID
        return result

    @property
    def value(self) -> str:
        return self.get_attr("value")

事件在 Python Action 之间或 Java-Python 边界传递时会序列化为 JSON。非平凡类型(如 Pydantic 模型)的属性值会丢失类型信息,以 dict 形式到达,需要手动重建:

input_event = InputEvent.from_event(event)
input_data = MyModel.model_validate(input_event.input)

Chat Model(聊天模型)

聊天模型是 Agent 的"大脑",使 Agent 能够与大型语言模型(LLM)通信,进行自然语言理解、推理和生成。

资源声明模型

Flink Agents 采用连接 + 配置的两层模型来管理聊天模型:

连接(Connection) — 定义与 LLM 服务的连接,通常只定义一次并可被多个模型配置共享。使用 @chat_model_connection 装饰器(Java 中为 @ChatModelConnection 注解):

from flink_agents.api.decorators import chat_model_connection
from flink_agents.api.resource import ResourceDescriptor, ResourceName

class MyAgent(Agent):
    @chat_model_connection
    @staticmethod
    def ollama_connection() -> ResourceDescriptor:
        return ResourceDescriptor(
            clazz=ResourceName.ChatModel.OLLAMA_CONNECTION,
            base_url="http://localhost:11434",
            request_timeout=120.0
        )

配置(Setup) — 引用一个连接,并添加模型特定的配置(如模型名称、温度、提示词、工具等)。使用 @chat_model_setup 装饰器:

@chat_model_setup
@staticmethod
def my_chat_model() -> ResourceDescriptor:
    return ResourceDescriptor(
        clazz=ResourceName.ChatModel.OLLAMA_SETUP,
        connection="ollama_connection",
        model="qwen3:8b",
        temperature=0.7,
        num_ctx=4096,
    )

聊天事件流

聊天模型通过内置事件进行交互:

Action 发送 ChatRequestEvent
        ↓
内置 chat_model_action 处理请求
        ↓
   ┌─────────────────────┐
   │ LLM 返回最终回答?    │
   └─────────────────────┘
     ↓ 是              ↓ 否(请求调用工具)
ChatResponseEvent    ToolRequestEvent
     ↓                    ↓
 Action 处理响应      tool_call_action 执行工具
                         ↓
                    ToolResponseEvent
                         ↓
                    chat_model_action 继续聊天

当 LLM 请求调用工具时,chat_model_action 发送 ToolRequestEvent 而非最终的 ChatResponseEvent。工具执行完成后,ToolResponseEventchat_model_action 消费,工具结果被追加到聊天历史中,然后再次调用模型。这个循环持续到模型返回最终响应。

内置 LLM 提供商

Flink Agents 原生支持多种主流 LLM 提供商:

提供商 Python 支持 Java 支持 典型模型
Ollama qwen3:8b, llama3:8b, deepseek-r1
OpenAI GPT-5 系列, GPT-4.1, gpt-oss
Anthropic Claude Sonnet 4.5, Claude Opus 4.1
Azure OpenAI GPT-4o, GPT-4 Turbo
Azure AI Llama, Mistral, Phi
Amazon Bedrock Claude, Llama, Mistral, Amazon Nova
通义千问 (DashScope) qwen-plus, qwen-max, qwen-turbo

对于仅在一种语言中可用的提供商,可以通过跨语言包装器在另一种语言中使用。例如,在 Python Agent 中使用 Java-only 的 Bedrock:

@chat_model_connection
@staticmethod
def java_chat_model_connection() -> ResourceDescriptor:
    return ResourceDescriptor(
        clazz=ResourceName.ChatModel.JAVA_WRAPPER_CONNECTION,
        java_clazz=ResourceName.ChatModel.Java.OLLAMA_CONNECTION,
        endpoint="http://localhost:11434",
        requestTimeout=120,
    )

自定义提供商

如果内置提供商不能满足需求,可以继承 BaseChatModelConnectionBaseChatModelSetup 实现自定义 LLM 集成:

from flink_agents.api.chat_model import BaseChatModelConnection, BaseChatModelSetup

class MyChatModelConnection(BaseChatModelConnection):
    def chat(self, messages, tools=None, **kwargs) -> ChatMessage:
        # 核心方法:发送消息到 LLM 并返回响应
        pass

class MyChatModelSetup(BaseChatModelSetup):
    @property
    def model_kwargs(self) -> dict:
        return {"model": self.model, "temperature": 0.7}

Tool(工具)

工具是 Agent 调用外部能力的方式。Flink Agents 提供了灵活可扩展的工具使用机制,开发者可以将本地 Python/Java 函数定义为工具,也可以集成远程 MCP 服务器提供的工具。

定义工具的两种方式

方式一:在 Agent 类中定义为静态方法

使用 @tool 装饰器(Java 中为 @Tool 注解)标记一个函数为工具,然后在 ResourceDescriptortools 列表中通过名称引用:

from flink_agents.api.decorators import tool

class ReviewAnalysisAgent(Agent):
    @tool
    @staticmethod
    def notify_shipping_manager(id: str, review: str) -> None:
        """Notify the shipping manager when product received a negative review due to
        shipping damage.

        Parameters
        ----------
        id : str
            The id of the product that received a negative review
        review: str
            The negative review content
        """
        notify_shipping_manager(id=id, review=review)

    @chat_model_setup
    @staticmethod
    def review_analysis_model() -> ResourceDescriptor:
        return ResourceDescriptor(
            clazz=ResourceName.ChatModel.OLLAMA_SETUP,
            connection="ollama_server",
            model="qwen3:8b",
            tools=["notify_shipping_manager"],  # 通过名称引用工具
        )

Python 中工具的 docstring 非常重要——框架使用它来生成工具元数据,LLM 依赖这些元数据来理解和决定何时使用该工具。

方式二:注册到执行环境

将工具注册到 AgentsExecutionEnvironment,使其可被多个 Agent 复用:

agents_env.add_resource(
    "notify_shipping_manager",
    ResourceType.TOOL,
    Tool.from_callable(notify_shipping_manager)
)

工具执行机制

内置的 tool_call_action 监听 ToolRequestEvent,对每个工具调用,通过函数名查找工具资源,通过持久化执行(durable execution)执行它,并记录成功或失败。批处理中的所有工具调用处理完成后,发送 ToolResponseEvent

当工具请求来自 chat_model_action 时,发出的 ToolResponseEvent 会被 chat_model_action 自动消费以继续聊天。用户也可以直接发送 ToolRequestEvent 来编程式地调用工具。

Memory(记忆系统)

记忆是 Agent 记住先前交互信息的能力,是基础模型 Agent 的核心能力之一,支撑着长程推理、持续适应和与复杂环境的有效交互。

Flink Agents 设计了一个模拟人类认知过程的分布式记忆系统,包含三种记忆类型,按照可见性(visibility)、保留时间(retention)和派生性(derivation)进行分类:

记忆类型对比

特性 感知记忆 短期记忆 长期记忆
保留时间 单次运行 跨多次运行 跨多次运行
可见性 单 Key 单 Key 单 Key
派生性 原始数据 原始数据 派生数据
存储后端 Flink State Flink State 外部向量存储
访问模式 键值/嵌套对象 键值/嵌套对象 语义搜索
典型用途 单次运行的临时执行状态 跨运行共享的精确信息 大量、可压缩、可搜索的信息
内容类型 任意原始类型/对象 任意原始类型/对象 字符串、ChatMessage

感知记忆(Sensory Memory)

存储 Agent 执行期间生成的临时数据,仅在单次运行中需要。例如工具调用上下文,或用户想在 Agent 的不同 Action 之间传递的数据。当一次 Agent 运行完成时,框架自动清理感知记忆。

在代码中通过 ctx.short_term_memory 访问:

@action(InputEvent.EVENT_TYPE)
@staticmethod
def process_input(event: Event, ctx: RunnerContext) -> None:
    input = ProductReview.model_validate(InputEvent.from_event(event).input)
    # 存储到短期记忆(包含感知记忆功能)
    ctx.short_term_memory.set("id", input.id)

@action(ChatResponseEvent.EVENT_TYPE)
@staticmethod
def process_response(event: Event, ctx: RunnerContext) -> None:
    # 从短期记忆中读取
    product_id = ctx.short_term_memory.get("id")

短期记忆(Short-Term Memory)

存储 Agent 执行期间生成的数据,但其生命周期可以跨越多次运行。与长期记忆相比,用户检索到的是写入时的完整原始数据。支持可选的自动过期和清理。

长期记忆(Long-Term Memory)

存储 Agent 执行期间生成的数据,但数据可能随 Agent 执行快速膨胀,需要压缩以提供简洁且高度相关的上下文。长期记忆支持相似条目的压缩和语义搜索,底层依赖外部向量存储。

Skills(技能)

技能(Skill)是一个自包含的指令包,包含指令文件以及可选的脚本和参考文件,教会 Agent 如何执行特定任务。技能本质上是一个包含 SKILL.md 文件的目录。

渐进式披露模型

技能采用渐进式披露(progressive disclosure)机制,确保提供大量能力不会使每个请求变得臃肿:

  1. 发现(Discovery) — 启动时,仅将每个技能的名称和描述注入系统提示词(每个技能约几十个 token),Agent 知道有哪些能力可用
  2. 激活(Activation) — 当 Agent 判断某个技能相关时,调用内置的 load_skill 工具读取完整的 SKILL.md 指令到上下文中
  3. 执行(Execution) — Agent 遵循加载的指令,通过内置的 bash 工具运行捆绑的脚本或命令,并按需读取参考文件

技能格式

一个技能是一个目录,包含一个带有 YAML frontmatter 和 Markdown 正文的 SKILL.md 文件:

---
name: math-calculator
description: Calculate mathematical expressions using shell commands. Use when the user asks to perform arithmetic.
license: Apache-2.0
compatibility: Requires bash with bc
---

# Math Calculator Skill

## When to Use
Use this skill whenever the user asks to evaluate a numeric expression.

## Method
Evaluate expressions with the `bc` calculator:

\`\`\`bash
echo "(2 + 3) * 4" | bc
# Output: 20
\`\`\`

Frontmatter 字段说明:

字段 必填 说明
name 技能标识符,1-64 字符,仅小写字母、数字和连字符
description 1-1024 字符,在发现时加载——应清晰描述技能做什么及何时使用
license 技能许可证
compatibility 运行时需求的自由文本说明

声明与启用技能

在 Agent 中使用 @skills 装饰器声明技能来源:

from flink_agents.api.decorators import skills
from flink_agents.api.skills import Skills

class MathAgent(Agent):
    @skills
    @staticmethod
    def my_skills() -> Skills:
        return Skills.from_local_dir("/path/to/skills")

在聊天模型配置中通过 skills 参数启用特定技能:

@chat_model_setup
@staticmethod
def math_model() -> ResourceDescriptor:
    return ResourceDescriptor(
        clazz=ResourceName.ChatModel.OLLAMA_SETUP,
        connection="ollama_server",
        model="qwen3.5:9b",
        prompt="system_prompt",
        skills=["math-calculator"],          # 按名称启用技能
        allowed_commands=["echo", "bc"],      # 白名单允许的 shell 命令
    )

启用技能后,框架自动注入发现提示词,并添加两个内置工具:load_skill(读取技能完整指令)和 bash(运行命令和脚本)。allowed_commands 是一个白名单,未列出的命令会被拒绝。

技能适合用过程(runbook)描述的能力,而非单个函数调用。对于单一的、类型明确的操作,优先使用工具。

两种 Agent 范式

Flink Agents 提供两种主要的 Agent 编排范式:Workflow Agent 和 ReAct Agent。

Workflow Agent

Workflow Agent 是一种推理和行为被组织为模块化步骤的有向工作流的 Agent。每个步骤是一个 Action,通过事件连接。这种设计适合需要显式编排、分支或多步推理的场景,如数据富化、多工具管道或复杂业务逻辑。

核心特点

  • Agent 定义为继承 Agent 基类的类
  • 逻辑表达为一组用 @action 装饰的函数
  • Action 消费事件、执行推理或工具调用、发出新事件
  • 事件驱动的工作流形成有向循环图
  • 开发者完全控制编排流程

适用场景

  • 需要明确控制每一步的执行顺序
  • 需要条件分支和数据处理管道
  • 需要在特定步骤插入自定义逻辑
  • 多阶段、多 Agent 协作的任务

ReAct Agent

ReAct Agent 采用 ReAct(Reasoning + Acting)范式,将推理和行动能力结合来解决复杂任务。用户只需指定目标(通过 prompt)和可用工具,LLM 自主决定如何达成目标并采取行动。

核心特点

  • 无需手动定义编排流程
  • LLM 自主决策何时推理、何时调用工具
  • 初始化参数简洁:聊天模型 + 提示词 + 输出模式
from flink_agents.api.agents.react_agent import ReActAgent

my_react_agent = ReActAgent(
    chat_model=chat_model_descriptor,
    prompt=my_prompt,
    output_schema=MyBaseModelDataType,
)

提示词设计

ReAct Agent 的提示词通常包含两条消息:

  • SYSTEM 消息:告诉 Agent 要做什么,给出输入输出示例
  • USER 消息:描述如何将输入元素转换为文本字符串,使用 {placeholder} 语法
from flink_agents.api.prompts.prompt import Prompt
from flink_agents.api.chat_message import ChatMessage, MessageRole

my_prompt = Prompt.from_messages(
    messages=[
        ChatMessage(
            role=MessageRole.SYSTEM,
            content="分析产品评论并生成满意度评分..."
        ),
        ChatMessage(
            role=MessageRole.USER,
            content='{"id": {id}, "review": {review}}'
        ),
    ],
)

占位符名称来源于输入元素的类型:

  • 基本类型(int, str 等):单个 {input} 占位符
  • Row:每个字段名一个占位符
  • dict/Map:直接使用键名
  • BaseModel(Python)/ Pojo(Java):对象的字段名

输出模式

设置 output_schema 可让 ReAct Agent 自动将 LLM 响应反序列化为期望的类型。输出模式可以是 BaseModel 子类(Python)或 Pojo 类(Java),或 RowTypeInfo

from pydantic import BaseModel

class ReviewAnalysisResult(BaseModel):
    id: str
    score: int
    reasons: list[str]

适用场景

  • 目标明确但路径不确定的复杂任务
  • LLM 需要自主决定使用哪些工具及顺序
  • 快速原型开发和实验

两种范式对比

维度 Workflow Agent ReAct Agent
编排控制 开发者完全控制,显式定义每一步 LLM 自主决策,动态编排
开发复杂度 较高,需定义所有 Action 较低,只需 prompt + 工具
可预测性 高,流程确定 较低,依赖 LLM 推理
灵活性 需要修改代码才能调整流程 通过修改 prompt 即可调整
适用场景 固定的多步管道、数据处理 探索性任务、复杂推理

Flink Agents 的核心价值在于将 Agent 无缝集成到 Flink 的流处理管道中。Agent 直接与 Flink 的 DataStream 和 Table API 交互,使结构化数据处理与 AI 能力融合。

DataStream API 集成

from pyflink.common import WatermarkStrategy

# 1. 创建输入 DataStream
input_stream = env.from_source(
    source=your_source,
    watermark_strategy=WatermarkStrategy.no_watermarks(),
    source_name="your_source_name",
)

# 2. 将 Agent 应用于 DataStream
output_stream = (
    agents_env.from_datastream(
        input=input_stream,
        key_selector=lambda x: x.id  # 必须提供 KeySelector
    )
    .apply(your_agent)
    .to_datastream()
)

# 3. 消费输出
output_stream.print()

输入 DataStream 必须是 KeyedStream,或者用户需要提供 KeySelector 来将普通 DataStream 转换为 KeyedStream。Key 用于数据分区,对应 Flink KeyedStream 中的 key 概念——相同 key 的数据会被分配到同一个分区,这使得基于 key 的记忆系统成为可能。

Table API 集成

from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo
from pyflink.datastream import KeySelector
from pyflink.table import DataTypes, Schema

# 1. 创建执行环境(包含 Table 环境)
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, t_env=t_env)

# 2. 定义 KeySelector
class MyKeySelector(KeySelector):
    def get_key(self, value):
        return value.id

# 3. 定义输出 Schema
output_type = ExternalTypeInfo(RowTypeInfo(
    [BasicTypeInfo.INT_TYPE_INFO()],
    ["result"],
))
schema = Schema.new_builder().column("result", DataTypes.INT()).build()

# 4. 将 Agent 应用于 Table
output_table = (
    agents_env.from_table(input=input_table, key_selector=MyKeySelector())
    .apply(your_agent)
    .to_table(schema=schema, output_type=output_type)
)

Table API 集成时需要注意 Python 和 Java 的差异:

  • Pythonto_table() 需要同时提供 SchemaTypeInformation
  • JavatoTable(Schema) 只需提供 Schema,输出列名为 f0

实战教程:Workflow Agent 快速上手

本节通过两个渐进式示例演示如何使用 Flink Agents 构建 LLM 驱动的工作流。

示例概述

  1. 评论分析:处理产品评论流,使用单个 Agent 从每条评论中提取评分(1-5)和不满意原因
  2. 产品改进建议:在第一个示例基础上,通过窗口聚合每条评论的分析结果,生成产品级摘要(评分分布和常见投诉),然后应用第二个 Agent 生成具体的改进建议

第一步:准备执行环境

from pyflink.datastream import StreamExecutionEnvironment
from flink_agents.api.execution_environment import AgentsExecutionEnvironment
from flink_agents.api.resource import ResourceType

env = StreamExecutionEnvironment.get_execution_environment()
agents_env = AgentsExecutionEnvironment.get_execution_environment(env)

# 注册 Ollama 聊天模型连接
agents_env.add_resource(
    "ollama_server",
    ResourceType.CHAT_MODEL_CONNECTION,
    ollama_server_descriptor,
)

第二步:定义 ReviewAnalysisAgent

这个 Agent 接收产品评论,使用 LLM 生成满意度评分和潜在的不满意原因。它展示了如何定义提示词、工具、聊天模型和 Action:

import json
import logging
from pydantic import BaseModel
from typing import ClassVar

from flink_agents.api.agent import Agent
from flink_agents.api.decorators import action, chat_model_setup, prompt, tool
from flink_agents.api.event import Event, InputEvent, OutputEvent, ChatResponseEvent
from flink_agents.api.context import RunnerContext
from flink_agents.api.chat_message import ChatMessage, MessageRole
from flink_agents.api.chat_event import ChatRequestEvent
from flink_agents.api.resource import ResourceDescriptor, ResourceName
from flink_agents.api.prompts.prompt import Prompt

# 数据模型
class ProductReview(BaseModel):
    id: str
    review: str

class ProductReviewAnalysisRes(BaseModel):
    id: str
    score: int
    reasons: list[str]

# 提示词
review_analysis_prompt = Prompt.from_messages(
    messages=[
        ChatMessage(
            role=MessageRole.SYSTEM,
            content="""你是一个产品评论分析专家。分析给定的产品评论,
            返回JSON格式的结果,包含:
            - score: 满意度评分(1-5)
            - reasons: 不满意原因列表

            示例输出:
            {"score": 2, "reasons": ["物流损坏", "质量不佳"]}
            """
        ),
        ChatMessage(
            role=MessageRole.USER,
            content="{input}"
        ),
    ]
)

class ReviewAnalysisAgent(Agent):
    """使用 LLM 分析产品评论,生成满意度评分和不满意原因。"""

    @prompt
    @staticmethod
    def review_analysis_prompt() -> Prompt:
        return review_analysis_prompt

    @tool
    @staticmethod
    def notify_shipping_manager(id: str, review: str) -> None:
        """当产品因运输损坏收到差评时通知运输经理。

        Parameters
        ----------
        id : str
            收到差评的产品ID
        review : str
            差评内容
        """
        notify_shipping_manager(id=id, review=review)

    @chat_model_setup
    @staticmethod
    def review_analysis_model() -> ResourceDescriptor:
        return ResourceDescriptor(
            clazz=ResourceName.ChatModel.OLLAMA_SETUP,
            connection="ollama_server",
            model="qwen3:8b",
            prompt="review_analysis_prompt",
            tools=["notify_shipping_manager"],
            extract_reasoning=True,
        )

    @action(InputEvent.EVENT_TYPE)
    @staticmethod
    def process_input(event: Event, ctx: RunnerContext) -> None:
        """处理输入事件,发送聊天请求。"""
        input = ProductReview.model_validate(InputEvent.from_event(event).input)
        # 将 id 存入短期记忆,供后续 Action 使用
        ctx.short_term_memory.set("id", input.id)

        content = f"""
            "id": {input.id},
            "review": {input.review}
        """
        msg = ChatMessage(role=MessageRole.USER)
        ctx.send_event(
            ChatRequestEvent(
                model="review_analysis_model",
                messages=[msg],
                prompt_args={"input": content},
            )
        )

    @action(ChatResponseEvent.EVENT_TYPE)
    @staticmethod
    def process_chat_response(event: Event, ctx: RunnerContext) -> None:
        """处理聊天响应,发送输出事件。"""
        chat_response = ChatResponseEvent.from_event(event)
        try:
            json_content = json.loads(chat_response.response.content)
            ctx.send_event(
                OutputEvent(
                    output=ProductReviewAnalysisRes(
                        id=ctx.short_term_memory.get("id"),
                        score=json_content["score"],
                        reasons=json_content["reasons"],
                    )
                )
            )
        except Exception:
            logging.exception(
                f"Error processing chat response {chat_response.response.content}"
            )
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSource
from pyflink.common import WatermarkStrategy, Duration
from pyflink.datastream.formats import StreamFormat

# 读取产品评论文件作为流式数据源
product_review_stream = env.from_source(
    source=FileSource.for_record_stream_format(
        StreamFormat.text_line_format(),
        f"file:///{current_dir}/resources/product_review.txt",
    )
    .monitor_continuously(Duration.of_minutes(1))
    .build(),
    watermark_strategy=WatermarkStrategy.no_watermarks(),
    source_name="streaming_agent_example",
).map(
    lambda x: ProductReview.model_validate_json(x)  # 反序列化 JSON
)

# 使用 ReviewAnalysisAgent 分析每条评论
review_analysis_res_stream = (
    agents_env.from_datastream(
        input=product_review_stream,
        key_selector=lambda x: x.id
    )
    .apply(ReviewAnalysisAgent())
    .to_datastream()
)

# 打印分析结果
review_analysis_res_stream.print()

# 执行 Flink 管道
agents_env.execute("Workflow Agent Example Job")

第四步:多 Agent 工作流

第二个示例展示了如何构建多 Agent 工作流。ProductSuggestionAgent 接收聚合后的分析结果,生成产品改进建议。通过 Table API 读取输入,使用窗口聚合:

from pyflink.datastream import KeySelector

class TableKeySelector(KeySelector):
    """从 Table 行(dict)中提取分区键。"""
    def get_key(self, value):
        return str(value["id"])

class TableReviewAnalysisAgent(Agent):
    # prompt / tool / chat_model_setup 与 ReviewAnalysisAgent 相同

    @action(InputEvent.EVENT_TYPE)
    @staticmethod
    def process_input(event: Event, ctx: RunnerContext) -> None:
        # Table 输入以 dict 形式到达,而非 POJO
        input_dict = InputEvent.from_event(event).input
        product_id = str(input_dict["id"])
        review_text = str(input_dict["review"])
        # ... 后续逻辑与 ReviewAnalysisAgent 相同

# 通过 Table API 集成
input_table = t_env.from_path("product_reviews")

review_analysis_res_stream = (
    agents_env.from_table(input=input_table, key_selector=TableKeySelector())
    .apply(TableReviewAnalysisAgent())
    .to_datastream()
)

高级特性

持久化执行(Durable Execution)

持久化执行用于包装耗时操作或带有副作用的操作。框架会持久化执行结果,在故障恢复时重放——当遇到相同的调用时,函数不会被再次执行,从而避免副作用重复。

@action(InputEvent.EVENT_TYPE)
@staticmethod
def process_input(event: Event, ctx: RunnerContext) -> None:
    input_event = InputEvent.from_event(event)

    def slow_external_call(data: str) -> str:
        time.sleep(2)
        return f"Processed: {data}"

    # 持久化执行:结果被保存,恢复时重放
    result = ctx.durable_execute(slow_external_call, input_event.input)
    ctx.send_event(OutputEvent(output=result))

约束条件

  • 函数必须是确定性的,且在恢复时以相同顺序被调用
  • 函数内部禁止访问 Memory 和调用 send_event
  • 参数和结果必须可序列化
  • 需要外部 Action 状态存储

带调和器的持久化执行

当原始调用可能已完成但结果或失败尚未持久化时,使用调和器(reconciler)提供自定义恢复逻辑:

def submit_payment(order_id: str) -> str:
    return payment_client.submit(order_id)

def payment_reconciler() -> str:
    """恢复时调用,查询支付状态而非重新提交。"""
    status = payment_client.get_status(order_id)
    if status == "SUCCEEDED":
        return payment_client.lookup_completed_payment(order_id)
    raise payment_client.get_failure(order_id)

result = ctx.durable_execute(
    submit_payment,
    order_id,
    reconciler=payment_reconciler,
)

异步执行(Async Execution)

异步执行使用与持久化执行相同的语义,但在等待线程池任务时让出执行权。适用于高延迟 I/O 操作。需要 Java 21+。

@action(InputEvent.EVENT_TYPE)
@staticmethod
async def process_with_async(event: Event, ctx: RunnerContext) -> None:
    input_event = InputEvent.from_event(event)

    def slow_external_call(data: str) -> str:
        time.sleep(2)
        return f"Processed: {data}"

    result = await ctx.durable_execute_async(slow_external_call, input_event.input)
    ctx.send_event(OutputEvent(output=result))

Python 异步 Action 仅支持 await ctx.durable_execute_async(...)。标准的 asyncio 函数(如 asyncio.gatherasyncio.waitasyncio.create_taskasyncio.sleep)不受支持,因为没有 asyncio 事件循环。

跨语言 Actions

在一种语言中声明的 Action 可以将其函数体分派到另一种语言执行。装饰的函数作为存根(stub),应抛出异常以防框架外的直接调用:

from flink_agents.api.function import JavaFunction

class MyAgent(Agent):
    @action(
        InputEvent.EVENT_TYPE,
        target=JavaFunction.for_action("com.example.MyHandlers", "handleInput"),
    )
    @staticmethod
    def handle_input(event: Event, ctx: RunnerContext) -> None:
        raise NotImplementedError("cross-language stub")

跨语言 Action 目前仅在 Flink 集群中运行时支持,本地开发模式不支持。

运行与部署

准备 Ollama

快速入门示例使用 Ollama 作为本地 LLM 服务:

# 下载并安装 Ollama(需要 0.9.0 或更高版本)
# 从 https://ollama.com 下载

# 拉取 qwen3:8b 模型
ollama pull qwen3:8b
# 配置 PYTHONPATH
export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])')

# 启动本地独立 Flink 集群
$FLINK_HOME/bin/start-cluster.sh

启动后,访问 localhost:8081 查看 Flink Web UI。如果无法访问,检查 $FLINK_HOME/log 中的日志。端口冲突时,修改 $FLINK_HOME/conf/config.yaml 中的端口配置。

提交 Agent 作业

export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])')

# 运行评论分析示例
$FLINK_HOME/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/workflow_single_agent_example.py

# 运行产品建议示例(多 Agent)
$FLINK_HOME/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py

作业提交后,在 Flink Web UI (localhost:8081) 中可以看到已提交的 Flink 作业。几分钟后,可以在 TaskManager 的输出日志中查看结果。

总结

Apache Flink Agents 将 AI Agent 的能力引入了 Flink 的流处理生态系统,创造了一种新范式:Agent 不再是被动等待人类提示的交互式工具,而是嵌入在实时数据流中的主动决策者。

本教程覆盖了以下核心内容:

  • 架构理念:事件驱动的编排模型,通过 Action 和事件构成有向图计算
  • 核心概念:Action、Event、Chat Model、Tool、Memory、Skills 六大构建块
  • 两种范式:Workflow Agent(开发者控制编排)和 ReAct Agent(LLM 自主编排)
  • Flink 集成:通过 DataStream API 和 Table API 将 Agent 嵌入流处理管道
  • 高级特性:持久化执行保证 exactly-once 语义,异步执行支持高延迟 I/O,跨语言能力打破语言壁垒
  • 实战开发:从环境搭建到 Agent 定义、集成、部署的完整流程

Flink Agents 的价值在于它不是重新发明 Agent 框架,而是将成熟的 Agent 抽象(提示词、工具、记忆、技能)嫁接到 Flink 经过大规模验证的流处理引擎上,使 Agent 天然具备分布式、容错、状态管理和精确一次执行的工程能力。对于需要在实时数据流上构建 AI 决策能力的场景,Flink Agents 提供了一个兼具开发效率和工程可靠性的方案。