minor changes

This commit is contained in:
Shengliang Guan 2022-02-28 14:21:18 +08:00
parent 7747ab2324
commit 99bc4379f0
4 changed files with 101 additions and 101 deletions

View File

@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_SCHED_H
#define _TD_UTIL_SCHED_H
#ifndef _TD_UTIL_SCHED_H_
#define _TD_UTIL_SCHED_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
@ -36,7 +38,7 @@ typedef struct SSchedMsg {
* @param label the label of the queue
* @return the created queue scheduler
*/
void *taosInitScheduler(int capacity, int numOfThreads, const char *label);
void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label);
/**
* Create a thread-safe ring-buffer based task queue and return the instance.
@ -47,7 +49,7 @@ void *taosInitScheduler(int capacity, int numOfThreads, const char *label);
* @param tmrCtrl the timer controller, tmr_ctrl_t*
* @return the created queue scheduler
*/
void *taosInitSchedulerWithInfo(int capacity, int numOfThreads, const char *label, void *tmrCtrl);
void *taosInitSchedulerWithInfo(int32_t capacity, int32_t numOfThreads, const char *label, void *tmrCtrl);
/**
* Clean up the queue scheduler instance and free the memory.
@ -68,4 +70,4 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg);
}
#endif
#endif /*_TD_UTIL_SCHED_H*/
#endif /*_TD_UTIL_SCHED_H_*/

View File

@ -16,22 +16,22 @@
#ifndef _TD_UTIL_SKILIST_H
#define _TD_UTIL_SKILIST_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taos.h"
#include "tarray.h"
#include "tfunctional.h"
#define MAX_SKIP_LIST_LEVEL 15
#ifdef __cplusplus
extern "C" {
#endif
#define MAX_SKIP_LIST_LEVEL 15
#define SKIP_LIST_RECORD_PERFORMANCE 0
// For key property setting
#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists (for tag index usage)
#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists (for tag index usage)
#define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key (for data update=0 case)
#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert (for data update!=0 case)
#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert (for data update!=0 case)
// For thread safety setting
#define SL_THREAD_SAFE (uint8_t)0x4
@ -39,17 +39,17 @@ extern "C" {
typedef char *SSkipListKey;
typedef char *(*__sl_key_fn_t)(const void *);
typedef void (*sl_patch_row_fn_t)(void * pDst, const void * pSrc);
typedef void* (*iter_next_fn_t)(void *iter);
typedef void (*sl_patch_row_fn_t)(void *pDst, const void *pSrc);
typedef void *(*iter_next_fn_t)(void *iter);
typedef struct SSkipListNode {
uint8_t level;
void * pData;
uint8_t level;
void *pData;
struct SSkipListNode *forwards[];
} SSkipListNode;
#define SL_GET_NODE_DATA(n) (n)->pData
#define SL_NODE_GET_FORWARD_POINTER(n, l) (n)->forwards[(l)]
#define SL_GET_NODE_DATA(n) (n)->pData
#define SL_NODE_GET_FORWARD_POINTER(n, l) (n)->forwards[(l)]
#define SL_NODE_GET_BACKWARD_POINTER(n, l) (n)->forwards[(n)->level + (l)]
/*
@ -100,14 +100,10 @@ typedef struct tSkipListState {
uint64_t nTotalElapsedTimeForInsert;
} tSkipListState;
typedef enum {
SSkipListPutSuccess = 0,
SSkipListPutEarlyStop = 1,
SSkipListPutSkipOne = 2
} SSkipListPutStatus;
typedef enum { SSkipListPutSuccess = 0, SSkipListPutEarlyStop = 1, SSkipListPutSkipOne = 2 } SSkipListPutStatus;
typedef struct SSkipList {
unsigned int seed;
uint32_t seed;
__compar_fn_t comparFn;
__sl_key_fn_t keyFn;
pthread_rwlock_t *lock;
@ -117,41 +113,41 @@ typedef struct SSkipList {
uint8_t type; // static info above
uint8_t level;
uint32_t size;
SSkipListNode * pHead; // point to the first element
SSkipListNode * pTail; // point to the last element
SSkipListNode *pHead; // point to the first element
SSkipListNode *pTail; // point to the last element
#if SKIP_LIST_RECORD_PERFORMANCE
tSkipListState state; // skiplist state
#endif
tGenericSavedFunc* insertHandleFn;
tGenericSavedFunc *insertHandleFn;
} SSkipList;
typedef struct SSkipListIterator {
SSkipList * pSkipList;
SSkipList *pSkipList;
SSkipListNode *cur;
int32_t step; // the number of nodes that have been checked already
int32_t order; // order of the iterator
SSkipListNode *next; // next points to the true qualified node in skiplist
int32_t step; // the number of nodes that have been checked already
int32_t order; // order of the iterator
SSkipListNode *next; // next points to the true qualified node in skiplist
} SSkipListIterator;
#define SL_IS_THREAD_SAFE(s) (((s)->flags) & SL_THREAD_SAFE)
#define SL_DUP_MODE(s) (((s)->flags) & ((((uint8_t)1) << 2) - 1))
#define SL_IS_THREAD_SAFE(s) (((s)->flags) & SL_THREAD_SAFE)
#define SL_DUP_MODE(s) (((s)->flags) & ((((uint8_t)1) << 2) - 1))
#define SL_GET_NODE_KEY(s, n) ((s)->keyFn((n)->pData))
#define SL_GET_MIN_KEY(s) SL_GET_NODE_KEY(s, SL_NODE_GET_FORWARD_POINTER((s)->pHead, 0))
#define SL_GET_MAX_KEY(s) SL_GET_NODE_KEY((s), SL_NODE_GET_BACKWARD_POINTER((s)->pTail, 0))
#define SL_SIZE(s) (s)->size
#define SL_GET_MIN_KEY(s) SL_GET_NODE_KEY(s, SL_NODE_GET_FORWARD_POINTER((s)->pHead, 0))
#define SL_GET_MAX_KEY(s) SL_GET_NODE_KEY((s), SL_NODE_GET_BACKWARD_POINTER((s)->pTail, 0))
#define SL_SIZE(s) (s)->size
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, __compar_fn_t comparFn, uint8_t flags,
__sl_key_fn_t fn);
void tSkipListDestroy(SSkipList *pSkipList);
SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData);
SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData);
void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate);
SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey);
SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey);
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel);
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList);
SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *val, int32_t type, int32_t order);
bool tSkipListIterNext(SSkipListIterator *iter);
SSkipListNode * tSkipListIterGet(SSkipListIterator *iter);
void * tSkipListDestroyIter(SSkipListIterator *iter);
SSkipListNode *tSkipListIterGet(SSkipListIterator *iter);
void *tSkipListDestroyIter(SSkipListIterator *iter);
uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key);
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode);
@ -159,4 +155,4 @@ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNod
}
#endif
#endif /*_TD_UTIL_SKILIST_H*/
#endif /*_TD_UTIL_SKILIST_H*/

View File

@ -13,34 +13,35 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdef.h"
#include "tutil.h"
#include "tlog.h"
#define _DEFAULT_SOURCE
#include "tsched.h"
#include "tdef.h"
#include "tlog.h"
#include "ttimer.h"
#include "tutil.h"
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
#define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue.
typedef struct {
char label[TSDB_LABEL_LEN];
tsem_t emptySem;
tsem_t fullSem;
pthread_mutex_t queueMutex;
int fullSlot;
int emptySlot;
int queueSize;
int numOfThreads;
pthread_t * qthread;
SSchedMsg * queue;
int32_t fullSlot;
int32_t emptySlot;
int32_t queueSize;
int32_t numOfThreads;
pthread_t *qthread;
SSchedMsg *queue;
bool stop;
void* pTmrCtrl;
void* pTimer;
void *pTmrCtrl;
void *pTimer;
} SSchedQueue;
static void *taosProcessSchedQueue(void *param);
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label) {
SSchedQueue *pSched = (SSchedQueue *)calloc(sizeof(SSchedQueue), 1);
if (pSched == NULL) {
uError("%s: no enough memory for pSched", label);
@ -62,7 +63,7 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
}
pSched->queueSize = queueSize;
tstrncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow
tstrncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow
pSched->fullSlot = 0;
pSched->emptySlot = 0;
@ -73,7 +74,7 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
return NULL;
}
if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
if (tsem_init(&pSched->emptySem, 0, (uint32_t)pSched->queueSize) != 0) {
uError("init %s:empty semaphore failed(%s)", label, strerror(errno));
taosCleanUpScheduler(pSched);
return NULL;
@ -86,11 +87,11 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
}
pSched->stop = false;
for (int i = 0; i < numOfThreads; ++i) {
for (int32_t i = 0; i < numOfThreads; ++i) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int code = pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched);
int32_t code = pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched);
pthread_attr_destroy(&attr);
if (code != 0) {
uError("%s: failed to create rpc thread(%s)", label, strerror(errno));
@ -105,8 +106,8 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
return (void *)pSched;
}
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label);
void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label);
if (tmrCtrl != NULL && pSched != NULL) {
pSched->pTmrCtrl = tmrCtrl;
@ -119,7 +120,7 @@ void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *lab
void *taosProcessSchedQueue(void *scheduler) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)scheduler;
int ret = 0;
int32_t ret = 0;
char name[16] = {0};
snprintf(name, tListLen(name), "%s-taskQ", pSched->label);
@ -164,7 +165,7 @@ void *taosProcessSchedQueue(void *scheduler) {
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)queueScheduler;
int ret = 0;
int32_t ret = 0;
if (pSched == NULL) {
uError("sched is not ready, msg:%p is dropped", pMsg);
@ -200,12 +201,12 @@ void taosCleanUpScheduler(void *param) {
if (pSched == NULL) return;
pSched->stop = true;
for (int i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
tsem_post(&pSched->fullSem);
}
}
for (int i = 0; i < pSched->numOfThreads; ++i) {
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
pthread_join(pSched->qthread[i], NULL);
}
@ -214,14 +215,14 @@ void taosCleanUpScheduler(void *param) {
tsem_destroy(&pSched->emptySem);
tsem_destroy(&pSched->fullSem);
pthread_mutex_destroy(&pSched->queueMutex);
if (pSched->pTimer) {
taosTmrStopA(&pSched->pTimer);
}
if (pSched->queue) free(pSched->queue);
if (pSched->qthread) free(pSched->qthread);
free(pSched); // fix memory leak
free(pSched); // fix memory leak
}
// for debug purpose, dump the scheduler status every 1min.
@ -230,11 +231,11 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
if (pSched == NULL || pSched->pTimer == NULL || pSched->pTimer != tmrId) {
return;
}
int32_t size = ((pSched->emptySlot - pSched->fullSlot) + pSched->queueSize) % pSched->queueSize;
if (size > 0) {
uDebug("scheduler:%s, current tasks in queue:%d, task thread:%d", pSched->label, size, pSched->numOfThreads);
}
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}

View File

@ -14,13 +14,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tcompare.h"
#define _DEFAULT_SOURCE
#include "tskiplist.h"
#include "tutil.h"
#include "tcompare.h"
#include "tlog.h"
#include "tutil.h"
static int initForwardBackwardPtr(SSkipList *pSkipList);
static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur);
static int32_t initForwardBackwardPtr(SSkipList *pSkipList);
static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur);
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
static void tSkipListCorrectLevel(SSkipList *pSkipList);
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
@ -31,10 +32,9 @@ static SSkipListNode *tSkipListNewNode(uint8_t level);
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
bool hasDup);
static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList);
static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList);
static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList);
static FORCE_INLINE int32_t tSkipListWLock(SSkipList *pSkipList);
static FORCE_INLINE int32_t tSkipListRLock(SSkipList *pSkipList);
static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList);
static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList);
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, __compar_fn_t comparFn, uint8_t flags,
@ -138,21 +138,21 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
bool hasDup = false;
char * pKey = NULL;
char * pDataKey = NULL;
int compare = 0;
char *pKey = NULL;
char *pDataKey = NULL;
int32_t compare = 0;
tSkipListWLock(pSkipList);
void* pData = iterate(iter);
if(pData == NULL) return;
void *pData = iterate(iter);
if (pData == NULL) return;
// backward to put the first data
hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
for (int level = 0; level < pSkipList->maxLevel; level++) {
for (int32_t level = 0; level < pSkipList->maxLevel; level++) {
forward[level] = SL_NODE_GET_BACKWARD_POINTER(backward[level], level);
}
@ -165,12 +165,12 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
pKey = SL_GET_MAX_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey);
if (compare > 0) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
for (int32_t i = 0; i < pSkipList->maxLevel; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
}
} else {
SSkipListNode *px = pSkipList->pHead;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
for (int32_t i = pSkipList->maxLevel - 1; i >= 0; --i) {
if (i < pSkipList->level) {
// set new px
if (forward[i] != pSkipList->pHead) {
@ -357,7 +357,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
int32_t id = 1;
char * prev = NULL;
char *prev = NULL;
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
@ -433,21 +433,21 @@ static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t
return iter;
}
static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList) {
static FORCE_INLINE int32_t tSkipListWLock(SSkipList *pSkipList) {
if (pSkipList->lock) {
return pthread_rwlock_wrlock(pSkipList->lock);
}
return 0;
}
static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList) {
static FORCE_INLINE int32_t tSkipListRLock(SSkipList *pSkipList) {
if (pSkipList->lock) {
return pthread_rwlock_rdlock(pSkipList->lock);
}
return 0;
}
static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) {
static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) {
if (pSkipList->lock) {
return pthread_rwlock_unlock(pSkipList->lock);
}
@ -455,12 +455,12 @@ static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) {
}
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) {
int compare = 0;
int32_t compare = 0;
bool hasDupKey = false;
char * pDataKey = pSkipList->keyFn(pData);
char *pDataKey = pSkipList->keyFn(pData);
if (pSkipList->size == 0) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
for (int32_t i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = pSkipList->pTail;
}
} else {
@ -470,7 +470,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
pKey = SL_GET_MAX_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey);
if (compare >= 0) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
for (int32_t i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = pSkipList->pTail;
}
@ -481,7 +481,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
pKey = SL_GET_MIN_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey);
if (compare < 0) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
for (int32_t i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i);
}
@ -489,7 +489,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
}
SSkipListNode *px = pSkipList->pTail;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
for (int32_t i = pSkipList->maxLevel - 1; i >= 0; --i) {
if (i < pSkipList->level) {
SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i);
while (p != pSkipList->pHead) {
@ -532,7 +532,8 @@ static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode)
// Function must be called after calling tSkipListRemoveNodeImpl() function
static void tSkipListCorrectLevel(SSkipList *pSkipList) {
while (pSkipList->level > 0 && SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == pSkipList->pTail) {
while (pSkipList->level > 0 &&
SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == pSkipList->pTail) {
pSkipList->level -= 1;
}
}
@ -636,7 +637,7 @@ static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_
return pNode;
}
static int initForwardBackwardPtr(SSkipList *pSkipList) {
static int32_t initForwardBackwardPtr(SSkipList *pSkipList) {
uint32_t maxLevel = pSkipList->maxLevel;
// head info
@ -685,12 +686,12 @@ static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipL
pSkipList->insertHandleFn->args[1] = pNode->pData;
pData = genericInvoke(pSkipList->insertHandleFn);
}
if(pData) {
if (pData) {
atomic_store_ptr(&(pNode->pData), pData);
}
} else {
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if(pSkipList->insertHandleFn) {
// for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if (pSkipList->insertHandleFn) {
pSkipList->insertHandleFn->args[0] = NULL;
pSkipList->insertHandleFn->args[1] = NULL;
genericInvoke(pSkipList->insertHandleFn);