/******************************************/ /* streamers.c 0.3.0 (1999-Dec-18-Sat) */ /* Adam M. Costello */ /******************************************/ /* An example implementation of the interface of streamers.h 0.3.x. */ /* This is ANSI C code. */ #include #include #include #include "bytestream.h" #include "integer.h" #include "random.h" #include "streamers.h" #include "timer.h" #include "warnf.h" /* Used for the pseudo-random number generator: */ static const uint32 rand_factor = 0x41c64e6d, rand_increment = 0xb; /* error message: */ static const char *concurrent = "concurrent method invocation"; /***************/ /* byte source */ struct byte_source { unsigned long bytes_created; /* number of bytes created */ unsigned long bytes_sent; /* number of bytes sent */ void *sink; /* our byte sink */ bytestream_transfer_method *transfer; /* sink's transfer method */ timer_id timer; /* timer for next burst */ int active; /* boolean: timer exists */ int anon_timers; /* number of timers w/o IDs */ uint32 rand_state; /* latest random number */ unsigned long burst_size; /* size of current burst */ unsigned char *buffer; /* the data in the burst */ struct streamer_parameters parameters; /* configuration parameters */ int lock; /* boolean: executing method */ }; /* At the top of every method we verify that the lock is 0 and */ /* set it to 1. We always reset it to 0 before returning. */ /* transfer is 0 if byte_source_talk_to() has not been called yet. */ /* Conceptually, the timer exists while the burst */ /* is being created, hence the name active. */ /* There is a buffer iff bytes_sent < bytes_created. */ /* When there is not a buffer, buffer == 0. */ void *byte_source_create(void) { struct byte_source *source; struct streamer_parameters *parameters; source = malloc(sizeof (struct byte_source)); if (!source) failf(out_of_memory); source->transfer = 0; /* We're not talking to anyone yet. */ source->active = 0; /* There is no scheduled next burst. */ source->anon_timers = 0; /* No outstanding timers without IDs. */ parameters = &source->parameters; /* Default parameters: */ parameters->total_bytes = 100001; parameters->seed = 0; parameters->rate = 8000.0; parameters->burst_min = 100; parameters->burst_max = 100; source->lock = 0; return source; } static timer_handler create_burst; static void schedule_source(struct byte_source *source) /* Schedules the next burst. There must be */ /* no outstanding timer when this is called. */ { struct streamer_parameters *parameters = &source->parameters; double delay, now = timer_now(); if (source->bytes_sent >= parameters->total_bytes || parameters->rate <= 0.0) { source->active = 0; /* warnf("debug: schedule_source() aborting\n"); */ return; } source->burst_size = random_discrete_uniform(parameters->burst_min, parameters->burst_max); delay = source->burst_size / parameters->rate * 8; source->timer = timer_set(create_burst, now + delay, source, 0); source->active = 1; /* warnf("debug: source burst scheduled for %.3f\n", now + delay); */ } static void send(struct byte_source *source) /* Send all unsent bytes. */ { unsigned char *buffer; unsigned long bytes_created = source->bytes_created, bytes_sent = source->bytes_sent, burst_size = source->burst_size, length; assert(bytes_sent <= bytes_created); length = bytes_created - bytes_sent; if (length == 0) return; assert(burst_size >= length); buffer = source->buffer + burst_size - length; bytes_sent += source->transfer(source->sink, source, buffer, length); source->bytes_sent = bytes_sent; if (bytes_sent == bytes_created) { free(source->buffer); source->buffer = 0; schedule_source(source); } } static void create_burst(double now, void *who, void *what) /* Creates a burst of data. */ { struct byte_source *source = who; unsigned long bytes_created = source->bytes_created, bytes_sent = source->bytes_sent, total_bytes = source->parameters.total_bytes, burst_size = source->burst_size, max_burst, i; unsigned char *buffer; uint32 rand_state = source->rand_state; if (source->lock) failf(code_ref, concurrent); source->lock = 1; assert(bytes_sent == bytes_created); if (bytes_created >= total_bytes) { source->lock = 0; return; } max_burst = total_bytes - bytes_created; if (burst_size > max_burst) burst_size = source->burst_size = max_burst; bytes_created += burst_size; source->bytes_created = bytes_created; buffer = malloc(burst_size); if (!buffer) failf(out_of_memory); source->buffer = buffer; for (i = 0; i < burst_size; ++i) { rand_state = (rand_state + rand_increment) * rand_factor; buffer[i] = (rand_state >> 24) & 0xff; } source->rand_state = rand_state; source->active = 0; send(source); source->lock = 0; } void byte_source_config(void *vsource, struct streamer_parameters parameters) { struct byte_source *source = vsource; if (source->lock) failf(code_ref, concurrent); source->lock = 1; source->parameters = parameters; assert(parameters.burst_min <= parameters.burst_max); assert(parameters.burst_min >= 1); /* Burst creation in progress is aborted: */ if (source->active) timer_cancel(source->timer); if (source->transfer && source->bytes_sent == source->bytes_created) { schedule_source(source); } source->lock = 0; } void byte_source_talk_to( void *vsource, void *sink, bytestream_transfer_method *transfer ) { struct byte_source *source = vsource; if (source->lock) failf(code_ref, concurrent); source->lock = 1; assert(!source->transfer); source->bytes_created = 0; source->bytes_sent = 0; source->sink = sink; source->transfer = transfer; source->rand_state = source->parameters.seed; source->buffer = 0; schedule_source(source); source->lock = 0; } static timer_handler call_send; static void call_send(double now, void *who, void *what) { struct byte_source *source = who; if (source->lock) failf(code_ref, concurrent); source->lock = 1; --source->anon_timers; send(source); source->lock = 0; } void byte_source_go_ahead(void *vsource, void *sink) { struct byte_source *source = vsource; double now = timer_now(); if (source->lock) failf(code_ref, concurrent); source->lock = 1; /* Make sure we're expecting this caller: */ if (!source->transfer || sink != source->sink) { /* warnf("debug: byte_source_go_ahead() aborting\n"); */ source->lock = 0; return; } timer_set(call_send, now, source, 0); ++source->anon_timers; /* warnf("debug: byte_source_go_ahead() setting timer at %.3f\n", now); */ source->lock = 0; } unsigned long byte_source_sent(void *vsource) { struct byte_source *source = vsource; return source->bytes_sent; } void byte_source_delete(void *vsource) { struct byte_source *source = vsource; if (source->anon_timers) { failf("Sorry, byte_source_delete() doesn't work if called while it has\n" "a timer set to expire now.\n"); } if (source->active) timer_cancel(source->timer); if (source->buffer) free(source->buffer); free(source); } /*************/ /* byte sink */ /* The infinite queue mentioned in streamers.h doesn't */ /* actually exist. The data is checked as it arrives. */ struct byte_sink { unsigned long bytes_acceptable; /* number of acceptable bytes */ unsigned long bytes_received; /* number of bytes received */ unsigned long bytes_wrong; /* no. of wrong bytes received */ void *source; /* our byte source */ bytestream_go_ahead_method *go_ahead; /* source's go-ahead method */ timer_id timer; /* timer for processing target */ int active; /* boolean: timer exists */ uint32 rand_state; /* latest random number */ unsigned long processing_target; /* see below */ int obligated; /* boolean: must call go_ahead */ struct streamer_parameters parameters; /* configuration parameters */ int lock; /* boolean: executing method */ }; /* At the top of every method we verify that the lock is 0 and */ /* set it to 1. We always reset it to 0 before returning. */ /* go_ahead is 0 if byte_sink_listen_to() has not been called yet. */ /* Conceptually, the timer is set while data is being processed, */ /* hence the name active. */ /* processing_target is the number of bytes that will have been */ /* processed when the timer expires. Or, if there is no outstanding */ /* timer, the number of bytes that have been processed, which equals */ /* bytes_received (if we had unprocessed bytes, we'd be processing */ /* them, so there would be a timer). */ void *byte_sink_create(void) { struct byte_sink *sink; struct streamer_parameters *parameters; sink = malloc(sizeof (struct byte_sink)); if (!sink) failf(out_of_memory); sink->go_ahead = 0; /* Means we're not listening to anyone yet. */ sink->active = 0; /* Means there is no processing happening. */ parameters = &sink->parameters; /* Default parameters: */ parameters->total_bytes = 100001; parameters->seed = 0; parameters->rate = 0.0; parameters->burst_min = 100002; parameters->burst_max = 100002; sink->lock = 0; return sink; } static timer_handler finish_processing; static void schedule_sink(struct byte_sink *sink, double now) /* Schedules the next processing target. There must */ /* be no outstanding timer when this is called. */ { struct streamer_parameters *parameters = &sink->parameters; unsigned long bytes_acceptable = sink->bytes_acceptable, bytes_received = sink->bytes_received, processing_target = sink->processing_target; double delay; assert(processing_target <= bytes_received); assert(bytes_received <= bytes_acceptable); /* warnf("debug: processing_target = %lu\n" " bytes_received = %lu\n" " bytes_acceptable = %lu\n", processing_target, bytes_received, bytes_acceptable); */ if (processing_target == bytes_received) { if (processing_target == bytes_acceptable) { sink->bytes_acceptable += random_discrete_uniform(parameters->burst_min, parameters->burst_max); if (sink->obligated) { /* warnf("debug: calling go_ahead()\n"); */ assert(sink->lock); sink->lock = 2; /* Distinguish this particular case. */ sink->go_ahead(sink->source, sink); sink->lock = 1; sink->obligated = 0; } } sink->active = 0; return; } if (parameters->rate <= 0.0) { sink->active = 0; return; } delay = (bytes_received - processing_target) / parameters->rate * 8; sink->timer = timer_set(finish_processing, now + delay, sink, 0); sink->processing_target = bytes_received; sink->active = 1; } static void finish_processing(double now, void *who, void *what) /* Handles timer expiration by calling schedule_sink(). */ { struct byte_sink *sink = who; if (sink->lock) failf(code_ref, concurrent); sink->lock = 1; schedule_sink(sink,now); sink->lock = 0; } void byte_sink_config(void *vsink, struct streamer_parameters parameters) { struct byte_sink *sink = vsink; if (sink->lock) failf(code_ref, concurrent); sink->lock = 1; sink->parameters = parameters; assert(parameters.burst_min <= parameters.burst_max); assert(parameters.burst_min >= 1); /* Processing in progress finishes early: */ if (sink->active) timer_cancel(sink->timer); if (sink->go_ahead) schedule_sink(sink, timer_now()); sink->lock = 0; } void byte_sink_listen_to( void *vsink, void *source, bytestream_go_ahead_method *go_ahead ) { struct byte_sink *sink = vsink; if (sink->lock) failf(code_ref, concurrent); sink->lock = 1; assert(!sink->go_ahead); sink->bytes_acceptable = 0; sink->bytes_received = 0; sink->bytes_wrong = 0; sink->source = source; sink->go_ahead = go_ahead; sink->rand_state = sink->parameters.seed; sink->processing_target = 0; sink->obligated = 0; schedule_sink(sink, timer_now()); sink->lock = 0; } size_t byte_sink_transfer( void *vsink, void *source, const unsigned char *buffer, size_t length ) { struct byte_sink *sink = vsink; unsigned long bytes_acceptable = sink->bytes_acceptable, bytes_received = sink->bytes_received, total_bytes = sink->parameters.total_bytes; size_t i, new_bytes; uint32 rand_state = sink->rand_state; double now = timer_now(); if (sink->lock == 2) { failf("byte_sink_transfer() called from within go-ahead method!\n"); } if (sink->lock) failf(code_ref, concurrent); sink->lock = 1; assert(bytes_received <= bytes_acceptable); new_bytes = bytes_received + length <= bytes_acceptable ? length : bytes_acceptable - bytes_received; for (i = 0; i < new_bytes; ++i) { rand_state = (rand_state + rand_increment) * rand_factor; if (buffer[i] != ((rand_state >> 24) & 0xff)) { warnf("received wrong byte at position %lu\n", sink->bytes_received + i); ++sink->bytes_wrong; } } sink->rand_state = rand_state; sink->bytes_received += new_bytes; if (sink->bytes_received == total_bytes) warnf("received all bytes\n"); if (sink->bytes_received > total_bytes) warnf("received too many bytes\n"); if (new_bytes < length) sink->obligated = 1; /* warnf("debug: time %6.3f byte_sink_transfer() returning %5lu\n", now, (unsigned long) new_bytes); */ if (new_bytes && !sink->active) schedule_sink(sink,now); sink->lock = 0; return new_bytes; } unsigned long byte_sink_received(void *vsink) { struct byte_sink *sink = vsink; return sink->bytes_received; } unsigned long byte_sink_wrong(void *vsink) { struct byte_sink *sink = vsink; return sink->bytes_wrong; } void byte_sink_delete(void *vsink) { struct byte_sink *sink = vsink; if (sink->active) timer_cancel(sink->timer); free(sink); }