What’s Mediastreamer2. Data movement mechanism

Igor Plastov
Level Up Coding
Published in
17 min readNov 25, 2020

--

(previous article What’s Mediastreamer2. Filters development)

Data movement in the mediastreamer is performed using queues described by the queue_t structure. Processions of messages of the mblk_t type are moving along the queues, messages by themselves do not contain data blocks, but only references to the previous, next message and to a data block. In addition, I want to emphasize especially, there is also a field for a link to a message of the same type, which allows you to organize messages into a singly linked list. A group of messages united by such a list will be called a tuple. Thus, any element of the queue can be a single message mblk_t, or maybe the head of a tuple of messages mblk_t. Each message in a tuple can have its own data block. We’ll discuss why tuples are needed a little later.

As mentioned above, the message itself does not contain a block of data; instead, it only contains a pointer to the memory area where the block is stored. In this part, the general picture of the work of the mediastreamer resembles the warehouse of doors in the cartoon “Monsters, Inc.”

Now, moving up the hierarchy, let’s consider in detail the listed entities of the data transfer mechanism in the mediastreamer.

5.1 Data block dblk_t

A data block consists of a header and a data buffer. The header is described by dblk_t structure,

Listing 5.1: Structure dblk_t

typedef struct datab
{
unsigned char *db_base; /* Pointer to the beginning of the data buffer. */
unsigned char *db_lim; /* Pointer to the end of the data buffer. */
void (*db_freefn)(void*); /* The function of freeing memory when deleting a block. */
int db_ref; /* Reference counter. */
} dblk_t;

The structure fields contain pointers to the beginning of the buffer, the end of the buffer, and the function to delete the data buffer. The last element in the header is the reference count db_ref , if it reaches zero, this serves as a signal to remove this block from memory. If the data block was created by the datab_alloc() function, then the data buffer will be placed in memory immediately after the header. In all other cases, the buffer can be located somewhere separately. The data buffer will contain signal samples or other data that we want to process with filters.

A new instance of the data block is created using the function:

dblk_t *datab_alloc(int size);

As an input parameter, it is passed the size of the data that the block will store. More memory is allocated in order to place a header — a datab structure at the beginning of the allocated memory. But when using other functions, this does not always happen, in some cases the data buffer may be located separately from the header of the data block. During creation, the structure fields are configured so that its db_base field points to the beginning of the data area, and db_lim to its end. The reference count db_ref is set to one. The data cleansing function pointer is set to zero.

5.2 Message mblk_t

As mentioned, the queue elements are of type mblk_t, it is defined as follows:

Listing 5.2: Structure mblk_t

typedef struct msgb
{
struct msgb *b_prev; // Pointer to the previous item in the list.
struct msgb *b_next; // Pointer to the next item in the list.
struct msgb *b_cont; // Pointer for “gluing” other messages to a message to create a message tuple.
struct datab *b_datap; // Pointer to the structure of the data block.
unsigned char *b_rptr; // Pointer to the beginning of the data area to read data from the b_datap buffer.
unsigned char *b_wptr; // Pointer to the beginning of the data area to write data to the b_datap buffer.
uint32_t reserved1; // Reserved field1, media streamer puts service information there.
uint32_t reserved2; // Reserved field2, media streamer places service information there.
#if defined(ORTP_TIMESTAMP)
struct timeval timestamp;
#endif
ortp_recv_addr_t recv_addr;
} mblk_t;

The mblk_t structure at the beginning contains the b_prev, b_next pointers, which are needed to organize a doubly linked list (which is the queue_t queue).

Then comes the b_contb_cont pointer, which is used only when the message enters the tuple. For the last message in the tuple, this pointer remains null.

Next, we see a pointer to the b_datap data block, for which the message exists. It is followed by pointers to the area within the block’s data buffer. The b_rptr field indicates the location from which the data from the buffer will be read. The b_wptr field indicates the location from which to write to the buffer.

The remaining fields are of a service nature and do not relate to the operation of the data transfer mechanism.

Figure 5.1 shows a single message named m1 and data block d1.

Figure 5.1: Message mblk_t

On the picture 5.2 depicts a tuple of three messages m1, m1_1, m1_2.

Figure 5.2: A tuple of three messages mblt_t

5.2.1 Functions for working
with mblk_t

A new message mblk_t is created by the function allocb():

mblk_t *allocb(int size, int pri);

it allocates in memory a new message mblk_t with a data block of the specified size, the second argument — pri is not used in this version of the library. In its place you need to substitute the BPRI_MED macro (after expanding the macro, zero will be substituted there). During the operation of the function, memory will be allocated for the structure of a new message and the function mblk_init() will be called, which will zero all fields of the created instance of the structure and then, using the above datab_alloc(), will create a data buffer. After that, the fields in the structure will be configured:

mp->b_datap = datab;
mp->b_rptr = mp->b_wptr = datab->db_base;
mp->b_next = mp->b_prev = mp->b_cont = NULL;

At the output, we get a new message with initialized fields and an empty data buffer. To add data to a message, you need to copy it to the data block buffer:

memcpy(msg->b_rptr, data, size);

datadata source pointer;sizesize of data.

Then you need to update the pointer to the write point so that it again points to the beginning of the free area in the buffer:

msg->b_wptr = msg->b_wptr + size

If you want to create a message from an existing buffer, without copying, then use the function esballoc() :

mblk_t *esballoc(uint8_t *buf, int size, int pri, void (*freefn)(void*));

After creating the message and the structure of the data block, it will set up its pointers to the data at the address buf. Those. in this case, the data buffer is not located after the header fields of the data block, as it was when the data block was created by the function datab_alloc(). The buffer with data passed to the function will remain where it was, but with the help of pointers it will be spurred to the newly created header of the data block, and that, accordingly, to the message.

Several data blocks can be concatenated to one message mblk_t. This is done with the appendb() function:

mblk_t * appendb(mblk_t *mp, const char *data, int size, bool_t pad);

mpa message to which one more data block will be added;datapointer to the block, a copy of which will be added to the message;sizedata size;padflag that the size of the allocated memory should be aligned along the 4-byte boundary (padding will be done with zeros).

If there is enough space in the existing message data buffer, the new data will be glued behind the data already there. If the free space in the message data buffer is less than size, then a new message is created with a sufficient buffer size and the data is copied into its buffer. This is a new message, hooked up to the original one using the b_cont pointer. In this case, the message becomes a tuple.

If you need to add one more data block to the tuple, then you need to use the function msgappend():

void msgappend(mblk_t *mp, const char *data, int size, bool_t pad);

it will find the last message in the tuple (such message will have zero b_cont) and call the function appendb() for this message.

You can find out the size of the data in a message or tuple using the function msgdsize():

int msgdsize(const mblk_t *mp);

it will loop through all the messages in the tuple and return the total amount of data in the data buffers of those messages. For each message, the amount of data is calculated as follows:

mp->b_wptr - mp->b_rptr

To combine two tuples, use the function concatb() :

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

it concatenates the tuple newm to the tail of mp and returns a pointer to the last message of the resulting tuple.

If necessary, the tuple can be turned into one message with a single data block, this is done by the function msgpullup() :

void msgpullup(mblk_t *mp,int len);

if len is -1, then the size of the allocated buffer is determined automatically. If len is a positive number, then a buffer of this size will be created and the message data of the tuple will be copied into it. If place in the buffer is over, copying will stop there. The first message of the tuple will receive a buffer of the new size with the copied data. The rest of the messages will be deleted and the memory returned to the heap.

When deleting the mblk_t structure, the reference count of the data block is taken into account, if it turns out to be zero when calling freeb(), then the data buffer is deleted along with the mblk_t instance that points to it.

Initializing new message fields mblk_init():

void mblk_init(mblk_t *mp);

Adding one more piece of data to the message appendb():

mblk_t * appendb(mblk_t *mp, const char *data, size_t size, bool_t pad);

If new data does not fit into the free space of the message data buffer, then a separately created message with a buffer of the required size is attached to the message (the pointer to the added message is set in the first message), the message turns into a tuple.

Adding a chunk of data to a tuple msgappend():

void msgappend(mblk_t *mp, const char *data, size_t size, bool_t pad);

The function calls appendb() in a loop.

Combining two tuples into one concatb():

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

Message newm will be appended to mp.

Making a copy of a single message copyb():

mblk_t *copyb(const mblk_t *mp);

Full copy of a tuple with all data blocks copymsg():

mblk_t *copymsg(const mblk_t *mp);

The elements of the tuple are copied by the function copyb().

Create a light copy of the mblk_t message. In this case, the data block is not copied, but the counter db_ref of its links is increased:

mblk_t *dupb(mblk_t *mp);

Making a light copy of a tuple. Data blocks are not copied, only their reference counts are increased db_ref:

mblk_t *dupmsg(mblk_t* m);

Joining all messages of a tuple into one message msgpullup():

void msgpullup(mblk_t *mp,size_t len);

If len is -1, then the size of the allocated buffer will be set automatically.

Delete message or tuple freemsg():

void freemsg(mblk_t *mp);

The reference count of the data block is decremented by one. If at the same time it reaches zero, then the data block is also deleted.

Counts the total amount of data in a message or tuple.

size_t msgdsize(const mblk_t *mp);

Copying the content of the reserved fields of one message to another message (in fact, these fields contain flags that are used by the mediastreamer):

mblk_meta_copy(const mblk_t *source, mblk *dest);

5.3 Queue queue_t

The message queue in the media streamer is implemented as a circular doubly linked list. Each element of the list contains a pointer to a data block with signal samples. It turns out that only pointers to the data block are moved in turn, while the data itself remains motionless. Structure describing the queue queue_t, shown below:

Listing 5.3: Structure queue_t

typedef struct _queue
{
mblk_t _q_stopper; /* Idle queue element, does not point to data, is used only for queue management. When initializing a queue(qinit()) its pointers are configured to point to itself. */
int q_mcount; /* Number of items in the queue. */
} queue_t;

The structure contains a field — pointer _q_stopper of type *mblk_t, it points to the first item (message) in the queue. The second field of the structure is the counter of messages in the queue. Below in the picture 5.3 a queue named q1, containing 4 messages m1, m2, m3, m4.

Figure 5.3: Queue of 4 messages

The following figure 5.4 a queue named q1, containing 4 messages m1, m2, m3, m4, wherein m2 — a tuple of two messages. Message m2 is the head of the tuple containing two more messages m2_1 and m2_2.

Figure 5.4: Queue of 3 messages and a tuple

5.3.1 Functions for
working with queue_t

Queue initialization:

void qinit(queue_t *q);

The _q_stopper field (hereinafter we will call it “stopper”) is initialized with the mblk_init() function, its pointer of the previous element and the next element are set so that they point to itself. The queue item counter is reset to zero.

Adding a new item (message):

void putq(queue_t *q, mblk_t *m);

A new element m is added to the end of the list, the element pointers are adjusted so that the stopper becomes the next element for it, and it becomes the previous one for the stopper. The queue item counter is incremented.

Retrieving an item from the queue:

mblk_t * getq(queue_t *q);

the message after the stopper is retrieved, the element counter is decremented. If there are no elements in the queue, except for the stopper, then 0 is returned.

Inserting a message into the queue:

void insq(queue_t *q, mblk_t *emp, mblk_t *mp);

The mp element is inserted before the emp element. If emp is 0, then the message is added to the tail of the queue.

Extracting a message from the head of the queue:

void remq(queue_t *q, mblk_t *mp);

The element counter is decremented by 1.

Reading a pointer to the first item in the queue:

mblk_t * peekq(queue_t *q);

Extracting a message from the tail of the queue:

mblk_t *ms_queue_peek_last (queue_t *q);

Removing all items from the queue while removing the items themselves:

void flushq(queue_t *q, int how);

The how argument is not used. The queue item counter is set to zero.

Macro for reading the pointer to the last element of the queue:

mblk_t * qlast(queue_t *q);

When working with message queues, keep in mind that when you call ms_queue_put (q, m) with a null pointer to a message, the function will loop. Your program will freeze. Function ms_queue_next (q, m) behaves similarly.

5.3.2 Connecting filters

The queues described above are used to transfer messages from one filter to another or from one to multiple filters. Filters and their connections to each other form a directed graph. The input or output of the filter will be called a generalizing word “ pin”. To describe the order of connections between filters, the mediastreamer uses the concept of “signal point”. Signal point is structure MSCPoint , which contains a pointer to the filter and the number of one of its pins, respectively, it describes the connection of one of the inputs or outputs of the filter.

Signal point of
data processing graph

Listing 5.4: Structure MSCPoint

typedef struct _MSCPoint
{
struct _MSFilter *filter; /* Pointer to the media streamer filter. */
int pin; /* The number of one of the filter inputs or outputs, i.e. pin. */
} MSCPoint;

Filter pins are numbered starting from zero. The connection of two pins with a message queue is described by the structure MSQueue, which contains a message queue and pointers to the two signal points that it connects:

typedef struct _MSQueue
{
queue_t q;
MSCPoint prev;
MSCPoint next;
}MSQueue;

We will call this structure « signal link».Each mediastreamer filter contains a table of input and a table of output signal links (MSQueue). The size of the tables is set when creating a filter, we have already done this using an exported variable of the type MSFilterDesc, when developing our own filter in chapter 4. Below, in the listing 5.5,shows a structure describing any filter in a mediastreamer, MSFilter:

Listing 5.5: Структура MSFilter

struct _MSFilter{
MSFilterDesc *desc; /* Pointer to filter descriptor. */
/* Protected attributes, they cannot be shifted or removed, otherwise the work with plugins will be broken. */
ms_mutex_t lock; /* Mutex. */
MSQueue **inputs; /* Input links table. */
MSQueue **outputs; /* Output links table. */
struct _MSFactory *factory; /* Pointer to the factory that created this filter instance. */
void *padding; /* Not used, will be used if protected fields are added. */
void *data; /* Pointer to an arbitrary structure for storing data for the internal state of the filter and intermediate calculations. */
struct _MSTicker *ticker; /* A pointer to the ticker object, which must not be null when the function is called process(). */
/*private attributes, they can be moved and changed at any time*/
MSList *notify_callbacks; /* List of callbacks used to handle filter events. */
uint32_t last_tick; /* Last measure number when the call of process() was made. */
MSFilterStats *stats; /* Filter statistics.*/
int postponed_task; /* Number of pending tasks. Some filters may delay data processing (call process()) for several measures.*/
bool_t seen; /* The flag used by the ticker to indicate that it has already served this filter instance at this clock cycle.*/
};
typedef struct _MSFilter MSFilter;

After we connected filters in a C program in accordance with our intention (but did not connect the ticker), we thereby created a directed graph, the nodes of which are instances of the MSFilter structure, and the edges are instances of the signal links MSQueue.

5.4 Behind-the-scenes
ticker activity

When I told you that the ticker is a clock source filter, there was not the whole truth about it. A ticker is an object that hourly executes the process() functions of all filters to which it is directly or indirectly connected. When we connect a ticker to a graph filter in a C program, we show the ticker a graph that it will control from now on until we turn it off. After connecting, the ticker begins to examine the graph entrusted to it, making a list of filters that it includes. In order not to “count” the same filter twice, it marks the detected filters by setting the flag seen. The search is carried out according to the tables of signal links that each filter has.

During its familiarization tour of the graph, the ticker checks if there is at least one filter among the filters, which acts as a source of data blocks. If none are found, then the graph is recognized as incorrect and the ticker crashes the program.

If the graph turned out to be “correct”, the preprocess() function is called for each found filter to initialize it. As soon as the moment comes for the next processing cycle (by default, every 10 milliseconds), the ticker calls the process() function for all previously found source filters, and then for the rest of the list filters. If the filter has input links, then the process() function is repeated until the input links queues are empty. After that, the ticker goes to the next filter in the list and “scrolls” it until the input links are free of messages. The ticker moves from filter to filter until the list is scrolled. This completes the processing of the measure.

Now we will return to tuples and talk about why such an entity was added to the mediastreamer. In the general case, the amount of data required by the algorithm working inside the filter does not coincide and is not a multiple of the size of the input data buffers. For example, we write a filter that performs Fast Fourier Transform, which by definition can only process blocks of data with a size equal to a power of two. Let it be 512 samples. If the data is generated by the telephone channel, then the data buffer of each message at the input will bring us 160 signal samples. There is a temptation not to take data from the input until the required amount of data is in the signal link. But in this case, there will be a collision with the ticker, which will unsuccessfully try to scroll the filter until the input link is empty. Earlier we labeled this rule as the third principle of the filter. According to this principle, the process() function of the filter should pick up all data from the input queues.

In addition, it will not be possible to pick up only 512 samples from the entrance, since you can pick up only whole blocks, i.e. the filter will have to take 640 samples and use 512 of them and store the remainder until a new portion of data is accumulated. Thus, our filter, in addition to its main work, must provide auxiliary actions for the intermediate storage of input data. The developers of the mediastreamer for solving of this general problem have developed a special object — MSBufferizer(bufferiser) which solves this task using tuples.

5.5 Buffering object
MSBufferizer

This is an object that can accumulate input data inside the filter and start giving it to processing as soon as the amount of information is sufficient to turn the filter algorithm. While the buffer is accumulating data, the filter will work in idle mode, without consuming the processing power of the processor. But as soon as the function of reading from the buffer returns a nonzero value, the process() function of the filter begins to take data from the buffer and process the data in portions of the required size, until they are exhausted. Unclaimed data remains in the buffer as the first element of the tuple, to which subsequent blocks of new input data are attached.

Structure MSBufferizer, which describes the buffering shown in the listing 5.6.

Listing 5.6: Structure MSBufferizer

struct _MSBufferizer
{
queue_t q; /* Message queue. */
int size; /* The total size of the data currently in the buffer. */
};
typedef struct _MSBufferizer MSBufferizer;

5.5.1 Functions for working
with MSBufferizer

Creating a new instance of the bufferiser:

MSBufferizer * ms_bufferizer_new(void);

Memory is allocated, initialized in ms_bufferizer_init(), and a pointer is returned.

Initialization function:

void ms_bufferizer_init(MSBufferizer *obj);

Queue q is initialized, the size field is set to zero.

Adding a message:

void ms_bufferizer_put(MSBufferizer *obj, mblk_t *m);

Message m is added to the queue. The computed size of the data blocks is added to size.

Buffering all messages in the data queue of link q:

void ms_bufferizer_put_from_queue(MSBufferizer *obj, MSQueue *q);

Transferring messages from link q to the buffer is performed using the ms_bufferizer_put() function.

Reading from the buffer:

int ms_bufferizer_read(MSBufferizer *obj, uint8_t *data, int datalen);

If the size of the data accumulated in the bufferiser turns out to be less than the requested one (datalen), the function returns zero, and the data is not copied into data. Otherwise, the data is sequentially copied from the tuples in the buffer. After copying, the tuple is deleted and the memory is freed. Copying ends at the moment when datalen bytes are copied. If the space in data ends among the source data block, then in this message, it will be reduced to the rest, not yet copied part. On the next call, copying will continue from this point.

Reading the amount of data that is currently available in the bufferiser:

int ms_bufferizer_get_avail(MSBufferizer *obj);

Returns the bufferiser’s size field.

Discarding some of the data in the bufferiser:

void ms_bufferizer_skip_bytes(MSBufferizer *obj, int bytes);

The specified number of bytes of data are retrieved and discarded. The oldest data is discarded.

Deleting all messages in the bufferiser:

void ms_bufferizer_flush(MSBufferizer *obj);

The data counter is reset to zero.

Deleting all messages in the bufferiser:

void ms_bufferizer_uninit(MSBufferizer *obj);

The counter is not reset.

Removing the bufferiser and freeing memory:

void ms_bufferizer_destroy(MSBufferizer *obj);

Examples of using the buffering can be found in the source code of several mediastreamer filters. For example, in the filter MS_L16_ENC, which swaps bytes in samples from network order to host order: l16.c

In the next chapter, we will look at debugging filters.

( next article What’s Mediastreamer2. Debugging
crafted filters
)

P.S.

If you are interesting how to easy draw fancy pictures like 5.1–5.4, you can read my article: Synergy of Graphviz and the C/C ++ Preprocessor

--

--