24 #ifndef LIBTHREADAR_RATELIER_GATHER_HPP
25 #define LIBTHREADAR_RATELIER_GATHER_HPP
116 void worker_push_one(
unsigned int slot, std::unique_ptr<T> & one,
signed int flag = 0);
124 void gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag);
131 static const unsigned int cond_pending_data = 0;
132 static const unsigned int cond_full = 1;
136 std::unique_ptr<T> obj;
141 slot(
signed int val) { empty =
true; flag = val; };
142 slot(
const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
145 unsigned int next_index;
146 std::vector<slot> table;
147 std::map<unsigned int, unsigned int> corres;
148 std::deque<unsigned int> empty_slot;
153 table(size, slot(flag)),
158 for(
unsigned int i = 0; i < size; ++i)
159 empty_slot.push_back(i);
168 while(empty_slot.empty()
169 || ((empty_slot.size() == 1 && slot != next_index)
170 && corres.begin() != corres.end() && (corres.begin())->first != next_index))
171 verrou.wait(cond_full);
173 std::map<unsigned int, unsigned int>::iterator it = corres.find(slot);
176 if(it != corres.end())
177 throw exception_range(
"the ratelier_gather index to fill is already used");
179 index = empty_slot.back();
183 if(index >= table.size())
185 if( ! table[index].empty)
190 corres[slot] = index;
191 table[index].obj = std::move(one);
192 table[index].empty =
false;
193 table[index].index = slot;
194 table[index].flag = flag;
196 empty_slot.pop_back();
198 if(verrou.get_waiting_thread_count(cond_pending_data) > 0)
199 if(corres.find(next_index) != corres.end())
200 verrou.signal(cond_pending_data);
205 verrou.broadcast(cond_pending_data);
206 verrou.broadcast(cond_full);
220 std::map<unsigned int, unsigned int>::iterator it;
221 std::map<unsigned int, unsigned int>::iterator tmp;
227 while(it != corres.end())
229 if(it->first > next_index)
232 if(it->first == next_index)
237 if(it->second >= table.size())
239 if(table[it->second].index != next_index)
241 if(table[it->second].empty)
243 if( ! table[it->second].obj)
248 ones.push_back(std::move(table[it->second].obj));
249 flag.push_back(table[it->second].flag);
251 table[it->second].empty =
true;
252 empty_slot.push_back(it->second);
263 verrou.wait(cond_pending_data);
267 if(verrou.get_waiting_thread_count(cond_full) > 0)
268 verrou.broadcast(cond_full);
273 verrou.broadcast(cond_pending_data);
274 verrou.broadcast(cond_full);
279 if(ones.size() != flag.size())
285 unsigned int size = table.size();
290 for(
unsigned int i = 0; i < size; ++i)
292 table[i].obj.reset();
293 table[i].empty =
true;
294 empty_slot.push_back(i);
298 verrou.broadcast(cond_pending_data);
299 verrou.broadcast(cond_full);
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
defines the mutex C++ class
void worker_push_one(unsigned int slot, std::unique_ptr< T > &one, signed int flag=0)
provides to a worker thread a mean to given data with its associated index to a gathering thread ...
Wrapper around the Posix pthread_cond_t object and its associated mutex.
void gather(std::deque< std::unique_ptr< T > > &ones, std::deque< signed int > &flag)
obtain the lowest continuous filled slots of the ratelier_gather and free them
void reset()
reset the object in its prestine state
the class ratelier_gather has a fixed length range of slots of arbitrary defined object type ...
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Exception used to report out or range value or argument.