Comment créer un planificateur de tâches multithread efficace en C ++?

Je voudrais créer un système de planificateur de tâches très efficace en C ++.

L’idée de base est la suivante:

class Task { public: virtual void run() = 0; }; class Scheduler { public: void add(Task &task, double delayToRun); }; 

Derrière Scheduler , il devrait y avoir un pool de threads de taille fixe, qui exécutent les tâches (je ne veux pas créer de thread pour chaque tâche). delayToRun signifie que la task n’est pas exécutée immédiatement, mais qu’elle delayToRun quelques secondes plus tard (à partir du point où elle a été ajoutée dans le Scheduler ).

( delayToRun signifie bien delayToRun une valeur “au moins”. Si le système est chargé, ou si nous demandons l’impossible au Planificateur, il ne pourra pas traiter notre requête. Mais il devrait faire de son mieux)

Et voici mon problème. Comment implémenter delayToRun fonctionnalité delayToRun ? J’essaie de résoudre ce problème avec l’utilisation de mutex et de variables de condition.

Je vois deux façons:

Avec fil de gestion

Le planificateur contient deux files d’attente: allTasksQueue et tasksReadyToRunQueue . Une tâche est ajoutée à allTasksQueue à Scheduler::add . Il existe un thread de gestion qui attend le moins de temps possible pour pouvoir placer une tâche de allTasksQueue à tasksReadyToRunQueue . Les threads de travail attendent une tâche disponible dans tasksReadyToRunQueue .

Si Scheduler::add ajoute une tâche devant allTasksQueue (une tâche dont la valeur est delayToRun pour qu’elle soit delayToRun avant la tâche d’exécution la plus delayToRun ), alors la tâche du gestionnaire doit être réveillée. mettre à jour l’heure d’attente.

Cette méthode peut être considérée comme inefficace car elle nécessite deux files d’attente et nécessite deux signaux condvar.signal pour exécuter une tâche (un pour allTasksQueue -> tasksReadyToRunQueue et un pour signaler à un thread de travail d’exécuter réellement la tâche)

Sans fil de gestion

Il y a une queue dans le planificateur. Une tâche est ajoutée dans cette queue à Scheduler::add . Un thread de travail vérifie la queue. S’il est vide, il attend sans contrainte de temps. S’il n’est pas vide, il attend la tâche la plus rapide.

  1. S’il n’y a qu’une seule variable de condition pour laquelle les threads de travail attendent: cette méthode peut être considérée comme inefficace, car si une tâche ajoutée devant la queue (front signifie, s’il y a N threads de travail, alors l’index de tâche <N) alors tous les threads de travail doivent être réveillés pour mettre à jour l’heure qu’ils attendent.

  2. S’il y a une variable de condition distincte pour chaque thread, alors nous pouvons contrôler quel thread doit se réveiller, donc dans ce cas nous n’avons pas besoin de réveiller tous les threads (il suffit de réveiller le thread qui a le plus de temps d’attente) , nous devons donc gérer cette valeur). Je pense actuellement à la mise en œuvre, mais la mise au sharepoints détails exacts est complexe. Existe-t-il des recommandations / reflections / documents sur cette méthode?


Y a-t-il une meilleure solution pour ce problème? J’essaie d’utiliser des fonctionnalités C ++ standard, mais je suis prêt à utiliser les outils dépendants de la plate-forme (ma plate-forme principale est Linux) (comme pthreads), ou même des outils spécifiques à Linux (comme les futex), s’ils offrent une meilleure solution.

Vous pouvez éviter à la fois d’avoir un thread “manager” séparé et de devoir réveiller un grand nombre de tâches lorsque la tâche suivante s’exécute, en utilisant une conception dans laquelle un seul thread de pool attend la tâche “next to run” (s’il en existe une) sur une variable de condition et les threads de pool restants attendent indéfiniment une seconde variable de condition.

Les threads de pool exécuteraient un pseudocode selon ces lignes:

 pthread_mutex_lock(&queue_lock); while (running) { if (head task is ready to run) { dequeue head task; if (task_thread == 1) pthread_cond_signal(&task_cv); else pthread_cond_signal(&queue_cv); pthread_mutex_unlock(&queue_lock); run dequeued task; pthread_mutex_lock(&queue_lock); } else if (!queue_empty && task_thread == 0) { task_thread = 1; pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run); task_thread = 0; } else { pthread_cond_wait(&queue_cv, &queue_lock); } } pthread_mutex_unlock(&queue_lock); 

Si vous modifiez la prochaine tâche à exécuter, vous exécutez alors:

 if (task_thread == 1) pthread_cond_signal(&task_cv); else pthread_cond_signal(&queue_cv); 

avec le queue_lock tenu.

Dans ce schéma, tous les wakeups se trouvent directement sur un seul thread, il n’y a qu’une seule queue de tâches prioritaire et aucun thread de gestionnaire n’est requirejs.

Votre spécification est un peu trop forte:

delayToRun signifie que la tâche n’est pas exécutée immédiatement, mais delayToRun secondes plus tard

Vous avez oublié d’append “au moins”:

  • La tâche n’est pas exécutée maintenant, mais au moins delayToRun quelques secondes plus tard

Le fait est que si dix mille tâches sont toutes programmées avec un délai de retard de 0.1 , elles ne pourront sûrement pas fonctionner en même temps.

Avec une telle correction, il vous suffit de maintenir une queue (ou un agenda) de (heure de début programmée, fermeture à exécuter), de conserver cette queue et de lancer N (un nombre fixe) de threads l’ordre du jour et le lancer.

alors tous les threads de travail doivent être réveillés pour mettre à jour l’heure qu’ils attendent.

Non, certains threads de travail seraient réveillés.

Lisez à propos des variables de condition et de diffusion.

Vous pouvez également utiliser les temporisateurs POSIX, voir timer_create (2) , ou fd timer spécifique à Linux, voir timerfd_create (2)

Vous éviterez probablement de lancer des appels système bloquants dans vos threads et de les gérer via une boucle d’événement (voir poll (2) …); sinon, si une centaine de tâches exécutent sleep(100) et qu’une tâche est programmée pour s’exécuter en une demi-seconde, elle ne s’exécutera pas avant cent secondes.

Vous voudrez peut-être en savoir plus sur la programmation de style de transmission continue (elle est très pertinente). Lisez l’ article sur Continuation Passing C de Juliusz Chroboczek.

Regardez aussi dans les threads Qt .

Vous pourriez également envisager de coder dans Go (avec ses Goroutines).

Ceci est un exemple d’implémentation de l’interface que vous avez fournie qui se rapproche le plus de la description de votre thread de gestion .

Il utilise un seul thread ( timer_thread ) pour gérer une queue ( allTasksQueue ) sortingée en fonction de l’heure réelle à laquelle une tâche doit être démarrée ( std::chrono::time_point ).
La queue est un std::priority_queue (qui conserve ses éléments clés time_point sortingés).

timer_thread est normalement suspendu jusqu’à ce que la tâche suivante soit démarrée ou lorsqu’une nouvelle tâche est ajoutée.
Lorsqu’une tâche est sur le point d’être exécutée, elle est placée dans tasksReadyToRunQueue , l’un des threads de travail est signalé, se réveille, le supprime de la queue et commence à traiter la tâche.

Notez que le pool de threads a une limite supérieure à la compilation pour le nombre de threads (40). Si vous planifiez plus de tâches que celles pouvant être envoyées aux travailleurs, la nouvelle tâche sera bloquée jusqu’à ce que les threads soient à nouveau disponibles.

Vous avez dit que cette approche n’est pas efficace, mais dans l’ensemble, cela me semble raisonnablement efficace. Tout est piloté par les événements et vous ne gaspillez pas les cycles du processeur en tournant inutilement. Bien sûr, ce n’est qu’un exemple, les optimisations sont possibles (note: std::multimap a été remplacé par std::priority_queue ).

L’implémentation est conforme à C ++ 11

 #include  #include  #include  #include  #include  #include  #include  #include  #include  class Task { public: virtual void run() = 0; virtual ~Task() { } }; class Scheduler { public: Scheduler(); ~Scheduler(); void add(Task &task, double delayToRun); private: using timepoint = std::chrono::time_point; struct key { timepoint tp; Task *taskp; }; struct TScomp { bool operator()(const key &a, const key &b) const { return a.tp > b.tp; } }; const int ThreadPoolSize = 40; std::vector ThreadPool; std::vector tasksReadyToRunQueue; std::priority_queue, TScomp> allTasksQueue; std::thread TimerThr; std::mutex TimerMtx, WorkerMtx; std::condition_variable TimerCV, WorkerCV; bool WorkerIsRunning = true; bool TimerIsRunning = true; void worker_thread(); void timer_thread(); }; Scheduler::Scheduler() { for (int i = 0; i  lck{TimerMtx}; TimerIsRunning = false; TimerCV.notify_one(); } TimerThr.join(); { std::lock_guard lck{WorkerMtx}; WorkerIsRunning = false; WorkerCV.notify_all(); } for (auto &t : ThreadPool) t.join(); } void Scheduler::add(Task &task, double delayToRun) { auto now = std::chrono::steady_clock::now(); long delay_ms = delayToRun * 1000; std::chrono::milliseconds duration (delay_ms); timepoint tp = now + duration; if (now >= tp) { /* * This is a short-cut * When time is due, the task is directly dispatched to the workers */ std::lock_guard lck{WorkerMtx}; tasksReadyToRunQueue.push_back(&task); WorkerCV.notify_one(); } else { std::lock_guard lck{TimerMtx}; allTasksQueue.push({tp, &task}); TimerCV.notify_one(); } } void Scheduler::worker_thread() { for (;;) { std::unique_lock lck{WorkerMtx}; WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 || !WorkerIsRunning; } ); if (!WorkerIsRunning) break; Task *p = tasksReadyToRunQueue.back(); tasksReadyToRunQueue.pop_back(); lck.unlock(); p->run(); delete p; // delete Task } } void Scheduler::timer_thread() { for (;;) { std::unique_lock lck{TimerMtx}; if (!TimerIsRunning) break; auto duration = std::chrono::nanoseconds(1000000000); if (allTasksQueue.size() != 0) { auto now = std::chrono::steady_clock::now(); auto head = allTasksQueue.top(); Task *p = head.taskp; duration = head.tp - now; if (now >= head.tp) { /* * A Task is due, pass to worker threads */ std::unique_lock ulck{WorkerMtx}; tasksReadyToRunQueue.push_back(p); WorkerCV.notify_one(); ulck.unlock(); allTasksQueue.pop(); } } TimerCV.wait_for(lck, duration); } } /* * End sample implementation */ class DemoTask : public Task { int n; public: DemoTask(int n=0) : n{n} { } void run() override { std::cout << "Start task " << n << std::endl;; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << " Stop task " << n << std::endl;; } }; int main() { Scheduler sched; Task *t0 = new DemoTask{0}; Task *t1 = new DemoTask{1}; Task *t2 = new DemoTask{2}; Task *t3 = new DemoTask{3}; Task *t4 = new DemoTask{4}; Task *t5 = new DemoTask{5}; sched.add(*t0, 7.313); sched.add(*t1, 2.213); sched.add(*t2, 0.713); sched.add(*t3, 1.243); sched.add(*t4, 0.913); sched.add(*t5, 3.313); std::this_thread::sleep_for(std::chrono::seconds(10)); } 

Cela signifie que vous souhaitez exécuter toutes les tâches en continu en utilisant un certain ordre.

Vous pouvez créer un type de sorting par une stack de retard (ou même une liste liée) de tâches. Lorsqu’une nouvelle tâche arrive, vous devez l’insérer dans la position en fonction d’un délai (calculez simplement cette position et insérez efficacement la nouvelle tâche).

Exécutez toutes les tâches en commençant par la tête de la stack de tâches (ou la liste).

Code de base pour C ++ 11:

 #include  #include  #include  #include  #include  using namespace std::chrono; using namespace std; class Task { public: virtual void run() = 0; }; template::value>> class SchedulerItem { public: T task; time_point startTime; int delay; SchedulerItem(T t, time_point s, int d) : task(t), startTime(s), delay(d){} }; template::value>> class Scheduler { public: queue> pool; mutex mtx; atomic running; Scheduler() : running(false){} void add(T task, double delayMsToRun) { lock_guard lock(mtx); pool.push(SchedulerItem(task, high_resolution_clock::now(), delayMsToRun)); if (running == false) runNext(); } void runNext(void) { running = true; auto th = [this]() { mtx.lock(); auto item = pool.front(); pool.pop(); mtx.unlock(); auto remaining = (item.startTime + milliseconds(item.delay)) - high_resolution_clock::now(); if(remaining.count() > 0) this_thread::sleep_for(remaining); item.task.run(); if(pool.size() > 0) runNext(); else running = false; }; thread t(th); t.detach(); } }; 

Code de test:

 class MyTask : Task { public: virtual void run() override { printf("mytask \n"); }; }; int main() { Scheduler s; s.add(MyTask(), 0); s.add(MyTask(), 2000); s.add(MyTask(), 2500); s.add(MyTask(), 6000); std::this_thread::sleep_for(std::chrono::seconds(10)); }