• 0
  • 0
分享

相信大家对Kafka不会陌生,但首先还是要简单介绍一下。

Kafka是一种高性能的分布式消息系统,由LinkedIn公司开发,用于处理海量的实时数据流。它采用了发布/订阅模式,可以将数据流分发到多个消费者端,同时提供了高可靠性、高吞吐量和低延迟的特性。

Kafka的应用场景非常广泛,例如日志收集、事件流处理、实时监控等。在这些场景中,Kafka可以提供高可靠性和低延迟的数据传输,确保数据的稳定性和实时性。与此同时,Kafka还提供了丰富的API和管理工具,使得用户可以方便地配置和管理Kafka集群。

很多高性能方案都会用到Kafka,今天我来分享如何使用Kafka Client API进行Kafka生产者和消费者压测。

依赖

我用了Gradle创建的项目,依赖配置如下:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '3.4.0'

kafka服务端

我本地用了Kafka最新版本:kafka_2.12-3.4.0,这个版本可以不依赖zookeeper,非常方便,用来本地功能验证和测试我是十分推荐的。基本做到了开箱即用。

具体的流程可以自行搜索。

生产者压测Demo

在创建生产者时,会有不少的参数需要配置,这里建议使用默认的。或者使用待测试参数组合。下面是我自己的配置,常用的参数我都列了出来。具体参数含义,可以自行搜索,这方面资料还是很多的,下面直接进入压测用例环节。

package com.funtest.kafka


import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import com.funtester.utils.StringUtil
import groovy.util.logging.Log4j2
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer

@Log4j2
class Produce extends SourceCode {

    static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); //所有分区副本都收到确认信息,才能确认写入
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name);
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        def topic = "testkafka"
        def test = {
            producer.send(new ProducerRecord<>(topic, StringUtil.getString(10)))
        }
        new FunQpsConcurrent(test,"Kafka测试").start()

        producer.close();
    }

}

这里用到了动态QPS模型,最后的close()也可以不使用,毕竟main方法的代码结束了就真的结束了。

消费者

呼应生产者,消费者也有一堆需要配置的参数。这里先按下不表,有兴趣的可以自行学习。

Kafka消费者有两种订阅消息的方式,分别是订阅模式和分配模式。

订阅模式是指消费者订阅一个或多个主题,然后自动分配分区进行消费。这种模式下,Kafka会自动管理消费者与分区之间的关系,当有新的消费者加入或者退出消费组时,Kafka会自动重新分配分区,保证每个消费者都能够获取到消息。

而分配模式则是由消费者主动向Kafka请求分配指定的分区进行消费。这种模式下,消费者需要手动管理分区与消费者之间的关系,需要注意的是,当有新的消费者加入或者退出消费组时,需要手动重新分配分区。

订阅模式相对于分配模式来说更加简单易用,但是分配模式可以更加灵活地控制消费者与分区之间的关系。所以我选择了订阅模式。

package com.funtest.kafka

import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer

import java.time.Duration

class Cunsumer extends SourceCode {

    static void main(String[] args) {
        KafkaConsumer<String, String> consumer;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FunTester32");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10000");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
                , "earliest");
        consumer = new KafkaConsumer<>(properties);

        String topic = "testkafka";
//        TopicPartition topicPartition = new TopicPartition(topic, 0);
//        List<TopicPartition> topics = Arrays.asList(topicPartition);
//        consumer.assign(topics);
//        consumer.seekToEnd(topics);
//        long current = consumer.position(topicPartition);
//        consumer.seek(topicPartition, current - 10);//手动设置偏移量
        consumer.subscribe([topic])//订阅模式,不能与assign混用
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            sleep(1.0)
        }

        def test = {
            consumer.poll(Duration.ofMillis(1000));
        }
        new FunQpsConcurrent(test,"Kafka消费").start()
        consumer.close()

    }
}

由于本地机器原因,需要在服务器上启动一个Kafka服务,用来测试不同参数组合情况下Kafka的性能表现。后续有机会再来分享。


  • 【留下美好印记】
    赞赏支持
登录 后发表评论
+ 关注

热门文章

    最新讲堂

      • 推荐阅读
      • 换一换
          •   微软笑拉了!昨晚谷歌展示了新AI加持后的搜索引擎,毫无亮点,效果拉跨,甚至回答还被爆出存在事实性错误。发布会后,股价直接一泻千里。  谷歌和微软的搜索引擎大战,已经打到了第三天。  这几天,全世界的目光都聚焦于此。毕竟,上一次两家巨头发生如此激烈的酣战,还是在十多年前。  北京时间昨晚九点半,谷歌CEO抢先公布的“ChatGPT同款”Bard在巴黎首次亮相,同时还有一众基于AI的产品更新。  此前,微软已经先下一城,率先召开发布会,展示了“ChatGPT搜索引擎”必应。而这次,轮到谷歌大显身手了。  面对微软的重重暴击,谷歌会怎么打回去?带着这个悬念摩拳擦掌期待了一天的“瓜友”们,看完直播...
            0 0 698
            分享
          •   今日,百度、字节、商汤、中科院旗下紫东太初、百川智能、智谱华章等 8 家企业 / 机构的大模型产品已经首批通过《生成式人工智能服务管理暂行办法》备案,可正式上线面向公众提供服务。外界注意到,阿里通义千问、360 智脑、科大讯飞星火大模型不在首批获批名单中。  据《科创板板日报》报道,针对大模型应用开放问题,记者今日以投资者身份致电科大讯飞,接电人员表示,科大讯飞已首批顺利完成备案。首批通过备案的企业名单,预计将在 1 周内陆续由各地方相关管理部门通知大家。  而早些时候贝壳财经也报道称,从多位独立信源处获悉,国内将有 11 家大模型陆续通过《生成式人工智能服务管理暂行办法》备案,首批将在 ...
            0 0 786
            分享
          • 扎实的基础是成功的一半,学号好基础,才能更好的进步!常见的测试用例设计方法主要会涉及以下几种:1、等价类2、边界值3、场景法4、判定表5、因果图6、错误推断法7、正交测试法(正交表)(今天主要解释前三种最为常用)选择合适的测试用例方法,有助于你去更好的梳理出逻辑关联关系,让你的测试覆盖率更高,更高效率的覆盖到所有测试点。一、等价类划分法1)定义依据需求输入划分为若干等价类,从等价类中选定一个测试用例,如果该测试用例通过,则表明整个等价类通过测试如:微信发红包0.01--2002)适用场景一般适用于无限多种输入,我们不可能完成穷举测试,等价类可以使我们用较少的测试用例尽可能多的将功能覆盖。3)有...
            0 0 2064
            分享
          • 1、手工测试与自动化测试其实并不是对立的并不是所有的功能自动化测试都可以实现,它的效率也不高,但是可以完成一部分场景的功能回归。自动化测试发展了这么多年,也没有把手工测试给取代。2、手工测试的特点手工测试能通过人为的逻辑判断效验当前的步骤是否正确,同时用例的执行具有一定步骤跳跃性,能够清楚知道逻辑,细致定位问题。如果修改bug所需时间稍长,那么想将手工测试应用于回归测试将变得异常困难。这是因为需要测试的测试用例太多,所以需要引入自动化测试。3、自动化测试的特点执行的对象是脚本,能通过人为的逻辑判断效验当前的步骤是否正确实现,用例步骤之间关联性强,不像手工测试用例那么跳跃。另外也是用来保证产品主...
            0 0 858
            分享
          • 为了恢复作为给定事务的一部分所做的所有更改,执行 ROLLBACK 命令。这将导致还原与事务相关的更改。ROLLBACKS 通常在事务执行期间观察到/发生错误时应用。让我们看一个使用 ROLLBACK 命令的示例。我们将使用相同的交易借记 ACC1,贷记 ACC2,资金为 100 美元START TRANSACTION; --statement1UPDATE bankaccounts SET funds=funds-100 WHERE accountno='ACC1'; --statement2UPDATE bankaccounts SET funds=funds+100 WH...
            0 0 1963
            分享
      • 51testing软件测试圈微信