X-Git-Url: http://sraa.de/git/?a=blobdiff_plain;f=dol%2Fsrc%2Fdol%2Fvisitor%2Fcbe%2Flib%2Fppu_main_workloop.h;fp=dol%2Fsrc%2Fdol%2Fvisitor%2Fcbe%2Flib%2Fppu_main_workloop.h;h=874bf326c9ece7ac4ea1fb9ff23c06f36391fa89;hb=8c411cf24ed0eb889191aaeafd8fa1e69081df42;hp=0000000000000000000000000000000000000000;hpb=dea7a4fb1ed110d3ce6e6d9255103d724bd66c0e;p=jump.git diff --git a/dol/src/dol/visitor/cbe/lib/ppu_main_workloop.h b/dol/src/dol/visitor/cbe/lib/ppu_main_workloop.h new file mode 100644 index 0000000..874bf32 --- /dev/null +++ b/dol/src/dol/visitor/cbe/lib/ppu_main_workloop.h @@ -0,0 +1,235 @@ +/**************************************************************** + * Main Loop function + * Creator: lschor, 2008-11-21 + * Description: Includes the main loop function for the ppu_main function + * + * Revision: + * - 2008-11-21: Created + */ + +void workloop() +{ + // Buffer to an address for sending to to the PPE + volatile char *dataToWriteP __attribute__ ((aligned(128))); + + // Number of corrent working process + int processNr; + + // General variables that have been used in the main workloop + uint32_t message; + uint32_t request; + uint32_t len; + uint32_t queue; + + struct timespec t; + t.tv_sec = 0; + t.tv_nsec = 5; + + while (1) + { + // Goes through all nodes and check if they like to send some data + // - Which node has some information + // - Run through all nodes and if one has some work, do that + for (processNr = 0; processNr < NUM_SPES; processNr++) + { + if (!spe_out_mbox_status(data[processNr].spe_ctx)) continue; + + // Get the data + spe_out_mbox_read(data[processNr].spe_ctx, &message, 1); + + request = message >> 28; + len = (0xffff & message); + queue = (message >> 16) & 0xfff; + + // What type of request are the data? + if (request == SPE_WRITE_DEMAND) // Want an address to write + { + + // If there is not enough space in the queue + if (MAXELEMENT - locBufCount[queue] < len) + { + int message = (0 << 28) | (queue << 16) | locBufCount[queue]; + spe_in_mbox_write(data[processNr].spe_ctx, (uint32_t*)&message, 1, SPE_MBOX_ANY_NONBLOCKING); + } + + // There is enough space in the queue + else + { + #ifdef MEASURE_SPE_WRITE_DEMAND + struct timeval t_ppe_write_demand_start, t_ppe_write_demand_end; + gettimeofday(&t_ppe_write_demand_start,NULL); + #endif + int message = (1 << 28) | (queue << 16) | len; + spe_in_mbox_write(data[processNr].spe_ctx, (uint32_t*)&message, 1, SPE_MBOX_ANY_NONBLOCKING); + + // Create the memory for the element + dataToWriteP = (char *)malloc(roundDMA(len)); + + // Create the address to send + uint64_t ea; + ea = (uint64_t)dataToWriteP; + + // Enter the information into the temporary Buffer + tmpFifoEntryWrite[processNr].ea = ea; + tmpFifoEntryWrite[processNr].length=len; + tmpFifoEntryWrite[processNr].queue = queue; + + // Send the address + spe_in_mbox_write(data[processNr].spe_ctx, (uint32_t*)&ea, 2,SPE_MBOX_ANY_NONBLOCKING); + #ifdef MEASURE_SPE_WRITE_DEMAND + gettimeofday(&t_ppe_write_demand_end, NULL); + printf("PPE_WRITE_DEMAND;%f\n",(t_ppe_write_demand_end.tv_sec - t_ppe_write_demand_start.tv_sec) + 0.000001 * (t_ppe_write_demand_end.tv_usec - t_ppe_write_demand_start.tv_usec)); + #endif + } + } + + /***************************************************************************************************************/ + else if (request == SPE_READ_DEMAND) // Want an address to read + { + // If no element is in the fifo queue + if (locBufCount[queue] <= 0) + { + int message = (0 << 28) | (queue << 16) | locBufCount[queue]; + spe_in_mbox_write(data[processNr].spe_ctx, (uint32_t*)&message, 1, SPE_MBOX_ANY_NONBLOCKING); + } + + // Has some elements in the fifo queue + else + { + #ifdef MEASURE_SPE_READ_DEMAND + struct timeval t_ppe_read_demand_start, t_ppe_read_demand_end; + gettimeofday(&t_ppe_read_demand_start,NULL); + #endif + + pthread_mutex_lock( &(mutex[queue]) ); + + len = len > locBufCount[queue] ? locBufCount[queue] : len; + + // Can only send special sizes over the DMA + // 1, 2, 4, 8, 16, ..., 16*x + + if (len >= 16) + len = len - (len % 16); + else if (len >= 8) + len = 8; + else if (len >= 4) + len = 4; + else if (len >= 2) + len = 2; + else + len = 1; + + int message = (1 << 28) | (queue << 16) | len; + spe_in_mbox_write(data[processNr].spe_ctx, (uint32_t*)&message, 1, SPE_MBOX_ANY_NONBLOCKING); + + char *buf = (char *)_malloc_align(sizeof(char) * len, 4); + + // Can directly read all elements from the end of the buffer + if (locBufStart[queue] + len < MAXELEMENT) + { + memcpy(buf, &(locBuf[queue][locBufStart[queue]]), len); + } + + // Has to write from the end and from the beginning of the buffer + else + { + memcpy(&(buf[0]), &(locBuf[queue][locBufStart[queue]]), MAXELEMENT - locBufStart[queue]); + memcpy(&(buf[MAXELEMENT - locBufStart[queue]]), locBuf[queue], len - MAXELEMENT + locBufStart[queue]); + } + + locBufCount[queue] -= len; + locBufStart[queue] = (locBufStart[queue] + len) % MAXELEMENT; + + pthread_mutex_unlock( & (mutex[queue]) ); + + // Create the address to send + uint64_t ea; + ea = (uint64_t)buf; + + // Enter the information into the temporary Buffer + tmpFifoEntryRead[processNr].ea = ea; + tmpFifoEntryRead[processNr].length = len; + tmpFifoEntryRead[processNr].queue = queue; + + spe_in_mbox_write(data[processNr].spe_ctx, (uint32_t*)&(tmpFifoEntryRead[processNr].ea), 2,SPE_MBOX_ANY_NONBLOCKING); + #ifdef MEASURE_SPE_READ_DEMAND + gettimeofday(&t_ppe_read_demand_end, NULL); + printf("PPE;%f\n",(t_ppe_read_demand_end.tv_sec - t_ppe_read_demand_start.tv_sec) + 0.000001 * (t_ppe_read_demand_end.tv_usec - t_ppe_read_demand_start.tv_usec)); + #endif + } + } + + /***************************************************************************************************************/ + else if (request == SPE_WRITE_SUC) // Has finished writing + { + #ifdef MEASURE_SPE_WRITE_SUC + struct timeval t_ppe_write_suc_start, t_ppe_write_suc_end; + gettimeofday(&t_ppe_write_suc_start,NULL); + #endif + // Enter the data into the FIFO + int queueW = tmpFifoEntryWrite[processNr].queue; + int lenW = tmpFifoEntryWrite[processNr].length; + char *bufferW = (char *)(tmpFifoEntryWrite[processNr].ea); + + pthread_mutex_lock( &(mutex[queueW]) ); + + // Has to write all elements at the beginning or write all elements at the end of the buffer + if (locBufStart[queueW] + locBufCount[queueW] > MAXELEMENT || locBufStart[queueW] + locBufCount[queueW] + lenW < MAXELEMENT) + { + memcpy(&(locBuf[queueW][(locBufStart[queueW] + locBufCount[queueW]) % MAXELEMENT]), bufferW, lenW); + } + + // Otherwise something at the end and something at the beginning + else + { + // At the end of the buffer + memcpy(&(locBuf[queueW][(locBufStart[queueW] + locBufCount[queueW])]), bufferW, MAXELEMENT - locBufStart[queueW] - locBufCount[queueW]); + + // At the beginning + memcpy(locBuf[queueW], &(bufferW[MAXELEMENT - locBufStart[queueW] - locBufCount[queueW]]), lenW - (MAXELEMENT - locBufStart[queueW] - locBufCount[queueW])); + } + + locBufCount[queueW]+=lenW; + + pthread_mutex_unlock( & (mutex[queueW]) ); + + // Set the temporary entry to null + #ifdef MEASURE_SPE_WRITE_SUC + gettimeofday(&t_ppe_write_suc_end, NULL); + printf("PPE;%f\n",(t_ppe_write_suc_end.tv_sec - t_ppe_write_suc_start.tv_sec) + 0.000001 * (t_ppe_write_suc_end.tv_usec - t_ppe_write_suc_start.tv_usec)); + #endif + } + + /***************************************************************************************************************/ + else if (request == SPE_READ_SUC) // Has finished reading + { + #ifdef MEASURE_SPE_READ_SUC + struct timeval t_ppe_read_suc_start, t_ppe_read_suc_end; + gettimeofday(&t_ppe_read_suc_start,NULL); + #endif + #ifdef MEASURE_SPE_READ_SUC + gettimeofday(&t_ppe_read_suc_end, NULL); + printf("PPE;%f\n",(t_ppe_read_suc_end.tv_sec - t_ppe_read_suc_start.tv_sec) + 0.000001 * (t_ppe_read_suc_end.tv_usec - t_ppe_read_suc_start.tv_usec)); + #endif + } + + /***************************************************************************************************************/ + else if (request == SPE_DETACH) // A process has finished + { + pthread_mutex_lock( &(mutexProcessNr) ); + processFinished++; + pthread_mutex_unlock( &(mutexProcessNr) ); + } + + /***************************************************************************************************************/ + else // Did nothing + { + nanosleep(&t, NULL); + } + } + + // Check if we can stop the execution + if (processFinished >= NUM_PROCS) break; + } +} +