Marine systems simulation
AsynchronousTask.h
1#pragma once
2
3#include <condition_variable>
4#include <functional>
5#include <mutex>
6#include <queue>
7#include <thread>
8
9#ifdef __linux__
10 #include <sched.h>
11#endif
12namespace CoRiBoDynamics
13{
14 // Create a pool of threads, each bound to one logical or physical core
16 public:
17 /*
18 To execute a task under the CoreBoundThreadPool, create a subclass of Task that implements the Execute function and add it to the tread pool.
19 */
20 class Task {
21 public:
22 virtual ~Task(){};
23 virtual void Execute() = 0;
24 };
25
26 /*
27 pool_size: number of threads in the pool
28 start_ix: core index of the first core that should bind to the first thread
29 hyperthreading_core_merge: if false, each thread binds to consecutive logical cores. if true, it overrides hyperthreading, puts one thread per physical core and allows the thread to bind to either logical core. If CPU is not hyperthreaded, this option should probably not be used.
30 */
31 CoreBoundThreadPool(int pool_size, int start_ix, bool hyperthreading_core_merge);
32 void ExecuteTask(Task* task, int thread_id); // add task to the private queue of thread_id
33 void ExecuteTask(Task* task); // add task to the shared queue
34 void ExecuteTaskSet(std::function<Task*(int)> task_set, int num);
35 void ExecuteTaskSet(std::function<Task*(int)> task_set, std::function<int(int)> thread_id, int num);
36 void BlockUntilCompletion(int thread_id); // wait until thread_id is idle.
37 void BlockUntilCompletion(); // wait until all currently added tasks have completed. may wait for tasks added later, but no guaranties for this
38 virtual ~CoreBoundThreadPool();
39 int PoolSize();
40
41 int smart_index(int ix);
42
43 protected:
52 public:
54 m_sub_queue = nullptr;
55 }
56
57 explicit mutex_queue(mutex_queue* sub_queue){
58 m_sub_queue = sub_queue;
59 }
60
62 void push(Task* t);
63
66
69
71 bool isEmpty();
72
73 private:
74 std::mutex m_mutex;
75 //std::queue<Task*> m_queue;
76 std::vector<Task*> m_queue;
77 mutex_queue* m_sub_queue;
78 };
79
80
82 class WorkUnit {
83 public:
84
86 WorkUnit(size_t cpu_mask, mutex_queue* sub_queue);
87
90
93
95 void AddToQueue(Task* t);
96
99
100 protected:
101 bool m_stay_in_loop;
102 std::thread m_thread;
103 std::condition_variable m_internal_barrier;
104 std::condition_variable m_external_barrier;
105 std::mutex m_mutex;
106 mutex_queue m_queue;
107 enum thread_state {IDLE, EXECUTING} m_state;
108
111
113 void setCpuMask(size_t cpu_mask);
114 };
115 std::vector<WorkUnit*> m_WorkUnit;
116
117 bool m_DestructorCalled;
118 std::mutex m_mutex;
119
120 mutex_queue m_shared_queue;
121
122 bool m_hyperthreading_core_merge;
123 };
124
125}
Definition: AsynchronousTask.h:20
Core bound thread unit with a task queue.
Definition: AsynchronousTask.h:82
void setCpuMask(size_t cpu_mask)
bind thread to the cpu-cores specified by cpu_mask
void ThreadLoop()
"semi-infinite" loop. pop task from queue and execute. if no task, go idle until new task is added.
void TerminateUnit()
terminates thread work loop, empties queue, and joins thread.
void AddToQueue(Task *t)
add this task to the task queue and return. Tasks are executed asynchronously in a strict FIFO order.
WorkUnit(size_t cpu_mask, mutex_queue *sub_queue)
initialize thread and bind to the cores specified by cpu_mask
void NudgeThread()
wakes thread from idle
void WaitUntilIdle()
blocks until thread is idle
if task are ordered roughly according to decreasing size, smart_index will return a 'good' core alloc...
Definition: AsynchronousTask.h:51
bool isEmpty()
threadsafe: is the queue (including sub_queues) empty?
void KillAllTasks()
threadsafe: remove all tasks from queue
Task * pop()
threadsafe: pop task from queue
void push(Task *t)
threadsafe: push task to queue
Definition: AsynchronousTask.h:15
Definition: CollisionManager.h:6