329 lines
8.3 KiB
C

/*
* Copyright (c) 2007 Vreixo Formoso
*
* This file is part of the libisofs project; you can redistribute it and/or
* modify it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation. See COPYING file for details.
*/
/*
* Synchronized ring buffer, works with a writer thread and a read thread.
*
* TODO #00010 : optimize ring buffer
* - write/read at the end of buffer requires a second mutex_lock, even if
* there's enought space/data at the beginning
* - pre-buffer for writes < BLOCK_SIZE
*
*/
#include "buffer.h"
#include "libburn/libburn.h"
#include "ecma119.h"
#include <pthread.h>
#include <string.h>
#ifndef MIN
# define MIN(a, b) (((a) < (b)) ? (a) : (b))
#endif
struct iso_ring_buffer
{
uint8_t *buf;
/*
* Max number of bytes in buffer
*/
size_t cap;
/*
* Number of bytes available.
*/
size_t size;
/* position for reading and writing, offset from buf */
size_t rpos;
size_t wpos;
/*
* flags to report if read or writer threads ends execution
* 0 not finished, 1 finished ok, 2 finish with error
*/
unsigned int rend :2;
unsigned int wend :2;
/* just for statistical purposes */
unsigned int times_full;
unsigned int times_empty;
pthread_mutex_t mutex;
pthread_cond_t empty;
pthread_cond_t full;
};
/**
* Create a new buffer.
*
* The created buffer should be freed with iso_ring_buffer_free()
*
* @param size
* Number of blocks in buffer. You should supply a number >= 32, otherwise
* size will be ignored and 32 will be used by default, which leads to a
* 64 KiB buffer.
* @return
* 1 success, < 0 error
*/
int iso_ring_buffer_new(size_t size, IsoRingBuffer **rbuf)
{
IsoRingBuffer *buffer;
if (rbuf == NULL) {
return ISO_NULL_POINTER;
}
buffer = malloc(sizeof(IsoRingBuffer));
if (buffer == NULL) {
return ISO_OUT_OF_MEM;
}
buffer->cap = (size > 32 ? size : 32) * BLOCK_SIZE;
buffer->buf = malloc(buffer->cap);
if (buffer->buf == NULL) {
free(buffer);
return ISO_OUT_OF_MEM;
}
buffer->size = 0;
buffer->wpos = 0;
buffer->rpos = 0;
buffer->times_full = 0;
buffer->times_empty = 0;
buffer->rend = buffer->wend = 0;
/* init mutex and waiting queues */
pthread_mutex_init(&buffer->mutex, NULL);
pthread_cond_init(&buffer->empty, NULL);
pthread_cond_init(&buffer->full, NULL);
*rbuf = buffer;
return ISO_SUCCESS;
}
void iso_ring_buffer_free(IsoRingBuffer *buf)
{
if (buf == NULL) {
return;
}
free(buf->buf);
pthread_mutex_destroy(&buf->mutex);
pthread_cond_destroy(&buf->empty);
pthread_cond_destroy(&buf->full);
free(buf);
}
/**
* Write count bytes into buffer. It blocks until all bytes where written or
* reader close the buffer.
*
* @param buf
* the buffer
* @param data
* pointer to a memory region of at least coun bytes, from which data
* will be read.
* @param
* Number of bytes to write
* @return
* 1 succes, 0 read finished, < 0 error
*/
int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count)
{
size_t len;
int bytes_write = 0;
if (buf == NULL || data == NULL) {
return ISO_NULL_POINTER;
}
while (bytes_write < count) {
pthread_mutex_lock(&buf->mutex);
while (buf->size == buf->cap) {
/*
* Note. There's only a writer, so we have no race conditions.
* Thus, the while(buf->size == buf->cap) is used here
* only to propertly detect the reader has been cancelled
*/
if (buf->rend) {
/* the read procces has been finished */
pthread_mutex_unlock(&buf->mutex);
return 0;
}
buf->times_full++;
/* wait until space available */
pthread_cond_wait(&buf->full, &buf->mutex);
}
len = MIN(count - bytes_write, buf->cap - buf->size);
if (buf->wpos + len > buf->cap) {
len = buf->cap - buf->wpos;
}
memcpy(buf->buf + buf->wpos, data + bytes_write, len);
buf->wpos = (buf->wpos + len) % (buf->cap);
bytes_write += len;
buf->size += len;
/* wake up reader */
pthread_cond_signal(&buf->empty);
pthread_mutex_unlock(&buf->mutex);
}
return ISO_SUCCESS;
}
/**
* Read count bytes from the buffer into dest. It blocks until the desired
* bytes has been read. If the writer finishes before outputting enought
* bytes, 0 (EOF) is returned, the number of bytes already read remains
* unknown.
*
* @return
* 1 success, 0 EOF, < 0 error
*/
int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count)
{
size_t len;
int bytes_read = 0;
if (buf == NULL || dest == NULL) {
return ISO_NULL_POINTER;
}
while (bytes_read < count) {
pthread_mutex_lock(&buf->mutex);
while (buf->size == 0) {
/*
* Note. There's only a reader, so we have no race conditions.
* Thus, the while(buf->size == 0) is used here just to ensure
* a reader detects the EOF propertly if the writer has been
* canceled while the reader was waiting
*/
if (buf->wend) {
/* the writer procces has been finished */
pthread_mutex_unlock(&buf->mutex);
return 0; /* EOF */
}
buf->times_empty++;
/* wait until data available */
pthread_cond_wait(&buf->empty, &buf->mutex);
}
len = MIN(count - bytes_read, buf->size);
if (buf->rpos + len > buf->cap) {
len = buf->cap - buf->rpos;
}
memcpy(dest + bytes_read, buf->buf + buf->rpos, len);
buf->rpos = (buf->rpos + len) % (buf->cap);
bytes_read += len;
buf->size -= len;
/* wake up the writer */
pthread_cond_signal(&buf->full);
pthread_mutex_unlock(&buf->mutex);
}
return ISO_SUCCESS;
}
void iso_ring_buffer_writer_close(IsoRingBuffer *buf, int error)
{
pthread_mutex_lock(&buf->mutex);
buf->wend = error ? 2 : 1;
/* ensure no reader is waiting */
pthread_cond_signal(&buf->empty);
pthread_mutex_unlock(&buf->mutex);
}
void iso_ring_buffer_reader_close(IsoRingBuffer *buf, int error)
{
pthread_mutex_lock(&buf->mutex);
if (buf->rend) {
/* reader already closed */
pthread_mutex_unlock(&buf->mutex);
return;
}
buf->rend = error ? 2 : 1;
/* ensure no writer is waiting */
pthread_cond_signal(&buf->full);
pthread_mutex_unlock(&buf->mutex);
}
/**
* Get the times the buffer was full.
*/
unsigned int iso_ring_buffer_get_times_full(IsoRingBuffer *buf)
{
return buf->times_full;
}
/**
* Get the times the buffer was empty.
*/
unsigned int iso_ring_buffer_get_times_empty(IsoRingBuffer *buf)
{
return buf->times_empty;
}
/**
* Get the status of the buffer used by a burn_source.
*
* @param b
* A burn_source previously obtained with
* iso_image_create_burn_source().
* @param size
* Will be filled with the total size of the buffer, in bytes
* @param free_bytes
* Will be filled with the bytes currently available in buffer
* @return
* < 0 error, > 0 state:
* 1="active" : input and consumption are active
* 2="ending" : input has ended without error
* 3="failing" : input had error and ended,
* 5="abandoned" : consumption has ended prematurely
* 6="ended" : consumption has ended without input error
* 7="aborted" : consumption has ended after input error
*/
int iso_ring_buffer_get_status(struct burn_source *b, size_t *size,
size_t *free_bytes)
{
int ret;
IsoRingBuffer *buf;
if (b == NULL) {
return ISO_NULL_POINTER;
}
buf = ((Ecma119Image*)(b->data))->buffer;
/* get mutex */
pthread_mutex_lock(&buf->mutex);
if (size) {
*size = buf->cap;
}
if (free_bytes) {
*free_bytes = buf->cap - buf->size;
}
ret = (buf->rend ? 4 : 0) + (buf->wend + 1);
pthread_mutex_unlock(&buf->mutex);
return ret;
}