-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessage_queue.cpp.save
More file actions
101 lines (80 loc) · 1.88 KB
/
message_queue.cpp.save
File metadata and controls
101 lines (80 loc) · 1.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#include <functional>
#include <condition_variable>
#include <iostream>
#include <thread>
#include <vector>
#include <unordered_map>
#include <future>
#include <queue>
#include <mutex>
#include <semaphore.h>
#include <pthread.h>
template<class ReturnType,class Ident, class... Arguments>
class message_queue
{
private:
std::thread executing;
std::queue<std::tuple<std::function<ReturnType(Arguments...)>,Ident, Arguments...>> job_queue
std::unordered_map<Ident,ReturnType> return_values;
std::unordered_map<Ident,bool> added;
std::mutex retrieval_lock;
// std::condition_variable retrieval_cv;
sem_t queue_count;
bool is_done;
public:
ReturnType get(Ident key)
{
while(added.count(key) == 0){}
retrieval_lock.lock();
if(added.count(key) == 0)
{
retrieval_lock.unlock();
throw 1;
}
ReturnType r1 = return_values[key];
return_values.erase(key);
added.erase(key);
retrieval_lock.unlock();
return r1;
}
bool hasFinished(Ident key)
{
return added.count(key) > 0;
}
message_queue()
{
sem_init(&queue_count,0,0);
is_done = false;
executing = std::thread([&]()
{
while(true)
{
sem_wait(&queue_count);
// while(job_queue.size() > 0)
// {
auto nxt_func = job_queue.front();
job_queue.pop();
return_values[std::get<1>(nxt_func)] = std::async(std::launch::async, std::get<0>(nxt_func),std::get<2>(nxt_func)).get();
added[std::get<1>(nxt_func)] = true;
// }
}
});
}
void add_message(std::function<ReturnType(Arguments...)> ftn,Ident key, Arguments... args)
{
job_queue.push(std::make_tuple(ftn,key,args...));
sem_post(&queue_count);
}
int jobsLeft()
{
return job_queue.size();
}
~message_queue()
{
isDone = true;
added.clear();
return_values.clear();
if(executing.joinable())
executing.join();
}
};