dol: initial dol commit
[jump.git] / dol / src / dol / visitor / PipeAndFilter / lib / WindowedFifo.cpp
diff --git a/dol/src/dol/visitor/PipeAndFilter/lib/WindowedFifo.cpp b/dol/src/dol/visitor/PipeAndFilter/lib/WindowedFifo.cpp
new file mode 100644 (file)
index 0000000..ddeb970
--- /dev/null
@@ -0,0 +1,304 @@
+#include "WindowedFifo.h"\r
+#include <string.h>\r
+\r
+/**\r
+ *\r
+ */\r
+WindowedFifo::WindowedFifo(char* name, unsigned size = 20) {\r
+    //std::cout << "Create WindowedFifo." << std::endl;\r
+    _size = size;\r
+    _buffer = new char[_size];\r
+    _head = 0;\r
+    _tail = 0;\r
+    _headRoom = 0;\r
+    _tailRoom = 0;\r
+    _use = 0;\r
+    //indicates whether Fifo is empty or full if _head == _tail\r
+    //_isFull = false;\r
+    _isHeadReserved = false;\r
+    _isTailReserved = false;\r
+    _name = new char[strlen(name) + 1];\r
+    strcpy(_name, name);\r
+    _mutex = new Mutex();\r
+    _readCondition = new Condition(_mutex);\r
+    _writeCondition = new Condition(_mutex);\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+WindowedFifo::~WindowedFifo() {\r
+    //std::cout << "Delete WindowedFifo." << std::endl;\r
+    if (_buffer) {\r
+        delete _buffer;\r
+    }\r
+    if (_name) {\r
+        delete _name;\r
+    }\r
+    if (_readCondition) {\r
+        delete _readCondition;\r
+    }\r
+    if (_writeCondition) {\r
+        delete _writeCondition;\r
+    }\r
+    if (_mutex) {\r
+        delete _mutex;\r
+    }\r
+    _buffer = 0;\r
+    _head = 0;\r
+    _tail = 0;\r
+    _name = 0;\r
+    _use = 0;\r
+    _readCondition = 0;\r
+    _writeCondition = 0;\r
+    _mutex = 0;\r
+    //std::cout << "Deleted WindowedFifo." << std::endl;\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+unsigned WindowedFifo::reserve(void** dest, unsigned len) {\r
+    char** destination = (char**)dest;\r
+    //std::cout << "Attempt to reserve " << len << " bytes." << std::endl;\r
+\r
+    //can only reserve once piece at a time\r
+    if (_isHeadReserved) {\r
+        *destination = 0;\r
+        return 0;\r
+    }\r
+\r
+    _mutex->lock();\r
+    while (unused() == 0) {\r
+      _readCondition->wait();\r
+    }\r
+\r
+    //reserve at most as much memory as still available in the buffer\r
+    unsigned write = (len <= _size - _use ? len : _size - _use);\r
+\r
+    if ( write > 0 ) {\r
+        //if wrap-around in buffer: return only buffer for the\r
+        //contiguous buffer space\r
+        if (_head + write > _size) {\r
+            write = _size - _head;\r
+        }\r
+\r
+        _headRoom = (_head + write) == _size? 0 : _head + write;\r
+        *destination = &(_buffer[_head]);\r
+        _isHeadReserved = true;\r
+\r
+        //the following comparison is unsafe in a multi-threaded\r
+        //environment and potentially leads to race-conditions\r
+        /*if (_headRoom == _tail) {\r
+            _isFull = true;\r
+        } else {\r
+            _isFull = false;\r
+        }*/\r
+    }\r
+    _writeReserve = write; \r
+    _mutex->unlock();\r
+\r
+    //std::cout << "Reserved " << write << " bytes." << std::endl;\r
+    return write;\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+void WindowedFifo::release() {\r
+    if (_isHeadReserved) {\r
+        //std::cout << "Released " << _headRoom - _head << " bytes." << std::endl;\r
+        _head = _headRoom;\r
+        _use += _writeReserve;\r
+        _isHeadReserved = false;\r
+        _writeCondition->notify();\r
+    }\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+unsigned WindowedFifo::capture(void **dest, unsigned len) {\r
+    char** destination = (char**)dest;\r
+    //std::cout << "Attempt to capture " << len << " bytes." << std::endl;\r
+\r
+    if (_isTailReserved) {\r
+        //std::cout << "Only one attempt to capture allowed." << std::endl;\r
+        *destination = 0;\r
+        return 0;\r
+    }\r
+\r
+    _mutex->lock();\r
+    while (used() == 0) {\r
+      _writeCondition->wait();\r
+    }\r
+\r
+    //capture at most as much data as available in the buffer\r
+    unsigned read = (len <= _use ? len : _use);\r
+\r
+    if (read > 0) {\r
+        //if wrap-around in buffer: return only buffer for the\r
+        //contiguous buffer space\r
+        if (_tail + read> _size) {\r
+            read = _size - _tail;\r
+        }\r
+\r
+        _tailRoom = (_tail + read) == _size ? 0 : _tailRoom = _tail + read;\r
+        *destination = &(_buffer[_tail]);\r
+        _isTailReserved = true;\r
+    }\r
+    _readReserve = read; \r
+    _mutex->unlock();\r
+    //std::cout << "Captured " << read << " bytes." << std::endl;\r
+\r
+    return read;\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+void WindowedFifo::consume() {\r
+    _mutex->lock();\r
+    if (_isTailReserved) {\r
+        //std::cout << "Consumed " << _tailRoom - _tail << " bytes." << std::endl;\r
+        _tail = _tailRoom;\r
+        //_isFull = false;\r
+        _use -= _readReserve;\r
+        _isTailReserved = false;\r
+        _readCondition->notify();\r
+    }\r
+    _mutex->unlock();\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+unsigned WindowedFifo::size() const {\r
+    return _size;\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+unsigned WindowedFifo::unused() const {\r
+    return _size - _use;\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+unsigned WindowedFifo::used() const {\r
+       return _use;\r
+    /*if (_headRoom > _tail) {\r
+        return _headRoom - _tail;\r
+    } else if (_headRoom == _tail) {\r
+        if (_isFull == true) {\r
+            return _size;\r
+        } else {\r
+            return 0;\r
+        }\r
+    }\r
+    return _headRoom + _size - _tail;*/\r
+}\r
+\r
+/**\r
+ *\r
+ */\r
+char* WindowedFifo::getName() const {\r
+    return _name;\r
+}\r
+\r
+/**\r
+ * Test the implementation\r
+ */\r
+/*\r
+#include <iostream>\r
+#include <iomanip>\r
+#define LENGTH 10\r
+\r
+void* producer(void *fifo)\r
+{\r
+    WindowedFifo* wfifo = (WindowedFifo*)fifo;\r
+    for (int j = 0; j < LENGTH; j++) {\r
+        //std::cout << "write " << i << " to Fifo.    ";\r
+        int *buf1;\r
+        int write = wfifo->reserve((void**)&buf1, sizeof(int));\r
+\r
+        if (write == sizeof(int)) {\r
+            *buf1 = j;\r
+            wfifo->release();\r
+            //std::cout << "used: " << std::setw(2) << wfifo->used()\r
+            //        << ", unused: " << std::setw(2) << wfifo->unused()\r
+            //        << ", size: "  << std::setw(2) << wfifo->size()\r
+            //        << std::endl;\r
+        } else {\r
+            std::cout << "Not successful: " << write << std::endl;\r
+        }\r
+    }\r
+    printf("producer returns.\n");\r
+    return 0;\r
+}\r
+\r
+void* consumer(void *fifo)\r
+{\r
+    WindowedFifo* wfifo = (WindowedFifo*)fifo;\r
+    for (int j = 0; j < LENGTH; j++) {\r
+        int* buf3;\r
+        int read = wfifo->capture((void**)&buf3, sizeof(int));\r
+        if (read == sizeof(int)) {\r
+            std::cout << "read " << (unsigned)*buf3 << "  from WindowedFifo   ";\r
+            std::cout << "used: " << std::setw(2) << wfifo->used()\r
+                    << ", unused: " << std::setw(2) << wfifo->unused()\r
+                    << ", size: "  << std::setw(2) << wfifo->size()\r
+                    << std::endl;\r
+            wfifo->consume();\r
+        } else {\r
+            std::cout << "Read nothing from WindowedFifo." << std::endl;\r
+        }\r
+    }\r
+    printf("consumer returns.\n");\r
+    return 0;\r
+}\r
+\r
+int main() {\r
+    WindowedFifo *wfifo = new WindowedFifo("fifo", 12);\r
+\r
+    int* buf1;\r
+    int* buf2;\r
+    wfifo->reserve((void**)&buf1, 8);\r
+    *buf1 = 10;\r
+    *(buf1 + 1) = 20;\r
+    wfifo->release();\r
+    wfifo->capture((void**)&buf2, 8);\r
+    std::cout << "read " << *buf2 << " " << *(buf2 + 1) << std::endl;\r
+    wfifo->consume();\r
+\r
+    pthread_t *producer_thread = new pthread_t;\r
+    pthread_t *consumer_thread = new pthread_t;\r
+\r
+    pthread_attr_t attributes;\r
+    pthread_attr_init(&attributes);\r
+    pthread_attr_setstacksize(&attributes, 131072);\r
+\r
+    if (pthread_create(consumer_thread, &attributes, consumer, wfifo)) {\r
+        std::cout << "Error: Could not start consumer." << std::endl;\r
+        std::cout << "Exit." << std::endl;\r
+        exit(1);\r
+    }\r
+    pthread_attr_destroy(&attributes);\r
+\r
+    pthread_attr_init(&attributes);\r
+    pthread_attr_setstacksize(&attributes, 131072);\r
+    if (pthread_create(producer_thread, &attributes, producer, wfifo)) {\r
+        std::cout << "Error: Could not start producer." << std::endl;\r
+        std::cout << "Exit." << std::endl;\r
+        exit(1);\r
+    }\r
+    pthread_attr_destroy(&attributes);\r
+\r
+\r
+    pthread_join(*consumer_thread, 0);\r
+    delete wfifo;\r
+    return 0;\r
+}\r
+*/\r