Branch data Line data Source code
1 : : /*
2 : : * Copyright (c) 2020, Citrix Systems, Inc.
3 : : *
4 : : * All rights reserved.
5 : : *
6 : : * Redistribution and use in source and binary forms, with or without
7 : : * modification, are permitted provided that the following conditions are met:
8 : : *
9 : : * 1. Redistributions of source code must retain the above copyright
10 : : * notice, this list of conditions and the following disclaimer.
11 : : * 2. Redistributions in binary form must reproduce the above copyright
12 : : * notice, this list of conditions and the following disclaimer in the
13 : : * documentation and/or other materials provided with the distribution.
14 : : * 3. Neither the name of the copyright holder nor the names of its
15 : : * contributors may be used to endorse or promote products derived from
16 : : * this software without specific prior written permission.
17 : : *
18 : : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 : : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 : : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 : : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
22 : : * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 : : * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 : : * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 : : * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 : : * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 : : * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 : : * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 : : */
30 : :
31 : : #ifdef HAVE_CONFIG_H
32 : : #include "config.h"
33 : : #endif
34 : :
35 : : #include <errno.h>
36 : : #include <stdlib.h>
37 : : #include <unistd.h>
38 : : #include <libaio.h>
39 : : #include <sys/eventfd.h>
40 : : #ifdef __linux__
41 : : #include <linux/version.h>
42 : : #endif
43 : :
44 : : #include "tapdisk.h"
45 : : #include "tapdisk-log.h"
46 : : #include "libaio-backend.h"
47 : : #include "tapdisk-server.h"
48 : : #include "tapdisk-utils.h"
49 : : #include "timeout-math.h"
50 : :
51 : : #include "atomicio.h"
52 : : #include "aio_getevents.h"
53 : :
54 : : #define WARN(_f, _a...) tlog_write(TLOG_WARN, _f, ##_a)
55 : : #define DBG(_f, _a...) tlog_write(TLOG_DBG, _f, ##_a)
56 : : #define ERR(_err, _f, _a...) tlog_error(_err, _f, ##_a)
57 : :
58 : : #define libaio_backend_queue_count(q) ((q)->queued)
59 : : #define libaio_backend_queue_empty(q) ((q)->queued == 0)
60 : : #define libaio_backend_queue_full(q) \
61 : : (((q)->tiocbs_pending + (q)->queued) >= (q)->size)
62 : : typedef struct _libaio_queue {
63 : : int size;
64 : :
65 : : const struct tio *tio;
66 : : void *tio_data;
67 : :
68 : : struct opioctx opioctx;
69 : :
70 : : int queued;
71 : : struct iocb **iocbs;
72 : :
73 : : /* number of iocbs pending in the aio layer */
74 : : int iocbs_pending;
75 : :
76 : : /* number of tiocbs pending in the queue --
77 : : * this is likely to be larger than iocbs_pending
78 : : * due to request coalescing */
79 : : int tiocbs_pending;
80 : :
81 : : /* iocbs may be deferred if the aio ring is full.
82 : : * libaio_backend_queue_complete will ensure deferred
83 : : * iocbs are queued as slots become available. */
84 : : struct tlist deferred;
85 : : int tiocbs_deferred;
86 : :
87 : : /* optional tapdisk filter */
88 : : struct tfilter *filter;
89 : :
90 : : uint64_t deferrals;
91 : : } libaio_queue;
92 : :
93 : : struct tio {
94 : : const char *name;
95 : : size_t data_size;
96 : :
97 : : int (*tio_setup) (libaio_queue *queue, int qlen);
98 : : void (*tio_destroy) (libaio_queue *queue);
99 : : int (*tio_submit) (libaio_queue *queue);
100 : : };
101 : :
102 : :
103 : : /*
104 : : * We used a kernel patch to return an fd associated with the AIO context
105 : : * so that we can concurrently poll on synchronous and async descriptors.
106 : : * This is signalled by passing 1 as the io context to io_setup.
107 : : */
108 : : #define REQUEST_ASYNC_FD ((io_context_t)1)
109 : :
110 : : static inline void
111 : 0 : queue_tiocb(libaio_queue *queue, struct tiocb *tiocb)
112 : : {
113 : 0 : struct iocb *iocb = &(tiocb->uiocb.io);
114 : :
115 [ # # ][ # # ]: 0 : if (queue->queued) {
116 : 0 : struct tiocb *prev = (struct tiocb *)
117 : 0 : queue->iocbs[queue->queued - 1]->data;
118 : 0 : prev->next = tiocb;
119 : : }
120 : :
121 : 0 : queue->iocbs[queue->queued++] = iocb;
122 : 0 : }
123 : :
124 : : static inline int
125 : 0 : deferred_tiocbs(libaio_queue *queue)
126 : : {
127 : 0 : return (queue->deferred.head != NULL);
128 : : }
129 : :
130 : : static inline void
131 : : defer_tiocb(libaio_queue *queue, struct tiocb *tiocb)
132 : : {
133 : 0 : struct tlist *list = &queue->deferred;
134 : :
135 [ # # ]: 0 : if (!list->head)
136 : 0 : list->head = list->tail = tiocb;
137 : : else
138 : 0 : list->tail = list->tail->next = tiocb;
139 : :
140 : 0 : queue->tiocbs_deferred++;
141 : 0 : queue->deferrals++;
142 : : }
143 : :
144 : : static inline void
145 : 0 : queue_deferred_tiocb(libaio_queue *queue)
146 : : {
147 : 0 : struct tlist *list = &queue->deferred;
148 : :
149 [ # # ]: 0 : if (list->head) {
150 : 0 : struct tiocb *tiocb = list->head;
151 : :
152 : 0 : list->head = tiocb->next;
153 [ # # ]: 0 : if (!list->head)
154 : 0 : list->tail = NULL;
155 : :
156 : 0 : queue_tiocb(queue, tiocb);
157 : 0 : queue->tiocbs_deferred--;
158 : : }
159 : 0 : }
160 : :
161 : : static inline void
162 : 0 : queue_deferred_tiocbs(libaio_queue *queue)
163 : : {
164 [ # # ][ # # ]: 0 : while (!libaio_backend_queue_full(queue) && deferred_tiocbs(queue))
165 : 0 : queue_deferred_tiocb(queue);
166 : 0 : }
167 : :
168 : : /*
169 : : * td_complete may queue more tiocbs
170 : : */
171 : : static void
172 : 0 : complete_tiocb(libaio_queue *queue, struct tiocb *tiocb, unsigned long res)
173 : : {
174 : : int err;
175 : 0 : struct iocb *iocb = &(tiocb->uiocb.io);
176 : :
177 [ # # ]: 0 : if (res == iocb_nbytes(iocb))
178 : : err = 0;
179 [ # # ]: 0 : else if ((int)res < 0)
180 : 0 : err = (int)res;
181 : : else
182 : : err = -EIO;
183 : :
184 : 0 : tiocb->cb(tiocb->arg, tiocb, err);
185 : 0 : }
186 : :
187 : : static int
188 : 0 : cancel_tiocbs(libaio_queue *queue, int err)
189 : : {
190 : : int queued;
191 : : struct tiocb *tiocb;
192 : :
193 [ # # ]: 0 : if (!queue->queued)
194 : : return 0;
195 : :
196 : : /*
197 : : * td_complete may queue more tiocbs, which
198 : : * will overwrite the contents of queue->iocbs.
199 : : * use a private linked list to keep track
200 : : * of the tiocbs we're cancelling.
201 : : */
202 : 0 : tiocb = queue->iocbs[0]->data;
203 : 0 : queued = queue->queued;
204 : 0 : queue->queued = 0;
205 : :
206 [ # # ]: 0 : for (; tiocb != NULL; tiocb = tiocb->next)
207 : 0 : complete_tiocb(queue, tiocb, err);
208 : :
209 : : return queued;
210 : : }
211 : :
212 : : static int
213 : 0 : fail_tiocbs(libaio_queue *queue, int succeeded, int total, int err)
214 : : {
215 : 0 : ERR(err, "io_submit error: %d of %d failed",
216 : : total - succeeded, total);
217 : :
218 : : /* take any non-submitted, merged iocbs
219 : : * off of the queue, split them, and fail them */
220 : 0 : queue->queued = io_expand_iocbs(&queue->opioctx,
221 : : queue->iocbs, succeeded, total);
222 : :
223 : 0 : return cancel_tiocbs(queue, err);
224 : : }
225 : :
226 : : /*
227 : : * libaio
228 : : */
229 : :
230 : : struct lio {
231 : : io_context_t aio_ctx;
232 : : struct io_event *aio_events;
233 : :
234 : : int event_fd;
235 : : int event_id;
236 : :
237 : : int flags;
238 : : };
239 : :
240 : : #define LIO_FLAG_EVENTFD (1<<0)
241 : :
242 : : static int
243 : : libaio_backend_lio_check_resfd(void)
244 : : {
245 : 0 : return tapdisk_linux_version() >= KERNEL_VERSION(2, 6, 22);
246 : : }
247 : :
248 : : static void
249 : 0 : libaio_backend_lio_destroy_aio(libaio_queue *queue)
250 : : {
251 : 0 : struct lio *lio = queue->tio_data;
252 : :
253 [ # # ]: 0 : if (lio->event_fd >= 0) {
254 : 0 : close(lio->event_fd);
255 : 0 : lio->event_fd = -1;
256 : : }
257 : :
258 [ # # ]: 0 : if (lio->aio_ctx) {
259 : 0 : io_destroy(lio->aio_ctx);
260 : 0 : lio->aio_ctx = 0;
261 : : }
262 : 0 : }
263 : :
264 : : static int
265 : 0 : __lio_setup_aio_poll(libaio_queue *queue, int qlen)
266 : : {
267 : 0 : struct lio *lio = queue->tio_data;
268 : : int err, fd;
269 : :
270 : 0 : lio->aio_ctx = REQUEST_ASYNC_FD;
271 : :
272 : 0 : fd = io_setup(qlen, &lio->aio_ctx);
273 [ # # ]: 0 : if (fd < 0) {
274 : 0 : lio->aio_ctx = 0;
275 : 0 : err = -errno;
276 : :
277 [ # # ]: 0 : if (err == -EINVAL)
278 : : goto fail_fd;
279 : :
280 : : goto fail;
281 : : }
282 : :
283 : 0 : lio->event_fd = fd;
284 : :
285 : 0 : return 0;
286 : :
287 : : fail_fd:
288 : : DPRINTF("Couldn't get fd for AIO poll support. This is probably "
289 : : "because your kernel does not have the aio-poll patch "
290 : : "applied.\n");
291 : : fail:
292 : 0 : return err;
293 : : }
294 : :
295 : : static int
296 : 0 : __lio_setup_aio_eventfd(libaio_queue *queue, int qlen)
297 : : {
298 : 0 : struct lio *lio = queue->tio_data;
299 : : int err;
300 : :
301 : 0 : err = io_setup(qlen, &lio->aio_ctx);
302 [ # # ]: 0 : if (err < 0) {
303 : 0 : lio->aio_ctx = 0;
304 : 0 : return err;
305 : : }
306 : :
307 : 0 : lio->event_fd = eventfd(0, 0);
308 [ # # ]: 0 : if (lio->event_fd < 0)
309 : 0 : return -errno;
310 : :
311 : 0 : lio->flags |= LIO_FLAG_EVENTFD;
312 : :
313 : 0 : return 0;
314 : : }
315 : :
316 : : static int
317 : 0 : libaio_backend_lio_setup_aio(libaio_queue *queue, int qlen)
318 : : {
319 : 0 : struct lio *lio = queue->tio_data;
320 : 0 : int err, old_err = 0;
321 : :
322 : 0 : lio->aio_ctx = 0;
323 : 0 : lio->event_fd = -1;
324 : :
325 : : /*
326 : : * prefer the mainline eventfd(2) api, if available.
327 : : * if not, fall back to the poll fd patch.
328 : : */
329 : :
330 : 0 : err = !libaio_backend_lio_check_resfd();
331 [ # # ]: 0 : if (!err)
332 : 0 : err = old_err = __lio_setup_aio_eventfd(queue, qlen);
333 [ # # ]: 0 : if (err)
334 : 0 : err = __lio_setup_aio_poll(queue, qlen);
335 : :
336 : : /* __lio_setup_aio_poll seems to always fail with EINVAL on newer systems,
337 : : * probably because it initializes the output parameter of io_setup to a
338 : : * non-zero value and the kernel patch that understands this is missing */
339 [ # # ][ # # ]: 0 : if (err == -EAGAIN || (err && old_err == -EAGAIN))
340 : : goto fail_rsv;
341 : : fail:
342 : 0 : return err;
343 : :
344 : : fail_rsv:
345 : : EPRINTF("Couldn't setup AIO context. If you are trying to "
346 : : "concurrently use a large number of blktap-based disks, you may "
347 : : "need to increase the system-wide aio request limit. "
348 : : "(e.g. 'echo 1048576 > /proc/sys/fs/aio-max-nr')\n");
349 : : goto fail;
350 : : }
351 : :
352 : :
353 : : static void
354 : 0 : libaio_backend_lio_destroy(libaio_queue *queue)
355 : : {
356 : 0 : struct lio *lio = queue->tio_data;
357 : :
358 [ # # ]: 0 : if (!lio)
359 : 0 : return;
360 : :
361 [ # # ]: 0 : if (lio->event_id >= 0) {
362 : 0 : tapdisk_server_unregister_event(lio->event_id);
363 : 0 : lio->event_id = -1;
364 : : }
365 : :
366 : 0 : libaio_backend_lio_destroy_aio(queue);
367 : :
368 [ # # ]: 0 : if (lio->aio_events) {
369 : 0 : free(lio->aio_events);
370 : 0 : lio->aio_events = NULL;
371 : : }
372 : : }
373 : :
374 : : static void
375 : 0 : libaio_backend_lio_set_eventfd(libaio_queue *queue, int n, struct iocb **iocbs)
376 : : {
377 : 0 : struct lio *lio = queue->tio_data;
378 : : int i;
379 : :
380 [ # # ]: 0 : if (lio->flags & LIO_FLAG_EVENTFD)
381 [ # # ]: 0 : for (i = 0; i < n; ++i)
382 : 0 : io_set_eventfd(iocbs[i], lio->event_fd);
383 : 0 : }
384 : :
385 : : static void
386 : 0 : libaio_backend_lio_ack_event(libaio_queue *queue)
387 : : {
388 : 0 : struct lio *lio = queue->tio_data;
389 : : uint64_t val;
390 : :
391 [ # # ]: 0 : if (lio->flags & LIO_FLAG_EVENTFD) {
392 : 0 : int gcc = read(lio->event_fd, &val, sizeof(val));
393 : : if (gcc) {};
394 : : }
395 : 0 : }
396 : :
397 : : static void
398 : 0 : libaio_backend_lio_event(event_id_t id, char mode, void *private)
399 : : {
400 : 0 : libaio_queue *queue = private;
401 : : struct lio *lio;
402 : : int i, ret, split;
403 : : struct iocb *iocb;
404 : : struct tiocb *tiocb;
405 : : struct io_event *ep;
406 : :
407 : 0 : libaio_backend_lio_ack_event(queue);
408 : :
409 : 0 : lio = queue->tio_data;
410 : : /* io_getevents() invoked via the libaio wrapper does not set errno but
411 : : * instead returns -errno on error */
412 [ # # ]: 0 : while ((ret = user_io_getevents(lio->aio_ctx, queue->size, lio->aio_events)) < 0) {
413 : : /* Permit some errors to retry */
414 [ # # ]: 0 : if (ret == -EINTR) continue;
415 : 0 : ERR(ret, "io_getevents() non-retryable error");
416 : 0 : return;
417 : : }
418 : 0 : split = io_split(&queue->opioctx, lio->aio_events, ret);
419 : :
420 : 0 : DBG("events: %d, tiocbs: %d\n", ret, split);
421 : :
422 : 0 : queue->iocbs_pending -= ret;
423 : 0 : queue->tiocbs_pending -= split;
424 : :
425 [ # # ]: 0 : for (i = split, ep = lio->aio_events; i-- > 0; ep++) {
426 : 0 : iocb = ep->obj;
427 : 0 : tiocb = iocb->data;
428 [ # # ]: 0 : if (tiocb)
429 : 0 : complete_tiocb(queue, tiocb, ep->res);
430 : : }
431 : :
432 : 0 : queue_deferred_tiocbs(queue);
433 : : }
434 : :
435 : : static int
436 : 0 : libaio_backend_lio_setup(libaio_queue *queue, int qlen)
437 : : {
438 : 0 : struct lio *lio = queue->tio_data;
439 : : int err;
440 : :
441 : 0 : lio->event_id = -1;
442 : :
443 : 0 : err = libaio_backend_lio_setup_aio(queue, qlen);
444 [ # # ]: 0 : if (err)
445 : : goto fail;
446 : :
447 : 0 : lio->event_id =
448 : 0 : tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
449 : 0 : lio->event_fd, TV_ZERO,
450 : : libaio_backend_lio_event,
451 : : queue);
452 : 0 : err = lio->event_id;
453 [ # # ]: 0 : if (err < 0)
454 : : goto fail;
455 : :
456 : 0 : lio->aio_events = calloc(qlen, sizeof(struct io_event));
457 [ # # ]: 0 : if (!lio->aio_events) {
458 : 0 : err = -errno;
459 : 0 : goto fail;
460 : : }
461 : :
462 : : return 0;
463 : :
464 : : fail:
465 : 0 : libaio_backend_lio_destroy(queue);
466 : 0 : return err;
467 : : }
468 : :
469 : : static int
470 : 0 : libaio_backend_lio_submit(libaio_queue *queue)
471 : : {
472 : 0 : struct lio *lio = queue->tio_data;
473 : 0 : int merged, submitted, err = 0;
474 : :
475 [ # # ]: 0 : if (!queue->queued)
476 : : return 0;
477 : :
478 : 0 : merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
479 : 0 : libaio_backend_lio_set_eventfd(queue, merged, queue->iocbs);
480 : 0 : submitted = io_submit(lio->aio_ctx, merged, queue->iocbs);
481 : :
482 : 0 : DBG("queued: %d, merged: %d, submitted: %d\n",
483 : : queue->queued, merged, submitted);
484 : :
485 [ # # ]: 0 : if (submitted < 0) {
486 : : err = submitted;
487 : : submitted = 0;
488 [ # # ]: 0 : } else if (submitted < merged)
489 : 0 : err = -EIO;
490 : :
491 : 0 : queue->iocbs_pending += submitted;
492 : 0 : queue->tiocbs_pending += queue->queued;
493 : 0 : queue->queued = 0;
494 : :
495 [ # # ]: 0 : if (err)
496 : 0 : queue->tiocbs_pending -=
497 : 0 : fail_tiocbs(queue, submitted, merged, err);
498 : :
499 : 0 : return submitted;
500 : : }
501 : :
502 : : static const struct tio td_tio_lio = {
503 : : .name = "lio",
504 : : .data_size = sizeof(struct lio),
505 : : .tio_setup = libaio_backend_lio_setup,
506 : : .tio_destroy = libaio_backend_lio_destroy,
507 : : .tio_submit = libaio_backend_lio_submit,
508 : : };
509 : :
510 : : static void
511 : 0 : libaio_backend_queue_free_io(libaio_queue *queue)
512 : : {
513 [ # # ]: 0 : if (queue->tio) {
514 [ # # ]: 0 : if (queue->tio->tio_destroy)
515 : 0 : queue->tio->tio_destroy(queue);
516 : 0 : queue->tio = NULL;
517 : : }
518 : :
519 [ # # ]: 0 : if (queue->tio_data) {
520 : 0 : free(queue->tio_data);
521 : 0 : queue->tio_data = NULL;
522 : : }
523 : 0 : }
524 : :
525 : : static int
526 : 0 : libaio_backend_queue_init_io(libaio_queue *queue, int drv)
527 : : {
528 : : const struct tio *tio;
529 : : int err;
530 : :
531 [ # # ]: 0 : switch (drv) {
532 : : case TIO_DRV_LIO:
533 : 0 : tio = &td_tio_lio;
534 : : break;
535 : : default:
536 : : err = -EINVAL;
537 : : goto fail;
538 : : }
539 : :
540 : 0 : queue->tio_data = calloc(1, tio->data_size);
541 [ # # ]: 0 : if (!queue->tio_data) {
542 : 0 : PERROR("malloc(%zu)", tio->data_size);
543 : 0 : err = -errno;
544 : 0 : goto fail;
545 : : }
546 : :
547 : 0 : queue->tio = tio;
548 : :
549 : : if (tio->tio_setup) {
550 : 0 : err = tio->tio_setup(queue, queue->size);
551 [ # # ]: 0 : if (err)
552 : : goto fail;
553 : : }
554 : :
555 : 0 : DPRINTF("I/O queue driver: %s\n", tio->name);
556 : :
557 : 0 : return 0;
558 : :
559 : : fail:
560 : 0 : libaio_backend_queue_free_io(queue);
561 : 0 : return err;
562 : : }
563 : :
564 : : static void
565 : 0 : libaio_backend_free_queue(tqueue *q)
566 : : {
567 : 0 : libaio_queue *queue = (libaio_queue*)*q;
568 : 0 : libaio_backend_queue_free_io(queue);
569 : :
570 : 0 : free(queue->iocbs);
571 : 0 : queue->iocbs = NULL;
572 : 0 : opio_free(&queue->opioctx);
573 : 0 : free(queue);
574 : 0 : *q = NULL;
575 : 0 : }
576 : :
577 : :
578 : :
579 : :
580 : : static int
581 : 0 : libaio_backend_init_queue(tqueue *q, int size,
582 : : int drv, struct tfilter *filter)
583 : : {
584 : :
585 : : int err;
586 : 0 : libaio_queue *queue = malloc(sizeof(libaio_queue));
587 [ # # ]: 0 : if(queue == NULL)
588 : : return ENOMEM;
589 : :
590 : 0 : *q = queue;
591 : :
592 : : memset(queue, 0, sizeof(libaio_queue));
593 : :
594 : 0 : queue->size = size;
595 : 0 : queue->filter = filter;
596 : :
597 [ # # ]: 0 : if (!size)
598 : : return 0;
599 : :
600 : 0 : err = libaio_backend_queue_init_io(queue, drv);
601 [ # # ]: 0 : if (err)
602 : : goto fail;
603 : :
604 : 0 : queue->iocbs = calloc(size, sizeof(struct iocb *));
605 [ # # ]: 0 : if (!queue->iocbs) {
606 : 0 : err = -errno;
607 : 0 : goto fail;
608 : : }
609 : :
610 : 0 : err = opio_init(&queue->opioctx, size);
611 [ # # ]: 0 : if (err)
612 : : goto fail;
613 : :
614 : : return 0;
615 : :
616 : : fail:
617 : 0 : libaio_backend_free_queue(q);
618 : 0 : return err;
619 : : }
620 : :
621 : :
622 : : static void
623 : 0 : libaio_backend_debug_queue(tqueue q)
624 : : {
625 : 0 : libaio_queue* queue = (libaio_queue*)q;
626 : 0 : struct tiocb *tiocb = queue->deferred.head;
627 : :
628 : 0 : WARN("LIBAIO QUEUE:\n");
629 : 0 : WARN("size: %d, tio: %s, queued: %d, iocbs_pending: %d, "
630 : : "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %"PRIx64"\n",
631 : : queue->size, queue->tio->name, queue->queued, queue->iocbs_pending,
632 : : queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
633 : :
634 [ # # ]: 0 : if (tiocb) {
635 : 0 : WARN("deferred:\n");
636 [ # # ]: 0 : for (; tiocb != NULL; tiocb = tiocb->next) {
637 : 0 : struct iocb *io = &(tiocb->uiocb.io);
638 : 0 : WARN("%s of %lu bytes at %lld\n",
639 : : iocb_opcode(io),
640 : : iocb_nbytes(io), iocb_offset(io));
641 : : }
642 : : }
643 : 0 : }
644 : :
645 : : static void
646 : 0 : libaio_backend_prep_tiocb(struct tiocb *tiocb, int fd, int rw, char *buf, size_t size,
647 : : long long offset, td_queue_callback_t cb, void *arg)
648 : : {
649 : 0 : struct iocb *iocb = &(tiocb->uiocb.io);
650 : :
651 [ # # ]: 0 : if (rw)
652 : : io_prep_pwrite(iocb, fd, buf, size, offset);
653 : : else
654 : : io_prep_pread(iocb, fd, buf, size, offset);
655 : :
656 : 0 : iocb->data = tiocb;
657 : 0 : tiocb->cb = cb;
658 : 0 : tiocb->arg = arg;
659 : 0 : tiocb->next = NULL;
660 : 0 : }
661 : :
662 : : static void
663 : 0 : libaio_backend_queue_tiocb(tqueue q, struct tiocb *tiocb)
664 : : {
665 : 0 : libaio_queue* queue = (libaio_queue*)q;
666 : :
667 [ # # ]: 0 : if (!libaio_backend_queue_full(queue))
668 : 0 : queue_tiocb(queue, tiocb);
669 : : else
670 : : defer_tiocb(queue, tiocb);
671 : 0 : }
672 : :
673 : :
674 : : /*
675 : : * fail_tiocbs may queue more tiocbs
676 : : */
677 : : static int
678 : 0 : libaio_backend_submit_tiocbs(tqueue q)
679 : : {
680 : 0 : libaio_queue* queue = (libaio_queue*)q;
681 : 0 : return queue->tio->tio_submit(queue);
682 : : }
683 : :
684 : : static int
685 : 0 : libaio_backend_submit_all_tiocbs(tqueue q)
686 : : {
687 : 0 : int submitted = 0;
688 : :
689 : 0 : libaio_queue* queue = (libaio_queue*)q;
690 : : do {
691 : 0 : submitted += libaio_backend_submit_tiocbs(queue);
692 [ # # ]: 0 : } while (!libaio_backend_queue_empty(queue));
693 : :
694 : 0 : return submitted;
695 : : }
696 : :
697 : : /*
698 : : * cancel_tiocbs may queue more tiocbs
699 : : */
700 : : int
701 : 0 : libaio_backend_cancel_tiocbs(tqueue q)
702 : : {
703 : 0 : libaio_queue* queue = (libaio_queue*)q;
704 : 0 : return cancel_tiocbs(queue, -EIO);
705 : : }
706 : :
707 : : int
708 : 0 : libaio_backend_cancel_all_tiocbs(tqueue q)
709 : : {
710 : 0 : int cancelled = 0;
711 : 0 : libaio_queue* queue = (libaio_queue*)q;
712 : :
713 : : do {
714 : 0 : cancelled += libaio_backend_cancel_tiocbs(queue);
715 [ # # ]: 0 : } while (!libaio_backend_queue_empty(queue));
716 : :
717 : 0 : return cancelled;
718 : : }
719 : :
720 : 0 : struct backend* get_libaio_backend()
721 : : {
722 : : static struct backend lib_aio_backend = {
723 : : .debug=libaio_backend_debug_queue,
724 : : .init=libaio_backend_init_queue,
725 : : .free_queue=libaio_backend_free_queue,
726 : : .queue=libaio_backend_queue_tiocb,
727 : : .submit_all=libaio_backend_submit_all_tiocbs,
728 : : .submit_tiocbs=libaio_backend_submit_tiocbs,
729 : : .prep=libaio_backend_prep_tiocb
730 : : };
731 : 0 : return &lib_aio_backend;
732 : : }
733 : :
|