#ifndef RTCPROTO_H #define RTCPROTO_H #ifndef POSIXVER_H #define POSIXVER_H /* ** Include this file before including system headers. By default, with ** C99 support from the compiler, it requests POSIX 2008 support. With ** C89 support only, it requests POSIX 1997 support. Override the ** default behaviour by setting either _XOPEN_SOURCE or _POSIX_C_SOURCE. */ /* _XOPEN_SOURCE 700 is loosely equivalent to _POSIX_C_SOURCE 200809L */ /* _XOPEN_SOURCE 600 is loosely equivalent to _POSIX_C_SOURCE 200112L */ /* _XOPEN_SOURCE 500 is loosely equivalent to _POSIX_C_SOURCE 199506L */ #if !defined(_XOPEN_SOURCE) && !defined(_POSIX_C_SOURCE) #if defined(__cplusplus) #define _XOPEN_SOURCE 700 /* SUS v4, POSIX 1003.1 2008/13 (POSIX 2008/13) */ #elif __STDC_VERSION__ >= 199901L #define _XOPEN_SOURCE 700 /* SUS v4, POSIX 1003.1 2008/13 (POSIX 2008/13) */ #else #define _XOPEN_SOURCE 500 /* SUS v2, POSIX 1003.1 1997 */ #endif /* __STDC_VERSION__ */ #endif /* !_XOPEN_SOURCE && !_POSIX_C_SOURCE */ #endif /* POSIXVER_H */ #ifndef RTCP_PROTO_H #define RTCP_PROTO_H /* Copyright (c) 2004-2005, Swedish Institute of Computer Science. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. Neither the name of the Institute nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS `AS IS' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. Author: Adam Dunkels */ /* The protothread implementation used here is the one originally written by Adam Dunkels (and Oliver Schmidt). I've lifted out only the parts relevant to this project, and the changes were so slight that it still seems sane to include the license under which their protothread library was released under. */ /** Local continuations Local continuations form the basis for the protothread implementation. A local continuation can be set in a function to capture its state. After it has been set, a local continuation can be resumed in order to restore the state of the function at the point where the local continuation was set. */ /** Implementation of local continuations based on the switch() statement. */ /* WARNING! lc implementation using switch() does not work if an LC_SET() is done within another switch() statement! */ typedef unsigned int rtcp_lc_t; /** Initialize a local continuation. * Also unsets any previously set continuation state. */ #define LC_INIT(s) s = 0; /** Resume a local continuation. * Resumes a previously set local continuation, restoring the state of the * function when the local continuation was set. If it was not set previously, * the resume operation does nothing. */ #define LC_RESUME(s) switch(s) { case 0: /** Set a local continuation. * Saves the state of the function at the point where the operation was last * executed. This state does not include the call stack or local (automatic) * variables, only the program counter (and such CPU registers that need to be * saved). */ #define LC_SET(s) s = __LINE__; case __LINE__: /** Mark the end of a local continuation usage. * The end operation signifies that local continuations should not be used * further on in the function. This operation is only required by some * implementations of local continuations, like the one here that relies on * the switch construct. */ #define LC_END(s) } typedef struct rtcp_pt_t { rtcp_lc_t lc; } rtcp_pt_t; #define PT_WAITING 0 #define PT_YIELDED 1 #define PT_EXITED 2 #define PT_ENDED 3 /** Initialize a protothread. * Initialization must be done prior to starting execution of the protothread. */ #define PT_INIT(pt) LC_INIT((pt)->lc) /** Declaration of a protothread. * All protothreads must be declared with this macro, which constrains the * signature of the function implementing it to return a character. */ #define PT_THREAD(fun_name_and_args) char fun_name_and_args /** Declare the start of a protothread inside the function that implements it. * Should be placed at the beginning of the function. All C statements above * this will be executed each time the protothread is scheduled. */ #define PT_BEGIN(pt) \ { char PT_YIELD_FLAG = 1; struct timespec PT_YIELD_TIME; \ (void)PT_YIELD_FLAG; (void)PT_YIELD_TIME; \ LC_RESUME((pt)->lc) /** Declare the end of a protothread inside the function that implements it. * Always used together with a matching PT_BEGIN() macro. */ #define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \ PT_INIT(pt); return PT_ENDED; } /** Block and wait until a condition is true. * Blocks the protothread until the specified condition is true. */ #define PT_WAIT_UNTIL(pt, condition) \ do { \ LC_SET((pt)->lc); \ if(!(condition)){ return PT_WAITING; } \ } while(0) /** Block and wait while a condition is true. * Blocks the protothread while the specified condition is true. */ #define PT_WAIT_WHILE(pt, condition) PT_WAIT_UNTIL(pt, !(condition)) /** Block and wait until a child protothread completes. * Schedules a child protohread. The current protothread will block until the * child protohread completes. The child protothread must be manually * initialized with the PT_INIT() macro before PT_WAIT_THREAD() is used. */ #define PT_WAIT_THREAD(pt, thread) PT_WAIT_WHILE((pt), PT_SCHEDULE(thread)) /** Spawn a child protothread and wait until it exits. * This macro can only be used inside a protothread. */ #define PT_SPAWN(pt, child, thread) \ do { \ PT_INIT(child); \ PT_WAIT_THREAD(pt, thread); \ } while(0) /** Restart the protothread. * Causes the running protothread to block and to restart its execution * at the place of the PT_BEGIN() macro. */ #define PT_RESTART(pt) \ do { \ PT_INIT(pt); \ return PT_WAITING; \ } while(0) /** Exit the protothread. * If the protothread was spawned by another one, the parent protothread will * become unblocked and can continue to run. */ #define PT_EXIT(pt) \ do { \ PT_INIT(pt); \ return PT_EXITED; \ } while(0) /** Schedule a protothread. * The return value of this expression is non-zero if the protothread is * running, or zero if it has exited. */ #define PT_SCHEDULE(f) ((f) < PT_EXITED) /** Yield from the current protothread. * Essentially, it acts as a one-time blocking wait to be rescheduled. */ #define PT_YIELD(pt) \ do { \ PT_YIELD_FLAG = 0; \ LC_SET((pt)->lc); \ if(PT_YIELD_FLAG == 0){ return PT_YIELDED; } \ } while(0) /** Yield from the current protothread until a condition occurs. * Essentially, it acts as a one-time blocking wait, and then a conditional * wait (always blocking at least once). */ #define PT_YIELD_UNTIL(pt, condition) \ do { \ PT_YIELD_FLAG = 0; \ LC_SET((pt)->lc); \ if(PT_YIELD_FLAG == 0 || !(condition)){ return PT_YIELDED; } \ } while(0) #endif /* RTCP_PROTO_H */ #ifndef RTCP_H #define RTCP_H /* MIT License Copyright © 2023 Benedict Blaise Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ /** rtcproto: run-to-completion protothreads PROTOTHREADS Protothreads are extremely lightweight stackless threads designed for severely memory constrained systems such as small embedded systems or sensor network nodes. Protothreads can be used with or without an underlying operating system. Protothreads provides a blocking context on top of an event-driven system, without the overhead of per-thread stacks. The purpose of protothreads is to implement sequential flow of control without complex state machines or full multi-threading. The protothread implementation used here is the one originally written by Adam Dunkels and Oliver Schmidt. I've lifted out only the parts relevant to this project, and the changes were so slight that it still seemed sane to include the license under which their protothread library was released under. Their full license text can be found in "proto.h". LOCAL STATES One drawback of protothreads is the fact that their local continuation only stores the point of execution in a function, and no local state whatsoever, such as any automatic (stack) variables. The usual remedy for this in memory constrained systems is to use static variables in functions, and to never have more than one instance of a protothread running, which works well in contexts such as the Contiki operating system's. In the run-to-completion protothread scheduler, while it is still true that only one protothread may be executing at any one time, this is only the result of the style of scheduling it uses, its methods allow for more flexibility. This means that the local states of each protothread invocation can be stored separately while that particular invocation waits to be scheduled. COOPERATIVE SCHEDULING When the scheduler is expected to return control by a timestamp regardless of whether the scheduled work is done, it provides protothreads with an extra timestamp parameter by which they are also expected to return control. This library aims to provide convenient macros that allow protothreads to cooperate with their executing scheduler by mechanisms such as yielding conditionally if a timepoint has passed, or generic loops that avoid the overhead of clock checks on every iteration, checking only at every couple iterations. TICKING AND SHRINKING Memory is shrunk lazily to avoid cyclical deallocation/allocation on systems that require and relinquish capacity in regular bursts. This works by ticking the data structure regularly, to let it know that a unit of time has passed. Each time, the expected required capacity is decreased, and when it falls below a threshold, the next write operation will reallocate to a smaller footprint that accomodates the actual load. PERFORMANCE STATISTICS Important performance statistics are recorded for each protothread, to help diagnose performance bottlenecks. See the documentation on the rtcp_stats_t type for what these are. Before requesting these statistics, the protothread function has to be registered with the scheduler either manually, or automatically through queueing calls to that protothread. SINGLE HEADER The library is built with meson for the time being, but an up-to-date version of a single-header rtcproto.h should be available in the source tree. If it happens to be out of date, you can build it with "$ make". All declarations should be at the top, and all definitions should be at the bottom of this header. Do "#define RTCPROTO_IMPLEMENTATION" before including this header in *exactly one* C file to create the implementation. ANSI C & C99 & POSIX 2001 The language of the implementation itself is ANSI C (C89), but the library depends on "stdint.h" for exact-width integer types (C99) and "unistd.h" for measuring wall-clock time accurately with clock_gettime (SUSv2, POSIX.1-2001). */ #define _POSIX_C_SOURCE 200112L #include #include #include #include #include #include #include #include /** Function executing a scheduled protothread. * All protothread functions must conform to this type if they are scheduled. \param proto protothread representation \param state user-defined local state associated with the protothread \param end NULL if the scheduler expects the protothread to complete, or a timestamp by which it is expected to suspend execution */ typedef char (*fn_prthread_t)(rtcp_pt_t* proto, void* state, struct timespec* end); /** User-defined index update function. * Useful when two values are associated in two different data structures, * when they need to know the key/position of the other value in the * associated structure at all times. \param value stored element whose index was updated \param index the updated index of the element \param user optional user pointer */ typedef void (*fn_idx_update_t)(void* value, int index, void* user); void rtcp_assert_always(int cond, char* fmt, ...); unsigned int rtcp_ceil_power_of_two(unsigned int x); int rtcp_capacity_from_size_and_factor(int size, float factor); void rtcp_timespec_norm(struct timespec* spec); int rtcp_timespec_diff_nsec(struct timespec* start, struct timespec* end); int rtcp_timespec_less(struct timespec* start, struct timespec* end); void rtcp_timespec_next(struct timespec* start); void rtcp_timespec_prev(struct timespec* start); struct timespec rtcp_timespec_add(struct timespec* start, struct timespec* end); struct timespec rtcp_timespec_diff(struct timespec* start, struct timespec* end); /* Clock-based protothread yielding for cooperative scheduling. */ /** Yield from the current protothread if a timepoint has passed. * Essentially, it acts as a conditional blocking wait to be rescheduled. * If 'time' is a null pointer, it will never yield. */ #define PT_YIELD_COOP(pt, time) \ do { \ LC_SET((pt)->lc); \ clock_gettime(CLOCK_REALTIME, &PT_YIELD_TIME); \ if((time) && rtcp_timespec_less((time), &PT_YIELD_TIME)) \ { return PT_YIELDED; } \ } while(0) /** Loop while checking for a cooperative yield only every couple iterations. * Useful with thin loops where the overhead of a clock_gettime every iteration * would waste considerable performance compared to the loop body. * * Specify your loop body within a new block using curly brackets '{}' right * after this macro. Before the closing bracket, you must use PT_LOOP_INCR. \param start statement that initializes your loop \param end your loop condition expression \param counter a symbol to serve as an integer counter \param between how many increments should pass between yield checks */ #define PT_LOOP(start, end, counter, between) \ (start); \ while((end)) \ { \ (counter) = 0; \ while((counter) < (between) && (end)) /** Increment loop while checking for a cooperative yield. * This macro is used in tandem with PT_LOOP. \param pt protothread \param time ending timepoint \param incr your loop increment statement \param counter a symbol to serve as an integer counter */ #define PT_LOOP_INCR(pt, time, incr, counter) \ ++(counter); (incr); \ } \ PT_YIELD_COOP(pt, time); #endif /* RTCP_H */ #ifndef RTCP_HASH_H #define RTCP_HASH_H #define _POSIX_C_SOURCE 200112L #include #include #include /** User-defined hashmap iteration function. */ typedef void (*fn_hmap_iter_t)(void*); /** Hashmap bucket states. A bucket can be empty, deleted, or occupied. */ enum { RTCP_HMAP_NIL = 0, RTCP_HMAP_DEL, RTCP_HMAP_OCC }; /** Generic hashmap data structure. * * Uses open addressing, linear probing, and a load factor of 1/2. * Hashes keys with 3 iterations of the romu_mono32 pseudorandom generator. * * Expands its memory pool as required, but shrinks it lazily. */ typedef struct rtcp_hmap_t { /** Currently allocated capacity. */ int capacity; /** Dynamically shrinking/expanding capacity. */ int targetcapa; /** Currently used capacity. */ int size; /** Size of the stored type, in bytes. */ int typesize; /** Bucket states. */ char* records; /** Bucket keys. */ fn_prthread_t* keys; /** Bucket values. */ void* buckets; } rtcp_hmap_t; void rtcp_hmap_init(rtcp_hmap_t* hmap); int rtcp_hmap_malloc(rtcp_hmap_t* hmap, int capacity, int typesize); void rtcp_hmap_free(rtcp_hmap_t* hmap); void rtcp_hmap_move(rtcp_hmap_t* dest, rtcp_hmap_t* src); int rtcp_hmap_realloc(rtcp_hmap_t* hmap, int newcapa); unsigned int rtcp_hmap_hashkey(fn_prthread_t key, int size); float rtcp_hmap_loadfact(rtcp_hmap_t* hmap); void rtcp_hmap_tick(rtcp_hmap_t* hmap); void rtcp_hmap_use_size(rtcp_hmap_t* hmap); int rtcp_hmap_shrink(rtcp_hmap_t* hmap); int rtcp_hmap_add(rtcp_hmap_t* hmap, fn_prthread_t key, void* val); void* rtcp_hmap_get(rtcp_hmap_t* hmap, fn_prthread_t key); int rtcp_hmap_get_idx(rtcp_hmap_t* hmap, fn_prthread_t key); void rtcp_hmap_del(rtcp_hmap_t* hmap, fn_prthread_t key); void rtcp_hmap_iter(rtcp_hmap_t* hmap, fn_hmap_iter_t callback); #endif /* RTCP_HASH_H */ #ifndef RTCP_UVEC_H #define RTCP_UVEC_H #define _POSIX_C_SOURCE 200112L #include #include #include /** Generic unordered vector data structure. * * A user-defined callback function will be invoked whenever a value is moved * into a new position in the vector. * * Expands its memory pool as required, but shrinks it lazily. */ typedef struct rtcp_uvec_t { /** Currently allocated capacity. */ int capacity; /** Dynamically shrinking/expanding capacity. */ int targetcapa; /** Currently used capacity. */ int size; /** Size of the stored type, in bytes. */ int typesize; /** Stored values. */ void* values; /** User-defined value index update callback. */ fn_idx_update_t callback; /** User pointer for the index update callback. */ void* user; } rtcp_uvec_t; void rtcp_uvec_init(rtcp_uvec_t* uvec); void rtcp_uvec_cb(rtcp_uvec_t* uvec, fn_idx_update_t callback, void* user); int rtcp_uvec_malloc(rtcp_uvec_t* uvec, int capacity, int typesize); void rtcp_uvec_free(rtcp_uvec_t* uvec); int rtcp_uvec_realloc(rtcp_uvec_t* uvec, int newcapa); void rtcp_uvec_tick(rtcp_uvec_t* uvec); void rtcp_uvec_use_size(rtcp_uvec_t* uvec); int rtcp_uvec_shrink(rtcp_uvec_t* uvec); int rtcp_uvec_add(rtcp_uvec_t* uvec, void* val); int rtcp_uvec_del(rtcp_uvec_t* uvec, int idx); void* rtcp_uvec_get(rtcp_uvec_t* uvec, int idx); #endif /* RTCP_UVEC_H */ #ifndef RTCP_HEAP_H #define RTCP_HEAP_H #define _POSIX_C_SOURCE 200112L #include #include #include /** User-defined comparison function for use in a generic heap. */ typedef int (*fn_heap_compare_t)(void*, void*); /** Generic heap data structure. * * The comparison function is user-defined, so a "less" comparison will yield * a min-heap, and a "greater" comparison will yield a max-heap. * * A user-defined callback function will be invoked whenever a value is moved * into a new position on the heap. Currently called on every move. * * Expands its memory pool as required, but shrinks it lazily. */ typedef struct rtcp_heap_t { /** Currently allocated capacity. */ int capacity; /** Dynamically shrinking/expanding capacity. */ int targetcapa; /** Currently used capacity. */ int size; /** Size of the stored type, in bytes. */ int typesize; /** Stored values. */ void* values; /** Value swap storage. */ void* swap; /** User-defined comparison function. */ fn_heap_compare_t compare; /** User-defined value index update callback. */ fn_idx_update_t callback; /** User pointer for the index update callback. */ void* user; } rtcp_heap_t; void rtcp_heap_init(rtcp_heap_t* heap); void rtcp_heap_cb(rtcp_heap_t* heap, fn_idx_update_t callback, void* user); int rtcp_heap_malloc(rtcp_heap_t* heap, int capacity, int typesize, fn_heap_compare_t compare); void rtcp_heap_free(rtcp_heap_t* heap); int rtcp_heap_realloc(rtcp_heap_t* heap, int newcapa); void rtcp_heap_swap(rtcp_heap_t* heap, int a, int b); void rtcp_heap_tick(rtcp_heap_t* heap); void rtcp_heap_use_size(rtcp_heap_t* heap); int rtcp_heap_shrink(rtcp_heap_t* heap); int rtcp_heap_property(rtcp_heap_t* heap); void rtcp_heap_swim(rtcp_heap_t* heap, int idx); void rtcp_heap_sink(rtcp_heap_t* heap, int idx); int rtcp_heap_insert(rtcp_heap_t* heap, void* val); void* rtcp_heap_peek(rtcp_heap_t* heap); void rtcp_heap_extract(rtcp_heap_t* heap, void* val); void* rtcp_heap_get(rtcp_heap_t* heap, int idx); #endif /* RTCP_HEAP_H */ #ifndef ROMU_H #define ROMU_H #include #include #define ROTL(d,lrot) ((d<<(lrot)) | (d>>(8*sizeof(d)-(lrot)))) uint32_t romu_mono32_init(uint32_t seed); uint16_t romu_mono32_random(uint32_t* state); #endif /* ROMU_H */ #ifndef RTCP_SCHED_H #define RTCP_SCHED_H #define _POSIX_C_SOURCE 200112L /** Type storing statistics associated with a certain protothread type */ typedef struct rtcp_stats_t { /** Number of entries into / invocations of the protothead. */ int entries; /** Total time spent in that protothread. */ struct timespec total; /** Local continuation of the maximal overrun. */ rtcp_lc_t max_lc; /** Time length of the maximal overrun. */ struct timespec max_over; } rtcp_stats_t; void rtcp_stats_init(rtcp_stats_t* stats); void rtcp_stats_update(rtcp_stats_t* stats, rtcp_lc_t lc, struct timespec actual, struct timespec overrun); /** Scheduler call. */ typedef struct rtcp_call_t { /** Expected invocation time (absolute timestamp). */ struct timespec stamp; /** Sequence number. */ unsigned int seq; /** Protothread to be invoked. */ fn_prthread_t func; /** Protothread representation. */ rtcp_pt_t pt; /** Index of the local state associated with the call in the local state storage associated with the function to be invoked. */ int uvecidx; } rtcp_call_t; int rtcp_call_comp_less(void* a, void* b); void rtcp_call_idx_update(void* val, int idx, void* user); /** Prefix that serves as a header in memory for each local state. */ typedef struct rtcp_prefix_t { /** Index of the protothread in the call queue the state belongs to. */ int heapidx; } rtcp_prefix_t; void rtcp_prefix_idx_update(void* val, int idx, void* user); /** Type storing local states associated with a certain protothread type. */ typedef struct rtcp_assoc_t { /** Stores data blocks of (rtcp_prefix_t + local state). */ rtcp_uvec_t locals; /** Run statistics. */ rtcp_stats_t stats; } rtcp_assoc_t; void rtcp_assoc_init(rtcp_assoc_t* assoc); void rtcp_assoc_malloc(rtcp_assoc_t* assoc, int typesize); void rtcp_assoc_free(rtcp_assoc_t* assoc); void rtcp_assoc_tick(rtcp_assoc_t* assoc); void rtcp_assoc_shrink(rtcp_assoc_t* assoc); /** Maximum number of scheduled calls that can be waiting at any one time. */ #define RTCP_SCHED_CALLS_MAX ((unsigned int)((((unsigned int)~0) >> 1) + 1)) /** Run-to-completion scheduler. * * Uses sequence numbers to distinguish calls having equal timestamps, so they * can be invoked in a first-in first-out order. However, only up to half of * an unsigned int number of calls may be waiting at any one time, so that the * head and tail of the sequence can be ordered correctly. * * Expands its memory pool as required, but shrinks it lazily. */ typedef struct rtcp_sched_t { /** Call queue. Stores rtcp_call_t's. */ rtcp_heap_t calls; /** Local state storages associated with protothread types. Stores rtcp_assoc_t's. */ rtcp_hmap_t assoc; /** Sequence number to distinguish equal timestamps. */ unsigned int seq; } rtcp_sched_t; int rtcp_sched_malloc(rtcp_sched_t* sched); void rtcp_sched_free(rtcp_sched_t* sched); void rtcp_sched_free_assoc(void* val); void rtcp_sched_tick_assoc(void* val); void rtcp_sched_shrink_assoc(void* val); void rtcp_sched_tick(rtcp_sched_t* sched); void rtcp_sched_shrink(rtcp_sched_t* sched); int rtcp_sched_func_get_assoc(rtcp_sched_t* sched, fn_prthread_t func, int statesize, rtcp_assoc_t** out); int rtcp_sched_calls_queued(rtcp_sched_t* sched); int rtcp_sched_queue(rtcp_sched_t* sched, struct timespec stamp, fn_prthread_t func, void* state, int statesize); /** Convenience macro, potential footgun. * Think of the state parameter as if you were passing it by value. */ #define rtcp_sched_mqueue(sched, stamp, func, state) rtcp_sched_queue(sched, stamp, func, &(state), sizeof(state)) void rtcp_sched_call(rtcp_sched_t* sched, struct timespec end, int endignore); void rtcp_sched_until(rtcp_sched_t* sched, struct timespec end); void rtcp_sched_flush(rtcp_sched_t* sched); void rtcp_sched_flush_until(rtcp_sched_t* sched, struct timespec end); void rtcp_sched_func_stats(rtcp_sched_t* sched, fn_prthread_t func, rtcp_stats_t* out); #endif /* RTCP_SCHED_H */ #ifdef RTCPROTO_IMPLEMENTATION void rtcp_assert_always(int cond, char* fmt, ...) { va_list argp; va_start(argp, fmt); if(!cond){ fprintf(stderr, "rtcp_assert_always: "); vfprintf(stderr, fmt, argp); exit(1); } va_end(argp); } unsigned int rtcp_ceil_power_of_two(unsigned int x) { int n = 0; while(x > 1){ ++n; x >>= 1; } return x << (n + 1); } void rtcp_timespec_norm(struct timespec* spec) { spec->tv_sec = spec->tv_sec + (spec->tv_nsec / 1000000000); spec->tv_nsec = spec->tv_nsec - (spec->tv_nsec / 1000000000) * 1000000000; } int rtcp_timespec_diff_nsec(struct timespec* start, struct timespec* end) { if(start->tv_sec - end->tv_sec){ return 1000000000 * (end->tv_sec - start->tv_sec - 1) + 1000000000 + end->tv_nsec - start->tv_nsec; } else { return end->tv_nsec - start->tv_nsec; } } int rtcp_timespec_less(struct timespec* start, struct timespec* end) { return (start->tv_sec < end->tv_sec) ? 1 : (start->tv_sec > end->tv_sec) ? 0 : (start->tv_nsec < end->tv_nsec); } void rtcp_timespec_next(struct timespec* start) { start->tv_sec += (start->tv_nsec == LONG_MAX); ++start->tv_nsec; } void rtcp_timespec_prev(struct timespec* start) { start->tv_sec -= (start->tv_nsec == 0); --start->tv_nsec; } struct timespec rtcp_timespec_add(struct timespec* start, struct timespec* end) { struct timespec ret; time_t start_sec_from_nsec; time_t end_sec_from_nsec; start_sec_from_nsec = start->tv_nsec / 1000000000; end_sec_from_nsec = end->tv_nsec / 1000000000; ret.tv_sec = end->tv_sec + start->tv_sec + end_sec_from_nsec + start_sec_from_nsec; ret.tv_nsec = (end->tv_nsec - end_sec_from_nsec * 1000000000) + (start->tv_nsec - start_sec_from_nsec * 1000000000); return ret; } struct timespec rtcp_timespec_diff(struct timespec* start, struct timespec* end) { struct timespec ret; time_t sec_diff; struct timespec start_norm = *start; struct timespec end_norm = *end; rtcp_timespec_norm(&start_norm); rtcp_timespec_norm(&end_norm); sec_diff = end_norm.tv_sec - start_norm.tv_sec; ret.tv_sec = sec_diff - (sec_diff > 0); ret.tv_nsec = (sec_diff > 0) * 1000000000 + (end_norm.tv_nsec - start_norm.tv_nsec); return ret; } void rtcp_hmap_init(rtcp_hmap_t* hmap) { int i; /* Initialize values */ hmap->size = 0; /* Initialize records to empty */ for(i = 0; i < hmap->capacity; ++i){ hmap->records[i] = RTCP_HMAP_NIL; } } int rtcp_hmap_malloc(rtcp_hmap_t* hmap, int capacity, int typesize) { /* Store parameters */ hmap->capacity = capacity; rtcp_assert_always(hmap->capacity > 0, "rtcp_hmap_malloc: capacity > 0\n"); hmap->typesize = typesize; rtcp_assert_always(hmap->typesize > 0, "rtcp_hmap_malloc: typesize > 0\n"); /* Initialize values */ hmap->targetcapa = hmap->capacity; /* Allocate memory */ hmap->records = malloc(hmap->capacity * sizeof(char)); if(!hmap->records){ return errno; } hmap->keys = malloc(hmap->capacity * sizeof(fn_prthread_t)); if(!hmap->keys){ return errno; } hmap->buckets = malloc(hmap->capacity * hmap->typesize); if(!hmap->buckets){ return errno; } return 0; } void rtcp_hmap_free(rtcp_hmap_t* hmap) { /* Free memory */ assert(hmap->records); free(hmap->records); assert(hmap->keys); free(hmap->keys); assert(hmap->buckets); free(hmap->buckets); } void rtcp_hmap_move(rtcp_hmap_t* dest, rtcp_hmap_t* src) { /* Free the destination */ rtcp_hmap_free(dest); /* Initialize values */ dest->capacity = src->capacity; dest->size = src->size; dest->typesize = src->typesize; /* Move memory */ dest->records = src->records; dest->keys = src->keys; dest->buckets = src->buckets; /* Reset memory in source */ src->records = NULL; src->keys = NULL; src->buckets = NULL; } int rtcp_hmap_realloc(rtcp_hmap_t* hmap, int newcapa) { int ret; rtcp_hmap_t newhmap; int i; /* Allocate a new hmap with the new capacity */ ret = rtcp_hmap_malloc(&newhmap, newcapa, hmap->typesize); if(ret){ return ret; } /* Initialize hmap */ rtcp_hmap_init(&newhmap); /* Handle old items */ for(i = 0; i < hmap->capacity; ++i){ /* Skip non-occupied buckets */ if(hmap->records[i] != RTCP_HMAP_OCC){ continue; } /* Add element to the new hmap */ rtcp_hmap_add(&newhmap, hmap->keys[i], ((uint8_t*)hmap->buckets) + i * hmap->typesize); } /* Move the new hmap into the old one */ rtcp_hmap_move(hmap, &newhmap); return 0; } unsigned int rtcp_hmap_hashkey(fn_prthread_t key, int capacity) { uint32_t romustt = romu_mono32_init((uint32_t)(uintptr_t)key); assert(capacity > 0); romu_mono32_random(&romustt); romu_mono32_random(&romustt); return romu_mono32_random(&romustt) % capacity; } void rtcp_hmap_tick(rtcp_hmap_t* hmap) { /* Decrease target capacity until a lower bound */ hmap->targetcapa = hmap->targetcapa > 1 ? hmap->targetcapa - 1 : hmap->targetcapa; } void rtcp_hmap_use_size(rtcp_hmap_t* hmap) { int cpot = rtcp_ceil_power_of_two( hmap->size > hmap->targetcapa - 1 ? hmap->size : hmap->targetcapa - 1); /* Set target capacity to indicate usage */ hmap->targetcapa = cpot > 0 ? cpot : 1; } int rtcp_hmap_shrink(rtcp_hmap_t* hmap) { int ret; int cpot; assert(hmap->targetcapa > 0); assert(hmap->capacity > 0); /* If the target capacity is less than the next shrink */ if(hmap->targetcapa < hmap->capacity / 2){ /* Reallocate with the log2-ceiling of the target */ cpot = rtcp_ceil_power_of_two(hmap->size); ret = rtcp_hmap_realloc(hmap, cpot > 0 ? cpot : 1); if(ret){ return ret; } } assert(hmap->capacity > 0); assert(hmap->capacity >= hmap->size); return 0; } int rtcp_hmap_add(rtcp_hmap_t* hmap, fn_prthread_t key, void* val) { int ret; int i, j, found; /* If the expansion load factor is exceeded, expand and rehash */ if(hmap->size + 1 > hmap->capacity / 2){ ret = rtcp_hmap_realloc(hmap, hmap->capacity * 2); if(ret){ return ret; } } /* New size */ ++hmap->size; assert(hmap->size <= hmap->capacity); /* Initialize */ i = 0; j = rtcp_hmap_hashkey(key, hmap->capacity); /* Step through occupied buckets */ while(i < hmap->capacity && hmap->records[j] == RTCP_HMAP_OCC){ ++i; j = (j + 1) % hmap->capacity; } rtcp_assert_always(i < hmap->capacity, "rtcp_hmap_add: failed to find non-occupied bucket\n"); /* Mark found bucket */ found = j; /* Step through non-empty buckets */ while(i < hmap->capacity && hmap->records[j] != RTCP_HMAP_NIL){ rtcp_assert_always( !(hmap->records[j] == RTCP_HMAP_OCC && hmap->keys[j] == key), "rtcp_hmap_add: key already present\n"); ++i; j = (j + 1) % hmap->capacity; } /* Write record */ hmap->records[found] = RTCP_HMAP_OCC; hmap->keys[found] = key; memcpy( ((uint8_t*)hmap->buckets) + found * hmap->typesize, (uint8_t*)val, hmap->typesize); /* Use the current size, shrink if necessary */ rtcp_hmap_use_size(hmap); rtcp_hmap_shrink(hmap); return 0; } void* rtcp_hmap_get(rtcp_hmap_t* hmap, fn_prthread_t key) { int idx = rtcp_hmap_get_idx(hmap, key); return idx >= 0 ? ((uint8_t*)hmap->buckets + idx * hmap->typesize) : NULL; } int rtcp_hmap_get_idx(rtcp_hmap_t* hmap, fn_prthread_t key) { int i, j; /* Initialize */ i = 0; j = rtcp_hmap_hashkey(key, hmap->capacity); /* Step through nonempty records */ while(i < hmap->capacity && hmap->records[j] != RTCP_HMAP_NIL){ /* If an occupied record with a matching key was found, return it */ if(hmap->records[j] == RTCP_HMAP_OCC && hmap->keys[j] == key){ return j; } ++i; j = (j + 1) % hmap->capacity; } /* Return invalid bucket index */ return -1; } void rtcp_hmap_del(rtcp_hmap_t* hmap, fn_prthread_t key) { int idx = rtcp_hmap_get_idx(hmap, key); /* If a valid bucket index was found, mark it as deleted */ if(idx >= 0){ hmap->records[idx] = RTCP_HMAP_DEL; --hmap->size; } /* Use the current size, shrink if necessary */ rtcp_hmap_use_size(hmap); rtcp_hmap_shrink(hmap); } void rtcp_hmap_iter(rtcp_hmap_t* hmap, fn_hmap_iter_t callback) { int i; /* Check condition for each bucket */ for(i = 0; i < hmap->capacity; ++i){ /* Proceed only for occupied buckets */ if(hmap->records[i] != RTCP_HMAP_OCC){ continue; } callback(((uint8_t*)hmap->buckets) + i * hmap->typesize); } } void rtcp_uvec_init(rtcp_uvec_t* uvec) { /* Initialize values */ uvec->size = 0; uvec->callback = NULL; uvec->user = NULL; } void rtcp_uvec_cb(rtcp_uvec_t* uvec, fn_idx_update_t callback, void* user) { /* Initialize values */ uvec->callback = callback; uvec->user = user; } int rtcp_uvec_malloc(rtcp_uvec_t* uvec, int capacity, int typesize) { /* Store parameters */ uvec->capacity = capacity; rtcp_assert_always(uvec->capacity > 0, "rtcp_uvec_malloc: capacity > 0\n"); uvec->typesize = typesize; rtcp_assert_always(uvec->typesize > 0, "rtcp_uvec_malloc: typesize > 0\n"); /* Initialize values */ uvec->targetcapa = uvec->capacity; /* Allocate memory */ uvec->values = malloc(uvec->capacity * uvec->typesize); if(!uvec->values){ return errno; } return 0; } void rtcp_uvec_free(rtcp_uvec_t* uvec) { /* Free memory */ assert(uvec->values); free(uvec->values); } int rtcp_uvec_realloc(rtcp_uvec_t* uvec, int newcapa) { void* newvalues; /* Attempt to change the size of the allocated memory block */ newvalues = realloc(uvec->values, newcapa * uvec->typesize); if(!newvalues){ return errno; } /* Attach block if allocation succeeded */ uvec->values = newvalues; /* Note new capacity */ uvec->capacity = newcapa; return 0; } void rtcp_uvec_tick(rtcp_uvec_t* uvec) { /* Decrease target capacity until a lower bound */ uvec->targetcapa = uvec->targetcapa > 1 ? uvec->targetcapa - 1 : uvec->targetcapa; } void rtcp_uvec_use_size(rtcp_uvec_t* uvec) { int cpot; assert(uvec->targetcapa > 0); cpot = rtcp_ceil_power_of_two( uvec->size > uvec->targetcapa - 1 ? uvec->size : uvec->targetcapa - 1); /* Set target capacity to indicate usage */ uvec->targetcapa = cpot > 0 ? cpot : 1; } int rtcp_uvec_shrink(rtcp_uvec_t* uvec) { int ret; int cpot; cpot = rtcp_ceil_power_of_two(uvec->size); assert(uvec->targetcapa > 0); assert(uvec->capacity > 0); /* If the target capacity less-or-equal than the next shrink */ if(uvec->targetcapa <= uvec->capacity / 2){ /* Reallocate with the log2-ceiling of the target */ ret = rtcp_uvec_realloc(uvec, cpot > 0 ? cpot : 1); if(ret){ return ret; } } assert(uvec->capacity > 0); assert(uvec->capacity >= uvec->size); return 0; } int rtcp_uvec_add(rtcp_uvec_t* uvec, void* val) { int ret; /* If the new size exceeds the current capacity, reallocate */ if(uvec->size + 1 > uvec->capacity){ ret = rtcp_uvec_realloc(uvec, uvec->capacity * 2); if(ret){ return ret; } } assert(uvec->size + 1 <= uvec->capacity); /* New size */ ++uvec->size; /* Write value */ if(val){ memcpy( ((uint8_t*)uvec->values) + (uvec->size - 1) * uvec->typesize, val, uvec->typesize ); } /* Value update callback */ if(uvec->callback){ uvec->callback( ((uint8_t*)uvec->values) + (uvec->size - 1) * uvec->typesize, (uvec->size - 1), uvec->user ); } /* Use the current size, shrink if necessary */ rtcp_uvec_use_size(uvec); rtcp_uvec_shrink(uvec); return 0; } int rtcp_uvec_del(rtcp_uvec_t* uvec, int idx) { rtcp_assert_always(idx >= 0 && idx < uvec->size, "rtcp_uvec_del: invalid index\n"); /* New size */ --uvec->size; /* If the vector is non-empty after this delete */ if(uvec->size > 0 && idx != uvec->size){ /* Write last value into the deleted */ memcpy( ((uint8_t*)uvec->values) + idx * uvec->typesize, ((uint8_t*)uvec->values) + uvec->size * uvec->typesize, uvec->typesize ); /* Value update callback */ if(uvec->callback){ uvec->callback( ((uint8_t*)uvec->values) + idx * uvec->typesize, idx, uvec->user ); } } /* Use the current size, shrink if necessary */ rtcp_uvec_use_size(uvec); rtcp_uvec_shrink(uvec); return 0; } void* rtcp_uvec_get(rtcp_uvec_t* uvec, int idx) { rtcp_assert_always(idx >= 0 && idx < uvec->size, "rtcp_uvec_get: invalid index\n"); return ((uint8_t*)uvec->values) + idx * uvec->typesize; } void rtcp_heap_init(rtcp_heap_t* heap) { /* Initialize values */ heap->size = 0; heap->callback = NULL; heap->user = NULL; } void rtcp_heap_cb(rtcp_heap_t* heap, fn_idx_update_t callback, void* user) { /* Initialize values */ heap->callback = callback; heap->user = user; } int rtcp_heap_malloc(rtcp_heap_t* heap, int capacity, int typesize, fn_heap_compare_t compare) { /* Store parameters */ heap->capacity = capacity; rtcp_assert_always(heap->capacity > 0, "rtcp_heap_malloc: capacity > 0\n"); heap->typesize = typesize; rtcp_assert_always(heap->typesize > 0, "rtcp_heap_malloc: typesize > 0\n"); heap->compare = compare; rtcp_assert_always(heap->compare != NULL, "rtcp_heap_malloc: compare != NULL\n"); /* Initialize values */ heap->targetcapa = heap->capacity; /* Allocate memory */ heap->values = malloc(heap->capacity * heap->typesize); if(!heap->values){ return errno; } heap->swap = malloc(heap->typesize); if(!heap->swap){ return errno; } return 0; } void rtcp_heap_free(rtcp_heap_t* heap) { /* Free memory */ assert(heap->values); free(heap->values); assert(heap->swap); free(heap->swap); } int rtcp_heap_realloc(rtcp_heap_t* heap, int newcapa) { void* newvalues; /* Attempt to change the size of the allocated memory block */ newvalues = realloc(heap->values, newcapa * heap->typesize); if(!newvalues){ return errno; } /* Attach block if allocation succeeded */ heap->values = newvalues; /* Note new capacity */ heap->capacity = newcapa; return 0; } void rtcp_heap_swap(rtcp_heap_t* heap, int a, int b) { memcpy( heap->swap, ((uint8_t*)heap->values) + a * heap->typesize, heap->typesize ); memcpy( ((uint8_t*)heap->values) + a * heap->typesize, ((uint8_t*)heap->values) + b * heap->typesize, heap->typesize ); memcpy( ((uint8_t*)heap->values) + b * heap->typesize, heap->swap, heap->typesize ); /* Value update callbacks */ if(heap->callback){ heap->callback( ((uint8_t*)heap->values) + a * heap->typesize, a, heap->user ); heap->callback( ((uint8_t*)heap->values) + b * heap->typesize, b, heap->user ); } } void rtcp_heap_tick(rtcp_heap_t* heap) { /* Decrease target capacity until a lower bound */ heap->targetcapa = heap->targetcapa > 1 ? heap->targetcapa - 1 : heap->targetcapa; } void rtcp_heap_use_size(rtcp_heap_t* heap) { int cpot; assert(heap->targetcapa > 0); cpot = rtcp_ceil_power_of_two( heap->size > heap->targetcapa - 1 ? heap->size : heap->targetcapa - 1); /* Set target capacity to indicate usage */ heap->targetcapa = cpot > 0 ? cpot : 1; } int rtcp_heap_shrink(rtcp_heap_t* heap) { int ret; int cpot; cpot = rtcp_ceil_power_of_two(heap->size); assert(heap->targetcapa > 0); assert(heap->capacity > 0); /* If the target capacity less-or-equal than the next shrink */ if(heap->targetcapa <= heap->capacity / 2){ /* Reallocate with the log2-ceiling of the target */ ret = rtcp_heap_realloc(heap, cpot > 0 ? cpot : 1); if(ret){ return ret; } } assert(heap->capacity > 0); assert(heap->capacity >= heap->size); return 0; } int rtcp_heap_property(rtcp_heap_t* heap) { int i, j, j2; int prop; int desc; /* Heap property true for zero elements */ prop = 1; /* Loop through first nodes of every level */ for(i = 0; i < heap->size; i = i * 2 + 1){ /* Loop through all nodes of the current level */ j2 = i * 2 + 1; j2 = j2 < heap->size ? j2 : heap->size; for(j = i; j < j2; ++j){ /* Compare root with descendants */ desc = j * 2 + 1; if(desc < heap->size){ prop = prop && heap->compare( ((uint8_t*)heap->values) + j * heap->typesize, ((uint8_t*)heap->values) + desc * heap->typesize ); } desc = j * 2 + 2; if(desc < heap->size){ prop = prop && heap->compare( ((uint8_t*)heap->values) + j * heap->typesize, ((uint8_t*)heap->values) + desc * heap->typesize ); } } } return prop; } void rtcp_heap_swim(rtcp_heap_t* heap, int idx) { int ancestor; /* Sift value upwards, until it either: - reaches the root node - does not violate the heap property relative to its ancestor */ ancestor = (idx - 1) / 2; while( idx > 0 && !heap->compare( ((uint8_t*)heap->values) + ancestor * heap->typesize, ((uint8_t*)heap->values) + idx * heap->typesize) ){ rtcp_heap_swap(heap, ancestor, idx); idx = (idx - 1) / 2; ancestor = (idx - 1) / 2; } } void rtcp_heap_sink(rtcp_heap_t* heap, int idx) { int desc1, desc2; /* Sift value upwards, until it either: - reaches the bottommost level - does not violate the heap property relative to its descendant */ while(idx < heap->size){ desc1 = idx * 2 + 1; desc2 = idx * 2 + 2; /* Select descendant based on comparison if there are two */ if( desc2 < heap->size && heap->compare( ((uint8_t*)heap->values) + desc2 * heap->typesize, ((uint8_t*)heap->values) + desc1 * heap->typesize) ){ desc1 = desc2; } /* If the selected descendant is valid, compare it, swap if necessary */ if( desc1 < heap->size && !heap->compare( ((uint8_t*)heap->values) + idx * heap->typesize, ((uint8_t*)heap->values) + desc1 * heap->typesize) ){ rtcp_heap_swap(heap, idx, desc1); idx = desc1; continue; } /* If no swap occurred, the sink is complete */ break; } } int rtcp_heap_insert(rtcp_heap_t* heap, void* val) { int ret; /* If the new size exceeds the current capacity, reallocate */ if(heap->size + 1 > heap->capacity){ ret = rtcp_heap_realloc(heap, heap->capacity * 2); if(ret){ return ret; } } assert(heap->size + 1 <= heap->capacity); /* New size */ ++heap->size; /* Write value */ memcpy( ((uint8_t*)heap->values) + (heap->size - 1) * heap->typesize, val, heap->typesize ); /* Value update callback */ if(heap->callback){ heap->callback( ((uint8_t*)heap->values) + (heap->size - 1) * heap->typesize, heap->size - 1, heap->user ); } /* Sift up new element to restore the heap property */ rtcp_heap_swim(heap, heap->size - 1); /* Use the current size, shrink if necessary */ rtcp_heap_use_size(heap); rtcp_heap_shrink(heap); return 0; } void* rtcp_heap_peek(rtcp_heap_t* heap) { return heap->size > 0 ? heap->values : NULL; } void rtcp_heap_extract(rtcp_heap_t* heap, void* val) { rtcp_assert_always(heap->size > 0, "rtcp_heap_extract: empty heap\n"); /* Peek only if the destination space is non-null */ if(val){ memcpy(val, ((uint8_t*)heap->values), heap->typesize); } /* If there are remaining elements after the deletion */ if(heap->size > 1){ /* Place last element at the root */ memcpy( ((uint8_t*)heap->values), ((uint8_t*)heap->values) + (heap->size - 1) * heap->typesize, heap->typesize ); /* Value update callback */ if(heap->callback){ heap->callback(((uint8_t*)heap->values), 0, heap->user); } } /* New size */ --heap->size; /* Sift down the element to restore the heap property */ rtcp_heap_sink(heap, 0); /* Use the current size, shrink if necessary */ rtcp_heap_use_size(heap); rtcp_heap_shrink(heap); } void* rtcp_heap_get(rtcp_heap_t* heap, int idx) { rtcp_assert_always(idx >= 0 && idx < heap->size, "rtcp_heap_get: invalid index\n"); return ((uint8_t*)heap->values) + idx * heap->typesize; } /* Romu Pseudorandom Number Generators Copyright 2020 Mark A. Overton Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ /* Website: romu-random.org Paper: http://arxiv.org/abs/2002.11331 Copy and paste the generator you want from those below. To compile, you will need to #include and use the ROTL definition below. */ /* ===== RomuMono32 32-bit arithmetic: Suitable only up to 2^26 output-values. Outputs 16-bit numbers. Fixed period of (2^32)-47. Must be seeded using the romuMono32_init function. Capacity = 2^27 bytes. Register pressure = 2. State size = 32 bits. */ uint32_t romu_mono32_init(uint32_t seed) { return (seed & 0x1fffffffu) + 1156979152u; /* Accepts 29 seed-bits. */ } uint16_t romu_mono32_random(uint32_t* state) { uint16_t result = *state >> 16; *state *= 3611795771u; *state = ROTL(*state,12); return result; } void rtcp_stats_init(rtcp_stats_t* stats) { /* Initialize values */ stats->entries = 0; stats->total.tv_sec = stats->total.tv_nsec = 0; stats->max_lc = 0; stats->max_over.tv_sec = stats->max_over.tv_nsec = 0; } void rtcp_stats_update(rtcp_stats_t* stats, rtcp_lc_t lc, struct timespec actual, struct timespec overrun) { /* Update entry count */ ++stats->entries; /* Update total runtime */ stats->total = rtcp_timespec_add(&stats->total, &actual); /* Update maximum overrun */ if(rtcp_timespec_less(&stats->max_over, &overrun)){ stats->max_lc = lc; stats->max_over = overrun; } } int rtcp_call_comp_less(void* a, void* b) { rtcp_call_t* ca = (rtcp_call_t*)a; rtcp_call_t* cb = (rtcp_call_t*)b; /* If the timestamps are equal, check the sequence numbers */ if( ca->stamp.tv_sec == cb->stamp.tv_sec && ca->stamp.tv_nsec == cb->stamp.tv_nsec ){ /* Sequence numbers must be guaranteed to be nonidentical */ assert(ca->seq != cb->seq); /* Wrapping comparison of sequence numbers: (a < b) should be true when: we try to increment each number to reach the other in a wrapping fashion, and we have to increment 'a' less than 'b' for that to happen. */ return (unsigned int)(ca->seq - cb->seq) > RTCP_SCHED_CALLS_MAX; } /* Otherwise, just compare the timestamps themselves */ else { return rtcp_timespec_less( &((rtcp_call_t*)a)->stamp, &((rtcp_call_t*)b)->stamp ); } } void rtcp_call_idx_update(void* val, int idx, void* user) { rtcp_call_t* call; rtcp_hmap_t* hmap; rtcp_prefix_t* prefix; rtcp_assoc_t* assoc; call = (rtcp_call_t*)val; hmap = (rtcp_hmap_t*)user; /* Get data associated with the call's function */ assoc = rtcp_hmap_get(hmap, call->func); assert(assoc); /* Get data associated with the current call */ prefix = (rtcp_prefix_t*) rtcp_uvec_get(&assoc->locals, call->uvecidx); /* Set the updated heap index */ prefix->heapidx = idx; } void rtcp_prefix_idx_update(void* val, int idx, void* user) { rtcp_prefix_t* prefix; rtcp_heap_t* heap; rtcp_call_t* call; prefix = (rtcp_prefix_t*)val; heap = (rtcp_heap_t*)user; /* Get the call associated with the current local state */ call = rtcp_heap_get(heap, prefix->heapidx); assert(call); /* Set the updated vector index */ call->uvecidx = idx; } void rtcp_assoc_init(rtcp_assoc_t* assoc) { /* Initialize structure storing local states */ rtcp_uvec_init(&assoc->locals); /* Initialize structure storing statistics */ rtcp_stats_init(&assoc->stats); } void rtcp_assoc_malloc(rtcp_assoc_t* assoc, int typesize) { /* Allocate local state */ rtcp_uvec_malloc(&assoc->locals, 1, typesize); } void rtcp_assoc_free(rtcp_assoc_t* assoc) { /* Free memory */ rtcp_uvec_free(&assoc->locals); } void rtcp_assoc_tick(rtcp_assoc_t* assoc) { /* Tick structure storing local states */ rtcp_uvec_tick(&assoc->locals); } void rtcp_assoc_shrink(rtcp_assoc_t* assoc) { /* Shrink structure storing local states */ rtcp_uvec_shrink(&assoc->locals); } int rtcp_sched_malloc(rtcp_sched_t* sched) { int ret; (void)sched; /* Allocate call queue */ ret = rtcp_heap_malloc(&sched->calls, 1, sizeof(rtcp_call_t), rtcp_call_comp_less); if(ret){ return ret; } rtcp_heap_init(&sched->calls); /* Allocate data associated to calls */ ret = rtcp_hmap_malloc(&sched->assoc, 1, sizeof(rtcp_assoc_t)); if(ret){ return ret; } rtcp_hmap_init(&sched->assoc); /* Initialize sequence number to distinguish equal-time timestamps */ sched->seq = 0; return 0; } void rtcp_sched_free(rtcp_sched_t* sched) { /* Free data structures associated with functions */ rtcp_hmap_iter(&sched->assoc, rtcp_sched_free_assoc); /* Free memory */ rtcp_heap_free(&sched->calls); rtcp_hmap_free(&sched->assoc); } void rtcp_sched_free_assoc(void* val) { rtcp_assoc_free((rtcp_assoc_t*)val); } void rtcp_sched_tick_assoc(void* val) { rtcp_assoc_tick((rtcp_assoc_t*)val); } void rtcp_sched_shrink_assoc(void* val) { rtcp_assoc_shrink((rtcp_assoc_t*)val); } void rtcp_sched_tick(rtcp_sched_t* sched) { /* Tick data structures associated with functions */ rtcp_hmap_iter(&sched->assoc, rtcp_sched_tick_assoc); /* Tick immediate member structures */ rtcp_heap_tick(&sched->calls); rtcp_hmap_tick(&sched->assoc); } void rtcp_sched_shrink(rtcp_sched_t* sched) { /* Shrink data structures associated with functions */ rtcp_hmap_iter(&sched->assoc, rtcp_sched_shrink_assoc); /* Shrink immediate member structures */ rtcp_heap_shrink(&sched->calls); rtcp_hmap_shrink(&sched->assoc); } int rtcp_sched_func_get_assoc(rtcp_sched_t* sched, fn_prthread_t func, int statesize, rtcp_assoc_t** out) { int ret; rtcp_assoc_t* assoc; rtcp_assoc_t newassoc; int localsize; rtcp_assert_always(statesize >= 0, "rtcp_sched_queue: statesize >= 0\n"); /* Calculate actual size of local state */ localsize = sizeof(rtcp_prefix_t) + statesize; /* Attempt to get the storage associated with the function */ if(!(assoc = rtcp_hmap_get(&sched->assoc, func))){ /* Allocate & initialize associated data structure */ rtcp_assoc_malloc(&newassoc, localsize); rtcp_assoc_init(&newassoc); /* Add and retrieve associated data */ ret = rtcp_hmap_add(&sched->assoc, func, &newassoc); if(ret){ return ret; } assoc = rtcp_hmap_get(&sched->assoc, func); } /* Assert that the size of the associated data is as expected */ rtcp_assert_always(localsize == assoc->locals.typesize, "rtcp_sched_queue: non-matching statesize\n"); /* Return associated data structure if it was requested */ if(out){ *out = assoc; } return 0; } int rtcp_sched_calls_queued(rtcp_sched_t* sched) { return sched->calls.size; } int rtcp_sched_queue(rtcp_sched_t* sched, struct timespec stamp, fn_prthread_t func, void* state, int statesize) { int ret; rtcp_assoc_t* assoc; int uvecidx; rtcp_prefix_t* prefix; rtcp_call_t call; /* Get or create data structure associated with function */ if((ret = rtcp_sched_func_get_assoc(sched, func, statesize, &assoc) )){ return ret; } /* Get the index where the local state will be added to the vector */ uvecidx = assoc->locals.size; /* Unset uvec callback */ rtcp_uvec_cb(&assoc->locals, NULL, NULL); /* Add associated local state */ ret = rtcp_uvec_add(&assoc->locals, NULL); if(ret){ return ret; } /* Write prefix portion of local state */ prefix = (rtcp_prefix_t*) rtcp_uvec_get(&assoc->locals, assoc->locals.size - 1); assert(prefix); /* Write state portion of local state */ memcpy(((uint8_t*)prefix) + sizeof(rtcp_prefix_t), state, statesize); /* Set heap callback */ rtcp_heap_cb(&sched->calls, rtcp_call_idx_update, &sched->assoc); /* Schedule call */ call.stamp = stamp; call.seq = sched->seq; call.func = func; PT_INIT(&call.pt); call.uvecidx = uvecidx; rtcp_heap_insert(&sched->calls, &call); /* Increment sequence number */ ++sched->seq; return 0; } void rtcp_sched_call(rtcp_sched_t* sched, struct timespec end, int endignore) { int status; rtcp_call_t* call; rtcp_prefix_t* prefix; rtcp_assoc_t* assoc; struct timespec limit; struct timespec run_start; struct timespec run_end; struct timespec time_zero = {0}; /* Get next call in queue */ call = rtcp_heap_peek(&sched->calls); /* Get data associated with the call's function */ assoc = rtcp_hmap_get(&sched->assoc, call->func); assert(assoc); /* Get local state associated with call */ prefix = (rtcp_prefix_t*) rtcp_uvec_get(&assoc->locals, call->uvecidx); assert(prefix->heapidx == 0); /* Copy time limit the protothread will receive a pointer to */ limit = end; /* Execute call */ clock_gettime(CLOCK_REALTIME, &run_start); status = call->func( &call->pt, ((uint8_t*)prefix) + sizeof(rtcp_prefix_t), endignore ? NULL : &limit ); clock_gettime(CLOCK_REALTIME, &run_end); /* Assert that the return status is a recognizable one */ rtcp_assert_always(status >= PT_WAITING && status <= PT_ENDED, "rtcp_sched_call: unrecognized return status from protothread\n"); /* Remove call and its local state from queue if it has completed */ if(status >= PT_EXITED){ /* Set uvec callback */ rtcp_uvec_cb(&assoc->locals, rtcp_prefix_idx_update, &sched->calls); /* Delete local state associated with call */ rtcp_uvec_del(&assoc->locals, call->uvecidx); /* Extract call from queue */ rtcp_heap_extract(&sched->calls, NULL); } /* Update statistics for the call's function */ rtcp_stats_update(&assoc->stats, call->pt.lc, rtcp_timespec_diff(&run_start, &run_end), !endignore && rtcp_timespec_less(&end, &run_end) ? rtcp_timespec_diff(&end, &run_end) : time_zero ); } void rtcp_sched_until(rtcp_sched_t* sched, struct timespec end) { struct timespec now; /* Set heap callback */ rtcp_heap_cb(&sched->calls, rtcp_call_idx_update, &sched->assoc); /* Schedule calls until either: - the designated timepoint has passed - there are no more calls waiting to run */ while( rtcp_heap_peek(&sched->calls) && clock_gettime(CLOCK_REALTIME, &now) == 0 && rtcp_timespec_less(&now, &end) ){ rtcp_sched_call(sched, end, 0); } } void rtcp_sched_flush(rtcp_sched_t* sched) { struct timespec now; /* Set heap callback */ rtcp_heap_cb(&sched->calls, rtcp_call_idx_update, &sched->assoc); /* Schedule calls until: - there are no more calls waiting to run */ while( rtcp_heap_peek(&sched->calls) ){ rtcp_sched_call(sched, now, 1); } } void rtcp_sched_flush_until(rtcp_sched_t* sched, struct timespec end) { rtcp_call_t* call; /* Set heap callback */ rtcp_heap_cb(&sched->calls, rtcp_call_idx_update, &sched->assoc); /* Schedule calls until either: - the next call is beyond the specified timepoint - there are no more calls waiting to run */ while( (call = rtcp_heap_peek(&sched->calls)) && rtcp_timespec_less(&call->stamp, &end) ){ rtcp_sched_call(sched, end, 0); } } void rtcp_sched_func_stats(rtcp_sched_t* sched, fn_prthread_t func, rtcp_stats_t* out) { rtcp_assoc_t* assoc; /* Get data associated with the protothread function */ assoc = rtcp_hmap_get(&sched->assoc, func); rtcp_assert_always(assoc != NULL, "rtcp_sched_func_stats: unknown function\n"); *out = assoc->stats; } #endif /* RTCPROTO_IMPLEMENTATION */ #endif /* RTCPROTO_H */