dol: initial dol commit
[jump.git] / dol / src / dol / visitor / rtems / lib / traffic_shaping.c
diff --git a/dol/src/dol/visitor/rtems/lib/traffic_shaping.c b/dol/src/dol/visitor/rtems/lib/traffic_shaping.c
new file mode 100644 (file)
index 0000000..d47d2f5
--- /dev/null
@@ -0,0 +1,189 @@
+#include <bsp.h>
+#include <stdio.h>
+
+#include "appsupport.h"
+#include "rtems_process_wrapper.h"
+#include "traffic_shaping.h"
+
+unsigned int sink_processes;
+void sink_end_process(rtems_task_argument argument);
+void queue_status_process(rtems_task_argument argument);
+void shaping_process(rtems_task_argument argument);
+rtems_task shaping_init(rtems_task_argument argument) {
+    rtems_id task_id;
+    rtems_status_code status;
+    QUEUE_WRAPPER *queue_wrap, *sink_wrap;
+    rtems_name name;
+    int i;
+
+    // instatiate daemons for data transfer notices
+    queue_wrap = (QUEUE_WRAPPER *)malloc(NUMBER_OF_QUEUES *
+                                          sizeof(QUEUE_WRAPPER));
+    for (i=0; i < NUMBER_OF_QUEUES; i++) {
+        queue_wrap[i].q = scratch_queue_autoinit_shaper(i+1, 0, 0, 0);
+        queue_wrap[i].hasData = 0;
+    }
+
+    for (i=0; i< NUMBER_OF_QUEUES; i++) {
+        name = rtems_build_name('q', 's', i+1, 'c');
+        status = rtems_task_create(name,
+                                   127,
+                                   RTEMS_MINIMUM_STACK_SIZE,
+                                   RTEMS_DEFAULT_MODES,
+                                   RTEMS_LOCAL,
+                                   &task_id, -1);
+        if (status != RTEMS_SUCCESSFUL) {
+            printf("[init    ] Create queue process failed (status %d).\n",
+                   status);
+            rtems_shutdown_executive(0);
+        }
+        status = rtems_task_start(task_id, queue_status_process,
+                                  (rtems_task_argument)&queue_wrap[i]);
+        if (status != RTEMS_SUCCESSFUL) {
+            printf("[init    ] Start queue process failed (status %d).\n",
+                   status);
+            rtems_shutdown_executive(0);
+        }
+    }
+
+    // instatiate daemons to wait for sinks finishing
+    sink_processes = NUMBER_OF_SINKS;
+    sink_wrap = (QUEUE_WRAPPER *)malloc(NUMBER_OF_SINKS *
+                                        sizeof(QUEUE_WRAPPER));
+    for (i=0; i<NUMBER_OF_SINKS; i++) {
+        sink_wrap[i].sem_terminate = get_sem_terminate(i+1);
+    }
+    for (i=0; i<NUMBER_OF_SINKS; i++) {
+        name = rtems_build_name('s', 't', i+1, 'c');
+        status = rtems_task_create(name,
+                                   127,
+                                   RTEMS_MINIMUM_STACK_SIZE,
+                                   RTEMS_DEFAULT_MODES,
+                                   RTEMS_LOCAL,
+                                   &task_id, -1);
+        if (status != RTEMS_SUCCESSFUL) {
+            printf("[init    ] Create sink process failed (status %d).\n",
+                   status);
+            rtems_shutdown_executive(0);
+        }
+        status = rtems_task_start(task_id, sink_end_process,
+                                  (rtems_task_argument)&sink_wrap[i]);
+        if (status != RTEMS_SUCCESSFUL) {
+            printf("[init    ] Start sink process failed (status %d).\n",
+                   status);
+            rtems_shutdown_executive(0);
+        }
+    }
+
+    
+    // intatiate daemon for data transfer
+    name = rtems_build_name('t', 's', NUMBER_OF_QUEUES, 'c');
+    status = rtems_task_create(name,
+                               128,
+                               RTEMS_MINIMUM_STACK_SIZE,
+                               RTEMS_DEFAULT_MODES,
+                               RTEMS_LOCAL,
+                               &task_id, -1);
+    if (status != RTEMS_SUCCESSFUL) {
+        printf("[init    ] Create shaping process failed (status %d).\n",
+               status);
+        rtems_shutdown_executive(0);
+    }
+    status = rtems_task_start(task_id, shaping_process,
+                              (rtems_task_argument)queue_wrap);
+    if (status != RTEMS_SUCCESSFUL) {
+        printf("[init    ] Start shaping process failed (status %d).\n",
+               status);
+        rtems_shutdown_executive(0);
+    }
+
+    rtems_task_delete(RTEMS_SELF);
+}
+
+
+void queue_status_process(rtems_task_argument argument) 
+{
+    QUEUE_WRAPPER *wrap = (QUEUE_WRAPPER *) argument;
+    SCRATCH_QUEUE_SHAPER *queue = wrap->q;
+
+/*    printf("queue:%x, traffice_shaping: "
+           "\n\t sem_shaper_used:%x, "
+           "\n\t sem_shaper_left:%x, "
+           "\n\t semaphore_left:%x, "
+           "\n\t semaphore_used:%x",
+           wrap->q,
+           queue->sem_shaper_used,
+           queue->sem_shaper_left,
+           queue->semaphore_left,
+           queue->semaphore_used
+           ); */
+    
+    while (1) {
+        // always USEINTERRUPT
+        ss_sem_wait(queue->sem_shaper_used);
+        ss_sem_wait(queue->sem_shaper_left);
+
+        wrap->hasData ++;
+    }
+}
+
+void sink_end_process(rtems_task_argument argument)
+{
+    QUEUE_WRAPPER *queue_wrap = (QUEUE_WRAPPER *) argument;
+
+    // wait for terminating signal from sink
+    ss_sem_wait(queue_wrap->sem_terminate);
+
+    sink_processes--;
+    if (sink_processes == 0) {
+        SHOW_DEBUG("END!!!");
+        exit(0);
+    }
+}
+
+static int cur_queue; // current queue
+QUEUE_WRAPPER * get_next_queue(QUEUE_WRAPPER * queue_wrap);
+int canSend(QUEUE_WRAPPER * queue);
+void shaping_process(rtems_task_argument argument) 
+{
+    QUEUE_WRAPPER *queue_wrap = (QUEUE_WRAPPER *) argument;
+    cur_queue = 0;
+
+    while (1) {
+        QUEUE_WRAPPER *q_w;
+        q_w = get_next_queue(queue_wrap);
+
+        if (canSend(q_w)) {
+            if (q_w->hasData) {
+                scratch_queue_shaperRW(q_w->q);
+                q_w->hasData--;
+            }
+        }
+    }
+    
+}
+
+#define PULL_MODE
+// queue scheduling
+QUEUE_WRAPPER * get_next_queue(QUEUE_WRAPPER * queue_wrap) 
+{
+    // round robin
+#ifdef PULL_MODE
+    cur_queue = (cur_queue == 0) ? NUMBER_OF_QUEUES : cur_queue;
+    cur_queue --;
+#else // PUSH MODE
+    cur_queue++;
+    cur_queue = (cur_queue == NUMBER_OF_QUEUES) ? 0 : cur_queue;
+#endif
+    return &queue_wrap[cur_queue];
+}
+
+
+// shaping
+int canSend(QUEUE_WRAPPER * queue)
+{
+    return 1;
+}
+
+
+