kafka客户端mavenkafka官网

太平洋在线下载 13 0
文档kafka3.2 文档1. 入门1.1 简介什么是事件流kafka客户端maven

事件流是人体中枢神经系统的数字等效物。它是“永远在线”世界的技术基础kafka客户端maven,在这个世界中,企业越来越多地由软件定义和自动化,并且软件的用户更多地是软件。

从技术上讲,事件流是从事件源(如数据库、传感器、移动设备、云服务和软件应用程序)以事件流的形式实时捕获数据的实践kafka客户端maven;持久存储这些事件流以供以后检索;实时和回顾性地操作、处理和响应事件流;并根据需要将事件流路由到不同的目标技术。因此,事件流确保了数据的连续流动和解释,以便正确的信息在正确的时间出现在正确的位置。

我可以将事件流用于什么?

事件流应用于 众多行业和组织的各种用例。它的许多例子包括:

实时处理支付和金融交易,例如在证券交易所、银行和保险中。实时跟踪和监控汽车、卡车、车队和货运,例如在物流和汽车行业。持续捕获和分析来自物联网设备或其他设备的传感器数据,例如工厂和风电场。收集并立即响应客户互动和订单,例如零售、酒店和旅游行业以及移动应用程序。监测住院病人,预测病情变化,确保在紧急情况下及时治疗。连接、存储和提供公司不同部门产生的数据。作为数据平台、事件驱动架构和微服务的基础。Apache Kafka® 是一个事件流平台。这意味着什么?

Kafka 结合了三个关键功能,因此您可以 通过一个经过实战考验的解决方案实现端到端的事件流 用例:

发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据 。根据需要持久可靠地 存储事件流。在事件发生时或回顾性 地处理事件流。

所有这些功能都以分布式、高度可扩展、弹性、容错和安全的方式提供。Kafka 可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。您可以在自行管理 Kafka 环境和使用各种供应商提供的完全托管服务之间进行选择。

简而言之,kafka是如何工作的?

Kafka 是一个分布式系统,由通过高性能TCP 网络协议进行通信的服务器和客户端组成。它可以部署在本地和云环境中的裸机硬件、虚拟机和容器上。

服务器:Kafka 作为一个或多个服务器的集群运行,可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect以将数据作为事件流持续导入和导出,以将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何一个服务器出现故障,其他服务器将接管它们的工作,以确保持续运行而不会丢失任何数据。

客户端:它们允许您编写分布式应用程序和微服务,以并行、大规模和容错方式读取、写入和处理事件流,即使在网络问题或机器故障的情况下也是如此。Kafka 附带了一些这样的客户端,这些客户端由 Kafka 社区提供的 数十个客户端进行了扩充:客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams库,用于 Go、Python、C/C++ 和许多其他编程语言以及 REST API。

主要概念和术语

事件记录了世界或您的业务中“发生了某事” 的事实。在文档中也称为记录或消息。当您向 Kafka 读取或写入数据时,您以事件的形式执行此操作。从概念上讲,事件具有键、值、时间戳和可选的元数据标头。这是一个示例事件:

事件键:“爱丽丝”事件价值:“向 Bob 支付了 200 美元”事件时间戳:“2020 年 6 月 25 日下午 2:06”

生产者是那些向 Kafka 发布(写入)事件的客户端应用程序,而消费者是订阅(读取和处理)这些事件的那些客户端应用程序。在 Kafka 中,生产者和消费者完全解耦并且彼此不可知,这是实现 Kafka 众所周知的高可扩展性的关键设计元素。例如,生产者永远不需要等待消费者。Kafka 提供了各种保证,例如一次性处理事件的能力。

事件被组织并持久地存储在主题中。非常简化,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。示例主题名称可以是“付款”。Kafka 中的主题始终是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事件的消费者。主题中的事件可以根据需要随时读取——与传统的消息传递系统不同,事件在消费后不会被删除。相反,您可以通过每个主题的配置设置来定义 Kafka 应该将您的事件保留多长时间,之后旧事件将被丢弃。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非常好的。

主题是分区的,这意味着一个主题分布在位于不同 Kafka 代理上的多个“桶”中。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从多个代理读取和写入数据。当一个新事件发布到一个主题时,它实际上是附加到主题的分区之一。具有相同事件键(例如,客户或车辆 ID)的事件被写入同一个分区,并且 Kafka保证给定主题分区的任何消费者将始终以与写入事件完全相同的顺序读取该分区的事件。

kafka客户端mavenkafka官网-第1张图片-太平洋在线下载

为了使您的数据具有容错性和高可用性,可以复制每个主题,甚至跨地理区域或数据中心,以便始终有多个代理拥有数据副本,以防万一出现问题,您想要对经纪人进行维护,等等。一个常见的生产设置是复制因子为 3,即始终存在三个数据副本。此复制在主题分区级别执行。

这本入门书应该足以进行介绍。如果您有兴趣,文档的设计部分会详细解释 Kafka 的各种概念。

kafka API

除了用于管理和行政任务的命令行工具外,Kafka 还为 Java 和 Scala 提供了五个核心 API:

用于管理和检查主题、代理和其他 Kafka 对象 的管理 API 。将事件流发布(写入)到一个或多个 Kafka 主题 的Producer API 。Consumer API订阅(读取)一个或多个主题并处理向它们生成的事件流 。用于实现流处理应用程序和微服务 的Kafka Streams API 。它提供了更高级别的函数来处理事件流,包括转换、聚合和连接等有状态操作、窗口化、基于事件时间的处理等等。从一个或多个主题读取输入以生成一个或多个主题的输出,有效地将输入流转换为输出流。Kafka Connect API用于构建和运行可重用 的数据导入/导出连接器,这些连接器从外部系统和应用程序消费(读取)或产生(写入)事件流,以便它们可以与 Kafka 集成。例如,与 PostgreSQL 等关系数据库的连接器可能会捕获对一组表的每次更改。但是,在实践中,您通常不需要实现自己的连接器,因为 Kafka 社区已经提供了数百个即用型连接器。从这往哪儿走要获得 Kafka 的实践经验,请遵循快速入门。要更详细地了解 Kafka,请阅读文档。您还可以选择Kafka 书籍和学术论文。浏览用例,了解我们全球社区中的其他用户如何从 Kafka 中获得价值。加入当地的 Kafka 聚会小组, 观看 Kafka 峰会(Kafka 社区的主要会议)的演讲。1.2 用例

以下是 Apache Kafka® 的一些流行用例的描述。有关其中一些实际应用领域的概述,请参阅此博客文章。

消息传递

Kafka 可以很好地替代更传统的消息代理。消息代理的使用有多种原因(将处理与数据生产者分离,缓冲未处理的消息等)。与大多数消息传递系统相比,Kafka 具有更好的吞吐量、内置的分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。

根据我们的经验,消息传递的使用通常吞吐量相对较低,但可能需要较低的端到端延迟,并且通常依赖于 Kafka 提供的强大的持久性保证。

在这个领域,Kafka 可与ActiveMQ或 RabbitMQ 等传统消息传递系统相媲美。

网站活动跟踪

Kafka 的原始用例是能够将用户活动跟踪管道重建为一组实时发布-订阅源。这意味着站点活动(页面查看、搜索或用户可能采取的其他操作)将发布到中心主题,每种活动类型都有一个主题。这些订阅源可用于订阅一系列用例,包括实时处理、实时监控以及加载到 Hadoop 或离线数据仓库系统以进行离线处理和报告。

活动跟踪的数量通常非常高,因为每个用户页面查看都会生成许多活动消息。

指标

Kafka 常用于运营监控数据。这涉及聚合来自分布式应用程序的统计数据以生成操作数据的集中提要。

日志聚合

许多人使用 Kafka 作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在一个中心位置(可能是文件服务器或 HDFS)进行处理。Kafka 抽象出文件的细节,并将日志或事件数据更清晰地抽象为消息流。这允许更低延迟的处理和更容易支持多个数据源和分布式数据消费。与 Scribe 或 Flume 等以日志为中心的系统相比,Kafka 提供同样出色的性能、由于复制而产生的更强大的持久性保证以及更低的端到端延迟。

流处理

许多 Kafka 用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从 Kafka 主题中消费,然后聚合、丰富或以其他方式转换为新主题以供进一步消费或后续处理。例如,用于推荐新闻文章的处理管道可能会从 RSS 提要中抓取文章内容并将其发布到“文章”主题;进一步的处理可能会对该内容进行规范化或去重,并将清理后的文章内容发布到新主题;最终处理阶段可能会尝试向用户推荐此内容。此类处理管道基于各个主题创建实时数据流图。从 0.10.0.0 开始,一个轻量级但功能强大的流处理库,称为Kafka Streams 可以在 Apache Kafka 中执行上述数据处理。除了 Kafka Streams,替代的开源流处理工具包括Apache Storm和 Apache Samza。

事件溯源

事件溯源是一种应用程序设计风格,其中状态更改被记录为按时间排序的记录序列。Kafka 对非常大的存储日志数据的支持使其成为以这种风格构建的应用程序的出色后端。

提交日志

Kafka 可以作为分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka 中的日志压缩功能有助于支持这种用法。在这种用法中,Kafka 类似于Apache BookKeeper项目。

1.3 快速入门第 1 步:获取 KAFKA

下载 最新的 Kafka 版本并解压:

$ tar -xzf kafka_2.13-3.2.0.tgz$ cd kafka_2.13-3.2.0第二步:启动KAFKA环境

注意:您的本地环境必须安装 Java 8+。

运行以下命令以按正确顺序启动所有服务:

# Start the ZooKeeper service# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话并运行:

# Start the Kafka broker service$ bin/kafka-server-start.sh config/server.properties

成功启动所有服务后,您将拥有一个基本的 Kafka 环境运行并可以使用。

第 3 步:创建一个主题来存储您的事件

Kafka 是一个分布式事件流平台,可让您跨多台机器 读取、写入、存储和处理 事件(在文档中也称为记录或 消息)。

示例事件包括支付交易、来自手机的地理位置更新、运输订单、来自物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在 主题中。非常简化,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。

因此,在您编写第一个事件之前,您必须创建一个主题。打开另一个终端会话并运行:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Kafka 的所有命令行工具都有其他选项:运行kafka-topics.sh不带任何参数的命令以显示使用信息。例如,它还可以显示 新主题 的分区数等详细信息:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0第 4 步:将一些事件写入主题

Kafka 客户端通过网络与 Kafka 代理通信以写入(或读取)事件。一旦收到,代理将以持久和容错的方式存储事件,只要您需要 - 甚至永远。

运行控制台生产者客户端将一些事件写入您的主题。默认情况下,您输入的每一行都会导致将一个单独的事件写入主题。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092This is my first eventThis is my second event

您可以随时停止生产者客户端Ctrl-C。

第 5 步:阅读事件

打开另一个终端会话并运行控制台使用者客户端以读取您刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092This is my first eventThis is my second event

您可以随时停止消费者客户端Ctrl-C。

随意尝试:例如,切换回您的生产者终端(上一步)以编写其他事件,并查看事件如何立即显示在您的消费者终端中。

因为事件被持久地存储在 Kafka 中,所以它们可以被尽可能多的消费者多次读取。您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松验证这一点。

第 6 步:使用 KAFKA CONNECT 将数据作为事件流导入/导出

您可能在现有系统(如关系数据库或传统消息传递系统)中拥有大量数据,以及许多已经使用这些系统的应用程序。Kafka Connect允许您不断地将来自外部系统的数据摄取到 Kafka 中,反之亦然。它是一个运行 连接器的可扩展工具,它实现了与外部系统交互的自定义逻辑。因此很容易将现有系统与 Kafka 集成。为了使这个过程更容易,有数百个这样的连接器随时可用。

在本快速入门中,我们将了解如何使用简单的连接器运行 Kafka Connect,这些连接器将数据从文件导入 Kafka 主题并将数据从 Kafka 主题导出到文件。

首先,确保添加connect-file-3.2.0.jar到plugin.pathConnect worker 配置中的属性。出于本快速入门的目的,我们将使用相对路径并将连接器的包视为 uber jar,当从安装目录运行快速入门命令时,它可以工作。但是,值得注意的是,对于生产部署,使用绝对路径总是更可取的。有关如何设置此配置的详细说明, 请参阅plugin.path 。

编辑config/connect-standalone.properties文件,添加或更改plugin.path配置属性匹配以下,并保存文件:

> 回声“plugin.path=lib/connect-file-3.2.0.jar”

然后,首先创建一些种子数据进行测试:

> echo -e "foonbar" > test.txt

或在 Windows 上:

> 回声 foo> test.txt> 回声栏>> test.txt

接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,包含常见的配置,例如要连接的 Kafka 代理和数据的序列化格式。其余的配置文件每个都指定一个要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

> bin/connect-standalone.sh 配置/connect-standalone.properties 配置/connect-file-source.properties 配置/connect-file-sink.properties

这些示例配置文件包含在 Kafka 中,使用您之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,它从输入文件读取行并将每个行生成到 Kafka 主题,第二个是接收器连接器它从 Kafka 主题读取消息,并将每个消息作为输出文件中的一行生成。

在启动期间,您会看到许多日志消息,包括一些指示正在实例化连接器的消息。一旦 Kafka Connect 进程开始,源连接器应该开始从主题读取行test.txt并将它们生成到主题connect-test,而接收器连接器应该开始从主题读取消息connect-test 并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传递:

> 更多 test.sink.txt富酒吧

请注意,数据存储在 Kafka 主题connect-test中,因此我们还可以运行控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"foo"}{"schema":{"type":"string","optional":false},"payload":"bar"}...

连接器继续处理数据,因此我们可以将数据添加到文件并查看它在管道中的移动:

> echo 另一行>> test.txt

您应该看到该行出现在控制台使用者输出和接收器文件中。

第 7 步:使用 KAFKA STREAMS 处理您的事件

一旦您的数据作为事件存储在 Kafka 中,您就可以使用 Java/Scala 的 Kafka Streams客户端库处理数据。它允许您实现关键任务的实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 的服务器端集群技术的优势相结合,使这些应用程序具有高度可扩展性、弹性、容错性和分布式性。该库支持一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。

为了让您初步了解,以下是实现流行WordCount算法的方法:

KStream<String, String> textLines = builder.stream("quickstart-events");KTable<String, Long> wordCounts = textLines .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" "))) .groupBy((keyIgnored, word) -> word) .count();wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

Kafka Streams 演示 和应用程序开发教程 演示了如何从头到尾编写和运行这样的流应用程序 。

第 8 步:终止 KAFKA 环境

现在您已经完成了快速入门,请随意拆除 Kafka 环境,或者继续玩。

Ctrl-C如果您还没有这样做,请 使用 停止生产者和消费者客户端。使用 停止 Kafka 代理Ctrl-C。最后,使用 . 停止 ZooKeeper 服务器Ctrl-C。

如果您还想删除本地 Kafka 环境的任何数据,包括您在此过程中创建的任何事件,请运行以下命令:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper恭喜!

您已成功完成 Apache Kafka 快速入门。

要了解更多信息,我们建议执行以下后续步骤:

通读简介 ,了解 Kafka 在高层次上的工作原理、主要概念以及与其他技术的比较。要更详细地了解 Kafka,请参阅 文档。浏览用例,了解我们全球社区中的其他用户如何从 Kafka 中获得价值。加入当地的 Kafka 聚会小组, 观看 Kafka 峰会(Kafka 社区的主要会议)的演讲。1.4 生态系统

在主要发行版之外,有许多工具可以与 Kafka 集成。生态系统页面列出了其中许多,包括流处理系统、Hadoop 集成、监控和部署工具。

1.4 生态系统

在主要发行版之外,有许多工具可以与 Kafka 集成。生态系统页面列出了其中许多,包括流处理系统、Hadoop 集成、监控和部署工具。

1.5 从旧版本升级从 0.8.x 到 3.1.x 的任何版本升级到 3.2.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x 或更高版本升级,并且您没有覆盖消息格式,那么您只需要覆盖中间代理协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,3.1等3.0)inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,3.1等3.0)log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过将协议版本编辑 inter.broker.protocol.version并设置为3.2.一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 3.2 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。3.2.0 的显着变化如果没有设置冲突的配置,默认情况下会启用生产者的幂等性。在 3.0.0 和 3.1.0 中,一个错误阻止了此默认值的应用,这意味着除非用户明确设置enable.idempotence为 true,否则幂等性保持禁用(有关更多详细信息,请参阅KAFKA-13598)。此问题已修复,默认值已在 3.0.1、3.1.1 和 3.2.0 中正确应用。一个值得注意的例外是 Connect,它默认禁用其所有生产者的幂等行为,以便统一支持使用各种 Kafka 代理版本。用户可以通过 Connect worker 和/或连接器配置更改此行为以启用部分或所有生产者的幂等性。Connect 可能会在未来的主要版本中默认启用幂等生产者。出于安全考虑,Kafka 已将 log4j 和 slf4j-log4j12 替换为 reload4j 和 slf4j-reload4j。这仅影响指定日志记录后端的模块(connect-runtime并且kafka-tools是两个这样的示例)。许多模块,包括kafka-clients,将其留给应用程序来指定日志记录后端。更多信息可以在reload4j找到。依赖 Kafka 项目中受影响模块的项目应使用 slf4j-log4j12 版本 1.7.35 或更高版本或 slf4j-reload4j 以避免 源自日志框架的可能的兼容性问题。示例连接器FileStreamSourceConnector和FileStreamSinkConnector已从默认类路径中删除。要在 Kafka Connect 独立或分布式模式下使用它们,需要显式添加它们,例如CLASSPATH=./lib/connect-file-3.2.0.jar ./bin/connect-distributed.sh.从 0.8.x 到 3.0.x 的任何版本升级到 3.1.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x 或更高版本升级,并且您没有覆盖消息格式,那么您只需要覆盖中间代理协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,3.0等2.8)inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,3.0等2.8)log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过将协议版本编辑 inter.broker.protocol.version并设置为3.1.一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 3.1 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。3.1.1 中的显着变化如果没有设置冲突的配置,默认情况下会启用生产者的幂等性。一个错误阻止了生产者幂等默认值的应用,这意味着它保持禁用状态,除非用户明确设置 enable.idempotence为 true。有关详细信息,请参阅KAFKA-13598。此问题已修复,默认值已正确应用。一个值得注意的例外是 Connect,它默认禁用其所有生产者的幂等行为,以便统一支持使用各种 Kafka 代理版本。用户可以通过 Connect worker 和/或连接器配置更改此行为以启用部分或所有生产者的幂等性。Connect 可能会在未来的主要版本中默认启用幂等生产者。3.1.0 的显着变化Apache Kafka 支持 Java 17。以下指标已被弃用:bufferpool-wait-time-total、io-waittime-total和iotime-total。请使用bufferpool-wait-time-ns-total, io-wait-time-ns-total, 和io-time-ns-total代替。有关详细信息,请参阅KIP-773 。从 2.4 开始,合作再平衡协议一直是默认协议,但我们继续支持 Eager 再平衡协议,为用户提供升级路径。此支持将在未来的版本中删除,因此任何仍在使用 Eager 协议的用户都应准备完成将其应用程序升级到版本 3.1 中的协作协议。这仅影响仍在使用旧于 2.4 版本的用户,以及已升级但尚未删除upgrade.from从低于 2.4 的版本升级时设置的配置。适合后一种情况的用户在升级到 3.1 以上时只需取消设置此配置,而前一种情况的用户如果尝试从 2.3 或更低版本升级到 3.1 以上版本,则需要遵循稍微不同的升级路径。这些应用程序需要通过桥接版本,首先升级到 2.4 - 3.1 之间的版本并设置upgrade.from配置,然后删除该配置并升级到 3.1 以上的最终版本。有关详细信息,请参阅KAFKA-8575 。IBP 3.1 将主题 ID 作为 KIP-516的一部分引入 FetchRequest 。从 0.8.x 到 2.8.x 的任何版本升级到 3.0.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x 或更高版本升级,并且您没有覆盖消息格式,那么您只需要覆盖中间代理协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.8等2.7)inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.8等2.7)log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过将协议版本编辑 inter.broker.protocol.version并设置为3.0.一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 3.0 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。3.0.1 中的显着变化如果没有设置冲突的配置,默认情况下会启用生产者的幂等性。一个错误阻止了生产者幂等默认值的应用,这意味着它保持禁用状态,除非用户明确设置 enable.idempotence为 true。有关详细信息,请参阅KAFKA-13598。此问题已修复,默认值已正确应用。3.0.0 的显着变化默认情况下,生产者具有更强的交付保证:idempotence已启用并acks设置为all而不是1. 有关详细信息,请参阅KIP-679。在 3.0.0 和 3.1.0 中,一个错误阻止了幂等默认值的应用,这意味着它保持禁用状态,除非用户明确设置 enable.idempotence为 true。请注意,该错误不会影响acks=all更改。有关详细信息,请参阅KAFKA-13598。此问题已修复,默认值已在 3.0.1、3.1.1 和 3.2.0 中正确应用。ZooKeeper 已升级到版本 3.6.3。提供了 KRaft 模式的预览版,但无法从 2.8 早期访问版本升级到它。有关详细信息,请参阅config/kraft/README.md文件。发布 tarball 不再包含测试、源、javadoc 和测试源 jar。这些仍然发布到 Maven 中央存储库。现在,运行时类路径中提供了许多实现依赖 jar,而不是编译和运行时类路径。升级后的编译错误可以通过显式添加缺少的依赖 jar 或更新应用程序以不使用内部类来修复。消费者配置的默认值session.timeout.ms从 10 秒增加到 45 秒。有关详细信息,请参阅 KIP-735 。代理配置log.message.format.version和主题配置message.format.version已被弃用。两种配置的值始终假定为3.0if inter.broker.protocol.versionis3.0或更高。如果设置了log.message.format.version或< ,我们建议在升级到3.0message.format.version的同时清除它们 。如果降级,inter.broker.protocol.version这将避免潜在的兼容性问题。有关详细信息,inter.broker.protocol.version请参阅KIP-724 。Streams API 删除了在 2.5.0 或更早版本中已弃用的所有弃用 API。有关已删除 API 的完整列表,请比较详细的 Kafka Streams 升级说明。Kafka Streams 不再对“connect:json”模块(KAFKA-5146)有编译时依赖。依赖这种传递依赖的项目必须明确声明它。通过指定的自定义主体构建器实现principal.builder.class现在必须实现 KafkaPrincipalSerde接口以允许在代理之间转发。有关KafkaPrincipalSerde 使用的更多详细信息,请参阅KIP-590 。已从 、 和 模块中删除了许多已弃用的类、clients方法connect和工具core:toolsScalaAuthorizer和SimpleAclAuthorizer相关类已被删除。请使用 JavaAuthorizer 和AclAuthorizer代替。该Metric#value()方法被删除(KAFKA-12573)。和Sum类Total已被删除 ( KAFKA-12584 )。请使用WindowedSumandCumulativeSum代替。和Count类SampledTotal已被删除。请分别使用WindowedCount和WindowedSum 代替。,PrincipalBuilder和DefaultPrincipalBuilder类ResourceFilter已被删除。从SslConfigs、和 SaslConfigs中删除了各种常量和构造函数。AclBinding``AclBindingFilterAdmin.electedPreferredLeaders()方法被删除。请Admin.electLeaders改用。kafka-preferred-replica-election命令行工具已被删除。请kafka-leader-election改用。该--zookeeper选项已从kafka-topics和kafka-reassign-partitions命令行工具中删除。请--bootstrap-server改用。在kafka-configs命令行工具中,该--zookeeper选项仅支持在代理未运行时更新SCRAM 凭据配置 和描述/更新动态代理配置。请 用于其他配置操作。--bootstrap-server构造ConfigEntry函数已被移除 ( KAFKA-12577 )。请改用剩余的公共构造函数。default客户端配置的配置值client.dns.lookup已被删除。万一您明确设置此配置,我们建议您不要设置此配置(use_all_dns_ips默认使用)。和ExtendedDeserializer类ExtendedSerializer已被删除。请使用Deserializer andSerializer代替。该close(long, TimeUnit)方法已从生产者、消费者和管理客户端中删除。请使用 close(Duration).和方法被删除ConsumerConfig.addDeserializerToConfig。ProducerConfig.addSerializerToConfig这些方法不打算成为公共 API,也没有替代品。该NoOffsetForPartitionException.partition()方法已被删除。请partitions() 改用。默认partition.assignment.strategy更改为“[RangeAssignor, CooperativeStickyAssignor]”,默认情况下将使用 RangeAssignor,但允许升级到 CooperativeStickyAssignor,只需一个滚动反弹即可从列表中删除 RangeAssignor。请在此处查看客户端升级路径指南以获取更多详细信息。斯卡拉kafka.common.MessageFormatter被删除。请使用 Java org.apache.kafka.common.MessageFormatter.该MessageFormatter.init(Properties)方法已被删除。请configure(Map)改用。该checksum()方法已从ConsumerRecord和中删除RecordMetadata。自 0.11 以来一直默认的消息格式 v2 将校验和从记录移动到记录批次。因此,这些方法没有意义,也不存在替代品。该ChecksumMessageFormatter课程已被删除。它不是公共 API 的一部分,但它可能已与kafka-console-consumer.sh. 它报告了每条记录的校验和,自消息格式 v2 以来一直不支持。该类org.apache.kafka.clients.consumer.internals.PartitionAssignor已被删除。请 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor改用。和配置已被删除 ( quota.producer.defaultKAFKA -12591 )。必须改为使用动态配额默认值。quota.consumer.default和port配置host.name已删除。请listeners改用。和advertised.port配置advertised.host.name已删除。请advertised.listeners改用。已弃用的工作人员配置已从Kafka Connect 工作人员配置rest.host.name中rest.port删除 ( KAFKA-12482 )。请listeners改用。该Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)方法已被弃用。请 Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)改用,在哪里ConsumerGroupMetadata 可以通过检索KafkaConsumer#groupMetadata()更强的语义。请注意,完整的消费者组元数据集只有代理或 2.5 或更高版本才能理解,因此您必须升级您的 kafka 集群以获得更强的语义。否则,您可以直接new ConsumerGroupMetadata(consumerGroupId)与老经纪人合作。有关详细信息, 请参阅KIP-732 。Connectinternal.key.converter和internal.value.converter属性已被完全删除。自版本 2.0.0 起,这些 Connect 工作程序属性的使用已被弃用。工人现在被硬编码以使用 JSON 转换器并schemas.enable设置为false. 如果您的集群一直在使用不同的内部键或值转换器,您可以按照KIP-738中概述的迁移步骤 将您的 Connect 集群安全地升级到 3.0。基于 Connect 的 MirrorMaker (MM2) 包括对支持的更改IdentityReplicationPolicy,无需重命名主题即可实现复制。默认情况下仍使用现有DefaultReplicationPolicy的,但可以通过 replication.policy配置属性启用身份复制。这对于从旧版 MirrorMaker (MM1) 迁移的用户,或者对于不希望重命名主题的简单单向复制拓扑的用例特别有用。请注意IdentityReplicationPolicy,与 不同 DefaultReplicationPolicy,它不能防止基于主题名称的复制循环,因此在构建复制拓扑时请注意避免循环。原始 MirrorMaker (MM1) 和相关类已被弃用。请使用基于连接的 MirrorMaker (MM2),如异地 复制部分所述。从 0.8.x 到 2.7.x 的任何版本升级到 2.8.1

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x 或更高版本升级,并且您没有覆盖消息格式,那么您只需要覆盖中间代理协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.7等2.6)inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.7等2.6)log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过将协议版本编辑 inter.broker.protocol.version并设置为2.8.一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.8 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。2.8.0 的显着变化2.8.0 版本为KIP-679 中引入的授权者接口添加了一个新方法 。动机是解除对我们未来计划的阻碍,以默认启用最强的消息传递保证。自定义授权方应考虑提供更有效的实现,以支持审计日志记录和任何自定义配置或访问规则。IBP 2.8 将主题 ID 作为 KIP-516的一部分引入主题。使用 ZooKeeper 时,此信息存储在 TopicZNode 中。如果集群降级到以前的 IBP 或版本,未来的主题将不会获得主题 ID,并且不能保证主题将在 ZooKeeper 中保留其主题 ID。这意味着在再次升级时,一些主题或所有主题将被分配新的 ID。Kafka Streams 引入了一个类型安全的split()运算符来替代已弃用的KStream#branch()方法(参见KIP-418)。从 0.8.x 到 2.6.x 的任何版本升级到 2.7.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x 或更高版本升级,并且您没有覆盖消息格式,那么您只需要覆盖中间代理协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.6等2.5)inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.6等2.5)log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过将协议版本编辑 inter.broker.protocol.version并设置为2.7.一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.7 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。2.7.0 的显着变化2.7.0 版本包括 KIP-595中指定的核心 Raft 实现。有一个单独的“筏”模块包含大部分逻辑。在与控制器的集成完成之前,用户可以使用一个独立的服务器来测试 Raft 实现的性能。有关详细信息,请参阅 raft 模块中的 README.mdKIP-651添加了 对使用 PEM 文件进行密钥和信任存储的支持。KIP-612增加了 对强制执行代理范围和每个侦听器连接创建速率的支持。2.7.0 版本包含 KIP-612 的第一部分,动态配置将在 2.8.0 版本中出现。限制主题和分区创建或主题删除的能力,以防止集群受到 KIP-599的损害当 Kafka 中的新功能可用时,有两个主要问题:KIP-584为客户端发现、功能选通和滚动升级提供了一个灵活且易于操作的解决方案,只需一次重启即可。Kafka 客户端如何了解代理功能?代理如何决定启用哪些功能?ConsoleConsumer现在可以通过KIP-431 打印记录偏移量和标题添加KIP-554 继续朝着从 Kafka 中移除 Zookeeper 的目标取得进展。添加 KIP-554 意味着您不必再直接连接到 ZooKeeper 来管理 SCRAM 凭据。更改现有侦听器的不可重新配置配置会导致InvalidRequestException. 相比之下,先前的(意外)行为会导致更新的配置被持久化,但直到重新启动代理才会生效。有关更多讨论,请参阅KAFKA-10479。请参阅DynamicBrokerConfig.DynamicSecurityConfigs和SocketServer.ListenerReconfigurableConfigs 了解现有侦听器支持的可重新配置配置。Kafka Streams 在 KStreams DSL 中 添加了对滑动窗口聚合的支持。对状态存储进行反向迭代,使用KIP-617 实现更高效的最新更新搜索Kafka Steams 中的端到端延迟指标请参阅 KIP-613 了解更多详细信息Kafka Streams 添加了使用KIP-607 报告默认 RocksDB 属性的指标来自KIP-616 的 更好的 Scala 隐式 Serdes 支持从 0.8.x 到 2.5.x 的任何版本升级到 2.6.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x 或更高版本升级,并且您没有覆盖消息格式,那么您只需要覆盖中间代理协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.5等2.4)inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.5等2.4)log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过将协议版本编辑 inter.broker.protocol.version并设置为2.6.一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.6 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。2.6.0 的显着变化Kafka Streams 添加了一种新的处理模式(需要 broker 2.5 或更高版本),该模式使用完全一次保证来提高应用程序的可扩展性(参见KIP-447)Java 11 或更高版本默认启用 TLSv1.3。如果双方都支持,客户端和服务器将协商 TLSv1.3,否则回退到 TLSv1.2。有关详细信息, 请参阅 KIP-573 。配置的默认值client.dns.lookup已从 更改default 为use_all_dns_ips。如果主机名解析为多个 IP 地址,客户端和代理现在将尝试按顺序连接到每个 IP,直到连接成功建立。有关详细信息, 请参阅 KIP-602 。NotLeaderForPartitionException已弃用并替换为NotLeaderOrFollowerException. 如果代理不是副本,则仅针对领导者或跟随者的获取请求和其他请求返回 NOT_LEADER_OR_FOLLOWER(6) 而不是 REPLICA_NOT_AVAILABLE(9),确保所有客户端将重新分配期间的此临时错误作为可重试异常处理。从 0.8.x 到 2.4.x 的任何版本升级到 2.5.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x 或更高版本升级,并且您没有覆盖消息格式,那么您只需要覆盖中间代理协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.4等2.3)inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如,,,2.4等2.3)log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过将协议版本编辑 inter.broker.protocol.version并设置为2.5.一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.5 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。kafka-reassign-partitions.sh 完成 KIP-455后,重新分配工具有几个显着的变化。此工具现在需要--additional在更改活动重新分配的限制时提供标志。现在可以使用该 --cancel命令取消重新分配。最后,重新分配 with--zookeeper 已被弃用,取而代之的是--bootstrap-server. 有关详细信息,请参阅 KIP。2.5.0 的显着变化使用时RebalanceProtocol#COOPERATIVE,Consumer#poll仍然可以在对仍归消费者所有的分区进行重新平衡期间返回数据;此外, Consumer#commitSync现在可能会抛出一个非致命RebalanceInProgressException事件来通知用户此类事件,以便与致命事件区分开来CommitFailedException并允许用户完成正在进行的重新平衡,然后重新尝试为那些仍然拥有的分区提交偏移量。为了提高典型网络环境中的弹性,默认值 zookeeper.session.timeout.ms已从 6 秒增加到 18 秒, replica.lag.time.max.ms从 10 秒增加到 30 秒。添加了新的 DSL 运算符cogroup(),用于一次将多个流聚合在一起。添加了一个新的KStream.toTable()API 来将输入事件流转换为 KTable。添加了一个新的 Serde 类型Void来表示来自输入主题的空键或空值。已弃用UsePreviousTimeOnInvalidTimestamp并将其替换为UsePartitionTimeOnInvalidTimeStamp.通过添加挂起的偏移防护机制和更强的事务提交一致性检查来改进精确一次语义,这极大地简化了可扩展的精确一次应用程序的实现。我们还在 示例文件夹下添加了一个新的完全一次语义代码示例。查看 KIP-447 了解完整详情。添加了一个新的公共 apiKafkaStreams.queryMetadataForKey(String, K, Serializer) to get detailed information on the key being queried. It provides information about the partition number where the key resides in addition to hosts containing the active and standby partitions for the key.通过弃用KafkaStreams.store(String, QueryableStoreType)和替换它来支持查询陈旧的存储(以实现高可用性)和属于特定分区的存储KafkaStreams.store(StoreQueryParameters)。添加了一个新的公共 api 来访问具有KafkaStreams.allLocalStorePartitionLags().不再支持 Scala 2.11。有关详细信息,请参阅 KIP-531 。包中的所有 Scala 类kafka.security.auth都已弃用。有关 2.4.0 中添加的新 Java 授权 API 的详细信息,请参阅 KIP-504 。请注意,kafka.security.auth.Authorizer andkafka.security.auth.SimpleAclAuthorizer在 2.4.0 中已弃用。默认情况下禁用 TLSv1 和 TLSv1.1,因为它们存在已知的安全漏洞。现在默认情况下仅启用 TLSv1.2。您可以通过在配置选项 ssl.protocol和ssl.enabled.protocols.ZooKeeper 已升级到 3.5.7,如果 3.4 数据目录中没有快照文件,ZooKeeper 从 3.4.X 升级到 3.5.7 可能会失败。这通常发生在 ZooKeeper 3.5.7 尝试加载未创建快照文件的现有 3.4 数据目录的测试升级中。有关该问题的更多详细信息,请参阅ZOOKEEPER-3056。ZOOKEEPER-3056中给出了一个修复程序,即在升级之前 设置snapshot.trust.empty=true 配置。zookeeper.propertiesZooKeeper 版本 3.5.7 支持与 ZooKeeper 的 TLS 加密连接,无论是否有客户端证书,并且可以使用额外的 Kafka 配置来利用这一点。有关详细信息,请参阅KIP-515。从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x、1.1.x、2.0.x 或 2.1.x 升级或2.2.x 或 2.3.x 到 2.4.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x 或更高版本升级,并且您没有覆盖消息格式,那么您只需要覆盖中间代理协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3)。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.10.0、0.11.0、1.0、2.0、2.2)。log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过编辑 inter.broker.protocol.version并将其设置为 2.4 来提升协议版本。一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.4 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

附加升级说明:

ZooKeeper 已升级到 3.5.6。如果 3.4 数据目录中没有快照文件,ZooKeeper 从 3.4.X 升级到 3.5.6 可能会失败。这通常发生在 ZooKeeper 3.5.6 尝试加载未创建快照文件的现有 3.4 数据目录的测试升级中。有关该问题的更多详细信息,请参阅ZOOKEEPER-3056。ZOOKEEPER-3056中给出了一个修复程序,即在升级之前设置snapshot.trust.empty=true 配置。zookeeper.properties但是我们观察到在使用 snapshot.trust.empty=trueconfig. 有关该问题的更多详细信息,请参阅ZOOKEEPER-3644。所以我们推荐复制空快照的安全解决方法如果 3.4 数据目录中没有快照文件,则文件到 3.4 数据目录。有关解决方法的更多详细信息,请参阅ZooKeeper 升级常见问题解答。ZooKeeper 3.5 中添加了 基于 Jetty 的嵌入式AdminServer 。zookeeper.propertiesAdminServer 在 ZooKeeper 中默认启用,并在端口 8080 上启动。在 Apache Kafka 发行版提供的 ZooKeeper 配置 () 中默认禁用 AdminServer 。如果您希望禁用 AdminServer,请确保更新您的本地zookeeper.properties文件。admin.enableServer=false请参考AdminServer config来配置 AdminServer。2.4.0 的显着变化为分区重新分配添加了一个新的管理 API。由于改变了 Kafka 传播重新分配信息的方式,在升级到新版本时可能会在失败的边缘情况下丢失重新分配状态。不建议在升级时开始重新分配。ZooKeeper 已从 3.4.14 升级到 3.5.6。新版本支持 TLS 和动态重新配置。命令行bin/kafka-preferred-replica-election.sh工具已被弃用。它已被bin/kafka-leader-election.sh.electPreferredLeadersJava类中的方法AdminClient已被弃用,取而代之的是方法electLeaders。Scala 代码利用NewTopic(String, int, short)带有文字值的构造函数将需要显式调用toShort第二个文字。构造函数中的参数GroupAuthorizationException(String)现在用于指定异常消息。以前它指的是授权失败的组。这样做是为了与其他异常类型保持一致并避免潜在的误用。TopicAuthorizationException(String)以前用于单个未经授权的主题 的构造函数也进行了类似的更改。内部PartitionAssignor接口已被弃用,取而代之的ConsumerPartitionAssignor是公共 API 中的新接口。两个接口之间的某些方法/签名略有不同。实现自定义 PartitionAssignor 的用户应尽快迁移到新界面。现在DefaultPartitioner使用粘性分区策略。这意味着具有空键且未分配分区的特定主题的记录将被发送到同一分区,直到准备好发送批处理。创建新批次时,将选择一个新分区。这会减少生成的延迟,但可能会导致边缘情况下跨分区的记录分布不均匀。通常用户不会受到影响,但这种差异在测试和其他在很短的时间内产生记录的情况下可能会很明显。阻塞KafkaConsumer#committed方法已扩展为允许将分区列表而不是单个分区作为输入参数。它可以减少客户端和代理之间的请求/响应迭代,以获取消费者组的已提交偏移量。旧的重载函数已被弃用,我们建议用户更改代码以利用新方法(详细信息可在KIP-520中找到)。我们INVALID_RECORD在生产响应中引入了一个新错误,以区别于CORRUPT_MESSAGE错误。更具体地说,以前当一批记录作为单个请求的一部分发送到代理时,一个或多个记录由于各种原因(不匹配魔术字节、crc 校验和错误、压缩日志的空键)而未能通过验证主题等),整个批次将被拒绝并具有相同的误导性,并且生产者客户端的调用者将从调用返回CORRUPT_MESSAGE的未来对象以及在RecordMetadata``send``Callback#onCompletion(RecordMetadata metadata, Exception exception) 现在有了新的错误代码和改进的异常错误消息,生产者调用者将更好地了解他们发送记录失败的根本原因。我们正在向客户端的组协议引入增量协作重新平衡,它允许消费者在重新平衡期间保留所有分配的分区,最后只撤销必须迁移到另一个消费者以实现整体集群平衡的分区。将ConsumerCoordinator选择RebalanceProtocol 所有消费者支持的转让人通常支持的最新版本。您可以使用新的内置CooperativeStickyAssignor或插入您自己的自定义合作分配器。为此,您必须实现ConsumerPartitionAssignor接口并包含RebalanceProtocol.COOPERATIVE在ConsumerPartitionAssignor#supportedProtocols. 然后,您的自定义分配者可以利用ownedPartitions每个消费者的字段Subscription尽可能将分区还给以前的所有者。请注意,当一个分区要重新分配给另一个消费者时,它必须从新的分配中删除,直到它被其原始所有者撤销。任何必须撤销分区的消费者都将触发后续重新平衡,以允许将撤销的分区安全地分配给其新所有者。有关更多信息,请参阅 ConsumerPartitionAssignor RebalanceProtocol javadocs。要从旧的(急切的)协议(总是在重新平衡之前撤销所有分区)升级到协作重新平衡,您必须遵循特定的升级路径以使所有客户端处于相同状态ConsumerPartitionAssignor 支持协作协议。这可以通过两次滚动反弹来完成,CooperativeStickyAssignor例如:在第一次反弹期间,将“cooperative-sticky”添加到每个成员的支持分配者列表中(不删除前一个分配者 - 请注意,如果以前使用默认值,您也必须明确地包含它)。然后你反弹和/或升级它。一旦整个组都在 2.4+ 并且所有成员在其支持的分配者中都具有“合作粘性”,则移除其他分配者并执行第二次滚动反弹,以便最终所有成员仅支持合作协议。有关协作再平衡协议和升级路径的更多详细信息,请参阅KIP-429。有一些行为变化ConsumerRebalanceListener,以及一个新的 API。在侦听器的三个回调中的任何一个期间抛出的异常都将不再被吞没,而是在Consumer.poll()调用之前一直被重新抛出。添加了该onPartitionsLost方法以允许用户对异常情况做出反应,其中消费者可能失去了其分区的所有权(例如错过了重新平衡)并且无法提交偏移量。默认情况下,这将简单地调用现有onPartitionsRevokedAPI 以与之前的行为保持一致。但是请注意,onPartitionsLost当丢失的分区集为空时,不会调用它。这意味着在加入组的新消费者的第一次重新平衡开始时不会调用回调。的语义ConsumerRebalanceListener's遵循上述合作再平衡协议时,回调将进一步更改。此外onPartitionsLost,onPartitionsRevoked 当撤销的分区集为空时,也永远不会调用。回调通常仅在重新平衡结束时调用,并且仅在移动到另一个消费者的分区集上调用。然而, onPartitionsAssigned回调将始终被调用,即使有一组空的分区,作为通知用户重新平衡事件的一种方式(这对于合作和渴望都是如此)。有关新回调语义的详细信息,请参阅ConsumerRebalanceListener javadocs。Scala traitkafka.security.auth.Authorizer已被弃用,取而代之的是新的 Java API org.apache.kafka.server.authorizer.Authorizer。授权者实现类 kafka.security.auth.SimpleAclAuthorizer也已被弃用,并被新的实现所取代kafka.security.authorizer.AclAuthorizer。AclAuthorizer使用新 API 支持的功能来改进授权日志记录,并且与SimpleAclAuthorizer. 有关更多详细信息,请参阅KIP-504。从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x、1.1.x、2.0.x 或 2.1.x 升级或2.2.x 到 2.3.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您是从 0.11.0.x、1.0.x、1.1.x、2.0.x 或 2.1.x 升级,并且您没有覆盖消息格式,那么您只需要覆盖代理间协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2)。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1)。log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过编辑 inter.broker.protocol.version并将其设置为 2.3 来提升协议版本。一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.3 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。2.3.0 的显着变化我们正在为 Kafka Connect 引入基于 增量协作重新平衡的新重新平衡协议。新协议不需要在 Connect 工作人员之间的重新平衡阶段停止所有任务。相反,只有需要在工作人员之间交换的任务才会停止,并在后续的重新平衡中启动。从 2.3.0 开始,默认启用新的 Connect 协议。有关其工作原理以及如何启用急切重新平衡的旧行为的更多详细信息,请查看 增量协作重新平衡设计。我们正在向消费者用户引入静态成员资格。此功能减少了正常应用程序升级或滚动反弹期间不必要的重新平衡。有关如何使用它的更多详细信息,请查看静态会员设计。Kafka Streams DSL 切换其使用的存储类型。尽管此更改主要对用户透明,但在某些极端情况下可能需要更改代码。有关更多详细信息,请参阅Kafka Streams 升级部分。Kafka Streams 2.3.0 需要 0.11 或更高版本的消息格式,并且不适用于较旧的消息格式。从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x、1.1.x、2.0.x 或 2.1.x 升级到2.2.0

如果您是从 2.1.x 之前的版本升级,请参阅下面关于用于存储消费者偏移量的架构更改的说明。将 inter.broker.protocol.version 更改为最新版本后,将无法降级到 2.1 之前的版本。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您从 0.11.0.x、1.0.x、1.1.x 或 2.0.x 升级并且您没有覆盖消息格式,那么您只需要覆盖代理间协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0)。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1)。log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过编辑 inter.broker.protocol.version并将其设置为 2.2 来提升协议版本。一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.2 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。2.2.1 中的显着变化Kafka Streams 2.2.1 需要 0.11 或更高版本的消息格式,并且不适用于较旧的消息格式。2.2.0 的显着变化默认消费者组 ID 已从空字符串 ( "") 更改为null. 使用新的默认组 ID 的消费者将无法订阅主题,也无法获取或提交偏移量。作为消费者组 ID 的空字符串已被弃用,但在未来的主要版本之前将受到支持。依赖空字符串组 id 的旧客户端现在必须将其作为其消费者配置的一部分显式提供。有关详细信息,请参阅 KIP-289。命令行工具现在可以使用而不是 zookeeperbin/kafka-topics.sh直接连接到代理。--bootstrap-server旧--zookeeper 选项现在仍然可用。请阅读KIP-377了解更多信息。Kafka Streams 依赖于更新版本的 RocksDBs,需要 MacOS 10.13 或更高版本。从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x、1.1.x 或 2.0.0 升级到 2.1.0

请注意,2.1.x 包含对用于存储消费者偏移量的内部模式的更改。升级完成后,将无法降级到以前的版本。有关更多详细信息,请参阅下面的滚动升级说明。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您从 0.11.0.x、1.0.x、1.1.x 或 2.0.x 升级并且您没有覆盖消息格式,那么您只需要覆盖代理间协议版本。inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0)。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1)。log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍然可以降级。验证集群的行为和性能后,通过编辑 inter.broker.protocol.version并将其设置为 2.1 来提升协议版本。一个个重启broker,让新的协议版本生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.1 并一一重启。请注意,不再维护的旧 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

附加升级说明:

此版本中的偏移过期语义略有变化。根据新语义,当组订阅了相应的主题并且仍然处于活动状态(有活动的消费者)时,不会删除组中的分区偏移量。如果组变为空,则在默认偏移保留期(或经纪人设置的保留期)过去后,其所有偏移将被删除(除非该组再次变为活动状态)。与不使用 Kafka 组管理的独立(简单)消费者关联的偏移量将在自上次提交后经过默认偏移量保留期(或代理设置的保留期)后被删除。提供 no 时,控制台使用者enable.auto.commit属性的默认值group.id现在设置为false。这是为了避免污染消费者协调器缓存,因为自动生成的组不太可能被其他消费者使用。正如我们 在KIP-91中介绍的那样,生产者配置的默认值retries已更改为,它设置了发送记录和接收代理确认之间的总时间的上限。默认情况下,传递超时设置为 2 分钟。Integer.MAX_VALUE``delivery.timeout.ms默认情况下,MirrorMaker 现在在配置生产者时覆盖delivery.timeout.ms到。Integer.MAX_VALUE如果您retries为了更快地失败而覆盖了 的值,那么您将需要覆盖delivery.timeout.ms.ListGroup作为推荐的替代方案,API 现在期望访问Describe Group用户应该能够列出的组。尽管Describe Cluster仍支持旧访问以实现向后兼容性,但不建议将其用于此 API。KIP-336弃用了 ExtendedSerializer 和 ExtendedDeserializer 接口,并传播了 Serializer 和 Deserializer 的使用。KIP-82引入了 ExtendedSerializer 和 ExtendedDeserializer,以 兼容 Java 7 的方式为序列化器和反序列化器提供记录头。现在我们合并了这些接口,因为 Java 7 支持已经被删除了。2.1.0 的显着变化Jetty 已升级到 9.4.12,默认情况下不包括 TLS_RSA_* 密码,因为它们不支持前向保密,有关更多信息,请参阅 https://github.com/eclipse/jetty.project/issues/2807。unclean.leader.election.enable当使用每个主题的配置覆盖动态更新配置时,控制器会自动启用不干净的领导者选举。AdminClient增加了一个方法AdminClient#metrics()。AdminClient现在,任何使用AdminClient. 有关更多信息,请参阅KIP-324Kafka 现在支持来自KIP-110 的Zstandard 压缩。您必须升级代理和客户端才能使用它。2.1.0 之前的消费者将无法读取使用 Zstandard 压缩的主题,因此在升级所有下游消费者之前,您不应为主题启用它。有关详细信息,请参阅 KIP。从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x 或 1.1.x 升级到 2.0.0

Kafka 2.0.0 引入了有线协议更改。通过遵循以下推荐的滚动升级计划,您可以保证升级期间不会出现停机。但是,请在升级之前 查看2.0.0 中的显着变化。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您从 0.11.0.x、1.0.x 或 1.1.x 升级并且您没有覆盖消息格式,那么您只需要覆盖代理间协议格式。inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1)。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1)。log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。整个集群升级后,通过编辑inter.broker.protocol.version并将其设置为 2.0 来提升协议版本。一个个重启broker,让新的协议版本生效。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 2.0 并一一重启。请注意,旧的 Scala 消费者不支持 0.11 中引入的新消息格式,因此为了避免下转换的性能成本(或利用恰好一次语义),必须使用较新的 Java 消费者。

附加升级说明:

如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,它们将从新协议开始。升级代理后,可以随时更新协议版本并重新启动。它不必紧随其后。对于消息格式版本也是如此。如果您在 Kafka Streams 代码中使用 Java8 方法引用,则可能需要更新代码以解决方法歧义。仅热交换 jar 文件可能不起作用。在集群中的所有代理都已更新之前 ,不应将 ACL 添加到前缀资源(在KIP-290中添加)。**注意:**添加到集群的任何前缀 ACL,即使在集群完全升级后,如果集群再次降级,都将被忽略。2.0.0 的显着变化KIP-186将默认偏移保留时间从 1 天增加到 7 天。这使得它不太可能在不经常提交的应用程序中“丢失”偏移量。它还增加了活动的偏移量集,因此可以增加代理的内存使用量。请注意,控制台使用者当前默认启用偏移提交,并且可能是大量偏移的来源,此更改现在将保留 7 天而不是 1 天。您可以通过将代理配置设置offsets.retention.minutes为 1440 来保留现有行为。已放弃对 Java 7 的支持,Java 8 现在是所需的最低版本。的默认值ssl.endpoint.identification.algorithm已更改为https,它执行主机名验证(否则可能发生中间人攻击)。设置ssl.endpoint.identification.algorithm为空字符串以恢复以前的行为。KAFKA-5674将下限max.connections.per.ip最小间隔扩展到零,因此允许对入站连接进行基于 IP 的过滤。KIP-272 在 metric 中添加了 API 版本标签kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}。该指标现在变为kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}。这将影响不会自动聚合的 JMX 监控工具。要获取特定请求类型的总数,需要更新该工具以跨不同版本进行聚合。KIP-225更改了指标“records.lag”以使用主题和分区的标签。名称格式为“{topic}-{partition}.records-lag”的原始版本已被删除。自 0.11.0.0 起已弃用的 Scala 消费者已被删除。自 0.10.0.0 以来,Java 使用者一直是推荐的选项。请注意,即使代理升级到 2.0.0,1.1.0(和更早版本)中的 Scala 消费者仍将继续工作。自 0.10.0.0 起已弃用的 Scala 生产者已被删除。自 0.9.0.0 以来,Java 生产者一直是推荐的选项。请注意,Java 生产者中默认分区器的行为与 Scala 生产者中的默认分区器不同。迁移的用户应该考虑配置一个保留以前行为的自定义分区器。请注意,即使代理升级到 2.0.0,1.1.0(及更早版本)中的 Scala 生产者仍将继续工作。MirrorMaker 和 ConsoleConsumer 不再支持 Scala 消费者,它们始终使用 Java 消费者。ConsoleProducer 不再支持 Scala 生产者,它始终使用 Java 生产者。许多依赖 Scala 客户端的已弃用工具已被删除:ReplayLogProducer、SimpleConsumerPerformance、SimpleConsumerShell、ExportZkOffsets、ImportZkOffsets、UpdateOffsetsInZK、VerifyConsumerRebalance。已弃用的 kafka.tools.ProducerPerformance 已被删除,请使用 org.apache.kafka.tools.ProducerPerformance。添加了新的 Kafka Streams 配置参数upgrade.from,允许从旧版本滚动反弹升级。KIP-284更改了 Kafka Streams 重新分区主题的保留时间,将其默认值设置为Long.MAX_VALUE.更新ProcessorStateManager了 Kafka Streams 中的 API,用于将状态存储注册到处理器拓扑。有关更多详细信息,请阅读 Streams升级指南。在早期版本中,Connect 的工作器配置需要internal.key.converter和internal.value.converter属性。在 2.0 中,这些不再需要并且默认为 JSON 转换器。您可以从 Connect 独立和分布式工作器配置中安全地删除这些属性:internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter.schemas.enable=falseKIP-266添加了一个新的消费者配置default.api.timeout.ms 来指定用于KafkaConsumer可能阻塞的 API 的默认超时。KIP 还为此类阻塞 API 添加了重载,以支持为每个 API 指定特定的超时时间,而不是使用default.api.timeout.ms. 特别是,poll(Duration)添加了一个新的 API,它不会阻止动态分区分配。旧poll(long)API 已被弃用,将在未来版本中删除。还为其他KafkaConsumer方法添加了重载,例如partitionsFor, listTopics, offsetsForTimes, beginningOffsets,endOffsets和close采用Duration.同样作为 KIP-266 的一部分,默认值request.timeout.ms已更改为 30 秒。之前的值略高于 5 分钟,以说明重新平衡所需的最长时间。现在我们将重新平衡中的 JoinGroup 请求视为一种特殊情况,并使用派生自 max.poll.interval.ms请求超时的值。所有其他请求类型使用定义的超时request.timeout.ms内部方法kafka.admin.AdminClient.deleteRecordsBefore已被删除。鼓励用户迁移到org.apache.kafka.clients.admin.AdminClient.deleteRecords.AclCommand 工具--producer便利选项在给定主题上使用KIP-277更细粒度的 ACL。KIP-176删除了--new-consumer所有基于消费者的工具的选项。此选项是多余的,因为如果定义了 --bootstrap-server,则会自动使用新的使用者。KIP-290增加了在前缀资源上定义 ACL 的能力,例如任何以“foo”开头的主题。KIP-283改进了 Kafka 代理上的消息下转换处理,这通常是一个内存密集型操作。KIP 添加了一种机制,该机制通过一次向下转换分区数据块来降低操作的内存密集度,这有助于设置内存消耗的上限。通过这种改进,FetchResponse协议行为发生了变化,代理可以在响应结束时发送带有无效偏移量的超大消息批次。消费者客户端必须忽略此类超大消息,就像KafkaConsumer.KIP-283 还添加了新的主题和代理配置message.downconversion.enable,并log.message.downconversion.enable分别控制是否启用下转换。禁用时,代理不执行任何下转换,而是向UNSUPPORTED_VERSION 客户端发送错误。动态代理配置选项可以在代理启动之前使用 kafka-configs.sh 存储在 ZooKeeper 中。此选项可用于避免在 server.properties 中存储明确的密码,因为所有密码配置都可以加密存储在 ZooKeeper 中。如果连接尝试失败,ZooKeeper 主机现在会重新解析。但是如果您的 ZooKeeper 主机名解析为多个地址并且其中一些地址不可访问,那么您可能需要增加连接超时 zookeeper.connection.timeout.ms。新协议版本KIP-279:OffsetsForLeaderEpochResponse v1 引入了分区级leader_epoch字段。KIP-219:提高因违反配额而受到限制的非集群操作请求和响应的协议版本。KIP-290:提高 ACL 创建、描述和删除请求和响应的协议版本。升级 1.1 Kafka Streams 应用程序将您的 Streams 应用程序从 1.1 升级到 2.0 不需要代理升级。Kafka Streams 2.0 应用程序可以连接到 2.0、1.1、1.0、0.11.0、0.10.2 和 0.10.1 代理(虽然不能连接到 0.10.0 代理)。请注意,在 2.0 中,我们删除了在 1.0 之前已弃用的公共 API;利用那些已弃用的 API 的用户需要相应地进行代码更改。有关更多详细信息,请参阅2.0.0 中的 Streams API 更改。从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x 或 1.0.x 升级到 1.1.x

Kafka 1.1.0 引入了有线协议更改。通过遵循以下推荐的滚动升级计划,您可以保证升级期间不会出现停机。但是,请在升级前 查看1.1.0 中的显着变化。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果您从 0.11.0.x 或 1.0.x 升级并且您没有覆盖消息格式,那么您只需要覆盖代理间协议格式。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(0.11.0 或 1.0)。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0)。log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。整个集群升级后,通过编辑inter.broker.protocol.version并将其设置为 1.1 来提升协议版本。一个个重启broker,让新的协议版本生效。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 1.1 并一一重启。请注意,旧的 Scala 消费者不支持 0.11 中引入的新消息格式,因此为了避免下转换的性能成本(或利用恰好一次语义),必须使用较新的 Java 消费者。

附加升级说明:

如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,它们将从新协议开始。升级代理后,可以随时更新协议版本并重新启动。它不必紧随其后。对于消息格式版本也是如此。如果您在 Kafka Streams 代码中使用 Java8 方法引用,则可能需要更新代码以解决方法歧义。仅热交换 jar 文件可能不起作用。1.1.1 的显着变化添加了新的 Kafka Streams 配置参数upgrade.from,允许从版本 0.10.0.x 滚动反弹升级有关此新配置的详细信息,请参阅Kafka Streams 升级指南。1.1.0 的显着变化Maven 中的 kafka 神器不再依赖于 log4j 或 slf4j-log4j12。与 kafka-clients 工件类似,用户现在可以通过包含适当的 slf4j 模块(slf4j-log4j12、logback 等)来选择日志记录后端。发布的 tarball 仍然包含 log4j 和 slf4j-log4j12。KIP-225更改了指标“records.lag”以使用主题和分区的标签。名称格式为“{topic}-{partition}.records-lag”的原始版本已弃用,并将在 2.0.0 中删除。Kafka Streams 对代理通信错误更加健壮。Kafka Streams 不会因致命异常而停止 Kafka Streams 客户端,而是尝试自我修复并重新连接到集群。使用新版本AdminClient,您可以更好地控制 Kafka Streams 重试的频率,并且可以配置 细粒度的超时(而不是像旧版本那样硬编码重试)。Kafka Streams 重新平衡时间进一步减少,使 Kafka Streams 响应更快。Kafka Connect 现在支持接收器和源连接器中的消息头,并通过简单的消息转换来操作它们。必须更改连接器才能显式使用它们。引入了一个新HeaderConverter的来控制标头如何(反)序列化,并且默认使用新的“SimpleHeaderConverter”来使用值的字符串表示形式。kafka.tools.DumpLogSegments 现在会自动设置深度迭代选项,如果 print-data-log 由于任何其他选项(如解码器)而显式或隐式启用。新协议版本KIP-226引入了 DescribeConfigs 请求/响应 v1。KIP-227引入了 Fetch Request/Response v7。升级 1.0 Kafka Streams 应用程序将您的 Streams 应用程序从 1.0 升级到 1.1 不需要代理升级。Kafka Streams 1.1 应用程序可以连接到 1.0、0.11.0、0.10.2 和 0.10.1 代理(尽管无法连接到 0.10.0 代理)。有关更多详细信息,请参阅1.1.0 中的 Streams API 更改。从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x 或 0.11.0.x 升级到 1.0.0

Kafka 1.0.0 引入了有线协议更改。通过遵循以下推荐的滚动升级计划,您可以保证升级期间不会出现停机。但是,请在升级之前 查看1.0.0 中的显着变化。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 是指当前使用的消息格式版本。如果您之前覆盖了消息格式版本,则应保留其当前值。或者,如果您从 0.11.0.x 之前的版本升级,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。如果从 0.11.0.x 升级并且没有覆盖消息格式,则必须将消息格式版本和代理间协议版本都设置为 0.11.0。inter.broker.protocol.version=0.11.0log.message.format.version=0.11.0inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0)。log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。升级整个集群后,通过编辑inter.broker.protocol.version并将其设置为 1.0 来提升协议版本。一个个重启broker,让新的协议版本生效。如果您已按照上述说明覆盖了消息格式版本,则需要再进行一次滚动重启以将其升级到最新版本。将所有(或大多数)消费者升级到 0.11.0 或更高版本后,将每个代理上的 log.message.format.version 更改为 1.0 并一一重启。如果您从 0.11.0 升级并且 log.message.format.version 设置为 0.11.0,您可以更新配置并跳过滚动重启。请注意,旧的 Scala 消费者不支持 0.11 中引入的新消息格式,因此为了避免下转换的性能成本(或利用恰好一次语义),必须使用较新的 Java 消费者。

附加升级说明:

如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,它们将从新协议开始。升级代理后,可以随时更新协议版本并重新启动。它不必紧随其后。对于消息格式版本也是如此。1.0.2 中的显着变化添加了新的 Kafka Streams 配置参数upgrade.from,允许从版本 0.10.0.x 滚动反弹升级有关此新配置的详细信息,请参阅Kafka Streams 升级指南。1.0.1 中的显着变化恢复了 AdminClient 的 Options 类(例如 CreateTopicsOptions、DeleteTopicsOptions 等)与 0.11.0.x 的二进制兼容性。二进制(但不是源代码)兼容性在 1.0.0 中被无意中破坏。1.0.0 的显着变化现在默认启用主题删除,因为该功能现在很稳定。希望保留以前行为的用户应将代理配置设置delete.topic.enable为false. 请记住,主题删除删除数据并且操作不可逆(即没有“取消删除”操作)对于支持时间戳搜索的主题(如果无法找到某个分区的偏移量),该分区现在包含在搜索结果中,且偏移量值为空。以前,分区不包含在映射中。进行此更改是为了使搜索行为与不支持时间戳搜索的主题的情况一致。如果inter.broker.protocol.version是 1.0 或更高版本,即使存在脱机日志目录,代理现在也将保持在线以提供实时日志目录上的副本。由于硬件故障导致的IOException,日志目录可能会脱机。用户需要监控per-broker metricofflineLogDirectoryCount来检查是否有离线日志目录。添加了 KafkaStorageException,这是一个可重试的异常。如果客户端的 FetchRequest 或 ProducerRequest 的版本不支持 KafkaStorageException,KafkaStorageException 将在响应中转换为 NotLeaderForPartitionException。-XX:+DisableExplicitGC 在默认 JVM 设置中被 -XX:+ExplicitGCInvokesConcurrent 取代。在某些情况下,这有助于避免在直接缓冲区分配本机内存期间出现内存不足异常。handleError已从包中以下已弃用的类中删除了覆盖的方法实现kafka.api:FetchRequest、GroupCoordinatorRequest、OffsetCommitRequest、 OffsetFetchRequest、OffsetRequest、ProducerRequest和TopicMetadataRequest。这仅适用于代理,但已不再使用,并且尚未维护实现。为了二进制兼容性,保留了存根实现。Java 客户端和工具现在接受任何字符串作为客户端 ID。已弃用的工具kafka-consumer-offset-checker.sh已被删除。用于kafka-consumer-groups.sh获取消费者组详细信息。SimpleAclAuthorizer 现在默认将拒绝访问记录到授权日志。身份验证失败现在作为AuthenticationException. 如果客户端连接身份验证失败,则不会重试。自定义SaslServer实现可能会抛出SaslAuthenticationException以提供错误消息以返回给客户端,指示身份验证失败的原因。实施者应注意不要在异常消息中包含任何不应泄露给未经身份验证的客户端的安全关键信息。app-info向 JMX 注册以提供版本和提交 id的mbean 将被弃用并替换为提供这些属性的指标。Kafka 指标现在可能包含非数字值。org.apache.kafka.common.Metric#value()已被弃用,并将0.0在这种情况下返回以最小化破坏读取每个客户端指标值的用户的可能性(通过MetricsReporter实现或通过调用metrics()方法)。org.apache.kafka.common.Metric#metricValue()可用于检索数字和非数字度量值。现在,每个 Kafka 速率指标都有一个相应的累积计数指标,并带有后缀-total 以简化下游处理。例如,records-consumed-rate有一个名为records-consumed-total.kafka_mx4jenable仅当系统属性设置为时才会启用 Mx4j true。由于逻辑反转错误,它以前默认启用,如果kafka_mx4jenable设置为true.客户端 jar 中的包org.apache.kafka.common.security.auth已公开并添加到 javadocs。以前位于此包中的内部类已移至其他地方。当使用授权者并且用户对主题没有所需的权限时,无论代理上是否存在主题,代理都会向请求返回 TOPIC_AUTHORIZATION_FAILED 错误。如果用户拥有所需权限且主题不存在,则返回 UNKNOWN_TOPIC_OR_PARTITION 错误码。config/consumer.properties 文件更新为使用新的消费者配置属性。新协议版本KIP-112:LeaderAndIsrRequest v1 引入了分区级is_new字段。KIP-112:UpdateMetadataRequest v4 引入了一个分区级offline_replicas字段。KIP-112:MetadataResponse v5 引入了分区级offline_replicas字段。KIP-112:ProduceResponse v4 引入了 KafkaStorageException 的错误代码。KIP-112:FetchResponse v6 引入了 KafkaStorageException 的错误代码。KIP-152:已添加 SaslAuthenticate 请求以启用身份验证失败报告。如果 SaslHandshake 请求版本大于 0,将使用此请求。升级 0.11.0 Kafka Streams 应用程序将您的 Streams 应用程序从 0.11.0 升级到 1.0 不需要代理升级。Kafka Streams 1.0 应用程序可以连接到 0.11.0、0.10.2 和 0.10.1 代理(虽然不能连接到 0.10.0 代理)。但是,Kafka Streams 1.0 需要 0.10 或更新的消息格式,并且不适用于旧的消息格式。如果您正在监控流指标,则需要对报告和监控代码中的指标名称进行一些更改,因为指标传感器层次结构已更改。有一些公共 API,包括ProcessorContext#schedule()、Processor#punctuate()和KStreamBuilder,TopologyBuilder正在被新的 API 弃用。我们建议您在升级时进行相应的代码更改,因为新的 API 看起来非常相似,所以这应该非常小。有关更多详细信息,请参阅1.0.0 中的 Streams API 更改。升级 0.10.2 Kafka Streams 应用程序将您的 Streams 应用程序从 0.10.2 升级到 1.0 不需要代理升级。Kafka Streams 1.0 应用程序可以连接到 1.0、0.11.0、0.10.2 和 0.10.1 代理(尽管无法连接到 0.10.0 代理)。如果您正在监控流指标,则需要对报告和监控代码中的指标名称进行一些更改,因为指标传感器层次结构已更改。有一些公共 API,包括ProcessorContext#schedule()、Processor#punctuate()和KStreamBuilder,TopologyBuilder正在被新的 API 弃用。我们建议您在升级时进行相应的代码更改,因为新的 API 看起来非常相似,所以这应该非常小。如果您指定自定义key.serde,value.serde并且timestamp.extractor在配置中,建议使用其替换的配置参数,因为这些配置已被弃用。有关更多详细信息,请参阅0.11.0 中的 Streams API 更改。升级 0.10.1 Kafka Streams 应用程序将您的 Streams 应用程序从 0.10.1 升级到 1.0 不需要代理升级。Kafka Streams 1.0 应用程序可以连接到 1.0、0.11.0、0.10.2 和 0.10.1 代理(尽管无法连接到 0.10.0 代理)。你需要重新编译你的代码。仅交换 Kafka Streams 库 jar 文件将不起作用,并且会破坏您的应用程序。如果您正在监控流指标,则需要对报告和监控代码中的指标名称进行一些更改,因为指标传感器层次结构已更改。有一些公共 API,包括ProcessorContext#schedule()、Processor#punctuate()和KStreamBuilder,TopologyBuilder正在被新的 API 弃用。我们建议您在升级时进行相应的代码更改,因为新的 API 看起来非常相似,所以这应该非常小。如果您指定自定义key.serde,value.serde并且timestamp.extractor在配置中,建议使用其替换的配置参数,因为这些配置已被弃用。如果您使用自定义(即用户实现的)时间戳提取器,则需要更新此代码,因为TimestampExtractor接口已更改。如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。有关详细信息,请参阅1.0.0中的 Streams API 更改、0.11.0 中的 Streams API 更改和 0.10.2 中的Streams API 更改。升级 0.10.0 Kafka Streams 应用程序将 Streams 应用程序从 0.10.0 升级到 1.0 确实需要代理升级,因为 Kafka Streams 1.0 应用程序只能连接到 0.1、0.11.0、0.10.2 或 0.10.1 代理。有几个 API 更改不向后兼容(参见Streams API 在 1.0.0中的更改、 Streams API 在 0.11.0 中的更改、 Streams API 在 0.10.2 中的更改以及 Streams API 在 0.10.1中的更改以了解更多详细信息)。因此,您需要更新并重新编译您的代码。仅交换 Kafka Streams 库 jar 文件将不起作用,并且会破坏您的应用程序。从 0.10.0.x 升级到 1.0.2 需要两次滚动反弹,并upgrade.from="0.10.0"为第一个升级阶段设置了配置(参见KIP-268)。作为替代方案,也可以进行离线升级。为滚动反弹准备您的应用程序实例,并确保将配置upgrade.from设置"0.10.0"为新版本 0.11.0.3将应用程序的每个实例反弹一次为第二轮滚动反弹准备新部署的 1.0.2 应用程序实例;确保删除 config 的值upgrade.from再次反弹应用程序的每个实例以完成升级从 0.10.0.x 升级到 1.0.0 或 1.0.1 需要离线升级(不支持滚动反弹升级)停止所有旧 (0.10.0.x) 应用程序实例更新您的代码并用新代码和新 jar 文件交换旧代码和 jar 文件重新启动所有新的(1.0.0 或 1.0.1)应用程序实例从 0.8.x、0.9.x、0.10.0.x、0.10.1.x 或 0.10.2.x 升级到 0.11.0.0

Kafka 0.11.0.0 引入了新的消息格式版本以及有线协议更改。通过遵循以下推荐的滚动升级计划,您可以保证升级期间不会出现停机。但是,请在升级前查看0.11.0.0 中的显着变化。

从版本 0.10.2 开始,Java 客户端(生产者和消费者)已经获得了与旧代理进行通信的能力。0.11.0 版客户端可以与 0.10.0 版或更高版本的代理通信。但是,如果您的代理早于 0.10.0,则必须先升级 Kafka 集群中的所有代理,然后再升级您的客户端。0.11.0 版代理支持 0.8.x 和更新的客户端。

对于滚动升级:

更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的当前消息格式版本。如果您之前没有覆盖消息格式,则应设置 CURRENT_MESSAGE_FORMAT_VERSION 以匹配 CURRENT_KAFKA_VERSION。inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2、0.9.0、0.10.0、0.10.1 或 0.10.2)。log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。整个集群升级后,通过编辑inter.broker.protocol.version并将其设置为 0.11.0 来提升协议版本,但不要更改log.message.format.version。一个个重启broker,让新的协议版本生效。一旦所有(或大多数)消费者都升级到 0.11.0 或更高版本,然后在每个代理上将 log.message.format.version 更改为 0.11.0 并一一重启。请注意,旧的 Scala 消费者不支持新的消息格式,因此为了避免下转换的性能成本(或利用恰好一次语义),必须使用新的 Java 消费者。

附加升级说明:

如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,它们将从新协议开始。升级代理后,可以随时更新协议版本并重新启动。它不必紧随其后。对于消息格式版本也是如此。也可以在bin/kafka-topics.sh更新全局设置之前使用主题管理工具 ()对单个主题启用 0.11.0 消息格式log.message.format.version。如果您是从 0.10.0 之前的版本升级,则无需在切换到 0.11.0 之前先将消息格式更新为 0.10.0。升级 0.10.2 Kafka Streams 应用程序将您的 Streams 应用程序从 0.10.2 升级到 0.11.0 不需要代理升级。Kafka Streams 0.11.0 应用程序可以连接到 0.11.0、0.10.2 和 0.10.1 代理(尽管无法连接到 0.10.0 代理)。如果您指定自定义key.serde,value.serde并且timestamp.extractor在配置中,建议使用其替换的配置参数,因为这些配置已被弃用。有关更多详细信息,请参阅0.11.0 中的 Streams API 更改。升级 0.10.1 Kafka Streams 应用程序将您的 Streams 应用程序从 0.10.1 升级到 0.11.0 不需要代理升级。Kafka Streams 0.11.0 应用程序可以连接到 0.11.0、0.10.2 和 0.10.1 代理(尽管无法连接到 0.10.0 代理)。你需要重新编译你的代码。仅交换 Kafka Streams 库 jar 文件将不起作用,并且会破坏您的应用程序。如果您指定自定义key.serde,value.serde并且timestamp.extractor在配置中,建议使用其替换的配置参数,因为这些配置已被弃用。如果您使用自定义(即用户实现的)时间戳提取器,则需要更新此代码,因为TimestampExtractor接口已更改。如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。有关详细信息,请参阅0.11.0 中的 Streams API 更改和 0.10.2 中的Streams API 更改。升级 0.10.0 Kafka Streams 应用程序将 Streams 应用程序从 0.10.0 升级到 0.11.0 确实需要代理升级,因为 Kafka Streams 0.11.0 应用程序只能连接到 0.11.0、0.10.2 或 0.10.1 代理。有几个 API 更改不向后兼容(请参阅0.11.0中的 Streams API 更改、0.10.2中的 Streams API 更改和 0.10.1中的 Streams API 更改以了解更多详细信息)。因此,您需要更新并重新编译您的代码。仅交换 Kafka Streams 库 jar 文件将不起作用,并且会破坏您的应用程序。从 0.10.0.x 升级到 0.11.0.3 需要两次滚动反弹,并upgrade.from="0.10.0"为第一个升级阶段设置了配置(参见KIP-268)。作为替代方案,也可以进行离线升级。为滚动反弹准备您的应用程序实例,并确保将配置upgrade.from设置"0.10.0"为新版本 0.11.0.3将应用程序的每个实例反弹一次为第二轮滚动反弹准备新部署的 0.11.0.3 应用程序实例;确保删除 config 的值upgrade.from再次反弹应用程序的每个实例以完成升级从 0.10.0.x 升级到 0.11.0.0、0.11.0.1 或 0.11.0.2 需要离线升级(不支持滚动反弹升级)停止所有旧 (0.10.0.x) 应用程序实例更新您的代码并用新代码和新 jar 文件交换旧代码和 jar 文件重新启动所有新的(0.11.0.0、0.11.0.1 或 0.11.0.2)应用程序实例0.11.0.3 中的显着变化添加了新的 Kafka Streams 配置参数upgrade.from,允许从版本 0.10.0.x 滚动反弹升级有关此新配置的详细信息,请参阅Kafka Streams 升级指南。0.11.0.0 的显着变化现在默认禁用不干净的领导者选举。新的默认设置有利于持久性而不是可用性。希望保留以前行为的用户应将代理配置设置unclean.leader.election.enable为true.Producer configs block.on.buffer.full,metadata.fetch.timeout.ms并timeout.ms已被删除。它们最初在 Kafka 0.9.0.0 中被弃用。offsets.topic.replication.factor代理配置现在在自动主题创建时强制执行。在集群大小满足此复制因子要求之前,内部自动主题创建将失败并出现 GROUP_COORDINATOR_NOT_AVAILABLE 错误。使用 snappy 压缩数据时,生产者和代理将使用压缩方案的默认块大小(2 x 32 KB)而不是 1 KB,以提高压缩率。有报道称,使用较小块大小压缩的数据比使用较大块大小压缩时大 50%。对于 snappy 案例,具有 5000 个分区的生产者将需要额外的 315 MB JVM 堆。同样,当使用 gzip 压缩数据时,生产者和代理将使用 8 KB 而不是 1 KB 作为缓冲区大小。gzip 的默认值过低(512 字节)。代理配置max.message.bytes现在适用于一批消息的总大小。以前,该设置应用于成批的压缩消息,或单独应用于非压缩消息。一个消息批处理可能仅包含一条消息,因此在大多数情况下,对单个消息大小的限制仅通过批处理格式的开销来减少。但是,对于消息格式转换有一些微妙的影响(有关详细信息,请参见下文)。另请注意,虽然以前代理将确保在每个获取请求中至少返回一条消息(无论总和分区级别的获取大小如何),但现在相同的行为适用于一个消息批次。默认情况下启用 GC 日志轮换,有关详细信息,请参阅 KAFKA-3754。RecordMetadata、MetricName 和 Cluster 类的弃用构造函数已被删除。通过提供用户标头读写访问的新标头接口添加了用户标头支持。ProducerRecord 和 ConsumerRecord 通过Headers headers()方法调用公开新的 Headers API。引入 ExtendedSerializer 和 ExtendedDeserializer 接口以支持标头的序列化和反序列化。如果配置的序列化器和反序列化器不是上述类,标头将被忽略。引入了一个新的配置group.initial.rebalance.delay.ms。此配置指定GroupCoordinator延迟初始消费者重新平衡的时间(以毫秒为单位)。随着新成员加入组,重新平衡将进一步延迟group.initial.rebalance.delay.ms,最多延迟max.poll.interval.ms. 其默认值为 3 秒。在开发和测试期间,可能需要将此设置为 0,以免延迟测试执行时间。org.apache.kafka.common.Cluster#partitionsForTopic,partitionsForNode并且如果所需主题的元数据不存在 ,availablePartitionsForTopic方法将返回一个空列表而不是(这被认为是一种不好的做法)。nullStreams API 配置参数timestamp.extractor、key.serde和value.serde已弃用并分别替换为default.timestamp.extractor、default.key.serde和default.value.serde。对于 Java 消费者 API 中的偏移提交失败,当 的实例被传递给提交回调commitAsync时,我们不再公开根本原因。RetriableCommitFailedException有关详细信息,请参阅 KAFKA-5052 。新协议版本KIP-107:FetchRequest v5 引入了一个分区级log_start_offset字段。KIP-107:FetchResponse v5 引入了一个分区级log_start_offset字段。KIP-82header : ProduceRequest v3在消息协议中引入了一个数组,包含key字段和value字段。KIP-82header : FetchResponse v5在消息协议中引入了一个数组,包含key字段和value字段。关于 Exactly Once 语义的注释

Kafka 0.11.0 包括对生产者中的幂等和事务功能的支持。幂等传递确保消息在单个生产者的生命周期内仅传递一次到特定主题分区。事务传递允许生产者将数据发送到多个分区,以便所有消息都成功传递,或者一个都没有传递。总之,这些功能在 Kafka 中实现了“exactly once semantics”。用户指南中提供了有关这些功能的更多详细信息,但下面我们添加了一些关于在升级的集群中启用它们的具体说明。请注意,不需要启用 EoS,如果未使用,对代理的行为没有影响。

只有新的 Java 生产者和消费者支持完全一次语义。这些功能主要取决于0.11.0 消息格式。尝试在较旧的格式上使用它们将导致不支持的版本错误。事务状态存储在一个新的内部主题__transaction_state中。在第一次尝试使用事务请求 API 之前,不会创建此主题。与消费者偏移主题类似,有几个设置可以控制主题的配置。例如,transaction.state.log.min.isr控制此主题的最低 ISR。有关选项的完整列表,请参阅用户指南中的配置部分。对于安全集群,事务 API 需要新的 ACL,可以使用bin/kafka-acls.sh. 工具。Kafka 中的 EoS 引入了新的请求 API 并修改了几个现有的。详情请参阅 KIP-98关于 0.11.0 中新消息格式的说明

0.11.0 消息格式包括几个主要增强功能,以支持生产者更好的交付语义(参见KIP-98)和改进的复制容错(参见KIP-101)。尽管新格式包含使这些改进成为可能的更多信息,但我们使批处理格式更加高效。只要每批消息的数量超过 2,您就可以预期较低的总体开销。但是,对于较小的批次,可能会对性能产生很小的影响。有关我们对新消息格式的初始性能分析的结果,请参见此处。您还可以在 KIP-98提案中找到有关消息格式的更多详细信息。

新消息格式的显着差异之一是即使未压缩的消息也作为单个批次存储在一起。这对代理配置有一些影响max.message.bytes,它限制了单个批次的大小。首先,如果旧客户端使用旧格式向主题分区生成消息,并且消息单独小于 max.message.bytes,则在上转换过程中将它们合并为单个批次后,代理可能仍会拒绝它们。通常,当单个消息的总大小大于max.message.bytes. 对于阅读从新格式下转换的消息的年长消费者来说,也有类似的效果:如果提取大小没有设置为至少与 max.message.bytes,即使单个未压缩消息小于配置的提取大小,消费者也可能无法取得进展。此行为不会影响 0.10.1.0 及更高版本的 Java 客户端,因为它使用更新的 fetch 协议,确保即使超过 fetch 大小也可以返回至少一条消息。要解决这些问题,您应该确保 1) 生产者的批量大小设置不大于max.message.bytes,以及 2) 消费者的获取大小设置至少与max.message.bytes.

大多数关于升级到 0.10.0 消息格式的性能影响的讨论 仍然与 0.11.0 升级相关。这主要影响未使用 TLS 保护的集群,因为在这种情况下已经不可能进行“零拷贝”传输。为了避免下转换的成本,您应该确保消费者应用程序升级到最新的 0.11.0 客户端。值得注意的是,由于旧的消费者在 0.11.0.0 中已被弃用,它不支持新的消息格式。您必须升级以使用新的消费者来使用新的消息格式,而无需下转换成本。请注意,0.11.0 消费者支持向后兼容 0.10.0 及更高版本的代理,因此可以在代理之前先升级客户端。

从 0.8.x、0.9.x、0.10.0.x 或 0.10.1.x 升级到 0.10.2.0

0.10.2.0 有有线协议更改。通过遵循以下推荐的滚动升级计划,您可以保证升级期间不会出现停机。但是,请在升级前查看0.10.2.0 中的显着变化。

从版本 0.10.2 开始,Java 客户端(生产者和消费者)已经获得了与旧代理进行通信的能力。版本 0.10.2 的客户端可以与版本 0.10.0 或更新的代理通信。但是,如果您的代理早于 0.10.0,则必须先升级 Kafka 集群中的所有代理,然后再升级您的客户端。0.10.2 版代理支持 0.8.x 和更新的客户端。

对于滚动升级:

更新所有代理上的 server.properties 文件并添加以下属性:inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2、0.9.0、0.10.0 或 0.10.1)。log.message.format.version=CURRENT_KAFKA_VERSION(有关此配置的详细信息, 请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。整个集群升级后,通过编辑 inter.broker.protocol.version 并将其设置为 0.10.2 来提升协议版本。如果您之前的消息格式是 0.10.0,请将 log.message.format.version 更改为 0.10.2(这是一个无操作,因为 0.10.0、0.10.1 和 0.10.2 的消息格式相同)。如果您之前的消息格式版本低于 0.10.0,请不要更改 log.message.format.version - 此参数仅应在所有消费者升级到 0.10.0.0 或更高版本后更改。一个个重启broker,让新的协议版本生效。如果此时 log.message.format.version 仍然低于 0.10.0,请等到所有消费者都升级到 0.10.0 或更高版本,然后在每个 broker 上将 log.message.format.version 更改为 0.10.2,然后一个一个地重新启动它们。

**注意:**如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并启动所有代理。默认情况下,它们将从新协议开始。

**注意:**升级代理后,可以随时更新协议版本并重新启动。它不必紧随其后。

升级 0.10.1 Kafka Streams 应用程序将您的 Streams 应用程序从 0.10.1 升级到 0.10.2 不需要代理升级。Kafka Streams 0.10.2 应用程序可以连接到 0.10.2 和 0.10.1 代理(尽管无法连接到 0.10.0 代理)。你需要重新编译你的代码。仅交换 Kafka Streams 库 jar 文件将不起作用,并且会破坏您的应用程序。如果您使用自定义(即用户实现的)时间戳提取器,则需要更新此代码,因为TimestampExtractor接口已更改。如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。有关更多详细信息,请参阅0.10.2 中的 Streams API 更改。升级 0.10.0 Kafka Streams 应用程序将 Streams 应用程序从 0.10.0 升级到 0.10.2 确实需要代理升级,因为 Kafka Streams 0.10.2 应用程序只能连接到 0.10.2 或 0.10.1 代理。有几个 API 更改不向后兼容(请参阅0.10.2 中的 Streams API 更改以了解更多详细信息)。因此,您需要更新并重新编译您的代码。仅交换 Kafka Streams 库 jar 文件将不起作用,并且会破坏您的应用程序。从 0.10.0.x 升级到 0.10.2.2 需要两次滚动反弹,并upgrade.from="0.10.0"为第一个升级阶段设置了配置(参见KIP-268)。作为替代方案,也可以进行离线升级。为滚动反弹准备您的应用程序实例,并确保将配置upgrade.from设置"0.10.0"为新版本 0.10.2.2将应用程序的每个实例反弹一次为第二轮滚动反弹准备新部署的 0.10.2.2 应用程序实例;确保删除 config 的值upgrade.from再次反弹应用程序的每个实例以完成升级从 0.10.0.x 升级到 0.10.2.0 或 0.10.2.1 需要离线升级(不支持滚动反弹升级)停止所有旧 (0.10.0.x) 应用程序实例更新您的代码并用新代码和新 jar 文件交换旧代码和 jar 文件重新启动所有新的(0.10.2.0 或 0.10.2.1)应用程序实例0.10.2.2 中的显着变化添加了新的配置参数upgrade.from,允许从版本 0.10.0.x 升级滚动反弹0.10.2.1 的显着变化StreamsConfig 类的两个配置的默认值已更改,以提高 Kafka Streams 应用程序的弹性。内部 Kafka Streams 生产者retries默认值从 0 更改为 10。内部 Kafka Streams 消费者max.poll.interval.ms 默认值从 300000 更改为Integer.MAX_VALUE。0.10.2.0 的显着变化Java 客户端(生产者和消费者)已经获得了与旧代理进行通信的能力。版本 0.10.2 的客户端可以与版本 0.10.0 或更新的代理通信。请注意,当使用较旧的代理时,某些功能不可用或受到限制。InterruptException如果调用线程被中断,Java 使用者上的几个方法现在可能会抛出。KafkaConsumer有关此更改的更深入说明,请参阅Javadoc。Java 使用者现在可以正常关闭。默认情况下,消费者最多等待 30 秒来完成挂起的请求。添加了一个新的带超时关闭 APIKafkaConsumer来控制最长等待时间。多个用逗号分隔的正则表达式可以通过 --whitelist 选项与新的 Java 使用者一起传递给 MirrorMaker。这使得使用旧 Scala 消费者时的行为与 MirrorMaker 一致。将您的 Streams 应用程序从 0.10.1 升级到 0.10.2 不需要代理升级。Kafka Streams 0.10.2 应用程序可以连接到 0.10.2 和 0.10.1 代理(尽管无法连接到 0.10.0 代理)。从 Streams API 中删除了 Zookeeper 依赖项。Streams API 现在使用 Kafka 协议来管理内部主题,而不是直接修改 Zookeeper。这消除了直接访问 Zookeeper 的权限需求,并且不应再在 Streams 应用程序中设置“StreamsConfig.ZOOKEEPER_CONFIG”。如果 Kafka 集群是安全的,Streams 应用程序必须具有创建新主题所需的安全权限。StreamsConfig 类中添加了几个新字段,包括“security.protocol”、“connections.max.idle.ms”、“retry.backoff.ms”、“reconnect.backoff.ms”和“request.timeout.ms”。用户应注意默认值并在需要时设置这些值。更多细节请参考3.5 Kafka Streams 配置。新协议版本KIP-88topics :如果数组设置为,OffsetFetchRequest v2 支持检索所有主题的偏移量null。KIP-88:OffsetFetchResponse v2 引入了一个顶级error_code字段。KIP-103:UpdateMetadataRequest v3为数组listener_name的元素引入了一个字段。end_pointsKIP-108:CreateTopicsRequest v1 引入了一个validate_only字段。KIP-108:CreateTopicsResponse v1为数组error_message元素引入了一个字段。topic_errors从 0.8.x、0.9.x 或 0.10.0.X 升级到 0.10.1.0

0.10.1.0 有有线协议更改。通过遵循以下推荐的滚动升级计划,您可以保证升级期间不会出现停机。但是,请在升级前注意0.10.1.0 中的潜在重大更改。注意:由于引入了新协议,因此在升级客户端之前升级 Kafka 集群很重要(即 0.10.1.x 客户端仅支持 0.10.1.x 或更高版本的代理,而 0.10.1.x 代理也支持旧客户端) .

对于滚动升级:

更新所有代理上的 server.properties 文件并添加以下属性:inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2.0、0.9.0.0 或 0.10.0.0)。log.message.format.version=CURRENT_KAFKA_VERSION(有关此配置的详细信息, 请参阅升级后的潜在性能影响。)一次升级一个代理:关闭代理,更新代码,然后重新启动。整个集群升级后,通过编辑 inter.broker.protocol.version 并将其设置为 0.10.1.0 来提升协议版本。如果您之前的消息格式是 0.10.0,请将 log.message.format.version 更改为 0.10.1(这是一个无操作,因为 0.10.0 和 0.10.1 的消息格式相同)。如果您之前的消息格式版本低于 0.10.0,请不要更改 log.message.format.version - 此参数仅应在所有消费者升级到 0.10.0.0 或更高版本后更改。一个个重启broker,让新的协议版本生效。如果此时 log.message.format.version 仍然低于 0.10.0,请等到所有消费者都升级到 0.10.0 或更高版本,然后在每个 broker 上将 log.message.format.version 更改为 0.10.1,然后一个一个地重新启动它们。

**注意:**如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并启动所有代理。默认情况下,它们将从新协议开始。

**注意:**升级代理后,可以随时更新协议版本并重新启动。它不必紧随其后。

0.10.1.0 中的潜在重大变化日志保留时间不再基于日志段的最后修改时间。相反,它将基于日志段中消息的最大时间戳。日志滚动时间不再取决于日志段创建时间。相反,它现在基于消息中的时间戳。进一步来说。如果segment中第一条消息的时间戳为T,则当新消息的时间戳大于等于T+log.roll.ms时,日志会被滚出由于为每个段添加了时间索引文件,0.10.0 的打开文件处理程序将增加约 33%。时间索引和偏移索引共享相同的索引大小配置。由于每个时间索引条目是偏移索引条目大小的 1.5 倍。用户可能需要增加 log.index.size.max.bytes 以避免潜在的频繁日志滚动。由于索引文件数量的增加,在一些日志段数较大(例如>15K)的broker上,broker启动过程中的日志加载过程可能会更长。根据我们的实验,将 num.recovery.threads.per.data.dir 设置为 1 可以减少日志加载时间。升级 0.10.0 Kafka Streams 应用程序将 Streams 应用程序从 0.10.0 升级到 0.10.1 确实需要代理升级,因为 Kafka Streams 0.10.1 应用程序只能连接到 0.10.1 代理。有几个 API 更改不向后兼容(请参阅0.10.1 中的 Streams API 更改以了解更多详细信息)。因此,您需要更新并重新编译您的代码。仅交换 Kafka Streams 库 jar 文件将不起作用,并且会破坏您的应用程序。从 0.10.0.x 升级到 0.10.1.2 需要两次滚动反弹,并upgrade.from="0.10.0"为第一个升级阶段设置了配置(参见KIP-268)。作为替代方案,也可以进行离线升级。为滚动反弹准备您的应用程序实例,并确保将配置upgrade.from设置"0.10.0"为新版本 0.10.1.2将应用程序的每个实例反弹一次为第二轮滚动反弹准备新部署的 0.10.1.2 应用程序实例;确保删除 config 的值upgrade.from再次反弹应用程序的每个实例以完成升级从 0.10.0.x 升级到 0.10.1.0 或 0.10.1.1 需要离线升级(不支持滚动反弹升级)停止所有旧 (0.10.0.x) 应用程序实例更新您的代码并用新代码和新 jar 文件交换旧代码和 jar 文件重新启动所有新的(0.10.1.0 或 0.10.1.1)应用程序实例0.10.1.0 的显着变化新的 Java 使用者不再处于测试阶段,我们建议将其用于所有新开发。旧的 Scala 消费者仍受支持,但它们将在下一个版本中被弃用,并将在未来的主要版本中删除。/开关不再需要与新消费者一起使用 MirrorMaker 和 Console Consumer 等工具--new-consumer;--new.consumer只需传递一个 Kafka 代理即可连接,而不是 ZooKeeper 集成。此外,控制台消费者与旧消费者的使用已被弃用,并将在未来的主要版本中删除。Kafka 集群现在可以由集群 ID 唯一标识。代理升级到 0.10.1.0 时会自动生成。集群 ID 可通过 kafka.server:type=KafkaServer,name=ClusterId 指标获得,它是元数据响应的一部分。序列化器、客户端拦截器和度量报告器可以通过实现 ClusterResourceListener 接口来接收集群 ID。BrokerState“RunningAsController”(值 4)已被删除。由于存在错误,代理在退出之前只会短暂处于此状态,因此删除的影响应该是最小的。检测给定代理是否是控制器的推荐方法是通过 kafka.controller:type=KafkaController,name=ActiveControllerCount 指标。新的 Java Consumer 现在允许用户按分区上的时间戳搜索偏移量。新的 Java Consumer 现在支持来自后台线程的心跳。有一个新配置 max.poll.interval.ms可以控制在消费者主动离开组之前轮询调用之间的最长时间(默认为 5 分钟)。配置的值 request.timeout.ms(默认为 30 秒)必须始终小于max.poll.interval.ms(默认为 5 分钟),因为这是消费者重新平衡时 JoinGroup 请求可以在服务器上阻塞的最长时间。最后,将 的默认值session.timeout.ms调低为 10 秒,将 的默认值max.poll.records改为 500。当使用授权者并且用户没有对主题的描述授权时,代理将不再向请求返回 TOPIC_AUTHORIZATION_FAILED 错误,因为这会泄漏主题名称。相反,将返回 UNKNOWN_TOPIC_OR_PARTITION 错误代码。这可能会在使用生产者和消费者时导致意外超时或延迟,因为 Kafka 客户端通常会自动重试未知主题错误。如果您怀疑这可能发生,您应该查阅客户端日志。默认情况下,获取响应具有大小限制(消费者为 50 MB,复制为 10 MB)。现有的每个分区限制也适用(消费者和复制为 1 MB)。请注意,这些限制都不是绝对最大值,如下一点所述。如果发现大于响应/分区大小限制的消息,消费者和副本可以取得进展。更具体地说,如果 fetch 的第一个非空分区中的第一条消息大于其中一个或两个限制,则仍将返回该消息。重载的构造函数被添加到kafka.api.FetchRequest并kafka.javaapi.FetchRequest允许调用者指定分区的顺序(因为顺序在 v3 中很重要)。先前存在的构造函数已被弃用,并且在发送请求之前对分区进行了洗牌以避免饥饿问题。新协议版本ListOffsetRequest v1 支持基于时间戳的精确偏移搜索。MetadataResponse v2 引入了一个新字段:“cluster_id”。FetchRequest v3 支持限制响应大小(除了现有的每个分区限制),如果需要取得进展,它会返回大于限制的消息,并且请求中的分区顺序现在很重要。JoinGroup v1 引入了一个新字段:“rebalance_timeout”。从 0.8.x 或 0.9.x 升级到 0.10.0.0

0.10.0.0 具有潜在的重大更改(请在升级前查看)和升级 后可能的性能影响。通过遵循下面推荐的滚动升级计划,您可以保证在升级期间和升级之后不会出现停机和性能影响。注意:由于引入了新协议,因此在升级客户端之前升级 Kafka 集群非常重要。

0.9.0.0 版本客户端注意事项:由于 0.9.0.0 中引入的错误,依赖 ZooKeeper(旧 Scala 高级消费者和 MirrorMaker,如果与旧消费者一起使用)的客户端将无法与 0.10.0.x 代理一起使用. 因此,0.9.0.0 客户端应该在代理升级到 0.10.0.x之前升级到 0.9.0.1。对于 0.8.X 或 0.9.0.1 客户端,此步骤不是必需的。

对于滚动升级:

更新所有代理上的 server.properties 文件并添加以下属性:inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如 0.8.2 或 0.9.0.0)。log.message.format.version=CURRENT_KAFKA_VERSION(有关此配置的详细信息, 请参阅升级后的潜在性能影响。)升级经纪人。这可以通过简单地关闭代理、更新代码并重新启动来一次完成。整个集群升级后,通过编辑 inter.broker.protocol.version 并将其设置为 0.10.0.0 来提升协议版本。注意:您不应该触摸 log.message.format.version - 此参数仅应在所有消费者升级到 0.10.0.0 后更改一个个重启broker,让新的协议版本生效。一旦所有消费者都升级到 0.10.0,在每个代理上将 log.message.format.version 更改为 0.10.0 并一一重启。

**注意:**如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并启动所有代理。默认情况下,它们将从新协议开始。

**注意:**升级代理后,可以随时更新协议版本并重新启动。它不必紧随其后。

升级到 0.10.0.0 后的潜在性能影响

0.10.0 中的消息格式包括一个新的时间戳字段,并使用压缩消息的相对偏移量。可以通过 server.properties 文件中的 log.message.format.version 配置磁盘消息格式。默认的磁盘消息格式为 0.10.0。如果消费者客户端在 0.10.0.0 之前的版本上,它只能理解 0.10.0 之前的消息格式。在这种情况下,代理能够将消息从 0.10.0 格式转换为更早的格式,然后再将响应发送给旧版本的消费者。但是,在这种情况下,经纪人不能使用零拷贝传输。Kafka 社区关于性能影响的报告显示,升级后 CPU 利用率从之前的 20% 变为 100%,这迫使所有客户端立即升级以使性能恢复正常。为了避免在消费者升级到 0.10.0.0 之前发生这种消息转换,可以在将代理升级到 0.10.0.0 时将 log.message.format.version 设置为 0.8.2 或 0.9.0。这样,broker 仍然可以使用零拷贝传输将数据发送给旧的消费者。一旦消费者升级,可以在代理上将消息格式更改为 0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已经升级但大多数客户端还没有升级时,尽可能避免消息转换至关重要。将代理升级到 0.10.0.0 时,message.format.version 到 0.8.2 或 0.9.0。这样,broker 仍然可以使用零拷贝传输将数据发送给旧的消费者。一旦消费者升级,可以在代理上将消息格式更改为 0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已经升级但大多数客户端还没有升级时,尽可能避免消息转换至关重要。将代理升级到 0.10.0.0 时,message.format.version 到 0.8.2 或 0.9.0。这样,broker 仍然可以使用零拷贝传输将数据发送给旧的消费者。一旦消费者升级,可以在代理上将消息格式更改为 0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已经升级但大多数客户端还没有升级时,尽可能避免消息转换至关重要。代理仍然可以使用零拷贝传输将数据发送给旧消费者。一旦消费者升级,可以在代理上将消息格式更改为 0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已经升级但大多数客户端还没有升级时,尽可能避免消息转换至关重要。代理仍然可以使用零拷贝传输将数据发送给旧消费者。一旦消费者升级,可以在代理上将消息格式更改为 0.10.0,并享受包含新时间戳和改进压缩的新消息格式。支持转换以确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已经升级但大多数客户端还没有升级时,尽可能避免消息转换至关重要。支持转换以确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已经升级但大多数客户端还没有升级时,尽可能避免消息转换至关重要。支持转换以确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已经升级但大多数客户端还没有升级时,尽可能避免消息转换至关重要。

对于升级到 0.10.0.0 的客户端,没有性能影响。

**注意:**通过设置消息格式版本,可以证明所有现有消息都在该消息格式版本之上或之下。否则 0.10.0.0 之前的消费者可能会中断。特别是,在消息格式设置为 0.10.0 之后,不应将其更改回早期格式,因为它可能会破坏 0.10.0.0 之前版本的消费者。

**注意:**由于在每条消息中引入了额外的时间戳,发送小消息的生产者可能会因为开销增加而看到消息吞吐量下降。同样,复制现在每条消息额外传输 8 个字节。如果您的运行接近集群的网络容量,您可能会不堪重负网卡并看到由于过载而导致的故障和性能问题。

**注意:**如果您在生产者上启用了压缩,您可能会注意到在某些情况下生产者吞吐量降低和/或代理上的压缩率降低。在接收压缩消息时,0.10.0 代理会避免重新压缩消息,这通常会减少延迟并提高吞吐量。但是,在某些情况下,这可能会减少生产者的批处理大小,从而导致吞吐量下降。如果发生这种情况,用户可以调整生产者的 linger.ms 和 batch.size 以获得更好的吞吐量。另外,使用 snappy 压缩消息的生产者缓冲区小于代理使用的缓冲区,这可能会对磁盘上消息的压缩率产生负面影响。我们打算在未来的 Kafka 版本中使其可配置。

0.10.0.0 中的潜在重大变化从 Kafka 0.10.0.0 开始,Kafka 中的消息格式版本表示为 Kafka 版本。例如,消息格式 0.9.0 是指 Kafka 0.9.0 支持的最高消息版本。消息格式 0.10.0 已引入,默认使用。它在消息中包含一个时间戳字段,并且相对偏移量用于压缩消息。ProduceRequest/Response v2 已引入,默认使用支持消息格式 0.10.0引入了 FetchRequest/Response v2,默认支持消息格式 0.10.0MessageFormatter 接口从更改def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)MessageReader 接口从更改def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]MessageFormatter 的包从更改kafka.tools为kafka.commonMessageReader 的包从 更改kafka.tools为kafka.commonMirrorMakerMessageHandler 不再公开该handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])方法,因为它从未被调用过。0.7 KafkaMigrationTool 不再与 Kafka 打包。如果您需要从 0.7 迁移到 0.10.0,请先迁移到 0.8,然后按照文档中的升级过程从 0.8 升级到 0.10.0。新的消费者已将其 API 标准化java.util.Collection为方法参数的序列类型。现有代码可能需要更新才能与 0.10.0 客户端库一起使用。LZ4 压缩消息处理已更改为使用可互操作的帧规范 (LZ4f v1.5.1)。为了保持与旧客户端的兼容性,此更改仅适用于消息格式 0.10.0 及更高版本。使用 v0/v1(消息格式 0.9.0)生成/获取 LZ4 压缩消息的客户端应继续使用 0.9.0 框架实现。使用 Produce/Fetch 协议 v2 或更高版本的客户端应使用可互操作的 LZ4f 框架。可在 http://www.lz4.org/ 上找到可互操作的 LZ4 库列表0.10.0.0 中的显着变化从 Kafka 0.10.0.0 开始,一个名为Kafka Streams的新客户端库可用于对存储在 Kafka 主题中的数据进行流处理。由于上面提到的消息格式更改,这个新的客户端库仅适用于 0.10.x 和更高版本的代理。有关更多信息,请阅读Streams 文档。receive.buffer.bytes对于新的消费者,配置参数的默认值现在是 64K。新消费者现在公开配置参数exclude.internal.topics以限制内部主题(例如消费者偏移主题)意外包含在正则表达式订阅中。默认情况下,它已启用。旧的 Scala 生产者已被弃用。用户应尽快将其代码迁移到 kafka-clients JAR 中包含的 Java 生产者。新的消费者 API 已被标记为稳定。从 0.8.0、0.8.1.X 或 0.8.2.X 升级到 0.9.0.0

0.9.0.0 具有潜在的重大更改(请在升级前查看)以及与以前版本相比的代理间协议更改。这意味着升级后的代理和客户端可能与旧版本不兼容。在升级客户端之前升级 Kafka 集群非常重要。如果您使用 MirrorMaker 下游集群也应该先升级。

对于滚动升级:

更新所有代理上的 server.properties 文件并添加以下属性:inter.broker.protocol.version=0.8.2.X升级经纪人。这可以通过简单地关闭代理、更新代码并重新启动来一次完成。升级整个集群后,通过编辑 inter.broker.protocol.version 并将其设置为 0.9.0.0 来提升协议版本。一个个重启broker,新协议版本生效

**注意:**如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并启动所有代理。默认情况下,它们将从新协议开始。

**注意:**升级代理后,可以随时更新协议版本并重新启动。它不必紧随其后。

0.9.0.0 中的潜在重大变化不再支持 Java 1.6。不再支持 Scala 2.9。超过 1000 的代理 ID 现在默认保留为自动分配的代理 ID。如果您的集群的现有代理 ID 高于该阈值,请确保相应地增加 reserved.broker.max.id 代理配置属性。配置参数 replica.lag.max.messages 已删除。分区领导者在决定哪些副本同步时将不再考虑滞后消息的数量。配置参数replica.lag.time.max.ms 现在不仅指自上次从副本获取请求以来经过的时间,还指自上次从副本赶上以来的时间。仍在从领导者那里获取消息但没有赶上replica.lag.time.max.ms 中的最新消息的副本将被视为不同步。压缩主题不再接受没有密钥的消息,如果尝试这样做,生产者会抛出异常。在 0.8.x 中,没有 key 的消息会导致日志压缩线程随后抱怨并退出(并停止压缩所有压缩主题)。MirrorMaker 不再支持多个目标集群。因此,它只接受一个 --consumer.config 参数。要镜像多个源集群,每个源集群至少需要一个 MirrorMaker 实例,每个实例都有自己的使用者配置。org.apache.kafka.clients.tools.**下打包的工具已移至org.apache.kafka.tools.**。所有包含的脚本仍将照常运行,只有直接导入这些类的自定义代码会受到影响。kafka-run-class.sh 中的默认 Kafka JVM 性能选项 (KAFKA_JVM_PERFORMANCE_OPTS) 已更改。kafka-topics.sh 脚本 (kafka.admin.TopicCommand) 现在在失败时以非零退出代码退出。kafka-topics.sh 脚本 (kafka.admin.TopicCommand) 现在将在主题名称因使用“.”而导致度量冲突时打印警告。或主题名称中的“_”,如果发生实际冲突,则会出错。kafka-console-producer.sh 脚本 (kafka.tools.ConsoleProducer) 将默认使用 Java 生产者而不是旧的 Scala 生产者,用户必须指定“旧生产者”才能使用旧生产者。默认情况下,所有命令行工具都会将所有日志消息打印到 stderr 而不是 stdout。0.9.0.1 中的显着变化可以通过将 broker.id.generation.enable 设置为 false 来禁用新的代理 ID 生成功能。配置参数 log.cleaner.enable 现在默认为 true。这意味着具有 cleanup.policy=compact 的主题现在将默认压缩,并且 128 MB 的堆将通过 log.cleaner.dedupe.buffer.size 分配给清理进程。您可能需要根据您对压缩主题的使用情况查看 log.cleaner.dedupe.buffer.size 和其他 log.cleaner 配置值。新消费者的配置参数 fetch.min.bytes 的默认值现在默认为 1。0.9.0.0 中的弃用从 kafka-topics.sh 脚本 (kafka.admin.TopicCommand) 更改主题配置已被弃用。今后,请使用 kafka-configs.sh 脚本 (kafka.admin.ConfigCommand) 来实现此功能。kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) 已被弃用。今后,请使用 kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) 来实现此功能。kafka.tools.ProducerPerformance 类已被弃用。今后,请使用 org.apache.kafka.tools.ProducerPerformance 来实现此功能(kafka-producer-perf-test.sh 也将更改为使用新类)。生产者配置 block.on.buffer.full 已被弃用,并将在未来版本中删除。目前其默认值已更改为 false。KafkaProducer 将不再抛出 BufferExhaustedException,而是使用 max.block.ms 值进行阻塞,之后它将抛出 TimeoutException。如果 block.on.buffer.full 属性显式设置为 true,它会将 max.block.ms 设置为 Long.MAX_VALUE 并且 metadata.fetch.timeout.ms 将不被遵守从 0.8.1 升级到 0.8.2

0.8.2 与 0.8.1 完全兼容。只需将其关闭、更新代码并重新启动它,即可一次完成一个代理的升级。

从 0.8.0 升级到 0.8.1

0.8.1 与 0.8 完全兼容。只需将其关闭、更新代码并重新启动它,即可一次完成一个代理的升级。

从 0.7 升级

0.7 版与较新的版本不兼容。对 API、ZooKeeper 数据结构、协议和配置进行了重大更改,以添加复制(0.7 中缺少这些)。从 0.7 升级到更高版本需要特殊的迁移工具。无需停机即可完成此迁移。

标签: kafka客户端maven

抱歉,评论功能暂时关闭!