• 0
  • 0
分享

相信大家对Kafka不会陌生,但首先还是要简单介绍一下。

Kafka是一种高性能的分布式消息系统,由LinkedIn公司开发,用于处理海量的实时数据流。它采用了发布/订阅模式,可以将数据流分发到多个消费者端,同时提供了高可靠性、高吞吐量和低延迟的特性。

Kafka的应用场景非常广泛,例如日志收集、事件流处理、实时监控等。在这些场景中,Kafka可以提供高可靠性和低延迟的数据传输,确保数据的稳定性和实时性。与此同时,Kafka还提供了丰富的API和管理工具,使得用户可以方便地配置和管理Kafka集群。

很多高性能方案都会用到Kafka,今天我来分享如何使用Kafka Client API进行Kafka生产者和消费者压测。

依赖

我用了Gradle创建的项目,依赖配置如下:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '3.4.0'

kafka服务端

我本地用了Kafka最新版本:kafka_2.12-3.4.0,这个版本可以不依赖zookeeper,非常方便,用来本地功能验证和测试我是十分推荐的。基本做到了开箱即用。

具体的流程可以自行搜索。

生产者压测Demo

在创建生产者时,会有不少的参数需要配置,这里建议使用默认的。或者使用待测试参数组合。下面是我自己的配置,常用的参数我都列了出来。具体参数含义,可以自行搜索,这方面资料还是很多的,下面直接进入压测用例环节。

package com.funtest.kafka


import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import com.funtester.utils.StringUtil
import groovy.util.logging.Log4j2
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer

@Log4j2
class Produce extends SourceCode {

    static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); //所有分区副本都收到确认信息,才能确认写入
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name);
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        def topic = "testkafka"
        def test = {
            producer.send(new ProducerRecord<>(topic, StringUtil.getString(10)))
        }
        new FunQpsConcurrent(test,"Kafka测试").start()

        producer.close();
    }

}

这里用到了动态QPS模型,最后的close()也可以不使用,毕竟main方法的代码结束了就真的结束了。

消费者

呼应生产者,消费者也有一堆需要配置的参数。这里先按下不表,有兴趣的可以自行学习。

Kafka消费者有两种订阅消息的方式,分别是订阅模式和分配模式。

订阅模式是指消费者订阅一个或多个主题,然后自动分配分区进行消费。这种模式下,Kafka会自动管理消费者与分区之间的关系,当有新的消费者加入或者退出消费组时,Kafka会自动重新分配分区,保证每个消费者都能够获取到消息。

而分配模式则是由消费者主动向Kafka请求分配指定的分区进行消费。这种模式下,消费者需要手动管理分区与消费者之间的关系,需要注意的是,当有新的消费者加入或者退出消费组时,需要手动重新分配分区。

订阅模式相对于分配模式来说更加简单易用,但是分配模式可以更加灵活地控制消费者与分区之间的关系。所以我选择了订阅模式。

package com.funtest.kafka

import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer

import java.time.Duration

class Cunsumer extends SourceCode {

    static void main(String[] args) {
        KafkaConsumer<String, String> consumer;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FunTester32");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10000");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
                , "earliest");
        consumer = new KafkaConsumer<>(properties);

        String topic = "testkafka";
//        TopicPartition topicPartition = new TopicPartition(topic, 0);
//        List<TopicPartition> topics = Arrays.asList(topicPartition);
//        consumer.assign(topics);
//        consumer.seekToEnd(topics);
//        long current = consumer.position(topicPartition);
//        consumer.seek(topicPartition, current - 10);//手动设置偏移量
        consumer.subscribe([topic])//订阅模式,不能与assign混用
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            sleep(1.0)
        }

        def test = {
            consumer.poll(Duration.ofMillis(1000));
        }
        new FunQpsConcurrent(test,"Kafka消费").start()
        consumer.close()

    }
}

由于本地机器原因,需要在服务器上启动一个Kafka服务,用来测试不同参数组合情况下Kafka的性能表现。后续有机会再来分享。


  • 【留下美好印记】
    赞赏支持
登录 后发表评论
+ 关注

热门文章

    最新讲堂

      • 推荐阅读
      • 换一换
          • 一、XSS的基本概念XSS又叫CSS (Cross Site Script) ,为了和css(层叠样式表)区分,我们通常称它为(xss)跨站脚本攻击。它指的是恶意攻击者往Web页面里插入恶意html代码,当用户浏览该页之时,嵌入其中Web里面的html代码会被执行,从而达到恶意的特殊目的。xss是一种发生在web前端的漏洞,所以其危害的对象也主要是前端用户.在OWASP Web Application Top 10排行榜中长期霸榜,从未掉出过前三名。XSS这类安全问题发生的本质原因在于,浏览器错误的将攻击者提供的用户输入数据当做JavaScript脚本给执行了。XSS有几种不同的分类办法,例如...
            0 0 1778
            分享
          •   一、背景  在我们日常的测试工作中,无论在数据迁移还是系统测试过程中,测试数据准确性和可用性是测试工作的重中之重,测试人员经常需要查询数据库中的数据或手工造测试数据,而登到oracle数据库后有时会发现显示中文乱码的情况,如下图所示:  或者是:  好好的怎么就乱码了呢?明明“之前”访问其他数据库是没问题。在学习和查询相关资料后我解决了这个问题,但由此引发了我对出现汉字乱码问题的学习兴趣,都是什么原因导致的汉字显示乱码呢?  首先,字符集设置不当是影响Oracle数据库汉字显示的关键问题。下文则详细介绍了oracle关于字符集的分类、构成及设定方法,分析了ORACLE数据库汉字显示乱码的常...
            0 0 534
            分享
          • 利用空闲之余,写了第一个接口自动化测试demo, 通过读取execl中的接口测试用例,接口自动执行。(这里跟很多网上的接口自动化有点不同的是:无需再写代码,只需要从execl中增加用例,就可执行)。这是execl的模板:这个模板可以很好的管理项目的各个模块,看起来也是简洁,也是颇为喜欢的~~主要就是写了这几个类,完成了接口自动化的第一步:在尝试写这个demo时,最大的问题困扰的我是:每个请求的参数方式(请求参数和body参数)不一致,个数不一致。怎样能用简洁的方式实现? 最终解决的办法就是:在execl中增加参数类型判断:paramType, 如果是params,封装一个将json格式的字符串...
            0 1 2744
            分享
          •   关于软件测试行业的职业发展方向,在网络上总能看到各种各样的问题。  · 有关注零基础能不能入行的  · 有关注25岁入行晚不晚的  · 还有关注35岁后的职业发展方向的  ······  在此过程中,看到很多行业大佬分享了自己的工作经验,也给出了很多自己的建议——要想在测试行业有更长远的发展,一定要关注自己的职业发展道路,也就是你的晋升之路。为此我做了以下三点总结:  一、软件测试职业发展方向情况  业内人士表示,“由于我国的软件行业已经突破了作为一种工业化产品的阶段,软件测试已经成为软件开发企业不可或缺的质量监控部门。目前,我国软件测试人才的数量相对滞后于产业升级,从而形成了软...
            1 1 688
            分享
          •   关于新人如何做好功能测试,以下是我个人的一些思考。  测试基础的重要性  作为一名测试新人,测试基础非常非常重要。这里说的基础,不仅仅是什么是软件测试、软件测试的目的,而是测试用例的设计能力。  因工作的原因,近来接触不少毕业3、4年,甚至7、8年的测试同学,对用例设计还是停留在理论阶段,这让人不免有些无力吐槽。  Q:软件测试用例的测试方法有哪些?  回答:等价类、边界值、因果图等等。  Q:结合实际的业务场景,来说说常用到的测试用例设计的方法。  回答:不少回复都是以登录来做说明的。  其实日常工作中,常用到的用例设计也就那么几种,如果我们能把理论好好应用到实际工作中,那么涨薪其实也很...
            0 0 1133
            分享
      • 51testing软件测试圈微信