dol: initial dol commit
[jump.git] / dol / src / dol / visitor / PipeAndFilter / lib / Scheduler.cpp
1 #include "Scheduler.h"
2
3 typedef struct {
4     Scheduler *scheduler;
5     ProcessWrapper *process;
6 } ProcessArgs;
7
8
9 void* runProcessWrapperWrapper(void* arg) {
10     ProcessArgs *args = (ProcessArgs*)arg;
11     (args->scheduler)->runProcessWrapper(args->process);
12     delete args;
13     return 0;
14 }
15
16
17 void *scheduleWrapper(void *arg) {
18     ((Scheduler*)arg)->schedule();
19     return 0;
20 }
21
22
23 Scheduler::Scheduler() {
24     //std::cout << "Create Scheduler." << std::endl;
25     _listsMutex = new Mutex();
26     _listsCondition = new Condition(_listsMutex);
27     _mapsMutex = new Mutex();
28     _mapsCondition = new Condition(_mapsMutex);
29     _processMap = new std::map<ProcessWrapper*, pthread_t* >();
30     _notificationEventMap = new std::map<ProcessWrapper*, Event* >();
31     _scheduleList = new std::list<ProcessWrapper* >();
32     _detachList = new std::list<ProcessWrapper* >();
33     _stopScheduler = false;
34     _allStarted = false;
35 }
36
37
38 Scheduler::~Scheduler() {
39     delete _listsCondition;
40     delete _listsMutex;
41     delete _mapsCondition;
42     delete _mapsMutex;
43     delete _processMap;
44     delete _notificationEventMap;
45     delete _scheduleList;
46     delete _detachList;
47 }
48
49
50 void Scheduler::registerProcess(ProcessWrapper *process) {
51     pthread_t *newThread = new pthread_t;
52     _processMap->insert(std::pair<ProcessWrapper*, pthread_t* >
53             (process, newThread));
54     Event *newNotificationEvent =
55             new Event("notificationEvent for " + std::string(process->getName()));
56     _notificationEventMap->insert(std::pair<ProcessWrapper*, Event* >
57             (process, newNotificationEvent));
58     process->initialize();
59 }
60
61
62 void Scheduler::run() {
63     _mapsMutex->lock();
64
65     pthread_t scheduleThread;
66     if (pthread_create(&scheduleThread, NULL, scheduleWrapper, this)) {
67         std::cout << "Error: Could not start scheduler thread."
68                 << std::endl;
69         std::cout << "Exit." << std::endl;
70         exit(1);
71     }
72     std::map<ProcessWrapper*, pthread_t* >::iterator iterator;
73     for (iterator = _processMap->begin();
74          iterator != _processMap->end();
75          iterator++) {
76
77         pthread_t *thread = (*iterator).second;
78
79         pthread_attr_t attributes;
80         pthread_attr_init(&attributes);
81         pthread_attr_setstacksize(&attributes, 131072);
82
83         ProcessArgs *args = new ProcessArgs;
84         args->scheduler = this;
85         args->process = (*iterator).first;
86         //std::cout << "Starting process "
87         //        << ((ProcessWrapper*)args->process)->getName()
88         //        << "." << std::endl;
89         if (pthread_create(thread, &attributes, runProcessWrapperWrapper, args)) {
90             std::cout << "Error: Could not start thread for process "
91                     << ((ProcessWrapper*)(args->process))->getName()
92                     << "." << std::endl;
93             std::cout << "Exit." << std::endl;
94             exit(1);
95         }
96         pthread_attr_destroy(&attributes);
97
98         //std::cout << "Started process "
99         //        << ((ProcessWrapper*)(args->process))->getName()
100         //        << "." << std::endl;
101     }
102     _mapsMutex->unlock();
103
104     //std::cout << "Started all registered processes." << std::endl;
105     _allStarted = true;
106     _listsCondition->notify();
107     //std::cout << "Wait until scheduler has stopped." << std::endl;
108
109     //wait until all processes have been detached
110     pthread_join(scheduleThread, 0);
111
112     //std::cout << "Scheduler has stopped." << std::endl;
113 }
114
115
116 void Scheduler::runProcessWrapper(ProcessWrapper *process) {
117     Event *_notificationEvent = (*_notificationEventMap)[process];
118
119     while (!process->isDetached()) {
120         _listsMutex->lock();
121         _scheduleList->push_back(process);
122         _listsCondition->notify();
123         _listsMutex->unlock();
124         _notificationEvent->wait();
125         process->fire();
126     }
127
128     _listsMutex->lock();
129     _detachList->push_back(process);
130     _listsMutex->unlock();
131     _listsCondition->notify();
132 }
133
134
135 void Scheduler::detachProcess(ProcessWrapper *process) {
136     _mapsMutex->lock();
137     pthread_t *threadToKill = (*_processMap)[process];
138     pthread_join(*threadToKill, 0);
139     delete threadToKill;
140     _processMap->erase(process);
141
142     Event *notificationEventToKill = (*_notificationEventMap)[process];
143     delete notificationEventToKill;
144     _notificationEventMap->erase(process);
145
146     if (_processMap->empty()) {
147         //std::cout << "No processes left in process map. Terminate."
148         //    << std::endl;
149         _stopScheduler = true;
150     }
151     _mapsMutex->unlock();
152 }
153
154
155 void Scheduler::schedule() {
156     _listsMutex->lock();
157
158     while(!_stopScheduler) {
159         _listsCondition->wait();
160
161         if (_allStarted) {
162             std::list<ProcessWrapper* >::iterator listIterator;
163             for (listIterator = _detachList->begin();
164                  listIterator != _detachList->end();
165                  listIterator++) {
166                 ProcessWrapper *process = (*listIterator);
167                 detachProcess(process);
168                 //std::cout << "Scheduler detached process "
169                 //        << process->getName() << "." << std::endl;
170             }
171             _detachList->clear();
172
173             for (listIterator = _scheduleList->begin();
174                  listIterator != _scheduleList->end();
175                  listIterator++) {
176                 ProcessWrapper *process = (*listIterator);
177                 ((Event*)(*_notificationEventMap)[process])->notifyAfterWait();
178             }
179             _scheduleList->clear();
180         }
181     }
182
183     _listsMutex->unlock();
184     //std::cout << "Stopped scheduler." << std::endl;
185 }