@@ -39,6 +39,22 @@ typedef struct task_queue {
39
39
int tail ;
40
40
} task_queue ;
41
41
42
+ static int task_queue_init (task_queue * tasks , pthread_t thread ) {
43
+ task * task_buffer = malloc (sizeof (task ) * TASK_QUEUE_INITIAL_CAPACITY );
44
+ if (task_buffer == NULL ) {
45
+ return 0 ;
46
+ }
47
+ * tasks = (task_queue ){.thread = thread ,
48
+ .processing = 0 ,
49
+ .tasks = task_buffer ,
50
+ .capacity = TASK_QUEUE_INITIAL_CAPACITY ,
51
+ .head = 0 ,
52
+ .tail = 0 };
53
+ return 1 ;
54
+ }
55
+
56
+ static void task_queue_deinit (task_queue * tasks ) { free (tasks -> tasks ); }
57
+
42
58
// Not thread safe.
43
59
static int task_queue_empty (task_queue * tasks ) {
44
60
return tasks -> head == tasks -> tail ;
@@ -137,7 +153,7 @@ void em_proxying_queue_destroy(em_proxying_queue* q) {
137
153
// of the queue.
138
154
pthread_mutex_destroy (& q -> mutex );
139
155
for (int i = 0 ; i < q -> size ; i ++ ) {
140
- free ( q -> task_queues [i ]. tasks );
156
+ task_queue_deinit ( & q -> task_queues [i ]);
141
157
}
142
158
free (q -> task_queues );
143
159
free (q );
@@ -178,14 +194,9 @@ static task_queue* get_or_add_tasks_for_thread(em_proxying_queue* q,
178
194
}
179
195
// Initialize the next available task queue.
180
196
tasks = & q -> task_queues [q -> size ];
181
- tasks -> thread = thread ;
182
- tasks -> processing = 0 ;
183
- tasks -> tasks = malloc (sizeof (task ) * TASK_QUEUE_INITIAL_CAPACITY );
184
- if (tasks -> tasks == NULL ) {
197
+ if (!task_queue_init (tasks , thread )) {
185
198
return NULL ;
186
199
}
187
- tasks -> head = 0 ;
188
- tasks -> tail = 0 ;
189
200
q -> size ++ ;
190
201
return tasks ;
191
202
}
0 commit comments