GDAL
cpl_worker_thread_pool.h
Go to the documentation of this file.
1/**********************************************************************
2 * $Id: cpl_worker_thread_pool.h 1df63aa4d30bc389133c38822002d5ecfb91c314 2020-05-21 11:30:55 +0200 Even Rouault $
3 *
4 * Project: CPL - Common Portability Library
5 * Purpose: CPL worker thread pool
6 * Author: Even Rouault, <even dot rouault at spatialys dot com>
7 *
8 **********************************************************************
9 * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a
12 * copy of this software and associated documentation files (the "Software"),
13 * to deal in the Software without restriction, including without limitation
14 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
15 * and/or sell copies of the Software, and to permit persons to whom the
16 * Software is furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included
19 * in all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
27 * DEALINGS IN THE SOFTWARE.
28 ****************************************************************************/
29
30#ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_
31#define CPL_WORKER_THREAD_POOL_H_INCLUDED_
32
33#include "cpl_multiproc.h"
34#include "cpl_list.h"
35
36#include <condition_variable>
37#include <memory>
38#include <mutex>
39#include <vector>
40
47
48#ifndef DOXYGEN_SKIP
49struct CPLWorkerThreadJob;
51
52struct CPLWorkerThread
53{
54 CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread)
55 CPLWorkerThread() = default;
56
57 CPLThreadFunc pfnInitFunc = nullptr;
58 void *pInitData = nullptr;
59 CPLWorkerThreadPool *poTP = nullptr;
60 CPLJoinableThread *hThread = nullptr;
61 bool bMarkedAsWaiting = false;
62
63 std::mutex m_mutex{};
64 std::condition_variable m_cv{};
65};
66
67typedef enum
68{
69 CPLWTS_OK,
70 CPLWTS_STOP,
71 CPLWTS_ERROR
72} CPLWorkerThreadState;
73#endif // ndef DOXYGEN_SKIP
74
75class CPLJobQueue;
76
79{
81
82 std::vector<std::unique_ptr<CPLWorkerThread>> aWT{};
83 std::mutex m_mutex{};
84 std::condition_variable m_cv{};
85 volatile CPLWorkerThreadState eState = CPLWTS_OK;
86 CPLList* psJobQueue = nullptr;
87 volatile int nPendingJobs = 0;
88
89 CPLList* psWaitingWorkerThreadsList = nullptr;
90 int nWaitingWorkerThreads = 0;
91
92 static void WorkerThreadFunction(void* user_data);
93
94 void DeclareJobFinished();
95 CPLWorkerThreadJob* GetNextJob(CPLWorkerThread* psWorkerThread);
96
97 public:
100
101
102 bool Setup(int nThreads,
103 CPLThreadFunc pfnInitFunc,
104 void** pasInitData);
105 bool Setup(int nThreads,
106 CPLThreadFunc pfnInitFunc,
107 void** pasInitData,
108 bool bWaitallStarted);
109
110 std::unique_ptr<CPLJobQueue> CreateJobQueue();
111
112 bool SubmitJob(CPLThreadFunc pfnFunc, void* pData);
113 bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void*>& apData);
114 void WaitCompletion(int nMaxRemainingJobs = 0);
115 void WaitEvent();
116
118 int GetThreadCount() const { return static_cast<int>(aWT.size()); }
119};
120
122class CPL_DLL CPLJobQueue
123{
125 CPLWorkerThreadPool* m_poPool = nullptr;
126 std::mutex m_mutex{};
127 std::condition_variable m_cv{};
128 int m_nPendingJobs = 0;
129
130 static void JobQueueFunction(void*);
131 void DeclareJobFinished();
132
134protected:
135 friend class CPLWorkerThreadPool;
136 explicit CPLJobQueue(CPLWorkerThreadPool* poPool);
138
139public:
140 ~CPLJobQueue();
141
143 CPLWorkerThreadPool* GetPool() { return m_poPool; }
144
145 bool SubmitJob(CPLThreadFunc pfnFunc, void* pData);
146 void WaitCompletion(int nMaxRemainingJobs = 0);
147};
148
149#endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_
Job queue.
Definition cpl_worker_thread_pool.h:123
CPLWorkerThreadPool * GetPool()
Return the owning worker thread pool.
Definition cpl_worker_thread_pool.h:143
Pool of worker threads.
Definition cpl_worker_thread_pool.h:79
CPLWorkerThreadPool()
Instantiate a new pool of worker threads.
Definition cpl_worker_thread_pool.cpp:57
bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData)
Setup the pool.
Definition cpl_worker_thread_pool.cpp:346
int GetThreadCount() const
Return the number of threads setup.
Definition cpl_worker_thread_pool.h:118
std::unique_ptr< CPLJobQueue > CreateJobQueue()
Create a new job queue based on this worker thread pool.
Definition cpl_worker_thread_pool.cpp:491
bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector< void * > &apData)
Queue several jobs.
Definition cpl_worker_thread_pool.cpp:198
void WaitEvent()
Wait for completion of at least one job, if there are any remaining.
Definition cpl_worker_thread_pool.cpp:316
bool SubmitJob(CPLThreadFunc pfnFunc, void *pData)
Queue a new job.
Definition cpl_worker_thread_pool.cpp:130
void WaitCompletion(int nMaxRemainingJobs=0)
Wait for completion of part or whole jobs.
Definition cpl_worker_thread_pool.cpp:299
Simplest list implementation.
struct _CPLList CPLList
List element structure.
Definition cpl_list.h:48
#define CPL_DISALLOW_COPY_ASSIGN(ClassName)
Helper to remove the copy and assignment constructors so that the compiler will not generate the defa...
Definition cpl_port.h:955