Soletta™ Framework
Framework for making IoT devices

Full online documentation | C API Index
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
Design Patterns

Table of Contents

This page will explain the common design patterns employed by Soletta.

IO Streams

Soletta has its own pattern for dealing with input/output streams. This section will describe how the soletta pattern is implemented and how to use it.

The aim of this section is to guide the developer to provide a unified API for all kind of streams, making it easier for a third party developer to migrate its application to any stream related IO, if necessary.

The pattern is divided in three categories: The stream configuration, the write API and the read API.

Configuration

The stream should be configured using a struct during its creation, this struct should provide the following fields.

Below there's a example of a configuration struct.

const void *user_data;
void (*on_feed_done)(void *user_data, struct my_stream_api_handle *handle, struct sol_blob *blob, int status);
ssize_t (*on_data)(void *user_data, struct my_stream_api_handle *handle, const struct sol_buffer *buf);
size_t feed_size;
};

Writing

Write operations must be async and the data that will be transferred must be provided as a pointer to sol_blob. One should follow the API below:

int my_stream_api_feed(struct my_stream_api_handle *handle, struct sol_blob *blob);

Every blob requested to be written, must be queued in order to be sent. If there's no more space to queue more blobs (sum of all queued blobs >= feed_size) the function write should return -ENOSPC.

Every time a blob is fully written the on_feed_done must be called in order to inform the user that the write operation was completed. The on_feed_done should have the following signature:

void (*on_feed_done)(void *user_data, my_stream_api_handle *handle, struct sol_blob *blob, int status);

One should warn the user that it's not necessary to use sol_blob_unref(), because the stream API will already do that. The status variable should be 0 on success and -errno on error. The user should be able to cancel the stream operation from the on_feed_done.

Below there's an example of a stream device write implementation.

const void *user_data;
void (*on_feed_done)(void *user_data, struct my_stream_api_handle *handle, struct sol_blob *blob, int status);
ssize_t (*on_data)(void *user_data, struct my_stream_api_handle *handle, const struct sol_buffer *buf);
struct sol_buffer rx;
size_t feed_size;
size_t pending_bytes;
size_t written;
int dev;
};
//The write operation itself
static bool
_can_write(void *data, int fd, uint32_t active_flags)
{
struct my_stream_api_handle *handle = data;
struct sol_blob *blob;
ssize_t status;
bool r = true;
//Write the blob
blob = sol_ptr_vector_get(&handle->pending_blobs, 0);
//Write into the stream
status = my_stream_device_write(handle->dev, (char *)blob->mem + handle->written, blob->size);
if (status < 0) {
if (status == EAGAIN || status == EINTR)
return true;
else {
SOL_WRN("Could not write to the stream device!");
handle->write_monitor = NULL;
return false;
}
}
//Update how many bytes have been written
handle->written += status;
handle->pending_bytes -= status;
//Blob was completly written. Inform the user.
if (handle->written == blob->size) {
//Do we still have more data ?
r = false;
handle->write_monitor = NULL;
}
//Reset the written counter
handle->written = 0;
/*
Inform the user.
Since it's completly safe to call my_stream_api_close() inside on_feed_done().
Informing the user should be the last thing to do.
*/
if (handle->on_feed_done)
handle->on_feed_done((void *)handle->user_data, handle, blob, status);
sol_blob_unref(blob); //NOTE: that we unref the blob, not the user!
}
return r;
}
int
my_stream_api_feed(struct my_stream_api_handle *handle, struct sol_blob *blob)
{
size_t total;
int r;
SOL_NULL_CHECK(handle, -EINVAL);
SOL_NULL_CHECK(blob, -EINVAL);
//Do not try to write with the uart is going to be closed
SOL_EXP_CHECK(handle->reentrant.delete_me, -EINVAL);
total = handle->pending_bytes + blob->size;
//feed_size was set and the total must not exceed feed_size
if (handle->feed_size && total >= handle->feed_size)
return -ENOSPC; //Try again later
//Store the blob and ref it, since it will written later.
r = sol_ptr_vector_append(&handle->pending_blobs, blob);
SOL_INT_CHECK(r, < 0, r);
sol_blob_ref(blob);
handle->pending_bytes = total;
//Schedule the write operation
if (!handle->write_monitor) {
SOL_FD_FLAGS_OUT, _can_write, handle);
SOL_NULL_CHECK_GOTO(handle->write_monitor, err_monitor);
}
return 0;
err_monitor:
return -ENOMEM;
}

Reading

Since this is a stream, there's no direct way for a third party developer to request a read operation. In order to be able to read from the stream, the third party developer must provide the on_data duration the stream configuration/creation. The on_data is provided during the stream creation, one must inform the user every time data is avaible to read. The on_data has the following signature:

ssize_t (*on_data)(void *user_data, my_stream_api_handle *handle, const struct sol_buffer *buf);

Note that the on_data returns a ssize_t, the returned value must be -errno if an error happened or the number of bytes consumed from buf (0 is valid). The consumed bytes will be removed from the buf. It's important to note that the buf should not be modified and any references to its data should not be kept after on_data returns. Also just like on_feed_done, the user should be able to close/delete the stream handle inside on_data. Extra care must be taken in order to do not crash.

Below there's an example of the stream device read implementation.

//Delivery the data to the user
static bool
_inform_user(void *data)
{
struct my_stream_api_handle *handle = data;
ssize_t r;
//Flag that the handle is in use
//Inform the user
r = handle->on_data((void *)handle->user_data, handle, &handle->rx);
}
//Remove the data.
if (r < 0)
SOL_ERR("Something wrong happened %zd", r);
else
sol_buffer_remove_data(&handle->rx, 0, r);
//Close the handle
if (handle->reentrant.delete_me) {
api_close(handle);
}
handle->read_timeout = NULL;
return false;
}
//Still need to callback the user with remaining data, keep the timer running
if (handle->rx.used)
return true;
handle->read_timeout = NULL;
return false;
}
//Actually read from the device
static bool
_can_read(void *data, int fd, uint32_t active_flags)
{
struct my_stream_api_handle *handle = data;
size_t remaining = handle->rx.capacity - handle->rx.used;
ssize_t status;
//The rx buffer is full. Try to expand it in order to store more data.
if (!remaining && !(handle->rx.flags & SOL_BUFFER_FLAGS_FIXED_CAPACITY)) {
int err;
SOL_INT_CHECK(err, < 0, true);
remaining = DEFAULT_BUFFER_SIZE;
}
if (remaining > 0) {
//Append data to the buffer
status = my_stream_device_read(handle->dev, sol_buffer_at_end(&handle->rx), remaining);
if (status < 0) {
if (status == EAGAIN || status == EINTR)
return true;
else {
SOL_WRN("Could not read to the stream device!");
handle->read_monitor = NULL;
return false;
}
}
handle->rx.used += status;
}
if (!handle->read_timeout)
handle->read_timeout = sol_timeout_add(0, _inform_user, handle);
return true;
}
//Stream creation, where on_data is configured
my_stream_api_new(const struct my_stream_api_config *config, int dev)
{
struct my_stream_api_handle *handle;
size_t data_buffer_size = 0;
void *buf = NULL;
//By default the rx buffer will not be limited
handle = calloc(1, sizeof(struct my_stream_api_handle));
SOL_NULL_CHECK(handle, NULL);
handle->on_feed_done = config->on_feed_done;
handle->feed_size = config->feed_size;
handle->user_data = config->user_data;
//The user does not want to read from the stream, ignore rx configuration.
if (config->on_data) {
handle->on_data = config->on_data;
data_buffer_size = config->data_buffer_size;
//The rx buffer has a fixed size and we should respect that.
if (data_buffer_size) {
buf = malloc(data_buffer_size);
SOL_NULL_CHECK_GOTO(buf, err_buf);
} //else - The user is informing that the rx buffer should be unlimited
//Setup input monitor
handle->read_monitor = my_stream_device_add_io_monitor(handle->dev, SOL_FD_FLAGS_IN, _can_read, handle);
SOL_NULL_CHECK_GOTO(handle->read_monitor, err_monitor);
}
sol_buffer_init_flags(&handle->rx, buf, data_buffer_size, flags);
handle->dev = dev;
return handle;
err_monitor:
free(buf);
err_buf:
free(handle);
return NULL;
}
static void
{
uint16_t i;
struct sol_blob *blob;
//Inform that some blobs where not sent
if (handle->on_feed_done)
handle->on_feed_done((void *)handle->user_data, handle, blob, -ECANCELED);
}
//Last chance to consume the rx buffer
if (handle->rx.used)
handle->on_data((void *)handle->user_data, handle, &handle->rx);
sol_buffer_fini(&handle->rx);
free(handle);
}
void
{
SOL_NULL_CHECK(handle);
if (handle->read_timeout) {
handle->read_timeout = NULL;
}
if (handle->read_monitor) {
handle->read_monitor = NULL;
}
if (handle->write_monitor) {
handle->write_monitor = NULL;
}
//The user is trying to delete the handle from the on_data, do not delete it now.
api_close(handle);
}
}

Usage Example

In the following example, it will be demonstrated how one can correctly use a stream API. The example is consists in an UART producer producing more data than an UART consumer can read. By doing so, it will be demonstrated how flow control can be employed.

First we start configuring the streams and creating the streams.

static void
startup(void)
{
struct sol_uart_config producer_config = {
.data_bits = SOL_UART_DATA_BITS_8,
.stop_bits = SOL_UART_STOP_BITS_ONE,
.on_feed_done = producer_data_written,
.feed_size = FEED_SIZE //Note that feed buffer is limited!
};
struct sol_uart_config consumer_config = {
.data_bits = SOL_UART_DATA_BITS_8,
.stop_bits = SOL_UART_STOP_BITS_ONE,
};
char **argv;
int argc;
argc = sol_argc();
argv = sol_argv();
if (argc < 3) {
fprintf(stderr, "Usage: %s <producerUART> <consumerUART>\n", argv[0]);
goto err_exit;
}
producer = sol_uart_open(argv[1], &producer_config);
if (!producer) {
fprintf(stderr, "Could not create the producer!\n");
goto err_exit;
}
consumer = sol_uart_open(argv[2], &consumer_config);
if (!consumer) {
fprintf(stderr, "Could not create the consumer\n");
goto err_exit;
}
//This will produce all the data to be sent
fprintf(stderr, "Could not create the producer timeout!\n");
goto err_exit;
}
return;
err_exit:
}

The producer will create its data every 10 ms ticks and try to send it. Since the producer buffer is limited sol_uart_feed() will start to return -ENOSPC, because the sum of all blobs are limited to feed_size.

Below we can see how the data is produced and sent.

static bool
send_blob(struct sol_blob *blob)
{
int err;
bool r = true;
//Actually write the blob into UART bus
err = sol_uart_feed(producer, blob);
if (err < 0) {
//Oh no, there's no more space left.
if (err == -ENOSPC) {
pending_blob = blob;
printf("No space left in the tx buffer - saving blob for later. Data: %.*s\n",
} else {
fprintf(stderr, "Could not not perform an UART write - Reason: %s\n",
r = false;
}
} else {
if (blob == pending_blob)
pending_blob = NULL; //Flag that data production can start once again!
}
return r;
}
static bool
producer_make_data(void *data)
{
void *v;
size_t size;
struct sol_blob *blob;
static uint16_t packets_created = 0;
bool keep_running = true;
int r;
//Stop the production until the pendind blob is sent
if (pending_blob) {
printf("Waiting for blob data: %.*s to be transferred.\n",
return true;
}
packets_created++;
//Generate data
if (packets_created != MAX_PACKETS)
r = sol_util_uuid_gen(true, true, &buf);
else {
keep_running = false;
}
if (r < 0) {
fprintf(stderr, "Could not create the UUID - Reason: %s\n",
goto err_exit;
}
v = sol_buffer_steal(&buf, &size);
v, size + 1);
if (!blob) {
fprintf(stderr, "Could not alloc memory for the blob\n");
goto err_exit;
}
//Send it
if (!send_blob(blob))
goto err_exit;
if (!keep_running)
goto exit;
return true;
err_exit:
exit:
return false;
}

It's important to note that every time sol_uart_feed() returns -ENOSPC the data production is ceased and the blob that could not be sent is stored at the pending_blob variable.

Every time a blob is completely written, the on_feed_done is called, in this case producer_data_written function. In this function the pending_blob variable is checked to see if it's not NULL. In case it's not NULL, we try to send it.

static void
producer_data_written(void *data, struct sol_uart *uart, struct sol_blob *blob, int status)
{
struct sol_str_slice slice;
slice = sol_str_slice_from_blob(blob);
if (status < 0) {
fprintf(stderr, "Could not write the UUID %.*s - Reason: %s\n", SOL_STR_SLICE_PRINT(slice),
sol_util_strerrora(-status));
} else {
printf("Producer: UUID %.*s written\n", SOL_STR_SLICE_PRINT(slice));
if (pending_blob) { //If we have a pending blob now it's the time to try to send it!
fprintf(stderr, "Could not send the pending blob!\n");
}
}
}
}

By stopping the data production when sol_uart_feed() returns -ENOSPC we control the data input flow. After sol_uart_feed() starts to accept blobs, the data production can start once again.

Now it's time to take a look at our consumer. It's a very naive consumer, it will just print the data to stdout every time the nul byte is found. In addiction it will close the UART input if it receives the "close" string. Note that is completely safe to close the UART from the on_data (the same applies to on_feed_done!)

static ssize_t
consumer_read_available(void *data, struct sol_uart *uart, const struct sol_buffer *buf)
{
struct sol_str_slice slice = sol_buffer_get_slice(buf);
char *sep;
sep = memchr(slice.data, '\0', slice.len);
if (!sep)
return 0; //Bytes will not be removed fom the rx buffer
//Close the UART!
if (sol_str_slice_str_contains(slice, "close")) {
sol_uart_close(uart); //This is completely safe
consumer = NULL;
printf("\n\n** Consumer **: Received the close command\n\n");
} else {
printf("\n\n** Consumer ** : Received UUID %.*s\n\n",
}
return slice.len; //slice.len bytes will be removed from the rx buffer
}

Now that you have the stream usage basics, you can start coding your apps!

Implementations

Soletta currently implements streams for the following modules: