X-Git-Url: http://sraa.de/git/?a=blobdiff_plain;f=dol%2Fsrc%2Fdol%2Fvisitor%2FPipeAndFilter%2Flib%2FScheduler.cpp;fp=dol%2Fsrc%2Fdol%2Fvisitor%2FPipeAndFilter%2Flib%2FScheduler.cpp;h=260729a2f1fe8bdb06a25d1fc5af1aac226a161d;hb=8c411cf24ed0eb889191aaeafd8fa1e69081df42;hp=0000000000000000000000000000000000000000;hpb=dea7a4fb1ed110d3ce6e6d9255103d724bd66c0e;p=jump.git diff --git a/dol/src/dol/visitor/PipeAndFilter/lib/Scheduler.cpp b/dol/src/dol/visitor/PipeAndFilter/lib/Scheduler.cpp new file mode 100644 index 0000000..260729a --- /dev/null +++ b/dol/src/dol/visitor/PipeAndFilter/lib/Scheduler.cpp @@ -0,0 +1,185 @@ +#include "Scheduler.h" + +typedef struct { + Scheduler *scheduler; + ProcessWrapper *process; +} ProcessArgs; + + +void* runProcessWrapperWrapper(void* arg) { + ProcessArgs *args = (ProcessArgs*)arg; + (args->scheduler)->runProcessWrapper(args->process); + delete args; + return 0; +} + + +void *scheduleWrapper(void *arg) { + ((Scheduler*)arg)->schedule(); + return 0; +} + + +Scheduler::Scheduler() { + //std::cout << "Create Scheduler." << std::endl; + _listsMutex = new Mutex(); + _listsCondition = new Condition(_listsMutex); + _mapsMutex = new Mutex(); + _mapsCondition = new Condition(_mapsMutex); + _processMap = new std::map(); + _notificationEventMap = new std::map(); + _scheduleList = new std::list(); + _detachList = new std::list(); + _stopScheduler = false; + _allStarted = false; +} + + +Scheduler::~Scheduler() { + delete _listsCondition; + delete _listsMutex; + delete _mapsCondition; + delete _mapsMutex; + delete _processMap; + delete _notificationEventMap; + delete _scheduleList; + delete _detachList; +} + + +void Scheduler::registerProcess(ProcessWrapper *process) { + pthread_t *newThread = new pthread_t; + _processMap->insert(std::pair + (process, newThread)); + Event *newNotificationEvent = + new Event("notificationEvent for " + std::string(process->getName())); + _notificationEventMap->insert(std::pair + (process, newNotificationEvent)); + process->initialize(); +} + + +void Scheduler::run() { + _mapsMutex->lock(); + + pthread_t scheduleThread; + if (pthread_create(&scheduleThread, NULL, scheduleWrapper, this)) { + std::cout << "Error: Could not start scheduler thread." + << std::endl; + std::cout << "Exit." << std::endl; + exit(1); + } + std::map::iterator iterator; + for (iterator = _processMap->begin(); + iterator != _processMap->end(); + iterator++) { + + pthread_t *thread = (*iterator).second; + + pthread_attr_t attributes; + pthread_attr_init(&attributes); + pthread_attr_setstacksize(&attributes, 131072); + + ProcessArgs *args = new ProcessArgs; + args->scheduler = this; + args->process = (*iterator).first; + //std::cout << "Starting process " + // << ((ProcessWrapper*)args->process)->getName() + // << "." << std::endl; + if (pthread_create(thread, &attributes, runProcessWrapperWrapper, args)) { + std::cout << "Error: Could not start thread for process " + << ((ProcessWrapper*)(args->process))->getName() + << "." << std::endl; + std::cout << "Exit." << std::endl; + exit(1); + } + pthread_attr_destroy(&attributes); + + //std::cout << "Started process " + // << ((ProcessWrapper*)(args->process))->getName() + // << "." << std::endl; + } + _mapsMutex->unlock(); + + //std::cout << "Started all registered processes." << std::endl; + _allStarted = true; + _listsCondition->notify(); + //std::cout << "Wait until scheduler has stopped." << std::endl; + + //wait until all processes have been detached + pthread_join(scheduleThread, 0); + + //std::cout << "Scheduler has stopped." << std::endl; +} + + +void Scheduler::runProcessWrapper(ProcessWrapper *process) { + Event *_notificationEvent = (*_notificationEventMap)[process]; + + while (!process->isDetached()) { + _listsMutex->lock(); + _scheduleList->push_back(process); + _listsCondition->notify(); + _listsMutex->unlock(); + _notificationEvent->wait(); + process->fire(); + } + + _listsMutex->lock(); + _detachList->push_back(process); + _listsMutex->unlock(); + _listsCondition->notify(); +} + + +void Scheduler::detachProcess(ProcessWrapper *process) { + _mapsMutex->lock(); + pthread_t *threadToKill = (*_processMap)[process]; + pthread_join(*threadToKill, 0); + delete threadToKill; + _processMap->erase(process); + + Event *notificationEventToKill = (*_notificationEventMap)[process]; + delete notificationEventToKill; + _notificationEventMap->erase(process); + + if (_processMap->empty()) { + //std::cout << "No processes left in process map. Terminate." + // << std::endl; + _stopScheduler = true; + } + _mapsMutex->unlock(); +} + + +void Scheduler::schedule() { + _listsMutex->lock(); + + while(!_stopScheduler) { + _listsCondition->wait(); + + if (_allStarted) { + std::list::iterator listIterator; + for (listIterator = _detachList->begin(); + listIterator != _detachList->end(); + listIterator++) { + ProcessWrapper *process = (*listIterator); + detachProcess(process); + //std::cout << "Scheduler detached process " + // << process->getName() << "." << std::endl; + } + _detachList->clear(); + + for (listIterator = _scheduleList->begin(); + listIterator != _scheduleList->end(); + listIterator++) { + ProcessWrapper *process = (*listIterator); + ((Event*)(*_notificationEventMap)[process])->notifyAfterWait(); + } + _scheduleList->clear(); + } + } + + _listsMutex->unlock(); + //std::cout << "Stopped scheduler." << std::endl; +}