• 0
  • 0
分享

对于性能测试来讲,使用编程语言实现性能测试用例的核心就是并发编程,也就是同时执行多个测试用例,以模拟真实的负载情况。并发编程可以有效地提高测试效率,可以更快地发现系统中的瓶颈和性能问题。在实现并发编程时,需要考虑线程的同步和互斥,以确保测试结果的正确性和可靠性。此外,还需要考虑如何分配和管理资源,以避免资源竞争和浪费。

之前已经使用了Java实现,最近在计划使用Go语言实现一些新的压测功能的开发,这其中肯定也少不了使用到线程池(Go中协程池)。虽然Go语言协程已经非常强大了,很多情况下,我们可以直接使用go关键字直接创建协程去执行任务。但是在任务调度和负载保护的场景中,还是有所欠缺。所以在参考了Java线程池实现类java.util.concurrent.ThreadPoolExecutor自己实现了一个包含等待队列、调度以及等待队列任务完成的协程池。

PS:文中若在Go语言语境中出现线程,均指协程。

ThreadPoolExecutor分析

首先我们看看java.util.concurrent.ThreadPoolExecutor的实现中几个比较重要的功能点,然后简单介绍实现逻辑。下面是构造方法:

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    }

这里我省略了具体实现,我们看到参数:

  • 核心线程数、最大线程数,这两个用来管理线程池的数量。
  • 最大空闲时间,时间单位,这俩组合起来回收空闲线程。
  • workQueue,用例暂存任务
  • 线程工厂和拒绝策略,这俩用处少,忽略。(Go协程池也没有设计这俩) 下面就要祭出个人原创画作:

这里我借鉴了 动态修改coreThread线程池拓展的思路,不再依靠任务队列是否已满来作为增加线程池线程数的依据。除了依赖等待队列的数量以外,还提供单独的API(这一点跟java.util.concurrent.ThreadPoolExecutor是一样的)。

协程池属性设计

我从Java抄来两个属性:核心数,最大数。其中核心数在协程池自己管理中收到最大值的限制,在使用API时不受限制。

同样的,我抄来一个等待队列的概念,使用chan func() taskType实现,taskType用来区分是普通任务还是具有管理效果的任务(目前只有减少协程数管理事件,自增事件通过单独的协程实现)

超时时间,这个必不可少,庆幸的是Go在这方面比较灵活,我抄了一个简单Demo实现。

我增加了活跃协程数(这个在java.util.concurrent.ThreadPoolExecutor也有,但未显式展示),协程池状态(防止main结束导致进程直接结束)。

计数类,收到任务数,执行任务数,用来统计任务执行数量。这个同java.util.concurrent.ThreadPoolExecutor。

协程池实现

struct展示

type GorotinesPool struct {
 Max          int
 Min          int
 tasks        chan func() taskType
 status       bool
 active       int32
 ReceiveTotal int32
 ExecuteTotal int32
 addTimeout   time.Duration
}

事件类型枚举

type taskType int

const (
 normal taskType = 0
 reduce taskType = 1
)

构造方法

这里我选择了直接创建所有核心线程数。

  • 如果复用java.util.concurrent.ThreadPoolExecutor后创建,会功能变得复杂
  • Go语言创建协程资源消耗较低
  • 测试下来,耗时非常低,简单粗暴但是可靠
  • // GetPool

  • //  @Description: 创建线程池

  • //  @param max 最大协程数

  • //  @param min 最小协程数

  • //  @param maxWaitTask 最大任务等待长度

  • //  @param timeout 添加任务超时时间,单位s

  • //  @return *GorotinesPool

  • //

  • func GetPool(max, min, maxWaitTask, timeout int) *GorotinesPool {

  •  p := &GorotinesPool{

  •   Max:          max,

  •   Min:          min,

  •   tasks:        make(chan func() taskType, maxWaitTask),

  •   status:       true,

  •   active:       0,

  •   ReceiveTotal: 0,

  •   ExecuteTotal: 0,

  •   addTimeout:   time.Duration(timeout) * time.Second,

  •  }

  •  for i := 0; i < min; i++ {

  •   atomic.AddInt32(&p.active, 1)

  •   go p.worker()

  •  }

  •  go func() {

  •   for {

  •    if !p.status {

  •     break

  •    }

  •    ftool.Sleep(1000)

  •    p.balance()

  •   }

  •  }()

  •  return p

  • }


管理协程数

主要分成2个:增加和减少,增加比较简单,减少的话,我通过管理事件(taskType)实现,如果需要减少线程数,我就往队列里面添加一个reduce的事件,然后任意一个协程收到之后就终止。后面会分享worker实现。

// AddWorker
//  @Description: 添加worker,协程数加1
//  @receiver pool
//
func (pool *GorotinesPool) AddWorker() {
 atomic.AddInt32(&pool.active, 1)
 go pool.worker()
}

// ReduceWorker
//  @Description: 减少worker,协程数减1
//  @receiver pool
//
func (pool *GorotinesPool) ReduceWorker() {
 atomic.AddInt32(&pool.active, -1)
 pool.tasks <- func() taskType {
  return reduce
 }
}

// balance
//  @Description: 平衡活跃协程数
//  @receiver pool
//
func (pool *GorotinesPool) balance() {
 if pool.status {
  if len(pool.tasks) > 0 && pool.active < int32(pool.Max) {
   pool.AddWorker()
  }
  if len(pool.tasks) == 0 && pool.active > int32(pool.Min) {
   pool.ReduceWorker()
  }
 }
}

worker

// worker
//  @Description: 开始执行协程
//  @receiver pool
//
func (pool *GorotinesPool) worker() {
 defer func() {
  if p := recover(); p != nil {
   log.Printf("execute task fail: %v", p)
  }
 }()
Fun:
 for t := range pool.tasks {
  atomic.AddInt32(&pool.ExecuteTotal, 1)
  switch t() {
  case normal:
   atomic.AddInt32(&pool.active, -1)
  case reduce:
   if pool.active > int32(pool.Min) {
    break Fun
   }
  }
 }
}

保障任务完成

为了防止进程终止而任务没有完成,我增加了线程池的状态state和等待方法(此方法需要显式调用)。

// Wait
//  @Description: 结束等待任务完成
//  @receiver pool
//
func (pool *GorotinesPool) Wait() {
 pool.status = false
Fun:
 for {
  if len(pool.tasks) == 0 || pool.active == 0 {
   break Fun
  }
  ftool.Sleep(1000)
 }
 defer close(pool.tasks)
 log.Printf("recieve: %d,execute: %d", pool.ReceiveTotal, pool.ExecuteTotal)
}

执行任务

有了以上的基础,执行就比较简单了。

// Execute
//  @Description: 执行任务
//  @receiver pool
//  @param t
//  @return error
//
func (pool *GorotinesPool) Execute(t func()) error {
 if pool.status {
  select {
  case pool.tasks <- func() taskType {
   t()
   return normal
  }:
   atomic.AddInt32(&pool.ReceiveTotal, 1)
   return nil
  case <-time.After(pool.addTimeout):
   return errors.New("add tasks timeout")
  }
 } else {
  return errors.New("pools is down")
 }
}

自测

自测用例

func TestPool(t *testing.T) {
 pool := execute.GetPool(1000, 1, 200, 1)
 for i := 0; i < 3; i++ {
  pool.Execute(func() {
   log.Println(i)
   ftool.Sleep(1000)
  })
 }
 ftool.Sleep(3000)
 pool.Wait()
 log.Printf("T : %d", pool.ExecuteTotal)
 log.Printf("R : %d", pool.ReceiveTotal)
 log.Printf("max : %d", pool.Max)
 log.Printf("min : %d", pool.Min)
}

下面是自测结果,从39s两个输出可以看出当时实际运行的协程数已经超过1了,协程池自增策略生效了。

2023/06/23 17:21:38 3
2023/06/23 17:21:39 3
2023/06/23 17:21:39 3
2023/06/23 17:21:41 recieve: 3,execute: 3
2023/06/23 17:21:41 T : 3
2023/06/23 17:21:41 R : 3
2023/06/23 17:21:41 max : 1000
2023/06/23 17:21:41 min : 1
--- PASS: TestPool (3.00s)

本次分享结束,协程池自测之后效果很不错,后面会依据这个协程池的设计进行其他性能测试功能开发。

FunTester原创专题推荐~

  • 900原创合集
  • 2021年原创合集
  • 2022年原创合集
  • 接口功能测试专题
  • 性能测试专题,
  • Groovy专题
  • Java、Groovy、Go、Python
  • 单测&白盒
  • FunTester社群风采
  • 测试理论鸡汤
  • FunTester视频专题
  • 案例分享:方案、BUG、爬虫
  • UI自动化专题
  • 测试工具专题
  • -- By FunTester


  • 【留下美好印记】
    赞赏支持
登录 后发表评论
+ 关注

热门文章

    最新讲堂

      • 推荐阅读
      • 换一换
          • 测试报告是测试人员在测试过程中用于反映测试状况的文档,其重要性通过网上哀求、跪求、旋转360度冰天雪地各种求测试报告模块的帖子中就可见一斑。其实测试报告的内容基本都是模板的那些,只是在实际测试过程中,如何去整理内容结构,使得报告的通常阅读者:开发人员、测试经理、产品经理、项目负责人能够一目了然地查看想要了解的内容才是测试报告最值得注意的地方。产品要想有广阔的市场,得需要切实了解用户的需求及感受,同理测试报告要想能够让阅读者能够满意,也需要能将质量情况条理性地列出。通常来说,开发人员往往希望能从报告中了解缺陷的情况,而测试经理还关心用例的执行情况及覆盖率、项目责任人则最关心还有多少问题,此次版本...
            14 14 1664
            分享
          • 背景       阿里云RDS FOR MySQL(MySQL5.7版本)数据库业务表每月新增数据量超过千万,随着数据量持续增加,我们业务出现大表慢查询,在业务高峰期主业务表的慢查询需要几十秒严重影响业务方案概述一、数据库设计及索引优化       MySQL数据库本身高度灵活,造成性能不足,严重依赖开发人员的表设计能力以及索引优化能力,在这里给几点优化建议时间类型转化为时间戳格式,用int类型储存,建索引增加查询效率建议字段定义not null,null值很难查询优化且占用额外的索引空间使用TINYINT类型代替...
            11 11 5826
            分享
          •   据报道,从欧美到亚洲,从微软、谷歌,到亚马逊、SAP,全球科技巨头今年以来的裁员人数已达数以万计。  但出人意料的是,这些公司绝大多数依然在盈利。  根据金融服务公司Jefferies的一份调查:“裁员是因为在疫情期间招聘过度,而且增长预期低于之前的预测。”  随着美国的利率上升和通胀高企,消费者也在不确定的全球经济环境中削减开支。  因此,Jefferies分析师总结道,企业“需要减少人员,通过与目前需求趋势匹配的员工人数重获运营效率”。  由于利率上升抬高了资本成本,也迫使企业纷纷减少人员支出。  美国银行全球研究部门在研报中写道:“创业公司尤其如此,他们之前因为低成本的资本而大幅增加...
            0 0 940
            分享
          • 作为一个电商项目,势必会包含优惠券这一模块,今天就来分析一下它,这篇的测试分析主要是针对后台,移动端的稍后我会再单独整理一篇分析优惠券的测试点我主要从三方面入手:发布前、发布中、发布后;发布前:首先我们先分析一下这个页面,得到以下测试点:整体的优惠券分为三部分:(1) 优惠券管理—优惠券领取记录—优惠券使用记录:(2) 测试点:当鼠标滑过时 按钮是否发生状态改变(按钮常亮),(3) 测试点:当鼠标点击时 右侧是否联动相关页面优惠券管理:(按钮的滑过/悬停、点击时、点击后状态根据产品原型进行测试)(1) 测试点:点击控制台是否能够正确跳转至控制台页面(2) 测试点:刷新按钮 点击时下方优惠券列表...
            1 1 4450
            分享
          • 接口测试是测试系统组件间接口的一种测试。接口测试主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点。测试的重点是要检查数据的交换,传递和控制管理过程,以及系统间的相互逻辑依赖关系等。测试的策略:接口测试也是属于功能测试,所以跟我们以往的功能测试流程并没有太大区别,测试流程依旧是:评审测试接口文档(需求文档);根据接口文档编写测试用例(用例编写完全可以按照以往规则来编写,例如等价类划分,边界值等设计方法);执行测试,查看不同的参数请求,接口的返回的数据是否达到预期。那么设计测试用例时我们主要考虑如下几个方面:功能测试:接口的功能是否正确实现了;接口是否按照设计文档中来实现(比如user...
            12 12 2975
            分享
      • 51testing软件测试圈微信