dol: initial dol commit
[jump.git] / dol / src / dol / visitor / PipeAndFilter / lib / WindowedFifo.cpp
1 #include "WindowedFifo.h"\r
2 #include <string.h>\r
3 \r
4 /**\r
5  *\r
6  */\r
7 WindowedFifo::WindowedFifo(char* name, unsigned size = 20) {\r
8     //std::cout << "Create WindowedFifo." << std::endl;\r
9     _size = size;\r
10     _buffer = new char[_size];\r
11     _head = 0;\r
12     _tail = 0;\r
13     _headRoom = 0;\r
14     _tailRoom = 0;\r
15     _use = 0;\r
16     //indicates whether Fifo is empty or full if _head == _tail\r
17     //_isFull = false;\r
18     _isHeadReserved = false;\r
19     _isTailReserved = false;\r
20     _name = new char[strlen(name) + 1];\r
21     strcpy(_name, name);\r
22     _mutex = new Mutex();\r
23     _readCondition = new Condition(_mutex);\r
24     _writeCondition = new Condition(_mutex);\r
25 }\r
26 \r
27 /**\r
28  *\r
29  */\r
30 WindowedFifo::~WindowedFifo() {\r
31     //std::cout << "Delete WindowedFifo." << std::endl;\r
32     if (_buffer) {\r
33         delete _buffer;\r
34     }\r
35     if (_name) {\r
36         delete _name;\r
37     }\r
38     if (_readCondition) {\r
39         delete _readCondition;\r
40     }\r
41     if (_writeCondition) {\r
42         delete _writeCondition;\r
43     }\r
44     if (_mutex) {\r
45         delete _mutex;\r
46     }\r
47     _buffer = 0;\r
48     _head = 0;\r
49     _tail = 0;\r
50     _name = 0;\r
51     _use = 0;\r
52     _readCondition = 0;\r
53     _writeCondition = 0;\r
54     _mutex = 0;\r
55     //std::cout << "Deleted WindowedFifo." << std::endl;\r
56 }\r
57 \r
58 /**\r
59  *\r
60  */\r
61 unsigned WindowedFifo::reserve(void** dest, unsigned len) {\r
62     char** destination = (char**)dest;\r
63     //std::cout << "Attempt to reserve " << len << " bytes." << std::endl;\r
64 \r
65     //can only reserve once piece at a time\r
66     if (_isHeadReserved) {\r
67         *destination = 0;\r
68         return 0;\r
69     }\r
70 \r
71     _mutex->lock();\r
72     while (unused() == 0) {\r
73       _readCondition->wait();\r
74     }\r
75 \r
76     //reserve at most as much memory as still available in the buffer\r
77     unsigned write = (len <= _size - _use ? len : _size - _use);\r
78 \r
79     if ( write > 0 ) {\r
80         //if wrap-around in buffer: return only buffer for the\r
81         //contiguous buffer space\r
82         if (_head + write > _size) {\r
83             write = _size - _head;\r
84         }\r
85 \r
86         _headRoom = (_head + write) == _size? 0 : _head + write;\r
87         *destination = &(_buffer[_head]);\r
88         _isHeadReserved = true;\r
89 \r
90         //the following comparison is unsafe in a multi-threaded\r
91         //environment and potentially leads to race-conditions\r
92         /*if (_headRoom == _tail) {\r
93             _isFull = true;\r
94         } else {\r
95             _isFull = false;\r
96         }*/\r
97     }\r
98     _writeReserve = write; \r
99     _mutex->unlock();\r
100 \r
101     //std::cout << "Reserved " << write << " bytes." << std::endl;\r
102     return write;\r
103 }\r
104 \r
105 /**\r
106  *\r
107  */\r
108 void WindowedFifo::release() {\r
109     if (_isHeadReserved) {\r
110         //std::cout << "Released " << _headRoom - _head << " bytes." << std::endl;\r
111         _head = _headRoom;\r
112         _use += _writeReserve;\r
113         _isHeadReserved = false;\r
114         _writeCondition->notify();\r
115     }\r
116 }\r
117 \r
118 /**\r
119  *\r
120  */\r
121 unsigned WindowedFifo::capture(void **dest, unsigned len) {\r
122     char** destination = (char**)dest;\r
123     //std::cout << "Attempt to capture " << len << " bytes." << std::endl;\r
124 \r
125     if (_isTailReserved) {\r
126         //std::cout << "Only one attempt to capture allowed." << std::endl;\r
127         *destination = 0;\r
128         return 0;\r
129     }\r
130 \r
131     _mutex->lock();\r
132     while (used() == 0) {\r
133       _writeCondition->wait();\r
134     }\r
135 \r
136     //capture at most as much data as available in the buffer\r
137     unsigned read = (len <= _use ? len : _use);\r
138 \r
139     if (read > 0) {\r
140         //if wrap-around in buffer: return only buffer for the\r
141         //contiguous buffer space\r
142         if (_tail + read> _size) {\r
143             read = _size - _tail;\r
144         }\r
145 \r
146         _tailRoom = (_tail + read) == _size ? 0 : _tailRoom = _tail + read;\r
147         *destination = &(_buffer[_tail]);\r
148         _isTailReserved = true;\r
149     }\r
150     _readReserve = read; \r
151     _mutex->unlock();\r
152     //std::cout << "Captured " << read << " bytes." << std::endl;\r
153 \r
154     return read;\r
155 }\r
156 \r
157 /**\r
158  *\r
159  */\r
160 void WindowedFifo::consume() {\r
161     _mutex->lock();\r
162     if (_isTailReserved) {\r
163         //std::cout << "Consumed " << _tailRoom - _tail << " bytes." << std::endl;\r
164         _tail = _tailRoom;\r
165         //_isFull = false;\r
166         _use -= _readReserve;\r
167         _isTailReserved = false;\r
168         _readCondition->notify();\r
169     }\r
170     _mutex->unlock();\r
171 }\r
172 \r
173 /**\r
174  *\r
175  */\r
176 unsigned WindowedFifo::size() const {\r
177     return _size;\r
178 }\r
179 \r
180 /**\r
181  *\r
182  */\r
183 unsigned WindowedFifo::unused() const {\r
184     return _size - _use;\r
185 }\r
186 \r
187 /**\r
188  *\r
189  */\r
190 unsigned WindowedFifo::used() const {\r
191         return _use;\r
192     /*if (_headRoom > _tail) {\r
193         return _headRoom - _tail;\r
194     } else if (_headRoom == _tail) {\r
195         if (_isFull == true) {\r
196             return _size;\r
197         } else {\r
198             return 0;\r
199         }\r
200     }\r
201     return _headRoom + _size - _tail;*/\r
202 }\r
203 \r
204 /**\r
205  *\r
206  */\r
207 char* WindowedFifo::getName() const {\r
208     return _name;\r
209 }\r
210 \r
211 /**\r
212  * Test the implementation\r
213  */\r
214 /*\r
215 #include <iostream>\r
216 #include <iomanip>\r
217 #define LENGTH 10\r
218 \r
219 void* producer(void *fifo)\r
220 {\r
221     WindowedFifo* wfifo = (WindowedFifo*)fifo;\r
222     for (int j = 0; j < LENGTH; j++) {\r
223         //std::cout << "write " << i << " to Fifo.    ";\r
224         int *buf1;\r
225         int write = wfifo->reserve((void**)&buf1, sizeof(int));\r
226 \r
227         if (write == sizeof(int)) {\r
228             *buf1 = j;\r
229             wfifo->release();\r
230             //std::cout << "used: " << std::setw(2) << wfifo->used()\r
231             //        << ", unused: " << std::setw(2) << wfifo->unused()\r
232             //        << ", size: "  << std::setw(2) << wfifo->size()\r
233             //        << std::endl;\r
234         } else {\r
235             std::cout << "Not successful: " << write << std::endl;\r
236         }\r
237     }\r
238     printf("producer returns.\n");\r
239     return 0;\r
240 }\r
241 \r
242 void* consumer(void *fifo)\r
243 {\r
244     WindowedFifo* wfifo = (WindowedFifo*)fifo;\r
245     for (int j = 0; j < LENGTH; j++) {\r
246         int* buf3;\r
247         int read = wfifo->capture((void**)&buf3, sizeof(int));\r
248         if (read == sizeof(int)) {\r
249             std::cout << "read " << (unsigned)*buf3 << "  from WindowedFifo   ";\r
250             std::cout << "used: " << std::setw(2) << wfifo->used()\r
251                     << ", unused: " << std::setw(2) << wfifo->unused()\r
252                     << ", size: "  << std::setw(2) << wfifo->size()\r
253                     << std::endl;\r
254             wfifo->consume();\r
255         } else {\r
256             std::cout << "Read nothing from WindowedFifo." << std::endl;\r
257         }\r
258     }\r
259     printf("consumer returns.\n");\r
260     return 0;\r
261 }\r
262 \r
263 int main() {\r
264     WindowedFifo *wfifo = new WindowedFifo("fifo", 12);\r
265 \r
266     int* buf1;\r
267     int* buf2;\r
268     wfifo->reserve((void**)&buf1, 8);\r
269     *buf1 = 10;\r
270     *(buf1 + 1) = 20;\r
271     wfifo->release();\r
272     wfifo->capture((void**)&buf2, 8);\r
273     std::cout << "read " << *buf2 << " " << *(buf2 + 1) << std::endl;\r
274     wfifo->consume();\r
275 \r
276     pthread_t *producer_thread = new pthread_t;\r
277     pthread_t *consumer_thread = new pthread_t;\r
278 \r
279     pthread_attr_t attributes;\r
280     pthread_attr_init(&attributes);\r
281     pthread_attr_setstacksize(&attributes, 131072);\r
282 \r
283     if (pthread_create(consumer_thread, &attributes, consumer, wfifo)) {\r
284         std::cout << "Error: Could not start consumer." << std::endl;\r
285         std::cout << "Exit." << std::endl;\r
286         exit(1);\r
287     }\r
288     pthread_attr_destroy(&attributes);\r
289 \r
290     pthread_attr_init(&attributes);\r
291     pthread_attr_setstacksize(&attributes, 131072);\r
292     if (pthread_create(producer_thread, &attributes, producer, wfifo)) {\r
293         std::cout << "Error: Could not start producer." << std::endl;\r
294         std::cout << "Exit." << std::endl;\r
295         exit(1);\r
296     }\r
297     pthread_attr_destroy(&attributes);\r
298 \r
299 \r
300     pthread_join(*consumer_thread, 0);\r
301     delete wfifo;\r
302     return 0;\r
303 }\r
304 */\r