Back

C++线程池

此文介绍了C++线程池,包含线程管理,线程数根据任务数动态增减.

C++线程池

任务类

#pragma once
#include <mutex>
#include <queue>

// 定义任务结构体
using callback = void (*)(void *);
struct Task {
    Task() {
        this->function = nullptr;
        this->arg = nullptr;
    }
    Task(callback f, void *arg) {
        this->function = f;
        this->arg = arg;
    }
    callback function;
    void *arg;
};

class TaskQueue {
public:
    TaskQueue();

    ~TaskQueue();

    // 添加任务
    void addTask(Task task);
    void addTask(callback f, void *arg);

    // 取出一个任务
    Task getTask();

    // 判断队列任务是否为空
    inline bool empty() { return m_taskQ.empty(); }

    // 当前任务个数
    inline int gettaskNumber() { return m_taskQ.size(); }

private:
    std::queue<Task> m_taskQ;
    std::mutex m_mutex;
};
#include "TaskQueue.h"
#include <mutex>

void TaskQueue::addTask(Task task) {
    std::unique_lock<std::mutex> lk(m_mutex);
    m_taskQ.push(task);
}

void TaskQueue::addTask(callback f, void *arg) {
    std::unique_lock<std::mutex> lk(m_mutex);
    m_taskQ.push(Task(f, arg));
}

Task TaskQueue::getTask() {
    Task task;
    if (!m_taskQ.empty()) {
        std::unique_lock<std::mutex> lk(m_mutex);
        task = m_taskQ.front();
        m_taskQ.pop();
    }
    return task;
}

TaskQueue::TaskQueue() {}

TaskQueue::~TaskQueue() {}

线程池类

#pragma once
#include "TaskQueue.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>

class ThreadPool {
public:
    // 创建线程池并初始化
    ThreadPool(int min, int max);

    // 给线程池添加任务
    void addTask(Task task);

    // 获取线程池中工作的线程的个数
    int getBusyNumber();

    // 获取线程池中活着的线程的个数
    int getAliveNumber();

    // static ThreadPool *instance();
    // static ThreadPool &Instance();

    // 销毁线程池
    ~ThreadPool();

    ThreadPool();
private:
    //////////////////////
    // 工作的线程(消费者线程)任务函数
    static void *worker(void *arg);
    // 管理者线程任务函数
    static void *manager(void *arg);

private:
    // 任务队列
    TaskQueue *taskQ;

    int minNum;              // 最小线程数量
    int maxNum;              // 最大线程数量
    int busyNum;             // 忙的线程的个数
    int liveNum;             // 存活的线程的个数
    std::atomic_int exitNum; // 要销毁的线程个数
    std::condition_variable m_notEmpty;
    std::mutex m_mutex;
    static const int NUMBER = 2;

    int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
#include "ThreadPool.h"
#include <algorithm>
#include <chrono>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>

// 创建线程池并初始化
ThreadPool::ThreadPool(int min, int max) {
    do {
        taskQ = new TaskQueue;
        if (nullptr == taskQ) {
            std::cout << "malloc taskQ fail..." << std::endl;
            break;
        }

        minNum = min;
        maxNum = max;
        busyNum = 0;
        liveNum = min; // 和最小个数相等
        exitNum = 0;
        shutdown = false;

        // 管理者线程
        std::thread manager_thread(manager, this);
        std::cout << "manager_thread created id: " << manager_thread.get_id()
            << '\n';
        manager_thread.detach();

        // 工作者线程
        for (int i = 0; i < min; ++i) {
            std::thread worker_thread(worker, this);
            std::cout << "worker_thread created " << worker_thread.get_id() << '\n';
            worker_thread.detach();
        }

        return;
    } while (0);

    // 释放资源
    if (taskQ) {
        delete taskQ;
        taskQ = nullptr;
    }
}

ThreadPool::~ThreadPool() {

    // 关闭线程池
    shutdown = true;
    // 唤醒阻塞的线程
    for (int i = 0; i < liveNum; ++i) {
        m_notEmpty.notify_all();
    }
    // 释放堆内存
    if (taskQ) {
        delete taskQ;
        taskQ = nullptr;
    }
}

void ThreadPool::addTask(Task task) {
    if (shutdown) {
        return;
    }
    // 添加任务
    taskQ->addTask(task);

    // 唤醒一个工作者线程
    m_notEmpty.notify_one();
}

int ThreadPool::getBusyNumber() {
    std::unique_lock<std::mutex> lk(m_mutex);
    int busyNum_1 = this->busyNum;
    return busyNum_1;
}

int ThreadPool::getAliveNumber() {
    std::unique_lock<std::mutex> lk(m_mutex);

    int aliveNum = this->liveNum;
    return aliveNum;
}

void *ThreadPool::worker(void *arg) {
    ThreadPool *pool = static_cast<ThreadPool *>(arg);
    while (true) {
        std::unique_lock<std::mutex> lk(pool->m_mutex);
        // 当前任务队列是否为空
        if (pool->taskQ->gettaskNumber() == 0 && !pool->shutdown) {
            // 阻塞工作线程
            pool->m_notEmpty.wait(lk, [&] {
                return pool->exitNum || pool->shutdown || !pool->taskQ->empty();
            });
        }

        lk.unlock();

        // 判断线程池是否被关闭了
        if (pool->shutdown || pool->exitNum > 0) {
            pool->exitNum--;
            std::cout << "thread " << std::this_thread::get_id() << " exiting..."
                << '\n';
            break;
        }

        // 从任务队列中取出一个任务
        auto task = std::move(pool->taskQ->getTask());
        // 解锁
        lk.lock();
        pool->busyNum++;
        lk.unlock();
        std::cout << "thread " << std::this_thread::get_id() << " start working..."
            << '\n';

        // 消费任务
        task.function(task.arg);
        delete task.arg;
        task.arg = nullptr;

        lk.lock();
        pool->busyNum--;
        lk.unlock();
        std::cout << "thread " << std::this_thread::get_id() << " end working..."
            << '\n';
    }
    return nullptr;
}

void *ThreadPool::manager(void *arg) {
    ThreadPool *pool = static_cast<ThreadPool *>(arg);
    while (!pool->shutdown) {
        // 每隔3s检测一次
        std::this_thread::sleep_for(std::chrono::seconds(1));

        std::unique_lock<std::mutex> lk(pool->m_mutex);
        // 取出线程池中任务的数量和当前线程的数量
        int queueSize = pool->taskQ->gettaskNumber();

        // 取出忙的线程的数量
        int busyNum = pool->busyNum;
        int liveNum = pool->liveNum;
        lk.unlock();
        // 添加线程
        // 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
        if (queueSize > liveNum && liveNum < pool->maxNum) {
            lk.lock();
            int counter = 0;
            for (int i = 0;
                 i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum;
                 ++i) {
                std::thread worker_thread(worker, pool);
                std::cout << "worker_thread created " << worker_thread.get_id() << '\n';
                worker_thread.detach();
                pool->liveNum++;
                counter++;
            }
            lk.unlock();
        }

        // 销毁线程
        // 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
        if (busyNum * 2 < liveNum && liveNum > pool->minNum) {
            pool->exitNum = NUMBER;
            // 让工作的线程自杀
            for (int i = 0; i < NUMBER; ++i) {
                pool->m_notEmpty.notify_one();
            }
        }
    }
    std::cout << "manager_thread exiting..." << '\n';
    return nullptr;
}

测试

/*************************************************************************
  > File Name: main.cpp
  > Author: txt1994
  > Mail: txt1994s@163.com
  > Created Time: Mon 08 Nov 2021 05:58:32 PM CST
 ************************************************************************/
#include "ThreadPool.h"
#include <chrono>
#include <iostream>
#include <memory>
#include <pthread.h>
#include <stdio.h>
#include <thread>
#include <unistd.h>
#include <utility>

void taskFunc(void *arg) {
    int num = *(int *)arg;
    std::cout << "thread id = " << std::this_thread::get_id()
        << " number = " << num << '\n';
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

int main(void) {
    // 初始化线程池
    ThreadPool *pool = new ThreadPool(4, 10);

    for (int i = 0; i < 100; ++i) {

        int *num = new int;
        *num = i + 100;

        Task task;
        task.arg = num;
        task.function = taskFunc;
        pool->addTask(task);
    }

    std::this_thread::sleep_for(std::chrono::seconds(30));
    delete pool;
    return 0;
}
Licensed under CC BY-NC-SA 4.0