smax-clib v0.9
A C/C++ client library for SMA-X
Loading...
Searching...
No Matches
smax-queue.c File Reference

Functions to support pipelined pull requests from SMA-X. Because they don't requite a sequence of round-trips, pipelined pulls can be orders of magnitude faster than staggered regular pull requests. More...

Macros

#define _POSIX_C_SOURCE   199309
 For clock_gettime()
 

Functions

XSyncPointsmaxCreateSyncPoint ()
 
void smaxDestroySyncPoint (XSyncPoint *s)
 
int smaxQueue (const char *table, const char *key, XType type, int count, void *value, XMeta *meta)
 
int smaxQueueCallback (void(*f)(void *), void *arg)
 
int smaxSetMaxPendingPulls (int n)
 
int smaxSync (XSyncPoint *sync, int timeoutMillis)
 
int smaxWaitQueueComplete (int timeoutMillis)
 

Detailed Description

Functions to support pipelined pull requests from SMA-X. Because they don't requite a sequence of round-trips, pipelined pulls can be orders of magnitude faster than staggered regular pull requests.

Date
Jun 25, 2019
Author
Attila Kovacs

Function Documentation

◆ smaxCreateSyncPoint()

XSyncPoint * smaxCreateSyncPoint ( )

Creates a synchronization point that can be waited upon until all elements queued prior to creation are processed (retrieved from the database.

Returns
Pointer to a newly created synchronization point that can be waited upon.
See also
smaxSync()
smaxQueue()
smaxQueueCallback()

References XSyncPoint::isComplete, XSyncPoint::lock, XSyncPoint::status, X_INCOMPLETE, and X_SUCCESS.

◆ smaxDestroySyncPoint()

void smaxDestroySyncPoint ( XSyncPoint s)

Destroys a synchronization point, releasing the memory space allocated to it.

Parameters
sPointer to the synchronization point to discard.

References XSyncPoint::isComplete, and XSyncPoint::lock.

◆ smaxQueue()

int smaxQueue ( const char *  table,
const char *  key,
XType  type,
int  count,
void *  value,
XMeta meta 
)

Queues a pull requests for pipelined data retrieval. Because pipelined pulls are executed on a separate Redis client from the one used for sharing values, e.g. via smaxShare(), there is no guarantee as to the order of this pull operation and previously initiated shares from the same thread. This would only be an issue if you are trying to use queued read to read back a value you have just shared – which is not really a good use case anyway, as it generates network traffic for not real reason. But, if you must read back a value you have shared, you probably should use a regular smaxPull() call to ensure ordering.

Parameters
tableHash table name.
keyVariable name under which the data is stored.
typeSMA-X variable type, e.g. X_FLOAT or X_CHARS(40), of the buffer.
countNumber of points to retrieve into the buffer.
[out]valuePointer to the buffer to which the data is to be retrieved.
[out]metaPointer to the corresponding metadata structure, or NULL.
Returns
X_SUCCESS (0) if successful X_NAME_INVALID if the table and key are both NULL X_NULL if the value field is NULL or the return value of xQueue().
See also
smaxPull()
smaxLazyPull()
smaxCreateSyncPoint()
smaxQueueCallback()

References REDISX_PIPELINE_CHANNEL, SMAX_PIPE_READ_TIMEOUT_MILLIS, x_error(), X_GROUP_INVALID, X_NAME_INVALID, X_NO_SERVICE, X_NULL, X_SUCCESS, x_trace(), and xStringCopyOf().

◆ smaxQueueCallback()

int smaxQueueCallback ( void(*)(void *)  f,
void *  arg 
)

Adds a callback function to the queue to be called with the specified argument once all prior requests in the queue have been fullfilled (retrieved from the database).

As a general rule callbacks added to the pipeline should return very fast, and avoid blocking operations for the most part (using mutexes that may block for very short periods only may be excepted). If the user needs to do more processing, or make blocking calls (e.g. IO operartions) that may not return for longer periods, the callback should fire off processing in a separate thread, or else simply move the result into another asynchronous processing queue.

Parameters
fThe callback function that takes a pointer argument
argArgument to call the specified function with.
Returns
X_SUCCESS (0) or else X_NULL if the function parameter is NULL.
See also
smaxCreateSyncPoint()
smaxQueue()

References x_error(), X_NULL, and X_SUCCESS.

◆ smaxSetMaxPendingPulls()

int smaxSetMaxPendingPulls ( int  n)

Configures how many pull requests can be queued in when piped pulls are enabled. If the queue reaches the specified limit, no new pull requests can be submitted until responses arrive, draining the queue somewhat.

Parameters
nThe maximum number of pull requests that can be queued.
Returns
TRUE if the argument was valid, and the queue size was set to it, otherwise FALSE

References x_error(), X_FAILURE, and X_SUCCESS.

◆ smaxSync()

int smaxSync ( XSyncPoint sync,
int  timeoutMillis 
)

Waits for the queue to reach the specified sync point, up to an optional timeout limit.

Parameters
syncPointer to a queued synchronization point.
timeoutMillisAn optional timeout in milliseconds. When set to a positive value The call will be guaranteed to return in the specified interval, whether or not the pipelined reads all succeeded. The return value can be used to check for errors or if the call timed out before all data were collected. If X_TIMEDOUT is returned, smax_end_bulk_pulls() may be called again to allow more time for the queued read operations to complete. 0 or negative timeout values will cause the call to wait indefinitely until reads are complete.
Returns
X_SUCCESS (0) if all reads have completed successfully, or the first read error that was enountered (e.g. RM_INVALID_KEY), or: X_TIMEDOUT if the call timed out while still awaiting data for the queued read requests. X_NULL if the SyncPoint argument is NULL, or its mutex/condition field have not been initialized. X_FAILURE if the SyncPoint's mutex has not been initialized.

or the first pull error encountered in the queue since the current batch began.

See also
smaxCreateSyncPoint()
smaxWaitQueueComplete()

References XSyncPoint::isComplete, XSyncPoint::lock, XSyncPoint::status, x_error(), X_FAILURE, X_INCOMPLETE, X_NULL, X_SUCCESS, X_TIMEDOUT, x_trace(), and xvprintf.

◆ smaxWaitQueueComplete()

int smaxWaitQueueComplete ( int  timeoutMillis)

Waits until all queued pull requests have been retrieved from the database, or until the specified timeout it reached.

Parameters
timeoutMillisAn optional timeout in milliseconds. When set to a positive value The call will be guaranteed to return in the specified interval, whether or not the pipelined reads all succeeded. The return value can be used to check for errors or if the call timed out before all data were collected. If X_TIMEDOUT is returned, smax_end_bulk_pulls() may be called again to allow more time for the queued read operations to complete. 0 or negative timeout values will cause the call to wait indefinitely until reads are complete.
Returns
X_SUCCESS (0) if all reads have completed successfully, or the first read error that was enountered (e.g. RM_INVALID_KEY), or: X_TIMEDOUT if the call timed out while still awaiting data for the queued read requests.
See also
smaxSync()

References XSyncPoint::isComplete, XSyncPoint::lock, smaxSync(), XSyncPoint::status, X_INCOMPLETE, and X_SUCCESS.