• 0
  • 0
分享
  • 使用Flink对Strom任务进行迁移——软件测试圈
  • 曼倩诙谐 2022-08-05 11:53:12 字数 1625 阅读 1719 收藏 0

Flink和Strom都是时下较为流行的数据流平台,考虑以下一种应用场景:已经使用Strom完成了对于某一逻辑功能的开发,如果现在期望使用Flink实现相同的逻辑,那么就需要考虑如何使用Flink来对Strom任务的逻辑功能进行最简单的复现测试。

使用Flink来测试Strom任务的逻辑主要存在两个最基本的问题:第一,Storm通过自定义的Bolt类实现自定义的逻辑,在Flink中如何实现?第二,Storm按照自定义标准实现数据分发的逻辑,在Flink中如何实现?

本文主要通过两个最基本的Flink程序实例对上述两个使用Flink测试Strom任务逻辑存在的基本问题进行解答。

第一个问题,我们可以通过Flink的ProcessFuction类进行实现,通过继承该类,在该类的processElement方法中实现自定义逻辑。ProcessFuction类如下图所示,我们可以通过var1这个参数直接获取当前流中的数据,然后进行自定义的逻辑加工,再通过Collector类var3的collect方法将处理后的数据发送到下一个流中。

-1.png

  假设某一Strom任务的功能逻辑是:

① 对初始数据源(一个字符串)末尾添加一个字符串。

② 然后再次添加另一个字符串。

  我们以上述对字符串加工的Strom任务为例,说明Flink程序如何通过ProcessFuction类对该任务实现复现测试。

(1)Flink主程序,假设初始数据源为“abc”。

-2.png

(2)第一个业务加工类,给数据流末尾添加“def”。

1-3.png


(3)第二个业务加工类,给数据流末尾添加“ghi”。

1-4.png


(4)执行Flink程序,观察输出结果,“abc”被二次加工为“abcdefghi”。

1-5.png

第二个分发数据的问题,我们假设某一Strom任务的功能逻辑是对数据源(股票对象)进行分类,将股价高于X的分为一类,将股价小于等于X的分为另一类。

我们以上述对股票数据对象分类处理的Strom任务为例,说明Flink程序如何通过旁路输出特性实现对数据流按照自定义标准分类,输出到不同的子数据流中处理。

-6.png

Flink 的旁路输出依然涉及ProcessFunction类的processElement方法,该方法的Context类型的var2参数的主要作用是利用其output方法进行旁路输出(我们用于进行数据分流)。

Flink的旁路输出特性可以用来对数据进行分流,通过创建一个流的标签(OutputTag),再利用这个OutputTag标签对象作为参数,调用初始/父级数据流的getSideOutput(OutputTag)方法获取子数据流。

每个流标签都有一个id,也可以不创建对象,只要流标签的id相同,其中的数据就相同。因此,可以通过匿名内部类的形式来获取子数据流。第一个参数是id,第二个参数是数据类型(不能省略)。

(1)创建股票类Stock,属性包括名称和价格。

-7.png


(2)创建消费消息的Flink程序。

1-8.png


(3)创建生产消息的Flink程序。

1-9.png

我们用“STOCK_LOW_PRICE”和“STOCK_HIGH_PRICE”这两个ID作为两个旁路输出标签的ID。

在processElement方法中,我们通过判断股票的价格是否大于50区分出低价股和高价股,利用Context对象的output方法进行旁路输出,把price小于50的Stock对象输出到ID为“STOCK_LOW_PRICE”的低价股标签旁路中,而把price大于等于50的Stock对象输出到ID为“STOCK_HIGH_PRICE”的高价股标签旁路中。

-10.png

(4)依次启动消费者程序、生产者程序,观察消费者程序控制台中的输出:

1-11.png

此时,桌面生成了两个文件夹,当中记录了股票数据,result1记录了小于50的低价股,result2中记录了股价大于等于50的高价股。

1-12.png

1-13.png



作者:孙俊辉    

来源:http://www.51testing.com/html/77/n-7792377.html

  • 【留下美好印记】
    赞赏支持
登录 后发表评论
+ 关注

热门文章

    最新讲堂

      • 推荐阅读
      • 换一换
          •   高速创新的能力一直是现代数字景观的基石。领先的行业巨头每天发布数百万行代码并实施数百次更新,确保无可挑剔的软件质量。由于数量庞大,仅靠手动测试很难获得这样的结果。  这就是企业认为自动化工作流程对项目成功至关重要的原因。在 QA 方面,测试自动化是一种行之有效的方法,可以使公司更加敏捷、创新和更具竞争力,并帮助他们更快、更频繁地以更低的成本推出高质量的数字产品和新 功能. 难怪其全球市场规模到本十年末将翻一番,达到570 亿美元。  但是企业如何才能从测试自动化方法中获得最终价值呢?在这篇文章中,我建议结合左移 和持续测试 来释放软件开发潜力并提高业务能力。  连续左移测试自动化概念探索 ...
            0 0 701
            分享
          • 一、接口自动化测试流程1、需求分析;请求(url、方法、数据)响应(响应数据、状态码)2、挑选需要做自动化接口(时间、人员、接口复杂度);3、设计自动化测试用例(如果功能阶段设计过用例,直接拿过来使用即可);4、搭建自动化测试环境(实现自动化使用的语言 如:(python、pycharm));5、设计自动化执行框架(报告、参数化、用例执行框架);6、编写代码;7、执行用例(unittest、pytest);8、生成测试报告(htmltextrunn er\allure)。二、接口清单整理登录接口请求登陆接口请求请求url:http://ttapi.research.itcast.cn/app/...
            0 0 1411
            分享
          • 前言Spring一直是很火的一个开源框架,在过去的一段时间里,Spring Boot在社区中热度一直很高,所以决定花时间来了解和学习,为自己做技术储备。正文首先声明,Spring Boot不是一门新技术,所以不用紧张。从本质上来说,Spring Boot就是Spring,它做了那些没有它你也会去做的Spring Bean配置。它使用“习惯优于配置”(项目中存在大量的配置,此外还内置了一个习惯性的配置,让你无需手动进行配置)的理念让你的项目快速运行起来。使用Spring Boot很容易创建一个独立运行(运行jar,内嵌Servlet容器)、准生产级别的基于Spring框架的项目,使用Spring...
            0 0 733
            分享
          •   一、性能指标在性能测试的作用?  性能指标在性能测试中起着非常重要的作用,它们帮助我们评估和了解系统的性能表现。下面用通俗易懂的话来解释性能指标的作用和意义:  1.帮助我们了解系统的处理能力:性能指标可以告诉我们系统在给定负载下能够处理多少请求。就像一个快递小哥,他每天能够送多少个包裹,这个数字就是他的处理能力。对于系统来说,性能指标可以告诉我们它能够处理多少个请求,这样我们就可以知道系统的处理能力是否满足我们的需求。  2.帮助我们评估系统的稳定性:性能指标还可以帮助我们评估系统在高负载情况下的稳定性。就像一辆车,我们要知道它在高速行驶时是否稳定,需要了解它的最高速度和操控性能。对于系...
            0 0 962
            分享
          • 新浪科技讯北京时间11月14日上午消息,据报道,多年来科技巨头们一直支撑着美国股市,它们的股价连连创下新高,但是2022年美国股市突然不行了,于是科技巨头也就没有那么风光了。尽管上周美国股市出现反弹,今年苹果、微软、亚马逊、谷歌母公司Alphabet、Meta的市值总和还是损失了超3万亿美元。这主要是因为营收增速放缓,利率上升。2020年9月5家公司的市值在标普500指数中占比达到24%,创历史新高,现在降到了19%。美联储退出宽松政策,股市已经发生了很大的变化。就在科技企业影响力退潮时,传统行业却在前进,能源和银行等行业在标普500指数中占据了更大的份额,埃克森美孚和富国银行都因为高油价或者...
            0 0 964
            分享
      • 51testing软件测试圈微信