Ruby 4.0.5p0 (2026-05-20 revision 64336ffd0ee9e1f4c05891695a3d7b49cb709721)
scheduler.c
1/**********************************************************************
2
3 scheduler.c
4
5 $Author$
6
7 Copyright (C) 2020 Samuel Grant Dawson Williams
8
9**********************************************************************/
10
11#include "vm_core.h"
12#include "eval_intern.h"
14#include "ruby/io.h"
15#include "ruby/io/buffer.h"
16
17#include "ruby/thread.h"
18
19// For `ruby_thread_has_gvl_p`:
20#include "internal/thread.h"
21
22// For atomic operations:
23#include "ruby_atomic.h"
24
25static ID id_close;
26static ID id_scheduler_close;
27
28static ID id_block;
29static ID id_unblock;
30
31static ID id_yield;
32
33static ID id_timeout_after;
34static ID id_kernel_sleep;
35static ID id_process_wait;
36
37static ID id_io_read, id_io_pread;
38static ID id_io_write, id_io_pwrite;
39static ID id_io_wait;
40static ID id_io_select;
41static ID id_io_close;
42
43static ID id_address_resolve;
44
45static ID id_blocking_operation_wait;
46static ID id_fiber_interrupt;
47
48static ID id_fiber_schedule;
49
50// Our custom blocking operation class
51static VALUE rb_cFiberSchedulerBlockingOperation;
52
53/*
54 * Custom blocking operation structure for blocking operations
55 * This replaces the use of Ruby procs to avoid use-after-free issues
56 * and provides a cleaner C API for native work pools.
57 */
58
59typedef enum {
60 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED, // Submitted but not started
61 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running
62 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error)
63 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled
64} rb_fiber_blocking_operation_status_t;
65
67 void *(*function)(void *);
68 void *data;
69
70 rb_unblock_function_t *unblock_function;
71 void *data2;
72
73 int flags;
75
76 // Execution status
77 volatile rb_atomic_t status;
78};
79
80static void
81blocking_operation_mark(void *ptr)
82{
83 // No Ruby objects to mark in our struct
84}
85
86static void
87blocking_operation_free(void *ptr)
88{
89 rb_fiber_scheduler_blocking_operation_t *blocking_operation = (rb_fiber_scheduler_blocking_operation_t *)ptr;
90 ruby_xfree(blocking_operation);
91}
92
93static size_t
94blocking_operation_memsize(const void *ptr)
95{
96 return sizeof(rb_fiber_scheduler_blocking_operation_t);
97}
98
99static const rb_data_type_t blocking_operation_data_type = {
100 "Fiber::Scheduler::BlockingOperation",
101 {
102 blocking_operation_mark,
103 blocking_operation_free,
104 blocking_operation_memsize,
105 },
106 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
107};
108
109/*
110 * Allocate a new blocking operation
111 */
112static VALUE
113blocking_operation_alloc(VALUE klass)
114{
115 rb_fiber_scheduler_blocking_operation_t *blocking_operation;
116 VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
117
118 blocking_operation->function = NULL;
119 blocking_operation->data = NULL;
120 blocking_operation->unblock_function = NULL;
121 blocking_operation->data2 = NULL;
122 blocking_operation->flags = 0;
123 blocking_operation->state = NULL;
124 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
125
126 return obj;
127}
128
129/*
130 * Get the blocking operation struct from a Ruby object
131 */
132static rb_fiber_scheduler_blocking_operation_t *
133get_blocking_operation(VALUE obj)
134{
135 rb_fiber_scheduler_blocking_operation_t *blocking_operation;
136 TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
137 return blocking_operation;
138}
139
140/*
141 * Document-method: Fiber::Scheduler::BlockingOperation#call
142 *
143 * Execute the blocking operation. This method releases the GVL and calls
144 * the blocking function, then restores the errno value.
145 *
146 * Returns nil. The actual result is stored in the associated state object.
147 */
148static VALUE
149blocking_operation_call(VALUE self)
150{
151 rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
152
153 if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
154 rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!");
155 }
156
157 if (blocking_operation->function == NULL) {
158 rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!");
159 }
160
161 if (blocking_operation->state == NULL) {
162 rb_raise(rb_eRuntimeError, "Blocking operation has no result object!");
163 }
164
165 // Mark as executing
166 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
167
168 // Execute the blocking operation without GVL
169 blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data,
170 blocking_operation->unblock_function, blocking_operation->data2,
171 blocking_operation->flags);
172 blocking_operation->state->saved_errno = rb_errno();
173
174 // Mark as completed
175 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
176
177 return Qnil;
178}
179
180/*
181 * C API: Extract blocking operation struct from Ruby object (GVL required)
182 *
183 * This function safely extracts the opaque struct from a BlockingOperation VALUE
184 * while holding the GVL. The returned pointer can be passed to worker threads
185 * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl.
186 *
187 * Returns the opaque struct pointer on success, NULL on error.
188 * Must be called while holding the GVL.
189 */
190rb_fiber_scheduler_blocking_operation_t *
192{
193 return get_blocking_operation(self);
194}
195
196/*
197 * C API: Execute blocking operation from opaque struct (GVL not required)
198 *
199 * This function executes a blocking operation using the opaque struct pointer
200 * obtained from rb_fiber_scheduler_blocking_operation_extract.
201 * It can be called from native threads without holding the GVL.
202 *
203 * Returns 0 on success, -1 on error.
204 */
205int
206rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
207{
208 if (blocking_operation == NULL) {
209 return -1;
210 }
211
212 if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
213 return -1; // Invalid blocking operation
214 }
215
216 // Resolve sentinel values for unblock_function and data2:
217 rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
218
219 // Atomically check if we can transition from QUEUED to EXECUTING
220 rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
221 if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
222 // Already cancelled or in wrong state
223 return -1;
224 }
225
226 // Now we're executing - call the function
227 blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
228 blocking_operation->state->saved_errno = errno;
229
230 // Atomically transition to completed (unless cancelled during execution)
231 expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
232 if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
233 // Successfully completed
234 return 0;
235 } else {
236 // Was cancelled during execution
237 blocking_operation->state->saved_errno = EINTR;
238 return -1;
239 }
240}
241
242/*
243 * C API: Create a new blocking operation
244 *
245 * This creates a blocking operation that can be executed by native work pools.
246 * The blocking operation holds references to the function and data safely.
247 */
248VALUE
249rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
250 rb_unblock_function_t *unblock_function, void *data2,
251 int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
252{
253 VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
254 rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
255
256 blocking_operation->function = function;
257 blocking_operation->data = data;
258 blocking_operation->unblock_function = unblock_function;
259 blocking_operation->data2 = data2;
260 blocking_operation->flags = flags;
261 blocking_operation->state = state;
262
263 return self;
264}
265
266/*
267 *
268 * Document-class: Fiber::Scheduler
269 *
270 * This is not an existing class, but documentation of the interface that Scheduler
271 * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking
272 * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
273 * of some concepts.
274 *
275 * Scheduler's behavior and usage are expected to be as follows:
276 *
277 * * When the execution in the non-blocking Fiber reaches some blocking operation (like
278 * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
279 * hook methods, listed below.
280 * * Scheduler somehow registers what the current fiber is waiting on, and yields control
281 * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
282 * wait to end, and other fibers in the same thread can perform)
283 * * At the end of the current thread execution, the scheduler's method #scheduler_close is called
284 * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
285 * registered on hook calls) and resuming them when the awaited resource is ready
286 * (e.g. I/O ready or sleep time elapsed).
287 *
288 * This way concurrent execution will be achieved transparently for every
289 * individual Fiber's code.
290 *
291 * Scheduler implementations are provided by gems, like
292 * Async[https://github.com/socketry/async].
293 *
294 * Hook methods are:
295 *
296 * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite #io_select, and #io_close
297 * * #process_wait
298 * * #kernel_sleep
299 * * #timeout_after
300 * * #address_resolve
301 * * #block and #unblock
302 * * #blocking_operation_wait
303 * * #fiber_interrupt
304 * * #yield
305 * * (the list is expanded as Ruby developers make more methods having non-blocking calls)
306 *
307 * When not specified otherwise, the hook implementations are mandatory: if they are not
308 * implemented, the methods trying to call hook will fail. To provide backward compatibility,
309 * in the future hooks will be optional (if they are not implemented, due to the scheduler
310 * being created for the older Ruby version, the code which needs this hook will not fail,
311 * and will just behave in a blocking fashion).
312 *
313 * It is also strongly recommended that the scheduler implements the #fiber method, which is
314 * delegated to by Fiber.schedule.
315 *
316 * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
317 * <tt>test/fiber/scheduler.rb</tt>
318 *
319 */
320void
321Init_Fiber_Scheduler(void)
322{
323 id_close = rb_intern_const("close");
324 id_scheduler_close = rb_intern_const("scheduler_close");
325
326 id_block = rb_intern_const("block");
327 id_unblock = rb_intern_const("unblock");
328 id_yield = rb_intern_const("yield");
329
330 id_timeout_after = rb_intern_const("timeout_after");
331 id_kernel_sleep = rb_intern_const("kernel_sleep");
332 id_process_wait = rb_intern_const("process_wait");
333
334 id_io_read = rb_intern_const("io_read");
335 id_io_pread = rb_intern_const("io_pread");
336 id_io_write = rb_intern_const("io_write");
337 id_io_pwrite = rb_intern_const("io_pwrite");
338
339 id_io_wait = rb_intern_const("io_wait");
340 id_io_select = rb_intern_const("io_select");
341 id_io_close = rb_intern_const("io_close");
342
343 id_address_resolve = rb_intern_const("address_resolve");
344
345 id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
346 id_fiber_interrupt = rb_intern_const("fiber_interrupt");
347
348 id_fiber_schedule = rb_intern_const("fiber");
349
350 // Define an anonymous BlockingOperation class for internal use only
351 // This is completely hidden from Ruby code and cannot be instantiated directly
352 rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject);
353 rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc);
354 rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0);
355
356 // Register the anonymous class as a GC root so it doesn't get collected
357 rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
358
359#if 0 /* for RDoc */
360 rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
361 rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
362 rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2);
363 rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3);
364 rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4);
365 rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4);
366 rb_define_method(rb_cFiberScheduler, "io_pread", rb_fiber_scheduler_io_pread, 5);
367 rb_define_method(rb_cFiberScheduler, "io_pwrite", rb_fiber_scheduler_io_pwrite, 5);
368 rb_define_method(rb_cFiberScheduler, "io_select", rb_fiber_scheduler_io_select, 4);
369 rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
370 rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1);
371 rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
372 rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
373 rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
374 rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
375 rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
376 rb_define_method(rb_cFiberScheduler, "yield", rb_fiber_scheduler_yield, 0);
377 rb_define_method(rb_cFiberScheduler, "fiber_interrupt", rb_fiber_scheduler_fiber_interrupt, 2);
378 rb_define_method(rb_cFiberScheduler, "io_close", rb_fiber_scheduler_io_close, 1);
379#endif
380}
381
382VALUE
384{
386
387 rb_thread_t *thread = GET_THREAD();
388 RUBY_ASSERT(thread);
389
390 return thread->scheduler;
391}
392
393static void
394verify_interface(VALUE scheduler)
395{
396 if (!rb_respond_to(scheduler, id_block)) {
397 rb_raise(rb_eArgError, "Scheduler must implement #block");
398 }
399
400 if (!rb_respond_to(scheduler, id_unblock)) {
401 rb_raise(rb_eArgError, "Scheduler must implement #unblock");
402 }
403
404 if (!rb_respond_to(scheduler, id_kernel_sleep)) {
405 rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
406 }
407
408 if (!rb_respond_to(scheduler, id_io_wait)) {
409 rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
410 }
411
412 if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
413 rb_warn("Scheduler should implement #fiber_interrupt");
414 }
415}
416
417static VALUE
418fiber_scheduler_close(VALUE scheduler)
419{
420 return rb_fiber_scheduler_close(scheduler);
421}
422
423static VALUE
424fiber_scheduler_close_ensure(VALUE _thread)
425{
426 rb_thread_t *thread = (rb_thread_t*)_thread;
427 thread->scheduler = Qnil;
428
429 return Qnil;
430}
431
432VALUE
434{
436
437 rb_thread_t *thread = GET_THREAD();
438 RUBY_ASSERT(thread);
439
440 if (scheduler != Qnil) {
441 verify_interface(scheduler);
442 }
443
444 // We invoke Scheduler#close when setting it to something else, to ensure
445 // the previous scheduler runs to completion before changing the scheduler.
446 // That way, we do not need to consider interactions, e.g., of a Fiber from
447 // the previous scheduler with the new scheduler.
448 if (thread->scheduler != Qnil) {
449 // rb_fiber_scheduler_close(thread->scheduler);
450 rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (VALUE)thread);
451 }
452
453 thread->scheduler = scheduler;
454
455 return thread->scheduler;
456}
457
458static VALUE
459fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
460{
461 RUBY_ASSERT(thread);
462
463 if (thread->blocking == 0) {
464 return thread->scheduler;
465 }
466 else {
467 return Qnil;
468 }
469}
470
472{
474
475 return fiber_scheduler_current_for_threadptr(GET_THREAD());
476}
477
478// This function is allowed to be called without holding the GVL.
480{
481 return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
482}
483
485{
486 return fiber_scheduler_current_for_threadptr(thread);
487}
488
489/*
490 *
491 * Document-method: Fiber::Scheduler#close
492 *
493 * Called when the current thread exits. The scheduler is expected to implement this
494 * method in order to allow all waiting fibers to finalize their execution.
495 *
496 * The suggested pattern is to implement the main event loop in the #close method.
497 *
498 */
499VALUE
501{
503
504 VALUE result;
505
506 // The reason for calling `scheduler_close` before calling `close` is for
507 // legacy schedulers which implement `close` and expect the user to call
508 // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
509 // which should call `scheduler_close`. If it were to call `close`, it
510 // would create an infinite loop.
511
512 result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
513 if (!UNDEF_P(result)) return result;
514
515 result = rb_check_funcall(scheduler, id_close, 0, NULL);
516 if (!UNDEF_P(result)) return result;
517
518 return Qnil;
519}
520
521VALUE
523{
524 if (timeout) {
525 return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec));
526 }
527
528 return Qnil;
529}
530
531/*
532 * Document-method: Fiber::Scheduler#kernel_sleep
533 * call-seq: kernel_sleep(duration = nil)
534 *
535 * Invoked by Kernel#sleep and Thread::Mutex#sleep and is expected to provide
536 * an implementation of sleeping in a non-blocking way. Implementation might
537 * register the current fiber in some list of "which fiber wait until what
538 * moment", call Fiber.yield to pass control, and then in #close resume
539 * the fibers whose wait period has elapsed.
540 *
541 */
542VALUE
544{
545 return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
546}
547
548VALUE
549rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
550{
551 return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
552}
553
560VALUE
562{
563 // First try to call the scheduler's yield method, if it exists:
564 VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL);
565 if (!UNDEF_P(result)) return result;
566
567 // Otherwise, we can emulate yield by sleeping for 0 seconds:
568 return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0));
569}
570
571#if 0
572/*
573 * Document-method: Fiber::Scheduler#timeout_after
574 * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block
575 *
576 * Invoked by Timeout.timeout to execute the given +block+ within the given
577 * +duration+. It can also be invoked directly by the scheduler or user code.
578 *
579 * Attempt to limit the execution time of a given +block+ to the given
580 * +duration+ if possible. When a non-blocking operation causes the +block+'s
581 * execution time to exceed the specified +duration+, that non-blocking
582 * operation should be interrupted by raising the specified +exception_class+
583 * constructed with the given +exception_arguments+.
584 *
585 * General execution timeouts are often considered risky. This implementation
586 * will only interrupt non-blocking operations. This is by design because it's
587 * expected that non-blocking operations can fail for a variety of
588 * unpredictable reasons, so applications should already be robust in handling
589 * these conditions and by implication timeouts.
590 *
591 * However, as a result of this design, if the +block+ does not invoke any
592 * non-blocking operations, it will be impossible to interrupt it. If you
593 * desire to provide predictable points for timeouts, consider adding
594 * <tt>sleep(0)</tt>.
595 *
596 * If the block is executed successfully, its result will be returned.
597 *
598 * The exception will typically be raised using Fiber#raise.
599 */
600VALUE
601rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
602{
603 VALUE arguments[] = {
604 timeout, exception, message
605 };
606
607 return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
608}
609
610VALUE
611rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
612{
613 return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
614}
615#endif
616
617/*
618 * Document-method: Fiber::Scheduler#process_wait
619 * call-seq: process_wait(pid, flags)
620 *
621 * Invoked by Process::Status.wait in order to wait for a specified process.
622 * See that method description for arguments description.
623 *
624 * Suggested minimal implementation:
625 *
626 * Thread.new do
627 * Process::Status.wait(pid, flags)
628 * end.value
629 *
630 * This hook is optional: if it is not present in the current scheduler,
631 * Process::Status.wait will behave as a blocking method.
632 *
633 * Expected to return a Process::Status instance.
634 */
635VALUE
636rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
637{
638 VALUE arguments[] = {
639 PIDT2NUM(pid), RB_INT2NUM(flags)
640 };
641
642 return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
643}
644
645/*
646 * Document-method: Fiber::Scheduler#block
647 * call-seq: block(blocker, timeout = nil)
648 *
649 * Invoked by methods like Thread.join, and by Thread::Mutex, to signify that current
650 * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
651 * elapsed.
652 *
653 * +blocker+ is what we are waiting on, informational only (for debugging and
654 * logging). There are no guarantee about its value.
655 *
656 * Expected to return boolean, specifying whether the blocking operation was
657 * successful or not.
658 */
659VALUE
660rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
661{
662 return rb_funcall(scheduler, id_block, 2, blocker, timeout);
663}
664
665/*
666 * Document-method: Fiber::Scheduler#unblock
667 * call-seq: unblock(blocker, fiber)
668 *
669 * Invoked to wake up Fiber previously blocked with #block (for example, Thread::Mutex#lock
670 * calls #block and Thread::Mutex#unlock calls #unblock). The scheduler should use
671 * the +fiber+ parameter to understand which fiber is unblocked.
672 *
673 * +blocker+ is what was awaited for, but it is informational only (for debugging
674 * and logging), and it is not guaranteed to be the same value as the +blocker+ for
675 * #block.
676 *
677 */
678VALUE
679rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
680{
681 RUBY_ASSERT(rb_obj_is_fiber(fiber));
682
683 VALUE result;
684 enum ruby_tag_type state;
685
686 // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
687 //
688 // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
689 int saved_errno = errno;
690
691 // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`.
692 rb_execution_context_t *ec = GET_EC();
693 int saved_interrupt_mask = ec->interrupt_mask;
694 ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
695
696 EC_PUSH_TAG(ec);
697 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
698 result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
699 }
700 EC_POP_TAG();
701
702 ec->interrupt_mask = saved_interrupt_mask;
703
704 if (state) {
705 EC_JUMP_TAG(ec, state);
706 }
707
708 RUBY_VM_CHECK_INTS(ec);
709
710 errno = saved_errno;
711
712 return result;
713}
714
715/*
716 * Document-method: Fiber::Scheduler#io_wait
717 * call-seq: io_wait(io, events, timeout)
718 *
719 * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
720 * specified descriptor is ready for specified events within
721 * the specified +timeout+.
722 *
723 * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
724 * <tt>IO::PRIORITY</tt>.
725 *
726 * Suggested implementation should register which Fiber is waiting for which
727 * resources and immediately calling Fiber.yield to pass control to other
728 * fibers. Then, in the #close method, the scheduler might dispatch all the
729 * I/O resources to fibers waiting for it.
730 *
731 * Expected to return the subset of events that are ready immediately.
732 *
733 */
734static VALUE
735fiber_scheduler_io_wait(VALUE _argument) {
736 VALUE *arguments = (VALUE*)_argument;
737
738 return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
739}
740
741VALUE
742rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
743{
744 VALUE arguments[] = {
745 scheduler, io, events, timeout
746 };
747
748 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
749 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
750 } else {
751 return fiber_scheduler_io_wait((VALUE)&arguments);
752 }
753}
754
755VALUE
760
761VALUE
766
767/*
768 * Document-method: Fiber::Scheduler#io_select
769 * call-seq: io_select(readables, writables, exceptables, timeout)
770 *
771 * Invoked by IO.select to ask whether the specified descriptors are ready for
772 * specified events within the specified +timeout+.
773 *
774 * Expected to return the 3-tuple of Array of IOs that are ready.
775 *
776 */
777VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
778{
779 VALUE arguments[] = {
780 readables, writables, exceptables, timeout
781 };
782
783 return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
784}
785
787{
788 // I wondered about extracting argv, and checking if there is only a single
789 // IO instance, and instead calling `io_wait`. However, it would require a
790 // decent amount of work and it would be hard to preserve the exact
791 // semantics of IO.select.
792
793 return rb_check_funcall(scheduler, id_io_select, argc, argv);
794}
795
796/*
797 * Document-method: Fiber::Scheduler#io_read
798 * call-seq: io_read(io, buffer, length, offset) -> read length or -errno
799 *
800 * Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
801 * specified +buffer+ (see IO::Buffer) at the given +offset+.
802 *
803 * The +length+ argument is the "minimum length to be read". If the IO buffer
804 * size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
805 * but at least 1KiB will be. Generally, the only case where less data than
806 * +length+ will be read is if there is an error reading the data.
807 *
808 * Specifying a +length+ of 0 is valid and means try reading at least once and
809 * return any available data.
810 *
811 * Suggested implementation should try to read from +io+ in a non-blocking
812 * manner and call #io_wait if the +io+ is not ready (which will yield control
813 * to other fibers).
814 *
815 * See IO::Buffer for an interface available to return data.
816 *
817 * Expected to return number of bytes read, or, in case of an error,
818 * <tt>-errno</tt> (negated number corresponding to system's error code).
819 *
820 * The method should be considered _experimental_.
821 */
822static VALUE
823fiber_scheduler_io_read(VALUE _argument) {
824 VALUE *arguments = (VALUE*)_argument;
825
826 return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
827}
828
829VALUE
830rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
831{
832 if (!rb_respond_to(scheduler, id_io_read)) {
833 return RUBY_Qundef;
834 }
835
836 VALUE arguments[] = {
837 scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
838 };
839
840 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
841 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
842 } else {
843 return fiber_scheduler_io_read((VALUE)&arguments);
844 }
845}
846
847/*
848 * Document-method: Fiber::Scheduler#io_pread
849 * call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
850 *
851 * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
852 * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
853 * +offset+.
854 *
855 * This method is semantically the same as #io_read, but it allows to specify
856 * the offset to read from and is often better for asynchronous IO on the same
857 * file.
858 *
859 * The method should be considered _experimental_.
860 */
861static VALUE
862fiber_scheduler_io_pread(VALUE _argument) {
863 VALUE *arguments = (VALUE*)_argument;
864
865 return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
866}
867
868VALUE
869rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
870{
871 if (!rb_respond_to(scheduler, id_io_pread)) {
872 return RUBY_Qundef;
873 }
874
875 VALUE arguments[] = {
876 scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
877 };
878
879 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
880 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
881 } else {
882 return fiber_scheduler_io_pread((VALUE)&arguments);
883 }
884}
885
886/*
887 * Document-method: Fiber::Scheduler#io_write
888 * call-seq: io_write(io, buffer, length, offset) -> written length or -errno
889 *
890 * Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
891 * from a specified +buffer+ (see IO::Buffer) at the given +offset+.
892 *
893 * The +length+ argument is the "minimum length to be written". If the IO
894 * buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
895 * will be written, but at least 1KiB will be. Generally, the only case where
896 * less data than +length+ will be written is if there is an error writing the
897 * data.
898 *
899 * Specifying a +length+ of 0 is valid and means try writing at least once, as
900 * much data as possible.
901 *
902 * Suggested implementation should try to write to +io+ in a non-blocking
903 * manner and call #io_wait if the +io+ is not ready (which will yield control
904 * to other fibers).
905 *
906 * See IO::Buffer for an interface available to get data from buffer
907 * efficiently.
908 *
909 * Expected to return number of bytes written, or, in case of an error,
910 * <tt>-errno</tt> (negated number corresponding to system's error code).
911 *
912 * The method should be considered _experimental_.
913 */
914static VALUE
915fiber_scheduler_io_write(VALUE _argument) {
916 VALUE *arguments = (VALUE*)_argument;
917
918 return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
919}
920
921VALUE
922rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
923{
924 if (!rb_respond_to(scheduler, id_io_write)) {
925 return RUBY_Qundef;
926 }
927
928 VALUE arguments[] = {
929 scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
930 };
931
932 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
933 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
934 } else {
935 return fiber_scheduler_io_write((VALUE)&arguments);
936 }
937}
938
939/*
940 * Document-method: Fiber::Scheduler#io_pwrite
941 * call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
942 *
943 * Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
944 * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
945 * +offset+.
946 *
947 * This method is semantically the same as #io_write, but it allows to specify
948 * the offset to write to and is often better for asynchronous IO on the same
949 * file.
950 *
951 * The method should be considered _experimental_.
952 *
953 */
954static VALUE
955fiber_scheduler_io_pwrite(VALUE _argument) {
956 VALUE *arguments = (VALUE*)_argument;
957
958 return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
959}
960
961VALUE
962rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
963{
964
965
966 if (!rb_respond_to(scheduler, id_io_pwrite)) {
967 return RUBY_Qundef;
968 }
969
970 VALUE arguments[] = {
971 scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
972 };
973
974 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
975 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
976 } else {
977 return fiber_scheduler_io_pwrite((VALUE)&arguments);
978 }
979}
980
981VALUE
982rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
983{
984 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
985
986 VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
987
988 rb_io_buffer_free_locked(buffer);
989
990 return result;
991}
992
993VALUE
994rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
995{
996 VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
997
998 VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
999
1000 rb_io_buffer_free_locked(buffer);
1001
1002 return result;
1003}
1004
1005VALUE
1006rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
1007{
1008 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
1009
1010 VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
1011
1012 rb_io_buffer_free_locked(buffer);
1013
1014 return result;
1015}
1016
1017VALUE
1018rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
1019{
1020 VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
1021
1022 VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
1023
1024 rb_io_buffer_free_locked(buffer);
1025
1026 return result;
1027}
1028
1029/*
1030 * Document-method: Fiber::Scheduler#io_close
1031 * call-seq: io_close(fd)
1032 *
1033 * Invoked by Ruby's core methods to notify scheduler that the IO object is closed. Note that
1034 * the method will receive an integer file descriptor of the closed object, not an object
1035 * itself.
1036 */
1037VALUE
1039{
1040 VALUE arguments[] = {io};
1041
1042 return rb_check_funcall(scheduler, id_io_close, 1, arguments);
1043}
1044
1045/*
1046 * Document-method: Fiber::Scheduler#address_resolve
1047 * call-seq: address_resolve(hostname) -> array_of_strings or nil
1048 *
1049 * Invoked by any method that performs a non-reverse DNS lookup. The most
1050 * notable method is Addrinfo.getaddrinfo, but there are many other.
1051 *
1052 * The method is expected to return an array of strings corresponding to ip
1053 * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved.
1054 *
1055 * Fairly exhaustive list of all possible call-sites:
1056 *
1057 * - Addrinfo.getaddrinfo
1058 * - Addrinfo.tcp
1059 * - Addrinfo.udp
1060 * - Addrinfo.ip
1061 * - Addrinfo.new
1062 * - Addrinfo.marshal_load
1063 * - SOCKSSocket.new
1064 * - TCPServer.new
1065 * - TCPSocket.new
1066 * - IPSocket.getaddress
1067 * - TCPSocket.gethostbyname
1068 * - UDPSocket#connect
1069 * - UDPSocket#bind
1070 * - UDPSocket#send
1071 * - Socket.getaddrinfo
1072 * - Socket.gethostbyname
1073 * - Socket.pack_sockaddr_in
1074 * - Socket.sockaddr_in
1075 * - Socket.unpack_sockaddr_in
1076 */
1077VALUE
1079{
1080 VALUE arguments[] = {
1081 hostname
1082 };
1083
1084 return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
1085}
1086
1087/*
1088 * Document-method: Fiber::Scheduler#blocking_operation_wait
1089 * call-seq: blocking_operation_wait(blocking_operation)
1090 *
1091 * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
1092 * The blocking_operation is an opaque object that encapsulates the blocking operation
1093 * and responds to a <tt>#call</tt> method without any arguments.
1094 *
1095 * If the scheduler doesn't implement this method, or if the scheduler doesn't execute
1096 * the blocking operation, Ruby will fall back to the non-scheduler implementation.
1097 *
1098 * Minimal suggested implementation is:
1099 *
1100 * def blocking_operation_wait(blocking_operation)
1101 * Thread.new { blocking_operation.call }.join
1102 * end
1103 */
1104VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
1105{
1106 // Check if scheduler supports blocking_operation_wait before creating the object
1107 if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
1108 return Qundef;
1109 }
1110
1111 // Create a new BlockingOperation with the blocking operation
1112 VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
1113
1114 VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
1115
1116 // Get the operation data to check if it was executed
1117 rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
1118 rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
1119
1120 // Invalidate the operation now that we're done with it
1121 operation->function = NULL;
1122 operation->state = NULL;
1123 operation->data = NULL;
1124 operation->data2 = NULL;
1125 operation->unblock_function = NULL;
1126
1127 // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
1128 if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
1129 return Qundef;
1130 }
1131
1132 return result;
1133}
1134
1135/*
1136 * Document-method: Fiber::Scheduler#fiber_interrupt
1137 * call-seq: fiber_interrupt(fiber, exception)
1138 *
1139 * Invoked by Ruby's core methods to notify the scheduler that the blocked fiber should be interrupted
1140 * with an exception. For example, IO#close uses this method to interrupt fibers that are performing
1141 * blocking IO operations.
1142 *
1143 */
1145{
1146 VALUE arguments[] = {
1147 fiber, exception
1148 };
1149
1150 VALUE result;
1151 enum ruby_tag_type state;
1152
1153 // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`.
1154 rb_execution_context_t *ec = GET_EC();
1155 int saved_interrupt_mask = ec->interrupt_mask;
1156 ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
1157
1158 EC_PUSH_TAG(ec);
1159 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1160 result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
1161 }
1162 EC_POP_TAG();
1163
1164 ec->interrupt_mask = saved_interrupt_mask;
1165
1166 if (state) {
1167 EC_JUMP_TAG(ec, state);
1168 }
1169
1170 RUBY_VM_CHECK_INTS(ec);
1171
1172 return result;
1173}
1174
1175/*
1176 * Document-method: Fiber::Scheduler#fiber
1177 * call-seq: fiber(&block)
1178 *
1179 * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
1180 * run the given block of code in a separate non-blocking fiber, and to return that Fiber.
1181 *
1182 * Minimal suggested implementation is:
1183 *
1184 * def fiber(&block)
1185 * fiber = Fiber.new(blocking: false, &block)
1186 * fiber.resume
1187 * fiber
1188 * end
1189 */
1190VALUE
1191rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
1192{
1193 return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
1194}
1195
1196/*
1197 * C API: Cancel a blocking operation
1198 *
1199 * This function cancels a blocking operation. If the operation is queued,
1200 * it just marks it as cancelled. If it's executing, it marks it as cancelled
1201 * and calls the unblock function to interrupt the operation.
1202 *
1203 * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error.
1204 */
1205int
1206rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
1207{
1208 if (blocking_operation == NULL) {
1209 return -1;
1210 }
1211
1212 rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
1213
1214 switch (current_state) {
1215 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
1216 // Work hasn't started - just mark as cancelled:
1217 if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
1218 // Successfully cancelled before execution:
1219 return 0;
1220 }
1221 // Fall through if state changed between load and CAS
1222
1223 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
1224 // Work is running - mark cancelled AND call unblock function
1225 if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
1226 // State changed between load and CAS - operation may have completed:
1227 return 0;
1228 }
1229 // Otherwise, we successfully marked it as cancelled, so we can call the unblock function:
1230 rb_unblock_function_t *unblock_function = blocking_operation->unblock_function;
1231 if (unblock_function) {
1232 RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier");
1233 blocking_operation->unblock_function(blocking_operation->data2);
1234 }
1235 // Cancelled during execution (unblock function called):
1236 return 1;
1237
1238 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
1239 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
1240 // Already finished or cancelled:
1241 return 0;
1242 }
1243
1244 return 0;
1245}
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
#define RUBY_ATOMIC_CAS(var, oldval, newval)
Atomic compare-and-swap.
Definition atomic.h:165
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
Definition atomic.h:175
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_class_new(VALUE super)
Creates a new, anonymous class.
Definition class.c:853
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1509
#define Qundef
Old name of RUBY_Qundef.
#define SIZET2NUM
Old name of RB_SIZE2NUM.
Definition size_t.h:62
#define Qnil
Old name of RUBY_Qnil.
VALUE rb_eRuntimeError
RuntimeError exception.
Definition error.c:1429
void rb_warn(const char *fmt,...)
Identical to rb_warning(), except it reports unless $VERBOSE is nil.
Definition error.c:466
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
Definition vm_eval.c:1117
VALUE rb_funcall_passing_block_kw(VALUE recv, ID mid, int argc, const VALUE *argv, int kw_splat)
Identical to rb_funcallv_passing_block(), except you can specify how to handle the last element of th...
Definition vm_eval.c:1187
void rb_unblock_function_t(void *)
This is the type of UBFs.
Definition thread.h:336
int rb_respond_to(VALUE obj, ID mid)
Queries if the object responds to the method.
Definition vm_method.c:3416
VALUE rb_check_funcall(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcallv(), except it returns RUBY_Qundef instead of raising rb_eNoMethodError.
Definition vm_eval.c:686
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
static ID rb_intern_const(const char *str)
This is a "tiny optimisation" over rb_intern().
Definition symbol.h:285
VALUE rb_io_timeout(VALUE io)
Get the timeout associated with the specified io object.
Definition io.c:857
@ RUBY_IO_READABLE
IO::READABLE
Definition io.h:97
@ RUBY_IO_WRITABLE
IO::WRITABLE
Definition io.h:98
int ruby_thread_has_gvl_p(void)
Whether the current thread is holding the GVL.
Definition thread.c:2103
void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags)
Identical to rb_thread_call_without_gvl(), except it additionally takes "flags" that change the behav...
Definition thread.c:1593
#define RB_UINT2NUM
Just another name of rb_uint2num_inline.
Definition int.h:39
#define RB_INT2NUM
Just another name of rb_int2num_inline.
Definition int.h:37
VALUE rb_ensure(type *q, VALUE w, type *e, VALUE r)
An equivalent of ensure clause.
#define OFFT2NUM
Converts a C's off_t into an instance of rb_cInteger.
Definition off_t.h:33
#define PIDT2NUM
Converts a C's pid_t into an instance of rb_cInteger.
Definition pid_t.h:28
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:649
struct rb_data_type_struct rb_data_type_t
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:205
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:508
int rb_errno(void)
Identical to system errno.
Definition eval.c:2274
#define errno
Ractor-aware version of errno.
Definition ruby.h:388
Scheduler APIs.
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void *(*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
Defer the execution of the passed function to the scheduler.
Definition scheduler.c:1104
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:471
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
Non-blocking pread from the passed IO using a native buffer.
Definition scheduler.c:1006
VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout)
Converts the passed timeout to an expression that rb_fiber_scheduler_block() etc.
Definition scheduler.c:522
VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for reading.
Definition scheduler.c:756
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
Non-blocking read from the passed IO using a native buffer.
Definition scheduler.c:982
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO at the specified offset.
Definition scheduler.c:962
VALUE rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE *argv)
Identical to rb_fiber_scheduler_kernel_sleep(), except it can pass multiple arguments.
Definition scheduler.c:549
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
Interrupt a fiber by raising an exception.
Definition scheduler.c:1144
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
Non-blocking version of rb_io_wait().
Definition scheduler.c:742
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
Non-blocking version of IO.select.
Definition scheduler.c:777
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO.
Definition scheduler.c:830
int rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Cancel a blocking operation.
Definition scheduler.c:1206
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
Non-blocking version of IO.select, argv variant.
Definition scheduler.c:786
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
Non-blocking waitpid.
Definition scheduler.c:636
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:660
int rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Execute blocking operation from handle (GVL not required).
Definition scheduler.c:206
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO at the specified offset.
Definition scheduler.c:869
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
Non-blocking pwrite to the passed IO using a native buffer.
Definition scheduler.c:1018
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO.
Definition scheduler.c:922
VALUE rb_fiber_scheduler_close(VALUE scheduler)
Closes the passed scheduler object.
Definition scheduler.c:500
rb_fiber_scheduler_blocking_operation_t * rb_fiber_scheduler_blocking_operation_extract(VALUE self)
Extract the blocking operation handle from a BlockingOperationRuby object.
Definition scheduler.c:191
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
Identical to rb_fiber_scheduler_current(), except it queries for that of the passed thread value inst...
Definition scheduler.c:479
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:543
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
Non-blocking DNS lookup.
Definition scheduler.c:1078
VALUE rb_fiber_scheduler_yield(VALUE scheduler)
Yield to the scheduler, to be resumed on the next scheduling cycle.
Definition scheduler.c:561
VALUE rb_fiber_scheduler_set(VALUE scheduler)
Destructively assigns the passed scheduler to that of the current thread that is calling this functio...
Definition scheduler.c:433
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
Non-blocking write to the passed IO using a native buffer.
Definition scheduler.c:994
VALUE rb_fiber_scheduler_current_for_threadptr(struct rb_thread_struct *thread)
Identical to rb_fiber_scheduler_current_for_thread(), except it expects a threadptr instead of a thre...
Definition scheduler.c:484
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for writing.
Definition scheduler.c:762
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
Non-blocking close the given IO.
Definition scheduler.c:1038
VALUE rb_fiber_scheduler_get(void)
Queries the current scheduler of the current thread that is calling this function.
Definition scheduler.c:383
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:679
VALUE rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
Create and schedule a non-blocking fiber.
Definition scheduler.c:1191
@ RUBY_Qundef
Represents so-called undef.
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40