X-Git-Url: http://sraa.de/git/?a=blobdiff_plain;f=dol%2Fsrc%2Fdol%2Fvisitor%2FPipeAndFilter%2Flib%2FFifo.cpp;fp=dol%2Fsrc%2Fdol%2Fvisitor%2FPipeAndFilter%2Flib%2FFifo.cpp;h=a88fd98b6c0ba4ede43930285ddb4f1270ae4a90;hb=8c411cf24ed0eb889191aaeafd8fa1e69081df42;hp=0000000000000000000000000000000000000000;hpb=dea7a4fb1ed110d3ce6e6d9255103d724bd66c0e;p=jump.git diff --git a/dol/src/dol/visitor/PipeAndFilter/lib/Fifo.cpp b/dol/src/dol/visitor/PipeAndFilter/lib/Fifo.cpp new file mode 100644 index 0000000..a88fd98 --- /dev/null +++ b/dol/src/dol/visitor/PipeAndFilter/lib/Fifo.cpp @@ -0,0 +1,229 @@ +#include "Fifo.h" + +/** + * + */ +Fifo::Fifo(char* name, unsigned size = 18) { + //std::cout << "Create Fifo." << std::endl; + //except at the beginning, _head and _tail must never overlap, + //otherwise one does not know whether the buffer is full or + //empty. to have nevertheless a buffer with the given capacity, + //a buffer with one more element is allocated. + _size = size + 1; + _buffer = new char[_size]; + _head = 0; + _tail = 0; + _name = new char[strlen(name) + 1]; + strcpy(_name, name); + _mutex = new Mutex(); + _readCondition = new Condition(_mutex); + _writeCondition = new Condition(_mutex); +} + +/** + * + */ +Fifo::~Fifo() { + //std::cout << "Delete Fifo." << std::endl; + if (_buffer) { + delete _buffer; + } + if (_name) { + delete _name; + } + if (_readCondition) { + delete _readCondition; + } + if (_writeCondition) { + delete _writeCondition; + } + if (_mutex) { + delete _mutex; + } + + _buffer = 0; + _head = 0; + _tail = 0; + _name = 0; + _readCondition = 0; + _writeCondition = 0; + _mutex = 0; + //std::cout << "Deleted Fifo." << std::endl; +} + +/** + * + */ +unsigned Fifo::read(void *destination, unsigned len) { + char* buffer = (char*)destination; + unsigned read = 0; + //std::cout << "Try to read " << len << " bytes from Fifo " << _name << "." << std::endl; + + while (read < len) { + _mutex->lock(); + while (used() == 0) { + _writeCondition->wait(); + } + _mutex->unlock(); + + if ((len - read) < used()) { + while (read < len) { + _mutex->lock(); + unsigned tocopy = (len - read + _tail >= _size) ? _size - _tail : len - read; + memcpy(buffer, _buffer + _tail, tocopy); + _tail = (_tail + tocopy) % _size; + read += tocopy; + buffer += tocopy; + _mutex->unlock(); + } + _readCondition->notify(); + } else { + _mutex->lock(); + *buffer++ = *(_buffer + _tail % _size); + _tail = (_tail + 1) % _size; + read++; + _mutex->unlock(); + _readCondition->notify(); + } + } + + //std::cout << "Read " << read << " bytes from Fifo " << _name << "." << std::endl; + return read; +} + +/** + * + */ +unsigned Fifo::write(const void *source, unsigned len) { + char* buffer = (char*)source; + unsigned write = 0; + //std::cout << "Try to write " << len << " bytes to Fifo " << _name << std::endl; + + while (write < len) { + _mutex->lock(); + while (unused() == 0) { + _readCondition->wait(); + } + _mutex->unlock(); + + if ((len - write) < unused()) { + while (write < len) { + unsigned tocopy = (len - write + _head >= _size) ? _size - _head : len - write; + _mutex->lock(); + memcpy(_buffer + _head, buffer, tocopy); + _head = (_head + tocopy) % _size; + write += tocopy; + buffer += tocopy; + _mutex->unlock(); + } + _writeCondition->notify(); + } else { + _mutex->lock(); + *(_buffer + (unsigned)(_head) % _size) = *buffer++; + _head = (_head + 1) % _size; + write++; + _mutex->unlock(); + _writeCondition->notify(); + } + } + //std::cout << "Wrote " << write << " bytes to Fifo " << _name << "." << std::endl; + return write; +} + +/** + * + */ +unsigned Fifo::size() const { + return (_size - 1); +} + +/** + * + */ +unsigned Fifo::unused() const { + return (_size - 1) - used(); +} + +/** + * + */ +unsigned Fifo::used() const { + if (_head >= _tail) { + return _head - _tail; + } + return _head + _size - _tail; +} + +/** + * + */ +char* Fifo::getName() const { + return _name; +} + +/** + * Test the implementation + */ +/* +#include +void* producer(void *fifo) +{ + const char *str = + "Visit www.systemc.org and see what SystemC can do for you today!\n"; + + while (*str) { + //printf("%c", *str); + ((Fifo*)fifo)->write((void*)str++, 4); + } + printf("\nproducer returns.\n"); + return 0; +} + +void* consumer(void *fifo) +{ + char c; + while (c != '\n') { + ((Fifo*)fifo)->read(&c, 4); + std::cout << c << std::flush; + + if (((Fifo*)fifo)->used() == 1) + std::cout << "<1>" << std::flush; + if (((Fifo*)fifo)->used() == 9) + std::cout << "<9>" << std::flush; + } + printf("\nconsumer returns.\n"); + return 0; +} + + +int main() { + Fifo *fifo = new Fifo("fifo", 3); + pthread_t *producer_thread = new pthread_t; + pthread_t *consumer_thread = new pthread_t; + + pthread_attr_t attributes; + pthread_attr_init(&attributes); + pthread_attr_setstacksize(&attributes, 131072); + + if (pthread_create(consumer_thread, &attributes, consumer, fifo)) { + std::cout << "Error: Could not start consumer." << std::endl; + std::cout << "Exit." << std::endl; + exit(1); + } + pthread_attr_destroy(&attributes); + + pthread_attr_init(&attributes); + pthread_attr_setstacksize(&attributes, 131072); + if (pthread_create(producer_thread, &attributes, producer, fifo)) { + std::cout << "Error: Could not start producer." << std::endl; + std::cout << "Exit." << std::endl; + exit(1); + } + pthread_attr_destroy(&attributes); + + + pthread_join(*consumer_thread, 0); + delete fifo; + return 0; +} +*/