+#include "Scheduler.h"
+
+typedef struct {
+ Scheduler *scheduler;
+ ProcessWrapper *process;
+} ProcessArgs;
+
+
+void* runProcessWrapperWrapper(void* arg) {
+ ProcessArgs *args = (ProcessArgs*)arg;
+ (args->scheduler)->runProcessWrapper(args->process);
+ delete args;
+ return 0;
+}
+
+
+void *scheduleWrapper(void *arg) {
+ ((Scheduler*)arg)->schedule();
+ return 0;
+}
+
+
+Scheduler::Scheduler() {
+ //std::cout << "Create Scheduler." << std::endl;
+ _listsMutex = new Mutex();
+ _listsCondition = new Condition(_listsMutex);
+ _mapsMutex = new Mutex();
+ _mapsCondition = new Condition(_mapsMutex);
+ _processMap = new std::map<ProcessWrapper*, pthread_t* >();
+ _notificationEventMap = new std::map<ProcessWrapper*, Event* >();
+ _scheduleList = new std::list<ProcessWrapper* >();
+ _detachList = new std::list<ProcessWrapper* >();
+ _stopScheduler = false;
+ _allStarted = false;
+}
+
+
+Scheduler::~Scheduler() {
+ delete _listsCondition;
+ delete _listsMutex;
+ delete _mapsCondition;
+ delete _mapsMutex;
+ delete _processMap;
+ delete _notificationEventMap;
+ delete _scheduleList;
+ delete _detachList;
+}
+
+
+void Scheduler::registerProcess(ProcessWrapper *process) {
+ pthread_t *newThread = new pthread_t;
+ _processMap->insert(std::pair<ProcessWrapper*, pthread_t* >
+ (process, newThread));
+ Event *newNotificationEvent =
+ new Event("notificationEvent for " + std::string(process->getName()));
+ _notificationEventMap->insert(std::pair<ProcessWrapper*, Event* >
+ (process, newNotificationEvent));
+ process->initialize();
+}
+
+
+void Scheduler::run() {
+ _mapsMutex->lock();
+
+ pthread_t scheduleThread;
+ if (pthread_create(&scheduleThread, NULL, scheduleWrapper, this)) {
+ std::cout << "Error: Could not start scheduler thread."
+ << std::endl;
+ std::cout << "Exit." << std::endl;
+ exit(1);
+ }
+ std::map<ProcessWrapper*, pthread_t* >::iterator iterator;
+ for (iterator = _processMap->begin();
+ iterator != _processMap->end();
+ iterator++) {
+
+ pthread_t *thread = (*iterator).second;
+
+ pthread_attr_t attributes;
+ pthread_attr_init(&attributes);
+ pthread_attr_setstacksize(&attributes, 131072);
+
+ ProcessArgs *args = new ProcessArgs;
+ args->scheduler = this;
+ args->process = (*iterator).first;
+ //std::cout << "Starting process "
+ // << ((ProcessWrapper*)args->process)->getName()
+ // << "." << std::endl;
+ if (pthread_create(thread, &attributes, runProcessWrapperWrapper, args)) {
+ std::cout << "Error: Could not start thread for process "
+ << ((ProcessWrapper*)(args->process))->getName()
+ << "." << std::endl;
+ std::cout << "Exit." << std::endl;
+ exit(1);
+ }
+ pthread_attr_destroy(&attributes);
+
+ //std::cout << "Started process "
+ // << ((ProcessWrapper*)(args->process))->getName()
+ // << "." << std::endl;
+ }
+ _mapsMutex->unlock();
+
+ //std::cout << "Started all registered processes." << std::endl;
+ _allStarted = true;
+ _listsCondition->notify();
+ //std::cout << "Wait until scheduler has stopped." << std::endl;
+
+ //wait until all processes have been detached
+ pthread_join(scheduleThread, 0);
+
+ //std::cout << "Scheduler has stopped." << std::endl;
+}
+
+
+void Scheduler::runProcessWrapper(ProcessWrapper *process) {
+ Event *_notificationEvent = (*_notificationEventMap)[process];
+
+ while (!process->isDetached()) {
+ _listsMutex->lock();
+ _scheduleList->push_back(process);
+ _listsCondition->notify();
+ _listsMutex->unlock();
+ _notificationEvent->wait();
+ process->fire();
+ }
+
+ _listsMutex->lock();
+ _detachList->push_back(process);
+ _listsMutex->unlock();
+ _listsCondition->notify();
+}
+
+
+void Scheduler::detachProcess(ProcessWrapper *process) {
+ _mapsMutex->lock();
+ pthread_t *threadToKill = (*_processMap)[process];
+ pthread_join(*threadToKill, 0);
+ delete threadToKill;
+ _processMap->erase(process);
+
+ Event *notificationEventToKill = (*_notificationEventMap)[process];
+ delete notificationEventToKill;
+ _notificationEventMap->erase(process);
+
+ if (_processMap->empty()) {
+ //std::cout << "No processes left in process map. Terminate."
+ // << std::endl;
+ _stopScheduler = true;
+ }
+ _mapsMutex->unlock();
+}
+
+
+void Scheduler::schedule() {
+ _listsMutex->lock();
+
+ while(!_stopScheduler) {
+ _listsCondition->wait();
+
+ if (_allStarted) {
+ std::list<ProcessWrapper* >::iterator listIterator;
+ for (listIterator = _detachList->begin();
+ listIterator != _detachList->end();
+ listIterator++) {
+ ProcessWrapper *process = (*listIterator);
+ detachProcess(process);
+ //std::cout << "Scheduler detached process "
+ // << process->getName() << "." << std::endl;
+ }
+ _detachList->clear();
+
+ for (listIterator = _scheduleList->begin();
+ listIterator != _scheduleList->end();
+ listIterator++) {
+ ProcessWrapper *process = (*listIterator);
+ ((Event*)(*_notificationEventMap)[process])->notifyAfterWait();
+ }
+ _scheduleList->clear();
+ }
+ }
+
+ _listsMutex->unlock();
+ //std::cout << "Stopped scheduler." << std::endl;
+}