GCC Code Coverage Report


Directory: src/
File: src/server.c
Date: 2024-04-25 03:45:42
Exec Total Coverage
Lines: 195 272 71.7%
Branches: 61 146 41.8%

Line Branch Exec Source
1 /*
2 * Copyright (c) 2022 Egor Tensin <egor@tensin.name>
3 * This file is part of the "cimple" project.
4 * For details, see https://github.com/egor-tensin/cimple.
5 * Distributed under the MIT License.
6 */
7
8 #include "server.h"
9 #include "command.h"
10 #include "compiler.h"
11 #include "const.h"
12 #include "event_loop.h"
13 #include "file.h"
14 #include "json_rpc.h"
15 #include "log.h"
16 #include "net.h"
17 #include "process.h"
18 #include "protocol.h"
19 #include "run_queue.h"
20 #include "signal.h"
21 #include "storage.h"
22 #include "storage_sqlite.h"
23 #include "tcp_server.h"
24 #include "worker_queue.h"
25
26 #include <poll.h>
27 #include <pthread.h>
28 #include <stdlib.h>
29
30 struct server {
31 pthread_mutex_t server_mtx;
32 pthread_cond_t server_cv;
33
34 int stopping;
35
36 struct cmd_dispatcher *cmd_dispatcher;
37
38 struct event_loop *event_loop;
39 int signalfd;
40
41 struct worker_queue worker_queue;
42 struct run_queue run_queue;
43
44 struct storage storage;
45
46 pthread_t main_thread;
47
48 struct tcp_server *tcp_server;
49 };
50
51 18472 static int server_lock(struct server *server)
52 {
53 18472 int ret = pthread_mutex_lock(&server->server_mtx);
54
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18472 times.
18472 if (ret) {
55 pthread_errno(ret, "pthread_mutex_lock");
56 return ret;
57 }
58 18472 return ret;
59 }
60
61 18472 static void server_unlock(struct server *server)
62 {
63
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 18472 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
18472 pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
64 18472 }
65
66 18292 static int server_wait(struct server *server)
67 {
68 18292 int ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
69
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18292 times.
18292 if (ret) {
70 pthread_errno(ret, "pthread_cond_wait");
71 return ret;
72 }
73 18292 return ret;
74 }
75
76 18443 static void server_notify(struct server *server)
77 {
78
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 18443 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
18443 pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal");
79 18443 }
80
81 29 static int server_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents,
82 void *_server)
83 {
84 29 struct server *server = (struct server *)_server;
85 29 int ret = 0;
86
87 29 ret = server_lock(server);
88
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
89 return ret;
90
91 29 server->stopping = 1;
92
93 29 server_notify(server);
94 29 server_unlock(server);
95 29 return ret;
96 }
97
98 27255 static int server_has_workers(const struct server *server)
99 {
100 27255 return !worker_queue_is_empty(&server->worker_queue);
101 }
102
103 9234 static int server_enqueue_worker(struct server *server, struct worker *worker)
104 {
105 9234 int ret = 0;
106
107 9234 ret = server_lock(server);
108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9234 times.
9234 if (ret < 0)
109 return ret;
110
111 9234 worker_queue_add_last(&server->worker_queue, worker);
112
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9234 times.
9234 log("Added a new worker %d to the queue\n", worker_get_fd(worker));
113
114 9234 server_notify(server);
115 9234 server_unlock(server);
116 9234 return ret;
117 }
118
119 27472 static int server_has_runs(const struct server *server)
120 {
121 27472 return !run_queue_is_empty(&server->run_queue);
122 }
123
124 9180 static int server_enqueue_run(struct server *server, struct run *run)
125 {
126 9180 int ret = 0;
127
128 9180 ret = storage_run_create(&server->storage, run_get_repo_url(run), run_get_repo_rev(run));
129
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
130 return ret;
131 9180 run_set_id(run, ret);
132
133 9180 ret = server_lock(server);
134
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
135 return ret;
136
137 9180 run_queue_add_last(&server->run_queue, run);
138
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Added a new run %d for repository %s to the queue\n", run_get_id(run),
139 run_get_repo_url(run));
140
141 9180 server_notify(server);
142 9180 server_unlock(server);
143 9180 return ret;
144 }
145
146 27501 static int server_ready_for_action(const struct server *server)
147 {
148
6/6
✓ Branch 0 taken 27472 times.
✓ Branch 1 taken 29 times.
✓ Branch 3 taken 27255 times.
✓ Branch 4 taken 217 times.
✓ Branch 6 taken 9180 times.
✓ Branch 7 taken 18075 times.
27501 return server->stopping || (server_has_runs(server) && server_has_workers(server));
149 }
150
151 9209 static int server_wait_for_action(struct server *server)
152 {
153 9209 int ret = 0;
154
155
2/2
✓ Branch 1 taken 18292 times.
✓ Branch 2 taken 9209 times.
27501 while (!server_ready_for_action(server)) {
156 18292 ret = server_wait(server);
157
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18292 times.
18292 if (ret < 0)
158 return ret;
159 }
160
161 9209 return ret;
162 }
163
164 9180 static void server_assign_run(struct server *server)
165 {
166 9180 struct run *run = run_queue_remove_first(&server->run_queue);
167
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Removed run %d for repository %s from the queue\n", run_get_id(run),
168 run_get_repo_url(run));
169
170 9180 struct worker *worker = worker_queue_remove_first(&server->worker_queue);
171
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Removed worker %d from the queue\n", worker_get_fd(worker));
172
173 9180 struct jsonrpc_request *start_request = NULL;
174 9180 int ret = 0;
175
176 9180 ret = request_create_start_run(&start_request, run);
177
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
178 goto exit;
179
180 9180 ret = jsonrpc_request_send(start_request, worker_get_fd(worker));
181 9180 jsonrpc_request_destroy(start_request);
182
1/2
✓ Branch 0 taken 9180 times.
✗ Branch 1 not taken.
9180 if (ret < 0)
183 goto exit;
184
185 9180 exit:
186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0) {
187 log("Failed to assign run for repository %s to worker %d, requeueing\n",
188 run_get_repo_url(run), worker_get_fd(worker));
189 run_queue_add_first(&server->run_queue, run);
190 } else {
191
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Assigned run %d for repository %s to worker %d\n", run_get_id(run),
192 run_get_repo_url(run), worker_get_fd(worker));
193 9180 run_destroy(run);
194 }
195
196 9180 worker_destroy(worker);
197 9180 }
198
199 29 static void *server_main_thread(void *_server)
200 {
201 29 struct server *server = (struct server *)_server;
202 29 int ret = 0;
203
204 29 ret = server_lock(server);
205
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
206 goto exit;
207
208 while (1) {
209 9209 ret = server_wait_for_action(server);
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9209 times.
9209 if (ret < 0)
211 goto unlock;
212
213
2/2
✓ Branch 0 taken 29 times.
✓ Branch 1 taken 9180 times.
9209 if (server->stopping)
214 29 goto unlock;
215
216 9180 server_assign_run(server);
217 }
218
219 29 unlock:
220 29 server_unlock(server);
221
222 29 exit:
223 29 return NULL;
224 }
225
226 9234 static int server_handle_cmd_new_worker(UNUSED const struct jsonrpc_request *request,
227 UNUSED struct jsonrpc_response **response, void *_ctx)
228 {
229 9234 struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
230 9234 struct server *server = (struct server *)ctx->arg;
231 9234 int ret = 0;
232
233 9234 ret = file_dup(ctx->fd);
234
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9234 times.
9234 if (ret < 0)
235 return ret;
236
237 9234 const int fd = ret;
238 9234 struct worker *worker = NULL;
239
240 9234 ret = worker_create(&worker, fd);
241
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9234 times.
9234 if (ret < 0)
242 goto close;
243
244 9234 ret = server_enqueue_worker(server, worker);
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9234 times.
9234 if (ret < 0)
246 goto destroy_worker;
247
248 9234 return ret;
249
250 destroy_worker:
251 worker_destroy(worker);
252
253 close:
254 net_close(fd);
255
256 return ret;
257 }
258
259 9180 static int server_handle_cmd_queue_run(const struct jsonrpc_request *request,
260 struct jsonrpc_response **response, void *_ctx)
261 {
262 9180 struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
263 9180 struct server *server = (struct server *)ctx->arg;
264 9180 int ret = 0;
265
266 9180 struct run *run = NULL;
267
268 9180 ret = request_parse_queue_run(request, &run);
269
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
270 return ret;
271
272 9180 ret = jsonrpc_response_create(response, request, NULL);
273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
274 goto destroy_run;
275
276 9180 ret = server_enqueue_run(server, run);
277
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
278 goto free_response;
279
280 9180 return ret;
281
282 free_response:
283 jsonrpc_response_destroy(*response);
284 *response = NULL;
285
286 destroy_run:
287 run_destroy(run);
288
289 return ret;
290 }
291
292 9180 static int server_handle_cmd_finished_run(const struct jsonrpc_request *request,
293 UNUSED struct jsonrpc_response **response, void *_ctx)
294 {
295 9180 struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
296 9180 struct server *server = (struct server *)ctx->arg;
297 9180 int ret = 0;
298
299 9180 int run_id = 0;
300 struct proc_output *output;
301
302 9180 ret = request_parse_finished_run(request, &run_id, &output);
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
304 return ret;
305
306 9180 ret = storage_run_finished(&server->storage, run_id, output);
307
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0) {
308 log_err("Failed to mark run %d as finished\n", run_id);
309 goto free_output;
310 }
311
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Marked run %d as finished\n", run_id);
313
314 9180 free_output:
315 9180 proc_output_destroy(output);
316
317 9180 return ret;
318 }
319
320 26 static int server_handle_cmd_get_runs(const struct jsonrpc_request *request,
321 struct jsonrpc_response **response, void *_ctx)
322 {
323 26 struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
324 26 struct server *server = (struct server *)ctx->arg;
325 26 int ret = 0;
326
327 26 ret = request_parse_get_runs(request);
328
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26 times.
26 if (ret < 0)
329 return ret;
330
331 struct run_queue runs;
332
333 26 ret = storage_get_runs(&server->storage, &runs);
334
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26 times.
26 if (ret < 0) {
335 log_err("Failed to fetch runs\n");
336 return ret;
337 }
338
339 26 ret = response_create_get_runs(response, request, &runs);
340
1/2
✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
26 if (ret < 0)
341 goto destroy_runs;
342
343 26 destroy_runs:
344 26 run_queue_destroy(&runs);
345
346 26 return ret;
347 }
348
349 static struct cmd_desc commands[] = {
350 {CMD_NEW_WORKER, server_handle_cmd_new_worker},
351 {CMD_QUEUE_RUN, server_handle_cmd_queue_run},
352 {CMD_FINISHED_RUN, server_handle_cmd_finished_run},
353 {CMD_GET_RUNS, server_handle_cmd_get_runs},
354 };
355
356 static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
357
358 29 int server_create(struct server **_server, const struct settings *settings)
359 {
360 struct storage_settings storage_settings;
361 29 int ret = 0;
362
363 29 struct server *server = malloc(sizeof(struct server));
364
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (!server) {
365 log_errno("malloc");
366 return -1;
367 }
368
369 29 ret = pthread_mutex_init(&server->server_mtx, NULL);
370
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret) {
371 pthread_errno(ret, "pthread_mutex_init");
372 goto free;
373 }
374
375 29 ret = pthread_cond_init(&server->server_cv, NULL);
376
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret) {
377 pthread_errno(ret, "pthread_cond_init");
378 goto destroy_mtx;
379 }
380
381 29 server->stopping = 0;
382
383 29 ret = cmd_dispatcher_create(&server->cmd_dispatcher, commands, numof_commands, server);
384
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
385 goto destroy_cv;
386
387 29 ret = event_loop_create(&server->event_loop);
388
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
389 goto destroy_cmd_dispatcher;
390
391 29 ret = signalfd_create_sigterms();
392
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
393 goto destroy_event_loop;
394 29 server->signalfd = ret;
395
396 29 ret = event_loop_add(server->event_loop, server->signalfd, POLLIN, server_set_stopping,
397 server);
398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
399 goto close_signalfd;
400
401 29 worker_queue_create(&server->worker_queue);
402
403 29 ret = storage_sqlite_settings_create(&storage_settings, settings->sqlite_path);
404
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
405 goto destroy_worker_queue;
406
407 29 ret = storage_create(&server->storage, &storage_settings);
408 29 storage_settings_destroy(&storage_settings);
409
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
410 goto destroy_worker_queue;
411
412 29 ret = storage_get_run_queue(&server->storage, &server->run_queue);
413
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
414 goto destroy_storage;
415
416 29 ret = tcp_server_create(&server->tcp_server, server->event_loop, settings->port,
417 29 cmd_dispatcher_handle_conn, server->cmd_dispatcher);
418
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
419 goto destroy_run_queue;
420
421 29 ret = pthread_create(&server->main_thread, NULL, server_main_thread, server);
422
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret) {
423 pthread_errno(ret, "pthread_create");
424 goto destroy_tcp_server;
425 }
426
427 29 *_server = server;
428 29 return ret;
429
430 destroy_tcp_server:
431 tcp_server_destroy(server->tcp_server);
432
433 destroy_run_queue:
434 run_queue_destroy(&server->run_queue);
435
436 destroy_storage:
437 storage_destroy(&server->storage);
438
439 destroy_worker_queue:
440 worker_queue_destroy(&server->worker_queue);
441
442 close_signalfd:
443 signalfd_destroy(server->signalfd);
444
445 destroy_event_loop:
446 event_loop_destroy(server->event_loop);
447
448 destroy_cmd_dispatcher:
449 cmd_dispatcher_destroy(server->cmd_dispatcher);
450
451 destroy_cv:
452 pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
453
454 destroy_mtx:
455 pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
456
457 free:
458 free(server);
459
460 return ret;
461 }
462
463 29 void server_destroy(struct server *server)
464 {
465
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 log("Shutting down\n");
466
467
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
29 pthread_errno_if(pthread_join(server->main_thread, NULL), "pthread_join");
468 29 tcp_server_destroy(server->tcp_server);
469 29 storage_destroy(&server->storage);
470 29 run_queue_destroy(&server->run_queue);
471 29 worker_queue_destroy(&server->worker_queue);
472 29 signalfd_destroy(server->signalfd);
473 29 event_loop_destroy(server->event_loop);
474 29 cmd_dispatcher_destroy(server->cmd_dispatcher);
475
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
29 pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
476
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
29 pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
477 29 free(server);
478 29 }
479
480 29 static int server_listen_thread(struct server *server)
481 {
482 29 int ret = 0;
483
484
2/2
✓ Branch 0 taken 54034 times.
✓ Branch 1 taken 29 times.
54063 while (!server->stopping) {
485
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 54034 times.
54034 log("Waiting for new connections\n");
486
487 54034 ret = event_loop_run(server->event_loop);
488
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 54034 times.
54034 if (ret < 0)
489 return ret;
490 }
491
492 29 return 0;
493 }
494
495 29 int server_main(struct server *server)
496 {
497 29 return server_listen_thread(server);
498 }
499