dol: initial dol commit
[jump.git] / dol / src / dol / visitor / PipeAndFilter / lib / Scheduler.cpp
diff --git a/dol/src/dol/visitor/PipeAndFilter/lib/Scheduler.cpp b/dol/src/dol/visitor/PipeAndFilter/lib/Scheduler.cpp
new file mode 100644 (file)
index 0000000..260729a
--- /dev/null
@@ -0,0 +1,185 @@
+#include "Scheduler.h"
+
+typedef struct {
+    Scheduler *scheduler;
+    ProcessWrapper *process;
+} ProcessArgs;
+
+
+void* runProcessWrapperWrapper(void* arg) {
+    ProcessArgs *args = (ProcessArgs*)arg;
+    (args->scheduler)->runProcessWrapper(args->process);
+    delete args;
+    return 0;
+}
+
+
+void *scheduleWrapper(void *arg) {
+    ((Scheduler*)arg)->schedule();
+    return 0;
+}
+
+
+Scheduler::Scheduler() {
+    //std::cout << "Create Scheduler." << std::endl;
+    _listsMutex = new Mutex();
+    _listsCondition = new Condition(_listsMutex);
+    _mapsMutex = new Mutex();
+    _mapsCondition = new Condition(_mapsMutex);
+    _processMap = new std::map<ProcessWrapper*, pthread_t* >();
+    _notificationEventMap = new std::map<ProcessWrapper*, Event* >();
+    _scheduleList = new std::list<ProcessWrapper* >();
+    _detachList = new std::list<ProcessWrapper* >();
+    _stopScheduler = false;
+    _allStarted = false;
+}
+
+
+Scheduler::~Scheduler() {
+    delete _listsCondition;
+    delete _listsMutex;
+    delete _mapsCondition;
+    delete _mapsMutex;
+    delete _processMap;
+    delete _notificationEventMap;
+    delete _scheduleList;
+    delete _detachList;
+}
+
+
+void Scheduler::registerProcess(ProcessWrapper *process) {
+    pthread_t *newThread = new pthread_t;
+    _processMap->insert(std::pair<ProcessWrapper*, pthread_t* >
+            (process, newThread));
+    Event *newNotificationEvent =
+            new Event("notificationEvent for " + std::string(process->getName()));
+    _notificationEventMap->insert(std::pair<ProcessWrapper*, Event* >
+            (process, newNotificationEvent));
+    process->initialize();
+}
+
+
+void Scheduler::run() {
+    _mapsMutex->lock();
+
+    pthread_t scheduleThread;
+    if (pthread_create(&scheduleThread, NULL, scheduleWrapper, this)) {
+        std::cout << "Error: Could not start scheduler thread."
+                << std::endl;
+        std::cout << "Exit." << std::endl;
+        exit(1);
+    }
+    std::map<ProcessWrapper*, pthread_t* >::iterator iterator;
+    for (iterator = _processMap->begin();
+         iterator != _processMap->end();
+         iterator++) {
+
+        pthread_t *thread = (*iterator).second;
+
+        pthread_attr_t attributes;
+        pthread_attr_init(&attributes);
+        pthread_attr_setstacksize(&attributes, 131072);
+
+        ProcessArgs *args = new ProcessArgs;
+        args->scheduler = this;
+        args->process = (*iterator).first;
+        //std::cout << "Starting process "
+        //        << ((ProcessWrapper*)args->process)->getName()
+        //        << "." << std::endl;
+        if (pthread_create(thread, &attributes, runProcessWrapperWrapper, args)) {
+            std::cout << "Error: Could not start thread for process "
+                    << ((ProcessWrapper*)(args->process))->getName()
+                    << "." << std::endl;
+            std::cout << "Exit." << std::endl;
+            exit(1);
+        }
+        pthread_attr_destroy(&attributes);
+
+        //std::cout << "Started process "
+        //        << ((ProcessWrapper*)(args->process))->getName()
+        //        << "." << std::endl;
+    }
+    _mapsMutex->unlock();
+
+    //std::cout << "Started all registered processes." << std::endl;
+    _allStarted = true;
+    _listsCondition->notify();
+    //std::cout << "Wait until scheduler has stopped." << std::endl;
+
+    //wait until all processes have been detached
+    pthread_join(scheduleThread, 0);
+
+    //std::cout << "Scheduler has stopped." << std::endl;
+}
+
+
+void Scheduler::runProcessWrapper(ProcessWrapper *process) {
+    Event *_notificationEvent = (*_notificationEventMap)[process];
+
+    while (!process->isDetached()) {
+        _listsMutex->lock();
+        _scheduleList->push_back(process);
+        _listsCondition->notify();
+        _listsMutex->unlock();
+        _notificationEvent->wait();
+        process->fire();
+    }
+
+    _listsMutex->lock();
+    _detachList->push_back(process);
+    _listsMutex->unlock();
+    _listsCondition->notify();
+}
+
+
+void Scheduler::detachProcess(ProcessWrapper *process) {
+    _mapsMutex->lock();
+    pthread_t *threadToKill = (*_processMap)[process];
+    pthread_join(*threadToKill, 0);
+    delete threadToKill;
+    _processMap->erase(process);
+
+    Event *notificationEventToKill = (*_notificationEventMap)[process];
+    delete notificationEventToKill;
+    _notificationEventMap->erase(process);
+
+    if (_processMap->empty()) {
+        //std::cout << "No processes left in process map. Terminate."
+        //    << std::endl;
+        _stopScheduler = true;
+    }
+    _mapsMutex->unlock();
+}
+
+
+void Scheduler::schedule() {
+    _listsMutex->lock();
+
+    while(!_stopScheduler) {
+        _listsCondition->wait();
+
+        if (_allStarted) {
+            std::list<ProcessWrapper* >::iterator listIterator;
+            for (listIterator = _detachList->begin();
+                 listIterator != _detachList->end();
+                 listIterator++) {
+                ProcessWrapper *process = (*listIterator);
+                detachProcess(process);
+                //std::cout << "Scheduler detached process "
+                //        << process->getName() << "." << std::endl;
+            }
+            _detachList->clear();
+
+            for (listIterator = _scheduleList->begin();
+                 listIterator != _scheduleList->end();
+                 listIterator++) {
+                ProcessWrapper *process = (*listIterator);
+                ((Event*)(*_notificationEventMap)[process])->notifyAfterWait();
+            }
+            _scheduleList->clear();
+        }
+    }
+
+    _listsMutex->unlock();
+    //std::cout << "Stopped scheduler." << std::endl;
+}