dol: initial dol commit
[jump.git] / dol / src / dol / visitor / rtems / lib / traffic_shaping.c
1 #include <bsp.h>
2 #include <stdio.h>
3
4 #include "appsupport.h"
5 #include "rtems_process_wrapper.h"
6 #include "traffic_shaping.h"
7
8 unsigned int sink_processes;
9 void sink_end_process(rtems_task_argument argument);
10 void queue_status_process(rtems_task_argument argument);
11 void shaping_process(rtems_task_argument argument);
12 rtems_task shaping_init(rtems_task_argument argument) {
13     rtems_id task_id;
14     rtems_status_code status;
15     QUEUE_WRAPPER *queue_wrap, *sink_wrap;
16     rtems_name name;
17     int i;
18
19     // instatiate daemons for data transfer notices
20     queue_wrap = (QUEUE_WRAPPER *)malloc(NUMBER_OF_QUEUES *
21                                           sizeof(QUEUE_WRAPPER));
22     for (i=0; i < NUMBER_OF_QUEUES; i++) {
23         queue_wrap[i].q = scratch_queue_autoinit_shaper(i+1, 0, 0, 0);
24         queue_wrap[i].hasData = 0;
25     }
26
27     for (i=0; i< NUMBER_OF_QUEUES; i++) {
28         name = rtems_build_name('q', 's', i+1, 'c');
29         status = rtems_task_create(name,
30                                    127,
31                                    RTEMS_MINIMUM_STACK_SIZE,
32                                    RTEMS_DEFAULT_MODES,
33                                    RTEMS_LOCAL,
34                                    &task_id, -1);
35         if (status != RTEMS_SUCCESSFUL) {
36             printf("[init    ] Create queue process failed (status %d).\n",
37                    status);
38             rtems_shutdown_executive(0);
39         }
40         status = rtems_task_start(task_id, queue_status_process,
41                                   (rtems_task_argument)&queue_wrap[i]);
42         if (status != RTEMS_SUCCESSFUL) {
43             printf("[init    ] Start queue process failed (status %d).\n",
44                    status);
45             rtems_shutdown_executive(0);
46         }
47     }
48
49     // instatiate daemons to wait for sinks finishing
50     sink_processes = NUMBER_OF_SINKS;
51     sink_wrap = (QUEUE_WRAPPER *)malloc(NUMBER_OF_SINKS *
52                                         sizeof(QUEUE_WRAPPER));
53     for (i=0; i<NUMBER_OF_SINKS; i++) {
54         sink_wrap[i].sem_terminate = get_sem_terminate(i+1);
55     }
56     for (i=0; i<NUMBER_OF_SINKS; i++) {
57         name = rtems_build_name('s', 't', i+1, 'c');
58         status = rtems_task_create(name,
59                                    127,
60                                    RTEMS_MINIMUM_STACK_SIZE,
61                                    RTEMS_DEFAULT_MODES,
62                                    RTEMS_LOCAL,
63                                    &task_id, -1);
64         if (status != RTEMS_SUCCESSFUL) {
65             printf("[init    ] Create sink process failed (status %d).\n",
66                    status);
67             rtems_shutdown_executive(0);
68         }
69         status = rtems_task_start(task_id, sink_end_process,
70                                   (rtems_task_argument)&sink_wrap[i]);
71         if (status != RTEMS_SUCCESSFUL) {
72             printf("[init    ] Start sink process failed (status %d).\n",
73                    status);
74             rtems_shutdown_executive(0);
75         }
76     }
77
78     
79     // intatiate daemon for data transfer
80     name = rtems_build_name('t', 's', NUMBER_OF_QUEUES, 'c');
81     status = rtems_task_create(name,
82                                128,
83                                RTEMS_MINIMUM_STACK_SIZE,
84                                RTEMS_DEFAULT_MODES,
85                                RTEMS_LOCAL,
86                                &task_id, -1);
87     if (status != RTEMS_SUCCESSFUL) {
88         printf("[init    ] Create shaping process failed (status %d).\n",
89                status);
90         rtems_shutdown_executive(0);
91     }
92     status = rtems_task_start(task_id, shaping_process,
93                               (rtems_task_argument)queue_wrap);
94     if (status != RTEMS_SUCCESSFUL) {
95         printf("[init    ] Start shaping process failed (status %d).\n",
96                status);
97         rtems_shutdown_executive(0);
98     }
99
100     rtems_task_delete(RTEMS_SELF);
101 }
102
103
104 void queue_status_process(rtems_task_argument argument) 
105 {
106     QUEUE_WRAPPER *wrap = (QUEUE_WRAPPER *) argument;
107     SCRATCH_QUEUE_SHAPER *queue = wrap->q;
108
109 /*    printf("queue:%x, traffice_shaping: "
110            "\n\t sem_shaper_used:%x, "
111            "\n\t sem_shaper_left:%x, "
112            "\n\t semaphore_left:%x, "
113            "\n\t semaphore_used:%x",
114            wrap->q,
115            queue->sem_shaper_used,
116            queue->sem_shaper_left,
117            queue->semaphore_left,
118            queue->semaphore_used
119            ); */
120     
121     while (1) {
122         // always USEINTERRUPT
123         ss_sem_wait(queue->sem_shaper_used);
124         ss_sem_wait(queue->sem_shaper_left);
125
126         wrap->hasData ++;
127     }
128 }
129
130 void sink_end_process(rtems_task_argument argument)
131 {
132     QUEUE_WRAPPER *queue_wrap = (QUEUE_WRAPPER *) argument;
133
134     // wait for terminating signal from sink
135     ss_sem_wait(queue_wrap->sem_terminate);
136
137     sink_processes--;
138     if (sink_processes == 0) {
139         SHOW_DEBUG("END!!!");
140         exit(0);
141     }
142 }
143
144 static int cur_queue; // current queue
145 QUEUE_WRAPPER * get_next_queue(QUEUE_WRAPPER * queue_wrap);
146 int canSend(QUEUE_WRAPPER * queue);
147 void shaping_process(rtems_task_argument argument) 
148 {
149     QUEUE_WRAPPER *queue_wrap = (QUEUE_WRAPPER *) argument;
150     cur_queue = 0;
151
152     while (1) {
153         QUEUE_WRAPPER *q_w;
154         q_w = get_next_queue(queue_wrap);
155
156         if (canSend(q_w)) {
157             if (q_w->hasData) {
158                 scratch_queue_shaperRW(q_w->q);
159                 q_w->hasData--;
160             }
161         }
162     }
163     
164 }
165
166 #define PULL_MODE
167 // queue scheduling
168 QUEUE_WRAPPER * get_next_queue(QUEUE_WRAPPER * queue_wrap) 
169 {
170     // round robin
171 #ifdef PULL_MODE
172     cur_queue = (cur_queue == 0) ? NUMBER_OF_QUEUES : cur_queue;
173     cur_queue --;
174 #else // PUSH MODE
175     cur_queue++;
176     cur_queue = (cur_queue == NUMBER_OF_QUEUES) ? 0 : cur_queue;
177 #endif
178     return &queue_wrap[cur_queue];
179 }
180
181
182 // shaping
183 int canSend(QUEUE_WRAPPER * queue)
184 {
185     return 1;
186 }
187
188
189