X-Git-Url: http://sraa.de/git/?a=blobdiff_plain;f=dol%2Fsrc%2Fdol%2Fvisitor%2FPipeAndFilter%2Flib%2FWindowedFifo.cpp;fp=dol%2Fsrc%2Fdol%2Fvisitor%2FPipeAndFilter%2Flib%2FWindowedFifo.cpp;h=ddeb9706d174392cb1ee07683c1927ca9ed30034;hb=8c411cf24ed0eb889191aaeafd8fa1e69081df42;hp=0000000000000000000000000000000000000000;hpb=dea7a4fb1ed110d3ce6e6d9255103d724bd66c0e;p=jump.git diff --git a/dol/src/dol/visitor/PipeAndFilter/lib/WindowedFifo.cpp b/dol/src/dol/visitor/PipeAndFilter/lib/WindowedFifo.cpp new file mode 100644 index 0000000..ddeb970 --- /dev/null +++ b/dol/src/dol/visitor/PipeAndFilter/lib/WindowedFifo.cpp @@ -0,0 +1,304 @@ +#include "WindowedFifo.h" +#include + +/** + * + */ +WindowedFifo::WindowedFifo(char* name, unsigned size = 20) { + //std::cout << "Create WindowedFifo." << std::endl; + _size = size; + _buffer = new char[_size]; + _head = 0; + _tail = 0; + _headRoom = 0; + _tailRoom = 0; + _use = 0; + //indicates whether Fifo is empty or full if _head == _tail + //_isFull = false; + _isHeadReserved = false; + _isTailReserved = false; + _name = new char[strlen(name) + 1]; + strcpy(_name, name); + _mutex = new Mutex(); + _readCondition = new Condition(_mutex); + _writeCondition = new Condition(_mutex); +} + +/** + * + */ +WindowedFifo::~WindowedFifo() { + //std::cout << "Delete WindowedFifo." << std::endl; + if (_buffer) { + delete _buffer; + } + if (_name) { + delete _name; + } + if (_readCondition) { + delete _readCondition; + } + if (_writeCondition) { + delete _writeCondition; + } + if (_mutex) { + delete _mutex; + } + _buffer = 0; + _head = 0; + _tail = 0; + _name = 0; + _use = 0; + _readCondition = 0; + _writeCondition = 0; + _mutex = 0; + //std::cout << "Deleted WindowedFifo." << std::endl; +} + +/** + * + */ +unsigned WindowedFifo::reserve(void** dest, unsigned len) { + char** destination = (char**)dest; + //std::cout << "Attempt to reserve " << len << " bytes." << std::endl; + + //can only reserve once piece at a time + if (_isHeadReserved) { + *destination = 0; + return 0; + } + + _mutex->lock(); + while (unused() == 0) { + _readCondition->wait(); + } + + //reserve at most as much memory as still available in the buffer + unsigned write = (len <= _size - _use ? len : _size - _use); + + if ( write > 0 ) { + //if wrap-around in buffer: return only buffer for the + //contiguous buffer space + if (_head + write > _size) { + write = _size - _head; + } + + _headRoom = (_head + write) == _size? 0 : _head + write; + *destination = &(_buffer[_head]); + _isHeadReserved = true; + + //the following comparison is unsafe in a multi-threaded + //environment and potentially leads to race-conditions + /*if (_headRoom == _tail) { + _isFull = true; + } else { + _isFull = false; + }*/ + } + _writeReserve = write; + _mutex->unlock(); + + //std::cout << "Reserved " << write << " bytes." << std::endl; + return write; +} + +/** + * + */ +void WindowedFifo::release() { + if (_isHeadReserved) { + //std::cout << "Released " << _headRoom - _head << " bytes." << std::endl; + _head = _headRoom; + _use += _writeReserve; + _isHeadReserved = false; + _writeCondition->notify(); + } +} + +/** + * + */ +unsigned WindowedFifo::capture(void **dest, unsigned len) { + char** destination = (char**)dest; + //std::cout << "Attempt to capture " << len << " bytes." << std::endl; + + if (_isTailReserved) { + //std::cout << "Only one attempt to capture allowed." << std::endl; + *destination = 0; + return 0; + } + + _mutex->lock(); + while (used() == 0) { + _writeCondition->wait(); + } + + //capture at most as much data as available in the buffer + unsigned read = (len <= _use ? len : _use); + + if (read > 0) { + //if wrap-around in buffer: return only buffer for the + //contiguous buffer space + if (_tail + read> _size) { + read = _size - _tail; + } + + _tailRoom = (_tail + read) == _size ? 0 : _tailRoom = _tail + read; + *destination = &(_buffer[_tail]); + _isTailReserved = true; + } + _readReserve = read; + _mutex->unlock(); + //std::cout << "Captured " << read << " bytes." << std::endl; + + return read; +} + +/** + * + */ +void WindowedFifo::consume() { + _mutex->lock(); + if (_isTailReserved) { + //std::cout << "Consumed " << _tailRoom - _tail << " bytes." << std::endl; + _tail = _tailRoom; + //_isFull = false; + _use -= _readReserve; + _isTailReserved = false; + _readCondition->notify(); + } + _mutex->unlock(); +} + +/** + * + */ +unsigned WindowedFifo::size() const { + return _size; +} + +/** + * + */ +unsigned WindowedFifo::unused() const { + return _size - _use; +} + +/** + * + */ +unsigned WindowedFifo::used() const { + return _use; + /*if (_headRoom > _tail) { + return _headRoom - _tail; + } else if (_headRoom == _tail) { + if (_isFull == true) { + return _size; + } else { + return 0; + } + } + return _headRoom + _size - _tail;*/ +} + +/** + * + */ +char* WindowedFifo::getName() const { + return _name; +} + +/** + * Test the implementation + */ +/* +#include +#include +#define LENGTH 10 + +void* producer(void *fifo) +{ + WindowedFifo* wfifo = (WindowedFifo*)fifo; + for (int j = 0; j < LENGTH; j++) { + //std::cout << "write " << i << " to Fifo. "; + int *buf1; + int write = wfifo->reserve((void**)&buf1, sizeof(int)); + + if (write == sizeof(int)) { + *buf1 = j; + wfifo->release(); + //std::cout << "used: " << std::setw(2) << wfifo->used() + // << ", unused: " << std::setw(2) << wfifo->unused() + // << ", size: " << std::setw(2) << wfifo->size() + // << std::endl; + } else { + std::cout << "Not successful: " << write << std::endl; + } + } + printf("producer returns.\n"); + return 0; +} + +void* consumer(void *fifo) +{ + WindowedFifo* wfifo = (WindowedFifo*)fifo; + for (int j = 0; j < LENGTH; j++) { + int* buf3; + int read = wfifo->capture((void**)&buf3, sizeof(int)); + if (read == sizeof(int)) { + std::cout << "read " << (unsigned)*buf3 << " from WindowedFifo "; + std::cout << "used: " << std::setw(2) << wfifo->used() + << ", unused: " << std::setw(2) << wfifo->unused() + << ", size: " << std::setw(2) << wfifo->size() + << std::endl; + wfifo->consume(); + } else { + std::cout << "Read nothing from WindowedFifo." << std::endl; + } + } + printf("consumer returns.\n"); + return 0; +} + +int main() { + WindowedFifo *wfifo = new WindowedFifo("fifo", 12); + + int* buf1; + int* buf2; + wfifo->reserve((void**)&buf1, 8); + *buf1 = 10; + *(buf1 + 1) = 20; + wfifo->release(); + wfifo->capture((void**)&buf2, 8); + std::cout << "read " << *buf2 << " " << *(buf2 + 1) << std::endl; + wfifo->consume(); + + pthread_t *producer_thread = new pthread_t; + pthread_t *consumer_thread = new pthread_t; + + pthread_attr_t attributes; + pthread_attr_init(&attributes); + pthread_attr_setstacksize(&attributes, 131072); + + if (pthread_create(consumer_thread, &attributes, consumer, wfifo)) { + std::cout << "Error: Could not start consumer." << std::endl; + std::cout << "Exit." << std::endl; + exit(1); + } + pthread_attr_destroy(&attributes); + + pthread_attr_init(&attributes); + pthread_attr_setstacksize(&attributes, 131072); + if (pthread_create(producer_thread, &attributes, producer, wfifo)) { + std::cout << "Error: Could not start producer." << std::endl; + std::cout << "Exit." << std::endl; + exit(1); + } + pthread_attr_destroy(&attributes); + + + pthread_join(*consumer_thread, 0); + delete wfifo; + return 0; +} +*/