+#include <bsp.h>
+#include <stdio.h>
+
+#include "appsupport.h"
+#include "rtems_process_wrapper.h"
+#include "traffic_shaping.h"
+
+unsigned int sink_processes;
+void sink_end_process(rtems_task_argument argument);
+void queue_status_process(rtems_task_argument argument);
+void shaping_process(rtems_task_argument argument);
+rtems_task shaping_init(rtems_task_argument argument) {
+ rtems_id task_id;
+ rtems_status_code status;
+ QUEUE_WRAPPER *queue_wrap, *sink_wrap;
+ rtems_name name;
+ int i;
+
+ // instatiate daemons for data transfer notices
+ queue_wrap = (QUEUE_WRAPPER *)malloc(NUMBER_OF_QUEUES *
+ sizeof(QUEUE_WRAPPER));
+ for (i=0; i < NUMBER_OF_QUEUES; i++) {
+ queue_wrap[i].q = scratch_queue_autoinit_shaper(i+1, 0, 0, 0);
+ queue_wrap[i].hasData = 0;
+ }
+
+ for (i=0; i< NUMBER_OF_QUEUES; i++) {
+ name = rtems_build_name('q', 's', i+1, 'c');
+ status = rtems_task_create(name,
+ 127,
+ RTEMS_MINIMUM_STACK_SIZE,
+ RTEMS_DEFAULT_MODES,
+ RTEMS_LOCAL,
+ &task_id, -1);
+ if (status != RTEMS_SUCCESSFUL) {
+ printf("[init ] Create queue process failed (status %d).\n",
+ status);
+ rtems_shutdown_executive(0);
+ }
+ status = rtems_task_start(task_id, queue_status_process,
+ (rtems_task_argument)&queue_wrap[i]);
+ if (status != RTEMS_SUCCESSFUL) {
+ printf("[init ] Start queue process failed (status %d).\n",
+ status);
+ rtems_shutdown_executive(0);
+ }
+ }
+
+ // instatiate daemons to wait for sinks finishing
+ sink_processes = NUMBER_OF_SINKS;
+ sink_wrap = (QUEUE_WRAPPER *)malloc(NUMBER_OF_SINKS *
+ sizeof(QUEUE_WRAPPER));
+ for (i=0; i<NUMBER_OF_SINKS; i++) {
+ sink_wrap[i].sem_terminate = get_sem_terminate(i+1);
+ }
+ for (i=0; i<NUMBER_OF_SINKS; i++) {
+ name = rtems_build_name('s', 't', i+1, 'c');
+ status = rtems_task_create(name,
+ 127,
+ RTEMS_MINIMUM_STACK_SIZE,
+ RTEMS_DEFAULT_MODES,
+ RTEMS_LOCAL,
+ &task_id, -1);
+ if (status != RTEMS_SUCCESSFUL) {
+ printf("[init ] Create sink process failed (status %d).\n",
+ status);
+ rtems_shutdown_executive(0);
+ }
+ status = rtems_task_start(task_id, sink_end_process,
+ (rtems_task_argument)&sink_wrap[i]);
+ if (status != RTEMS_SUCCESSFUL) {
+ printf("[init ] Start sink process failed (status %d).\n",
+ status);
+ rtems_shutdown_executive(0);
+ }
+ }
+
+
+ // intatiate daemon for data transfer
+ name = rtems_build_name('t', 's', NUMBER_OF_QUEUES, 'c');
+ status = rtems_task_create(name,
+ 128,
+ RTEMS_MINIMUM_STACK_SIZE,
+ RTEMS_DEFAULT_MODES,
+ RTEMS_LOCAL,
+ &task_id, -1);
+ if (status != RTEMS_SUCCESSFUL) {
+ printf("[init ] Create shaping process failed (status %d).\n",
+ status);
+ rtems_shutdown_executive(0);
+ }
+ status = rtems_task_start(task_id, shaping_process,
+ (rtems_task_argument)queue_wrap);
+ if (status != RTEMS_SUCCESSFUL) {
+ printf("[init ] Start shaping process failed (status %d).\n",
+ status);
+ rtems_shutdown_executive(0);
+ }
+
+ rtems_task_delete(RTEMS_SELF);
+}
+
+
+void queue_status_process(rtems_task_argument argument)
+{
+ QUEUE_WRAPPER *wrap = (QUEUE_WRAPPER *) argument;
+ SCRATCH_QUEUE_SHAPER *queue = wrap->q;
+
+/* printf("queue:%x, traffice_shaping: "
+ "\n\t sem_shaper_used:%x, "
+ "\n\t sem_shaper_left:%x, "
+ "\n\t semaphore_left:%x, "
+ "\n\t semaphore_used:%x",
+ wrap->q,
+ queue->sem_shaper_used,
+ queue->sem_shaper_left,
+ queue->semaphore_left,
+ queue->semaphore_used
+ ); */
+
+ while (1) {
+ // always USEINTERRUPT
+ ss_sem_wait(queue->sem_shaper_used);
+ ss_sem_wait(queue->sem_shaper_left);
+
+ wrap->hasData ++;
+ }
+}
+
+void sink_end_process(rtems_task_argument argument)
+{
+ QUEUE_WRAPPER *queue_wrap = (QUEUE_WRAPPER *) argument;
+
+ // wait for terminating signal from sink
+ ss_sem_wait(queue_wrap->sem_terminate);
+
+ sink_processes--;
+ if (sink_processes == 0) {
+ SHOW_DEBUG("END!!!");
+ exit(0);
+ }
+}
+
+static int cur_queue; // current queue
+QUEUE_WRAPPER * get_next_queue(QUEUE_WRAPPER * queue_wrap);
+int canSend(QUEUE_WRAPPER * queue);
+void shaping_process(rtems_task_argument argument)
+{
+ QUEUE_WRAPPER *queue_wrap = (QUEUE_WRAPPER *) argument;
+ cur_queue = 0;
+
+ while (1) {
+ QUEUE_WRAPPER *q_w;
+ q_w = get_next_queue(queue_wrap);
+
+ if (canSend(q_w)) {
+ if (q_w->hasData) {
+ scratch_queue_shaperRW(q_w->q);
+ q_w->hasData--;
+ }
+ }
+ }
+
+}
+
+#define PULL_MODE
+// queue scheduling
+QUEUE_WRAPPER * get_next_queue(QUEUE_WRAPPER * queue_wrap)
+{
+ // round robin
+#ifdef PULL_MODE
+ cur_queue = (cur_queue == 0) ? NUMBER_OF_QUEUES : cur_queue;
+ cur_queue --;
+#else // PUSH MODE
+ cur_queue++;
+ cur_queue = (cur_queue == NUMBER_OF_QUEUES) ? 0 : cur_queue;
+#endif
+ return &queue_wrap[cur_queue];
+}
+
+
+// shaping
+int canSend(QUEUE_WRAPPER * queue)
+{
+ return 1;
+}
+
+
+