X-Git-Url: http://sraa.de/git/?a=blobdiff_plain;f=dol%2Fsrc%2Fdol%2Fvisitor%2Fcell%2Flib%2Fspu%2FFastCommunication.cpp;fp=dol%2Fsrc%2Fdol%2Fvisitor%2Fcell%2Flib%2Fspu%2FFastCommunication.cpp;h=2f2c3deaebb8e080f9a5ee218ed8c8b96634a018;hb=8c411cf24ed0eb889191aaeafd8fa1e69081df42;hp=0000000000000000000000000000000000000000;hpb=dea7a4fb1ed110d3ce6e6d9255103d724bd66c0e;p=jump.git diff --git a/dol/src/dol/visitor/cell/lib/spu/FastCommunication.cpp b/dol/src/dol/visitor/cell/lib/spu/FastCommunication.cpp new file mode 100644 index 0000000..2f2c3de --- /dev/null +++ b/dol/src/dol/visitor/cell/lib/spu/FastCommunication.cpp @@ -0,0 +1,707 @@ +/* + * FastCommunication.cpp + * + * Created on: Mar 3, 2009 + * Author: lschor + * + * Provides the communication between various processors. + * + * + * + * Description: + * + * Sender: The processor now has some data that need to be forwarded + * to another processor. + * Receiver: The processor that needs to get the data. + * + * Sender ---- * + * -----> Have some data * + * (Len, queue) * + * -----------> Receiver * + * * + * Check its len * + * * + * Sets up a DMA * + * transfer to * + * read from the * + * <-------------- queue to a * + * MFC performs the local buffer * + * transfer (Alignment) * + * <--------- * + * * + * * + * (Pools if the * + * transfer is * + * completed) * + * * + * * + * Copy the data * + * from its temp. * + * buffer to the * + * queue und * + * Have completed <------------ informs the * + * <---------- (queue, len) sender * + * Can increase the * + * pointers in the FIFO + * + * + * Has two ways to handling request, which are not possible + * to work out currently: + * 1) Send back "len = 0" + * --> Needs to send more messages, but may bigger lens + * 2) Store them until you have enough space to read + * --> Smaller lens and less messages + */ + +#include "FastCommunication.h" + +/* + * Constructor + */ +FastCommunication::FastCommunication(int nrOfQueues, uint64_t ea_base, + uint64_t *ea_base_all, int32_t * queueFromSPEIn, + int32_t * queueOnSPEIn, uint64_t *fifoTails) { + + // Set the base address + _ea_base = ea_base; + + _ea_base_all = ea_base_all; + queueFromSPE = queueFromSPEIn; + queueOnSPE = queueOnSPEIn; + _fifoTails = fifoTails; + + _nrOfQueues = nrOfQueues; + try { _fifos = new fifoCollection[nrOfQueues]; } + catch(std::bad_alloc &e) { + fprintf(stderr, "[FastCommunication] Memory allocation failure\n"); + exit(1); + } + _nrOfRequest = 0; + + for (int i = 0; i < MAXNOREQ; i++) { + _request[i].valid = false; + } +} + +/* + * Deconstructor + */ +FastCommunication::~FastCommunication() { + delete _fifos; +} + +/* + * Register an additional FIFO in the Communication + */ +bool FastCommunication::addFifo(int fifoNr, Fifo* fifo, int type, + int queue) { + _fifos[fifoNr].fifo = fifo; + _fifos[fifoNr].queue = queue; + _fifos[fifoNr].type = type; + _fifos[fifoNr].iswfifo = false; + + // Per FIFO queue, we need two requests, but we store at max MAXNOREQ of them + if (_nrOfRequest < MAXNOREQ) { + _nrOfRequest += 2; + } + + return true; +} + +/* + * Register an additional WindowedFIFO in the Communication + */ +bool FastCommunication::addWFifo(int fifoNr, WindowedFifo* wfifo, + int type, int queue) { + _fifos[fifoNr].wfifo = wfifo; + _fifos[fifoNr].queue = queue; + _fifos[fifoNr].type = type; + _fifos[fifoNr].iswfifo = true; + + // Per FIFO queue, we need two requests, but we store at max MAXNOREQ of them + if (_nrOfRequest < MAXNOREQ) { + _nrOfRequest += 2; + } + + return true; +} + +/* + * Communication: Main update procedure to update + */ +bool FastCommunication::update() { + // Check if you have received a new message + while (spu_stat_in_mbox() > 0) { + uint32_t messageIn = spu_read_in_mbox(); + + uint32_t code = GETFASTCODE(messageIn); + uint32_t queue = GETFASTQUEUE(messageIn); + uint32_t len = GETFASTLEN(messageIn); + + /***************************************************************************************************************/ + if (code == SPE_READ_DEMAND) // One should start a read process + { + // Find out for which fifo the request is + fifoCollection *fifocol = NULL; + int i = 0; + for (i = 0; i < _nrOfQueues; i++) { + if (_fifos[i].queue == queue) { + fifocol = &_fifos[i]; + break; + } + } + + // The queue was not found + if (fifocol == NULL) { + printf("SPU COM> ERROR, this queue does not exists!\n"); + return false; + } + + // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + // Windowed FIFO + if (_fifos[i].iswfifo) { + + WindowedFifo* wfifo = NULL; + wfifo = _fifos[i].wfifo; + + // Find the current tail of the queue from where to read + uint32_t inTail = wfifo->_inTail; + + // This is the len we like to read + len = len > (wfifo->unused()) ? wfifo->unused() : len; + + // We can read something + if (len > 0) { + // Write all information we used to the request-memory + comRequest* request = newRequest(); + + // Has no memory to store a new request + if (request == NULL) { + // not possible to start a request for this SPE --> Send len = 0 + uint32_t message = CREATEFASTMESSAGE( + SPE_READ_COMPLETE, queue, 0); + sendMessage(message, queueFromSPE[queue]); + return false; + } + + // reserve DMA tag ID + uint32_t tag_id; + if ((tag_id = mfc_tag_reserve()) == MFC_TAG_INVALID) { + printf("SPE: ERROR - can't reserve a tag ID\n"); + return false; + } + + // Generate the base address of the input FIFO + uint64_t baseAddress = _fifoTails[queue] + inTail; + + // Align the address to the correct factor (in general 128 or 32) + while (baseAddress % ALIGNMENT_FACTOR_POWER2 != 0) + baseAddress--; + + // Offset --> How much we had to align + uint32_t offset = _fifoTails[queue] + inTail + - baseAddress; + + request->data = (char *) _malloc_align(roundDMA(len + + offset), ALIGNMENT_FACTOR); + + // Store all data in the request buffer + request->len = len; + + request->wfifo = wfifo; + request->iswfifo = true; + + request->status = read_started; + request->queue = queue; + request->offset = offset; + request->tag_id = tag_id; + + // Set up the request in the MFC + mfc_get((void *) &(request->data[0]), baseAddress, + roundDMA(len + offset), tag_id, 0, 0); + } + + else { +#ifndef STORE_REQUESTS // Send len = 0 back to the sender, this one should try it in a later phase + uint32_t message = CREATEFASTMESSAGE( + SPE_READ_COMPLETE, queue, 0); + sendMessage(message, queueFromSPE[queue]); +#else // I store the request and may try in a later time to start the transfer + comRequest* request = newRequest(); + + // Has no memory to store a new request + if (request == NULL) + { + // not possible to start a request for this SPE --> Send len = 0 + uint32_t message = CREATEFASTMESSAGE(SPE_READ_COMPLETE, queue, 0); + sendMessage(message, queueFromSPE[queue]); + return false; + } + + request->len = len; + request->wfifo = wfifo; + request->iswfifo = true; + request->status = read_pending; + request->queue = queue; +#endif + } + } + + // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + // Classic FIFO + else { + Fifo* fifo = NULL; + fifo = _fifos[i].fifo; + + uint32_t inTail = fifo->_inTail; + + // This is the len we like to read + len = len > (fifo->unused()) ? fifo->unused() : len; + + // We can read something + if (len > 0) { + // Write all information we used to the request-memory + comRequest* request = newRequest(); + + // Has no memory to store a new request + if (request == NULL) { + // not possible to start a request for this SPE --> Send len = 0 + uint32_t message = CREATEFASTMESSAGE( + SPE_READ_COMPLETE, queue, 0); + sendMessage(message, queueFromSPE[queue]); + return false; + } + + uint32_t tag_id; + + // reserve DMA tag ID + if ((tag_id = mfc_tag_reserve()) == MFC_TAG_INVALID) { + printf("SPE: ERROR - can't reserve a tag ID\n"); + return false; + } + + uint64_t baseAddress = _fifoTails[queue] + inTail; + + while (baseAddress % ALIGNMENT_FACTOR_POWER2 != 0) + baseAddress--; + + // Offset + uint32_t offset = _fifoTails[queue] + inTail + - baseAddress; + + // Store all data in the request buffer + request->data = (char *) _malloc_align(roundDMA(len + + offset), ALIGNMENT_FACTOR); + request->len = len; + request->fifo = fifo; + request->iswfifo = false; + request->status = read_started; + request->queue = queue; + request->offset = offset; + request->tag_id = tag_id; + + // Tell the fifo that one has reserved some data + + mfc_get((void *) &(request->data[0]), baseAddress, + roundDMA(len + offset), tag_id, 0, 0); + } + + // len == 0 + else { +#ifndef STORE_REQUESTS // Send len = 0 back to the sender, this one should try it in a later phase + uint32_t message = CREATEFASTMESSAGE( + SPE_READ_COMPLETE, queue, 0); + sendMessage(message, queueFromSPE[queue]); +#else // I store the request and may try in a later time to start the transfer + comRequest* request = newRequest(); + + // Has no memory to store a new request + if (request == NULL) + { + // not possible to start a request for this SPE --> Send len = 0 + uint32_t message = CREATEFASTMESSAGE(SPE_READ_COMPLETE, queue, 0); + sendMessage(message, queueFromSPE[queue]); + return false; + } + + request->len = len; + request->fifo = fifo; + request->iswfifo = false; + request->status = read_pending; + request->queue = queue; +#endif + } + } + } + + /***************************************************************************************************************/ + else if (code == SPE_READ_COMPLETE) // A read has finished + { + // Get the stored request + comRequest* request = getRequest(read_request_sent, queue); + if (request == NULL) { + printf( + ">>>>>>>>>>>>>>>>> Communicate SPU> Couldn't find the request\n"); + return 0; + } + + // Inform my FIFO that the request is completed + if (request->iswfifo) + request->wfifo->dmaRead(len); + else + request->fifo->dmaRead(len); + + // Request free + deleteRequest(request); + } + } + + /***************************************************************************************************************/ + // Check if some active write processes have finished + uint8_t req = _currentRequest; + + for (int i = 0; i < _nrOfRequest; i++) // Check all possible requests + { + if (_request[req].valid) // Only to check if the request is valid + { + if (_request[req].status == read_started) { // We have setup the request, now check if it is complete + + // If I cannot send a message to the corresponding processor --> Do not have to check it + if (!(testMessage(queueOnSPE[_request[req].queue]) > 0)) + continue; + + // Check if the specific Tag-ID has completed + uint32_t tag_id = _request[req].tag_id; + mfc_write_tag_mask(1 << tag_id); + mfc_write_tag_update(MFC_TAG_UPDATE_IMMEDIATE); + uint32_t ret = mfc_read_tag_status(); + + if (!((ret & (1 << tag_id)) == 0)) { + + // This update is finished + mfc_tag_release(tag_id); + + // Have to write the data into the fifo + if (_request[req].iswfifo) { // WFIFO + + _request[req].wfifo->dmaWrite( + (char *) _request[req].data + + _request[req].offset, + _request[req].len); + _free_align(_request[req].data); + + // Increase the inTail value (used to know where the last request was started) + _request[req].wfifo->_inTail + = (_request[req].wfifo->_inTail + + _request[req].len) % (FIFO_SIZE[_request[req].queue]); + } else { // FIFO + _request[req].fifo->write( + (char *) _request[req].data + + _request[req].offset, + _request[req].len); + _free_align(_request[req].data); + + _request[req].fifo->_inTail + = (_request[req].fifo->_inTail + + _request[req].len) % (FIFO_SIZE[_request[req].queue]); + + } + + // Inform the sender about the event + uint32_t message = CREATEFASTMESSAGE( + SPE_READ_COMPLETE, _request[req].queue, + _request[req].len); + sendMessage(message, queueFromSPE[_request[req].queue]); + + // Delete the request + deleteRequest(&_request[req]); + _currentRequest = (req + 1) % _nrOfRequest; + + break; // continue is also working + } + } else if (_request[req].status == read_pending) { // Has an open request for sending + + // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + // Windowed FIFO + if (_request[req].iswfifo) { + + if (_request[req].wfifo->unused() > 0) { + comRequest *request = &_request[req]; + + // This is the len we like to read + uint32_t len = request->len > (request->wfifo->unused()) ? request->wfifo->unused() + : request->len; + + // reserve DMA tag ID + uint32_t tag_id; + if ((tag_id = mfc_tag_reserve()) + == MFC_TAG_INVALID) { + printf("SPE: ERROR - can't reserve a tag ID\n"); + return false; + } + + uint32_t inTail = request->wfifo->_inTail; + uint64_t baseAddress = _fifoTails[request->queue] + + inTail; + + while (baseAddress % ALIGNMENT_FACTOR_POWER2 != 0) + baseAddress--; + + // Offset --> How much we had to align + uint32_t offset = _fifoTails[request->queue] + + inTail - baseAddress; + + request->data = (char *) _malloc_align(roundDMA( + len + offset), ALIGNMENT_FACTOR); + request->len = len; + request->status = read_started; + request->offset = offset; + request->tag_id = tag_id; + + // Set up the request in the MFC + mfc_get((void *) &(request->data[0]), baseAddress, + roundDMA(len + offset), tag_id, 0, 0); + } + } + + // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + // Classic FIFO + else { + + if (_request[req].fifo->unused() > 0) { + comRequest *request = &_request[req]; + + // This is the len we like to read + uint32_t len = request->len > (request->fifo->unused()) ? request->fifo->unused() + : request->len; + + uint32_t tag_id; + // reserve DMA tag ID + if ((tag_id = mfc_tag_reserve()) + == MFC_TAG_INVALID) { + printf("SPE: ERROR - can't reserve a tag ID\n"); + return false; + } + + uint32_t inTail = request->fifo->_inTail; + uint64_t baseAddress = _fifoTails[request->queue] + inTail; + + while (baseAddress % ALIGNMENT_FACTOR_POWER2 != 0) + baseAddress--; + + // Offset + uint32_t offset = _fifoTails[request->queue] + + inTail - baseAddress; + + // Store all data in the request buffer + request->data = (char *) _malloc_align(roundDMA( + len + offset), ALIGNMENT_FACTOR); + request->len = len; + request->status = read_started; + request->offset = offset; + request->tag_id = tag_id; + + // Start the DMA transfer + mfc_get((void *) &(request->data[0]), baseAddress, + roundDMA(len + offset), tag_id, 0, 0); + } + } + } + } + // Go to the next Request + req = (req + 1) % _nrOfRequest; + } + + /***************************************************************************************************************/ + // Start a new request to the receiver of a FIFO + for (int i = 0; i < _nrOfQueues; i++) { + if (_fifos[i].type == this->out) // Only out-FIFO have to be quecked + { + if (_fifos[i].iswfifo) { // WFIFO + + // Start only a write process if there is really enough place in the outbound mailbox + if (_fifos[i].wfifo->dmaAllowed() && _fifos[i].wfifo->used() > 0) { + uint32_t queue = _fifos[i].queue; + + // Can we send a message to this processor or is it blocked? + if (testMessage(queueOnSPE[queue]) <= 0) { + continue; + } + + // Write all information we used to the request-memory + comRequest* request = newRequest(); + + // Has no memory to store a new request + if (request == NULL) { + } else { + uint32_t len = _fifos[i].wfifo->dmaStart(); + // Create a write-demand message + uint32_t message = CREATEFASTMESSAGE( + SPE_READ_DEMAND, queue, len); + + request->len = len; + request->queue = queue; + request->status = read_request_sent; + request->wfifo = _fifos[i].wfifo; + request->iswfifo = true; + + sendMessage(message, queueOnSPE[queue]); + } + break; + } + } else { // FIFO + + // Start only a write process if there is really enough place in the outbound mailbox + if (_fifos[i].fifo->dmaAllowed() && _fifos[i].fifo->used() > 0) { + uint32_t queue = _fifos[i].queue; + + // Check if we can send a message to the processor + if (testMessage(queueOnSPE[queue]) <= 0) { + continue; + } + + // Write all information we used to the request-memory + comRequest* request = newRequest(); + + // Has no memory to store a new request + if (request == NULL) { + } else { + uint32_t len = _fifos[i].fifo->dmaStart(); + // Create a write-demand message + uint32_t message = CREATEFASTMESSAGE( + SPE_READ_DEMAND, queue, len); + + request->len = len; + request->queue = queue; + request->status = read_request_sent; + request->fifo = _fifos[i].fifo; + request->iswfifo = false; + + sendMessage(message, queueOnSPE[queue]); + } + break; + } + } + } + } + return true; +} + +/* + * Create a new request to store in the cache + * + */ +comRequest* FastCommunication::newRequest() { + for (int i = 0; i < _nrOfRequest; i++) { + if (!_request[i].valid) { + _request[i].valid = true; + return &(_request[i]); + } + } + + return NULL; +} + +/* + * Delete the request + * + */ +void FastCommunication::deleteRequest(comRequest* request) { + request->valid = false; +} + +/* + * Returns the request one like to get + */ +comRequest* FastCommunication::getRequest(uint8_t status, uint32_t queue) { + uint8_t req = _currentRequest; + + for (int i = 0; i < _nrOfRequest; i++) { + if (_request[req].valid && _request[req].status == status + && _request[req].queue == queue) { + _currentRequest = (req + 1) % _nrOfRequest; + return &(_request[req]); + } + req = (req + 1) % _nrOfRequest; + } + + return NULL; +} + +/** + * True if no communication is necessary, false if there is active communication + */ +bool FastCommunication::empty() { + // check if one fifo would like to send something to another SPE + for (int i = 0; i < _nrOfQueues; i++) { + if (_fifos[i].type == this->out) { + if (_fifos[i].iswfifo) { + if (_fifos[i].wfifo->used() > 0) + { + return false; + } + } else { + if (_fifos[i].fifo->used() > 0) + { + return false; + } + } + } + } + + // are open sendings? + for (int i = 0; i < _nrOfRequest; i++) { + if (_request[i].valid) { + return false; + } + } + + return true; +} + +void FastCommunication::sendMessage(uint32_t message, int32_t process) { + // Simple forward the message to the PPE + if (process <= -1) { + spu_write_out_mbox(message); + } + + // Forward message to the corresponding SPE + else { + uint32_t tag_id; + + // reserve DMA tag ID + if ((tag_id = mfc_tag_reserve()) == MFC_TAG_INVALID) { + printf("SPE: ERROR - can't reserve a tag ID\n"); + return; + } + + //printf("SPE > OUT Message to process = %d, addr = %llx\n", process, _ea_base_all[process]); + write_in_mbox(message, _ea_base_all[process], tag_id); + mfc_tag_release(tag_id); + } +} + +int FastCommunication::testMessage(int32_t process) { + //int32_t process = queueOnSPE[queue]; + + // Simple forward the message to the PPE + if (process <= -1) { + return spu_stat_out_mbox(); + } + + // Forward message to the corresponding SPE + else { + uint32_t tag_id; + + // reserve DMA tag ID + if ((tag_id = mfc_tag_reserve()) == MFC_TAG_INVALID) { + printf("SPE: ERROR - can't reserve a tag ID\n"); + return 0; + } + + // IMPORTANT: THIS IS NOT RACE CONDITION FREE!!!! + int test = status_in_mbox(_ea_base_all[process], tag_id); + mfc_tag_release(tag_id); + return test; + } +}