2 * FastCommunication.cpp
4 * Created on: Mar 3, 2009
7 * Provides the communication between various processors.
13 * Sender: The processor now has some data that need to be forwarded
14 * to another processor.
15 * Receiver: The processor that needs to get the data.
18 * -----> Have some data *
20 * -----------> Receiver *
27 * <-------------- queue to a *
28 * MFC performs the local buffer *
29 * transfer (Alignment) *
42 * Have completed <------------ informs the *
43 * <---------- (queue, len) sender *
45 * pointers in the FIFO
48 * Has two ways to handling request, which are not possible
49 * to work out currently:
50 * 1) Send back "len = 0"
51 * --> Needs to send more messages, but may bigger lens
52 * 2) Store them until you have enough space to read
53 * --> Smaller lens and less messages
56 #include "FastCommunication.h"
61 FastCommunication::FastCommunication(int nrOfQueues, uint64_t ea_base,
62 uint64_t *ea_base_all, int32_t * queueFromSPEIn,
63 int32_t * queueOnSPEIn, uint64_t *fifoTails) {
65 // Set the base address
68 _ea_base_all = ea_base_all;
69 queueFromSPE = queueFromSPEIn;
70 queueOnSPE = queueOnSPEIn;
71 _fifoTails = fifoTails;
73 _nrOfQueues = nrOfQueues;
74 try { _fifos = new fifoCollection[nrOfQueues]; }
75 catch(std::bad_alloc &e) {
76 fprintf(stderr, "[FastCommunication] Memory allocation failure\n");
81 for (int i = 0; i < MAXNOREQ; i++) {
82 _request[i].valid = false;
89 FastCommunication::~FastCommunication() {
94 * Register an additional FIFO in the Communication
96 bool FastCommunication::addFifo(int fifoNr, Fifo* fifo, int type,
98 _fifos[fifoNr].fifo = fifo;
99 _fifos[fifoNr].queue = queue;
100 _fifos[fifoNr].type = type;
101 _fifos[fifoNr].iswfifo = false;
103 // Per FIFO queue, we need two requests, but we store at max MAXNOREQ of them
104 if (_nrOfRequest < MAXNOREQ) {
112 * Register an additional WindowedFIFO in the Communication
114 bool FastCommunication::addWFifo(int fifoNr, WindowedFifo* wfifo,
115 int type, int queue) {
116 _fifos[fifoNr].wfifo = wfifo;
117 _fifos[fifoNr].queue = queue;
118 _fifos[fifoNr].type = type;
119 _fifos[fifoNr].iswfifo = true;
121 // Per FIFO queue, we need two requests, but we store at max MAXNOREQ of them
122 if (_nrOfRequest < MAXNOREQ) {
130 * Communication: Main update procedure to update
132 bool FastCommunication::update() {
133 // Check if you have received a new message
134 while (spu_stat_in_mbox() > 0) {
135 uint32_t messageIn = spu_read_in_mbox();
137 uint32_t code = GETFASTCODE(messageIn);
138 uint32_t queue = GETFASTQUEUE(messageIn);
139 uint32_t len = GETFASTLEN(messageIn);
141 /***************************************************************************************************************/
142 if (code == SPE_READ_DEMAND) // One should start a read process
144 // Find out for which fifo the request is
145 fifoCollection *fifocol = NULL;
147 for (i = 0; i < _nrOfQueues; i++) {
148 if (_fifos[i].queue == queue) {
149 fifocol = &_fifos[i];
154 // The queue was not found
155 if (fifocol == NULL) {
156 printf("SPU COM> ERROR, this queue does not exists!\n");
160 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
162 if (_fifos[i].iswfifo) {
164 WindowedFifo* wfifo = NULL;
165 wfifo = _fifos[i].wfifo;
167 // Find the current tail of the queue from where to read
168 uint32_t inTail = wfifo->_inTail;
170 // This is the len we like to read
171 len = len > (wfifo->unused()) ? wfifo->unused() : len;
173 // We can read something
175 // Write all information we used to the request-memory
176 comRequest* request = newRequest();
178 // Has no memory to store a new request
179 if (request == NULL) {
180 // not possible to start a request for this SPE --> Send len = 0
181 uint32_t message = CREATEFASTMESSAGE(
182 SPE_READ_COMPLETE, queue, 0);
183 sendMessage(message, queueFromSPE[queue]);
187 // reserve DMA tag ID
189 if ((tag_id = mfc_tag_reserve()) == MFC_TAG_INVALID) {
190 printf("SPE: ERROR - can't reserve a tag ID\n");
194 // Generate the base address of the input FIFO
195 uint64_t baseAddress = _fifoTails[queue] + inTail;
197 // Align the address to the correct factor (in general 128 or 32)
198 while (baseAddress % ALIGNMENT_FACTOR_POWER2 != 0)
201 // Offset --> How much we had to align
202 uint32_t offset = _fifoTails[queue] + inTail
205 request->data = (char *) _malloc_align(roundDMA(len
206 + offset), ALIGNMENT_FACTOR);
208 // Store all data in the request buffer
211 request->wfifo = wfifo;
212 request->iswfifo = true;
214 request->status = read_started;
215 request->queue = queue;
216 request->offset = offset;
217 request->tag_id = tag_id;
219 // Set up the request in the MFC
220 mfc_get((void *) &(request->data[0]), baseAddress,
221 roundDMA(len + offset), tag_id, 0, 0);
225 #ifndef STORE_REQUESTS // Send len = 0 back to the sender, this one should try it in a later phase
226 uint32_t message = CREATEFASTMESSAGE(
227 SPE_READ_COMPLETE, queue, 0);
228 sendMessage(message, queueFromSPE[queue]);
229 #else // I store the request and may try in a later time to start the transfer
230 comRequest* request = newRequest();
232 // Has no memory to store a new request
235 // not possible to start a request for this SPE --> Send len = 0
236 uint32_t message = CREATEFASTMESSAGE(SPE_READ_COMPLETE, queue, 0);
237 sendMessage(message, queueFromSPE[queue]);
242 request->wfifo = wfifo;
243 request->iswfifo = true;
244 request->status = read_pending;
245 request->queue = queue;
250 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
254 fifo = _fifos[i].fifo;
256 uint32_t inTail = fifo->_inTail;
258 // This is the len we like to read
259 len = len > (fifo->unused()) ? fifo->unused() : len;
261 // We can read something
263 // Write all information we used to the request-memory
264 comRequest* request = newRequest();
266 // Has no memory to store a new request
267 if (request == NULL) {
268 // not possible to start a request for this SPE --> Send len = 0
269 uint32_t message = CREATEFASTMESSAGE(
270 SPE_READ_COMPLETE, queue, 0);
271 sendMessage(message, queueFromSPE[queue]);
277 // reserve DMA tag ID
278 if ((tag_id = mfc_tag_reserve()) == MFC_TAG_INVALID) {
279 printf("SPE: ERROR - can't reserve a tag ID\n");
283 uint64_t baseAddress = _fifoTails[queue] + inTail;
285 while (baseAddress % ALIGNMENT_FACTOR_POWER2 != 0)
289 uint32_t offset = _fifoTails[queue] + inTail
292 // Store all data in the request buffer
293 request->data = (char *) _malloc_align(roundDMA(len
294 + offset), ALIGNMENT_FACTOR);
296 request->fifo = fifo;
297 request->iswfifo = false;
298 request->status = read_started;
299 request->queue = queue;
300 request->offset = offset;
301 request->tag_id = tag_id;
303 // Tell the fifo that one has reserved some data
305 mfc_get((void *) &(request->data[0]), baseAddress,
306 roundDMA(len + offset), tag_id, 0, 0);
311 #ifndef STORE_REQUESTS // Send len = 0 back to the sender, this one should try it in a later phase
312 uint32_t message = CREATEFASTMESSAGE(
313 SPE_READ_COMPLETE, queue, 0);
314 sendMessage(message, queueFromSPE[queue]);
315 #else // I store the request and may try in a later time to start the transfer
316 comRequest* request = newRequest();
318 // Has no memory to store a new request
321 // not possible to start a request for this SPE --> Send len = 0
322 uint32_t message = CREATEFASTMESSAGE(SPE_READ_COMPLETE, queue, 0);
323 sendMessage(message, queueFromSPE[queue]);
328 request->fifo = fifo;
329 request->iswfifo = false;
330 request->status = read_pending;
331 request->queue = queue;
337 /***************************************************************************************************************/
338 else if (code == SPE_READ_COMPLETE) // A read has finished
340 // Get the stored request
341 comRequest* request = getRequest(read_request_sent, queue);
342 if (request == NULL) {
344 ">>>>>>>>>>>>>>>>> Communicate SPU> Couldn't find the request\n");
348 // Inform my FIFO that the request is completed
349 if (request->iswfifo)
350 request->wfifo->dmaRead(len);
352 request->fifo->dmaRead(len);
355 deleteRequest(request);
359 /***************************************************************************************************************/
360 // Check if some active write processes have finished
361 uint8_t req = _currentRequest;
363 for (int i = 0; i < _nrOfRequest; i++) // Check all possible requests
365 if (_request[req].valid) // Only to check if the request is valid
367 if (_request[req].status == read_started) { // We have setup the request, now check if it is complete
369 // If I cannot send a message to the corresponding processor --> Do not have to check it
370 if (!(testMessage(queueOnSPE[_request[req].queue]) > 0))
373 // Check if the specific Tag-ID has completed
374 uint32_t tag_id = _request[req].tag_id;
375 mfc_write_tag_mask(1 << tag_id);
376 mfc_write_tag_update(MFC_TAG_UPDATE_IMMEDIATE);
377 uint32_t ret = mfc_read_tag_status();
379 if (!((ret & (1 << tag_id)) == 0)) {
381 // This update is finished
382 mfc_tag_release(tag_id);
384 // Have to write the data into the fifo
385 if (_request[req].iswfifo) { // WFIFO
387 _request[req].wfifo->dmaWrite(
388 (char *) _request[req].data
389 + _request[req].offset,
391 _free_align(_request[req].data);
393 // Increase the inTail value (used to know where the last request was started)
394 _request[req].wfifo->_inTail
395 = (_request[req].wfifo->_inTail
396 + _request[req].len) % (FIFO_SIZE[_request[req].queue]);
398 _request[req].fifo->write(
399 (char *) _request[req].data
400 + _request[req].offset,
402 _free_align(_request[req].data);
404 _request[req].fifo->_inTail
405 = (_request[req].fifo->_inTail
406 + _request[req].len) % (FIFO_SIZE[_request[req].queue]);
410 // Inform the sender about the event
411 uint32_t message = CREATEFASTMESSAGE(
412 SPE_READ_COMPLETE, _request[req].queue,
414 sendMessage(message, queueFromSPE[_request[req].queue]);
416 // Delete the request
417 deleteRequest(&_request[req]);
418 _currentRequest = (req + 1) % _nrOfRequest;
420 break; // continue is also working
422 } else if (_request[req].status == read_pending) { // Has an open request for sending
424 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
426 if (_request[req].iswfifo) {
428 if (_request[req].wfifo->unused() > 0) {
429 comRequest *request = &_request[req];
431 // This is the len we like to read
432 uint32_t len = request->len > (request->wfifo->unused()) ? request->wfifo->unused()
435 // reserve DMA tag ID
437 if ((tag_id = mfc_tag_reserve())
438 == MFC_TAG_INVALID) {
439 printf("SPE: ERROR - can't reserve a tag ID\n");
443 uint32_t inTail = request->wfifo->_inTail;
444 uint64_t baseAddress = _fifoTails[request->queue]
447 while (baseAddress % ALIGNMENT_FACTOR_POWER2 != 0)
450 // Offset --> How much we had to align
451 uint32_t offset = _fifoTails[request->queue]
452 + inTail - baseAddress;
454 request->data = (char *) _malloc_align(roundDMA(
455 len + offset), ALIGNMENT_FACTOR);
457 request->status = read_started;
458 request->offset = offset;
459 request->tag_id = tag_id;
461 // Set up the request in the MFC
462 mfc_get((void *) &(request->data[0]), baseAddress,
463 roundDMA(len + offset), tag_id, 0, 0);
467 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
471 if (_request[req].fifo->unused() > 0) {
472 comRequest *request = &_request[req];
474 // This is the len we like to read
475 uint32_t len = request->len > (request->fifo->unused()) ? request->fifo->unused()
479 // reserve DMA tag ID
480 if ((tag_id = mfc_tag_reserve())
481 == MFC_TAG_INVALID) {
482 printf("SPE: ERROR - can't reserve a tag ID\n");
486 uint32_t inTail = request->fifo->_inTail;
487 uint64_t baseAddress = _fifoTails[request->queue] + inTail;
489 while (baseAddress % ALIGNMENT_FACTOR_POWER2 != 0)
493 uint32_t offset = _fifoTails[request->queue]
494 + inTail - baseAddress;
496 // Store all data in the request buffer
497 request->data = (char *) _malloc_align(roundDMA(
498 len + offset), ALIGNMENT_FACTOR);
500 request->status = read_started;
501 request->offset = offset;
502 request->tag_id = tag_id;
504 // Start the DMA transfer
505 mfc_get((void *) &(request->data[0]), baseAddress,
506 roundDMA(len + offset), tag_id, 0, 0);
511 // Go to the next Request
512 req = (req + 1) % _nrOfRequest;
515 /***************************************************************************************************************/
516 // Start a new request to the receiver of a FIFO
517 for (int i = 0; i < _nrOfQueues; i++) {
518 if (_fifos[i].type == this->out) // Only out-FIFO have to be quecked
520 if (_fifos[i].iswfifo) { // WFIFO
522 // Start only a write process if there is really enough place in the outbound mailbox
523 if (_fifos[i].wfifo->dmaAllowed() && _fifos[i].wfifo->used() > 0) {
524 uint32_t queue = _fifos[i].queue;
526 // Can we send a message to this processor or is it blocked?
527 if (testMessage(queueOnSPE[queue]) <= 0) {
531 // Write all information we used to the request-memory
532 comRequest* request = newRequest();
534 // Has no memory to store a new request
535 if (request == NULL) {
537 uint32_t len = _fifos[i].wfifo->dmaStart();
538 // Create a write-demand message
539 uint32_t message = CREATEFASTMESSAGE(
540 SPE_READ_DEMAND, queue, len);
543 request->queue = queue;
544 request->status = read_request_sent;
545 request->wfifo = _fifos[i].wfifo;
546 request->iswfifo = true;
548 sendMessage(message, queueOnSPE[queue]);
554 // Start only a write process if there is really enough place in the outbound mailbox
555 if (_fifos[i].fifo->dmaAllowed() && _fifos[i].fifo->used() > 0) {
556 uint32_t queue = _fifos[i].queue;
558 // Check if we can send a message to the processor
559 if (testMessage(queueOnSPE[queue]) <= 0) {
563 // Write all information we used to the request-memory
564 comRequest* request = newRequest();
566 // Has no memory to store a new request
567 if (request == NULL) {
569 uint32_t len = _fifos[i].fifo->dmaStart();
570 // Create a write-demand message
571 uint32_t message = CREATEFASTMESSAGE(
572 SPE_READ_DEMAND, queue, len);
575 request->queue = queue;
576 request->status = read_request_sent;
577 request->fifo = _fifos[i].fifo;
578 request->iswfifo = false;
580 sendMessage(message, queueOnSPE[queue]);
591 * Create a new request to store in the cache
594 comRequest* FastCommunication::newRequest() {
595 for (int i = 0; i < _nrOfRequest; i++) {
596 if (!_request[i].valid) {
597 _request[i].valid = true;
598 return &(_request[i]);
609 void FastCommunication::deleteRequest(comRequest* request) {
610 request->valid = false;
614 * Returns the request one like to get
616 comRequest* FastCommunication::getRequest(uint8_t status, uint32_t queue) {
617 uint8_t req = _currentRequest;
619 for (int i = 0; i < _nrOfRequest; i++) {
620 if (_request[req].valid && _request[req].status == status
621 && _request[req].queue == queue) {
622 _currentRequest = (req + 1) % _nrOfRequest;
623 return &(_request[req]);
625 req = (req + 1) % _nrOfRequest;
632 * True if no communication is necessary, false if there is active communication
634 bool FastCommunication::empty() {
635 // check if one fifo would like to send something to another SPE
636 for (int i = 0; i < _nrOfQueues; i++) {
637 if (_fifos[i].type == this->out) {
638 if (_fifos[i].iswfifo) {
639 if (_fifos[i].wfifo->used() > 0)
644 if (_fifos[i].fifo->used() > 0)
652 // are open sendings?
653 for (int i = 0; i < _nrOfRequest; i++) {
654 if (_request[i].valid) {
662 void FastCommunication::sendMessage(uint32_t message, int32_t process) {
663 // Simple forward the message to the PPE
665 spu_write_out_mbox(message);
668 // Forward message to the corresponding SPE
672 // reserve DMA tag ID
673 if ((tag_id = mfc_tag_reserve()) == MFC_TAG_INVALID) {
674 printf("SPE: ERROR - can't reserve a tag ID\n");
678 //printf("SPE > OUT Message to process = %d, addr = %llx\n", process, _ea_base_all[process]);
679 write_in_mbox(message, _ea_base_all[process], tag_id);
680 mfc_tag_release(tag_id);
684 int FastCommunication::testMessage(int32_t process) {
685 //int32_t process = queueOnSPE[queue];
687 // Simple forward the message to the PPE
689 return spu_stat_out_mbox();
692 // Forward message to the corresponding SPE
696 // reserve DMA tag ID
697 if ((tag_id = mfc_tag_reserve()) == MFC_TAG_INVALID) {
698 printf("SPE: ERROR - can't reserve a tag ID\n");
702 // IMPORTANT: THIS IS NOT RACE CONDITION FREE!!!!
703 int test = status_in_mbox(_ea_base_all[process], tag_id);
704 mfc_tag_release(tag_id);