-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThreadPool.h
More file actions
123 lines (101 loc) · 2.83 KB
/
ThreadPool.h
File metadata and controls
123 lines (101 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#ifndef _ThreadPool_H
#define _ThreadPool_H
//
// Copyright (c) 2002 by Ted T. Yuan.
//
// Permission is granted to use this code without restriction as long as this copyright notice appears in all source files.
//
/*
// usage:
class DerivedRunnable
{
void operator()()
{
// do what you want to do
// then delete self, which is on heap
delete this;
}
};
example
{
RunPool<DerivedRunnable> pool(poolSize);
ThreadPool<DerivedRunnable> thpool(pool,
poolSize > 256 ? 256 : poolSize);
boost::thread thrd(thpool);
while(true)
{
// create an object of the derived class (DerivedRunnable) from Runnable that implements
// virtual void operator()()
// for real application logic
DerivedRunnable drun = new DerivedRunnable(...);
thpool.execute(drun);
}
thrd.join();
}
*/
#define HAS_PUTTABLETAKABLE
#include <ProCon.h>
#ifndef SPACE_YIN
#define SPACE_YIN yin
#endif
namespace SPACE_YIN {
// _Runnable is boost::function0<void> or its derivation, or any class that implements void operator()()
#define RunnablePtr _Runnable *
template < typename _Runnable >
struct RunPool : SPACE_YIN::Pool<RunnablePtr>
{
RunPool(size_t limit = (size_t)-1)
: SPACE_YIN::Pool<RunnablePtr>(limit) {}
};
template < typename _Runnable >
struct Runner;
#define RunnerBase SPACE_YIN::Consumer<RunnablePtr, RunPool<_Runnable> >
template < typename _Runnable >
struct Runner : public RunnerBase
{
typedef SPACE_YIN::Takable<RunnablePtr, RunPool<_Runnable> > TakableRunPool;
Runner(TakableRunPool& runners,
SPACE_YIN::Latch& lh)
: RunnerBase(runners, lh) {}
Runner(RunPool<_Runnable>& runners,
SPACE_YIN::Latch& lh)
: RunnerBase(runners, lh) {}
protected:
void consume(RunnablePtr runner)
{
try {
runner->operator()();
} catch (...) {
Logger::log("ThreadPool::consume exception");
}
}
bool cancel() {
return !channel_.channel_.size();
}
void starting() {
}
void started() {
}
void done() {
}
};
#define ThreadPoolBase SPACE_YIN::Consuming<RunnablePtr, RunPool<_Runnable>, Runner<_Runnable> >
template < typename _Runnable >
struct ThreadPool : public ThreadPoolBase
{
bool bStoppableThreadPool; // in most server pool cases bStoppableThreadPool is default to false, meaning it runs forever...
// however, in a finite pre-defined task queue case, one would set the flag to make sure proper consumer stop
ThreadPool(RunPool<_Runnable>& channel, size_t nThreads = 1, bool bCanStop = false)
: ThreadPoolBase (channel, nThreads, true, true),
bStoppableThreadPool(bCanStop) {}
void consumerModelCreated(Runner<_Runnable>& consumer)
{
consumer.mayStop(bStoppableThreadPool); //false); // see above comment
}
void execute(_Runnable*& runObj)
{
channel_.offer(runObj);
}
};
}
#endif