Merge pull request #9521 from taosdata/feature/dnode3
set queue to a named type instead of void
This commit is contained in:
commit
234e2c53a0
|
@ -22,59 +22,57 @@ extern "C" {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
||||||
This set of API for queue is designed specially for vnode/mnode. The main purpose is to
|
This set of API for queue is designed specially for vnode/mnode. The main purpose is to
|
||||||
consume all the items instead of one item from a queue by one single read. Also, it can
|
consume all the items instead of one item from a queue by one single read. Also, it can
|
||||||
combine multiple queues into a queue set, a consumer thread can consume a queue set via
|
combine multiple queues into a queue set, a consumer thread can consume a queue set via
|
||||||
a single API instead of looping every queue by itself.
|
a single API instead of looping every queue by itself.
|
||||||
|
|
||||||
Notes:
|
Notes:
|
||||||
1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe
|
1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe
|
||||||
2: after taosCloseQueue/taosCloseQset is called, read/write operation APIs are not safe.
|
2: after taosCloseQueue/taosCloseQset is called, read/write operation APIs are not safe.
|
||||||
3: read/write operation APIs are multi-thread safe
|
3: read/write operation APIs are multi-thread safe
|
||||||
|
|
||||||
To remove the limitation and make this set of queue APIs multi-thread safe, REF(tref.c)
|
To remove the limitation and make this set of queue APIs multi-thread safe, REF(tref.c)
|
||||||
shall be used to set up the protection.
|
shall be used to set up the protection.
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
typedef void *taos_queue;
|
typedef struct STaosQueue STaosQueue;
|
||||||
typedef void *taos_qset;
|
typedef struct STaosQset STaosQset;
|
||||||
typedef void *taos_qall;
|
typedef struct STaosQall STaosQall;
|
||||||
typedef void (*FProcessItem)(void *ahandle, void *pItem);
|
typedef void (*FProcessItem)(void *ahandle, void *pItem);
|
||||||
typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems);
|
typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems);
|
||||||
|
|
||||||
taos_queue taosOpenQueue();
|
STaosQueue *taosOpenQueue();
|
||||||
void taosCloseQueue(taos_queue);
|
void taosCloseQueue(STaosQueue *queue);
|
||||||
void taosSetQueueFp(taos_queue, FProcessItem, FProcessItems);
|
void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp);
|
||||||
void *taosAllocateQitem(int size);
|
void *taosAllocateQitem(int32_t size);
|
||||||
void taosFreeQitem(void *pItem);
|
void taosFreeQitem(void *pItem);
|
||||||
int taosWriteQitem(taos_queue, void *pItem);
|
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||||
int taosReadQitem(taos_queue, void **pItem);
|
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||||
bool taosQueueEmpty(taos_queue);
|
bool taosQueueEmpty(STaosQueue *queue);
|
||||||
|
|
||||||
taos_qall taosAllocateQall();
|
STaosQall *taosAllocateQall();
|
||||||
void taosFreeQall(taos_qall);
|
void taosFreeQall(STaosQall *qall);
|
||||||
int taosReadAllQitems(taos_queue, taos_qall);
|
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall);
|
||||||
int taosGetQitem(taos_qall, void **pItem);
|
int32_t taosGetQitem(STaosQall *qall, void **ppItem);
|
||||||
void taosResetQitems(taos_qall);
|
void taosResetQitems(STaosQall *qall);
|
||||||
|
|
||||||
taos_qset taosOpenQset();
|
STaosQset *taosOpenQset();
|
||||||
void taosCloseQset();
|
void taosCloseQset(STaosQset *qset);
|
||||||
void taosQsetThreadResume(taos_qset param);
|
void taosQsetThreadResume(STaosQset *qset);
|
||||||
int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
|
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
|
||||||
void taosRemoveFromQset(taos_qset, taos_queue);
|
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue);
|
||||||
int taosGetQueueNumber(taos_qset);
|
int32_t taosGetQueueNumber(STaosQset *qset);
|
||||||
|
|
||||||
int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessItem *);
|
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp);
|
||||||
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessItems *);
|
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp);
|
||||||
|
|
||||||
int taosGetQueueItemsNumber(taos_queue param);
|
int32_t taosGetQueueItemsNumber(STaosQueue *queue);
|
||||||
int taosGetQsetItemsNumber(taos_qset param);
|
int32_t taosGetQsetItemsNumber(STaosQset *qset);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_UTIL_QUEUE_H*/
|
#endif /*_TD_UTIL_QUEUE_H*/
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ typedef struct SWorkerPool {
|
||||||
int32_t max; // max number of workers
|
int32_t max; // max number of workers
|
||||||
int32_t min; // min number of workers
|
int32_t min; // min number of workers
|
||||||
int32_t num; // current number of workers
|
int32_t num; // current number of workers
|
||||||
taos_qset qset;
|
STaosQset *qset;
|
||||||
const char *name;
|
const char *name;
|
||||||
SWorker *workers;
|
SWorker *workers;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
|
@ -44,8 +44,8 @@ typedef struct SWorkerPool {
|
||||||
typedef struct SMWorker {
|
typedef struct SMWorker {
|
||||||
int32_t id; // worker id
|
int32_t id; // worker id
|
||||||
pthread_t thread; // thread
|
pthread_t thread; // thread
|
||||||
taos_qall qall;
|
STaosQall *qall;
|
||||||
taos_qset qset; // queue set
|
STaosQset *qset; // queue set
|
||||||
SMWorkerPool *pool;
|
SMWorkerPool *pool;
|
||||||
} SMWorker;
|
} SMWorker;
|
||||||
|
|
||||||
|
@ -57,15 +57,15 @@ typedef struct SMWorkerPool {
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} SMWorkerPool;
|
} SMWorkerPool;
|
||||||
|
|
||||||
int32_t tWorkerInit(SWorkerPool *pool);
|
int32_t tWorkerInit(SWorkerPool *pool);
|
||||||
void tWorkerCleanup(SWorkerPool *pool);
|
void tWorkerCleanup(SWorkerPool *pool);
|
||||||
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp);
|
STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp);
|
||||||
void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue);
|
void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue);
|
||||||
|
|
||||||
int32_t tMWorkerInit(SMWorkerPool *pool);
|
int32_t tMWorkerInit(SMWorkerPool *pool);
|
||||||
void tMWorkerCleanup(SMWorkerPool *pool);
|
void tMWorkerCleanup(SMWorkerPool *pool);
|
||||||
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp);
|
STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp);
|
||||||
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue);
|
void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,7 +140,7 @@ void dmnInitOption(SDnodeOpt *pOption) {
|
||||||
pOption->sver = 30000000; //3.0.0.0
|
pOption->sver = 30000000; //3.0.0.0
|
||||||
pOption->numOfCores = tsNumOfCores;
|
pOption->numOfCores = tsNumOfCores;
|
||||||
pOption->numOfSupportVnodes = tsNumOfSupportVnodes;
|
pOption->numOfSupportVnodes = tsNumOfSupportVnodes;
|
||||||
pOption->numOfCommitThreads = 1;
|
pOption->numOfCommitThreads = tsNumOfCommitThreads;
|
||||||
pOption->statusInterval = tsStatusInterval;
|
pOption->statusInterval = tsStatusInterval;
|
||||||
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
|
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
|
||||||
pOption->ratioOfQueryCores = tsRatioOfQueryCores;
|
pOption->ratioOfQueryCores = tsRatioOfQueryCores;
|
||||||
|
|
|
@ -64,7 +64,7 @@ typedef struct {
|
||||||
int32_t maxNum;
|
int32_t maxNum;
|
||||||
void *queueFp;
|
void *queueFp;
|
||||||
SDnode *pDnode;
|
SDnode *pDnode;
|
||||||
taos_queue queue;
|
STaosQueue *queue;
|
||||||
union {
|
union {
|
||||||
SWorkerPool pool;
|
SWorkerPool pool;
|
||||||
SMWorkerPool mpool;
|
SMWorkerPool mpool;
|
||||||
|
@ -92,7 +92,7 @@ typedef struct {
|
||||||
SDnodeEps *dnodeEps;
|
SDnodeEps *dnodeEps;
|
||||||
pthread_t *threadId;
|
pthread_t *threadId;
|
||||||
SRWLatch latch;
|
SRWLatch latch;
|
||||||
taos_queue pMgmtQ;
|
STaosQueue *pMgmtQ;
|
||||||
SWorkerPool mgmtPool;
|
SWorkerPool mgmtPool;
|
||||||
} SDnodeMgmt;
|
} SDnodeMgmt;
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
#include "dndTransport.h"
|
#include "dndTransport.h"
|
||||||
#include "dndWorker.h"
|
#include "dndWorker.h"
|
||||||
|
|
||||||
static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs);
|
static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
|
||||||
|
|
||||||
static SBnode *dndAcquireBnode(SDnode *pDnode) {
|
static SBnode *dndAcquireBnode(SDnode *pDnode) {
|
||||||
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
@ -286,7 +286,7 @@ static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t code) {
|
static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
taosGetQitem(qall, (void **)&pMsg);
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
@ -294,7 +294,7 @@ static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t cod
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs) {
|
static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SBnode *pBnode = dndAcquireBnode(pDnode);
|
SBnode *pBnode = dndAcquireBnode(pDnode);
|
||||||
if (pBnode == NULL) {
|
if (pBnode == NULL) {
|
||||||
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
|
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
|
|
@ -27,20 +27,20 @@ typedef struct {
|
||||||
} SWrapperCfg;
|
} SWrapperCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int8_t dropped;
|
int8_t dropped;
|
||||||
int8_t accessState;
|
int8_t accessState;
|
||||||
uint64_t dbUid;
|
uint64_t dbUid;
|
||||||
char *db;
|
char *db;
|
||||||
char *path;
|
char *path;
|
||||||
SVnode *pImpl;
|
SVnode *pImpl;
|
||||||
taos_queue pWriteQ;
|
STaosQueue *pWriteQ;
|
||||||
taos_queue pSyncQ;
|
STaosQueue *pSyncQ;
|
||||||
taos_queue pApplyQ;
|
STaosQueue *pApplyQ;
|
||||||
taos_queue pQueryQ;
|
STaosQueue *pQueryQ;
|
||||||
taos_queue pFetchQ;
|
STaosQueue* pFetchQ;
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -72,9 +72,9 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
||||||
|
|
||||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
||||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
||||||
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
|
||||||
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
|
||||||
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
|
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
|
||||||
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
@ -768,7 +768,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
|
||||||
vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp);
|
vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
|
@ -804,7 +804,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
|
@ -815,7 +815,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
|
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
|
@ -826,7 +826,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pQueue == NULL) {
|
if (pQueue == NULL) {
|
||||||
|
|
|
@ -14,26 +14,29 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "ulog.h"
|
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
|
#include "ulog.h"
|
||||||
|
|
||||||
|
typedef struct STaosQnode STaosQnode;
|
||||||
|
|
||||||
typedef struct STaosQnode {
|
typedef struct STaosQnode {
|
||||||
struct STaosQnode *next;
|
STaosQnode *next;
|
||||||
char item[];
|
char item[];
|
||||||
} STaosQnode;
|
} STaosQnode;
|
||||||
|
|
||||||
typedef struct STaosQueue {
|
typedef struct STaosQueue {
|
||||||
int32_t itemSize;
|
int32_t itemSize;
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
struct STaosQnode *head;
|
STaosQnode *head;
|
||||||
struct STaosQnode *tail;
|
STaosQnode *tail;
|
||||||
struct STaosQueue *next; // for queue set
|
STaosQueue *next; // for queue set
|
||||||
struct STaosQset *qset; // for queue set
|
STaosQset *qset; // for queue set
|
||||||
void *ahandle; // for queue set
|
void *ahandle; // for queue set
|
||||||
FProcessItem itemFp;
|
FProcessItem itemFp;
|
||||||
FProcessItems itemsFp;
|
FProcessItems itemsFp;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} STaosQueue;
|
} STaosQueue;
|
||||||
|
|
||||||
typedef struct STaosQset {
|
typedef struct STaosQset {
|
||||||
|
@ -52,8 +55,8 @@ typedef struct STaosQall {
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
} STaosQall;
|
} STaosQall;
|
||||||
|
|
||||||
taos_queue taosOpenQueue() {
|
STaosQueue *taosOpenQueue() {
|
||||||
STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1);
|
STaosQueue *queue = calloc(sizeof(STaosQueue), 1);
|
||||||
if (queue == NULL) {
|
if (queue == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -65,16 +68,14 @@ taos_queue taosOpenQueue() {
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosSetQueueFp(taos_queue param, FProcessItem itemFp, FProcessItems itemsFp) {
|
void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) {
|
||||||
if (param == NULL) return;
|
if (queue == NULL) return;
|
||||||
STaosQueue *queue = (STaosQueue *)param;
|
|
||||||
queue->itemFp = itemFp;
|
queue->itemFp = itemFp;
|
||||||
queue->itemsFp = itemsFp;
|
queue->itemsFp = itemsFp;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCloseQueue(taos_queue param) {
|
void taosCloseQueue(STaosQueue *queue) {
|
||||||
if (param == NULL) return;
|
if (queue == NULL) return;
|
||||||
STaosQueue *queue = (STaosQueue *)param;
|
|
||||||
STaosQnode *pTemp;
|
STaosQnode *pTemp;
|
||||||
STaosQset *qset;
|
STaosQset *qset;
|
||||||
|
|
||||||
|
@ -98,9 +99,8 @@ void taosCloseQueue(taos_queue param) {
|
||||||
uTrace("queue:%p is closed", queue);
|
uTrace("queue:%p is closed", queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool taosQueueEmpty(taos_queue param) {
|
bool taosQueueEmpty(STaosQueue *queue) {
|
||||||
if (param == NULL) return true;
|
if (queue == NULL) return true;
|
||||||
STaosQueue *queue = (STaosQueue *)param;
|
|
||||||
|
|
||||||
bool empty = false;
|
bool empty = false;
|
||||||
pthread_mutex_lock(&queue->mutex);
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
@ -112,7 +112,7 @@ bool taosQueueEmpty(taos_queue param) {
|
||||||
return empty;
|
return empty;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosAllocateQitem(int size) {
|
void *taosAllocateQitem(int32_t size) {
|
||||||
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
|
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
|
||||||
|
|
||||||
if (pNode == NULL) return NULL;
|
if (pNode == NULL) return NULL;
|
||||||
|
@ -129,9 +129,8 @@ void taosFreeQitem(void *param) {
|
||||||
free(temp);
|
free(temp);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosWriteQitem(taos_queue param, void *item) {
|
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
|
||||||
STaosQueue *queue = (STaosQueue *)param;
|
STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
|
||||||
STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
|
|
||||||
pNode->next = NULL;
|
pNode->next = NULL;
|
||||||
|
|
||||||
pthread_mutex_lock(&queue->mutex);
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
@ -146,7 +145,7 @@ int taosWriteQitem(taos_queue param, void *item) {
|
||||||
|
|
||||||
queue->numOfItems++;
|
queue->numOfItems++;
|
||||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems);
|
uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
|
||||||
|
@ -155,22 +154,21 @@ int taosWriteQitem(taos_queue param, void *item) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosReadQitem(taos_queue param, void **pitem) {
|
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
||||||
STaosQueue *queue = (STaosQueue *)param;
|
|
||||||
STaosQnode *pNode = NULL;
|
STaosQnode *pNode = NULL;
|
||||||
int code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&queue->mutex);
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
|
||||||
if (queue->head) {
|
if (queue->head) {
|
||||||
pNode = queue->head;
|
pNode = queue->head;
|
||||||
*pitem = pNode->item;
|
*ppItem = pNode->item;
|
||||||
queue->head = pNode->next;
|
queue->head = pNode->next;
|
||||||
if (queue->head == NULL) queue->tail = NULL;
|
if (queue->head == NULL) queue->tail = NULL;
|
||||||
queue->numOfItems--;
|
queue->numOfItems--;
|
||||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
code = 1;
|
code = 1;
|
||||||
uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems);
|
uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
@ -178,18 +176,13 @@ int taosReadQitem(taos_queue param, void **pitem) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosAllocateQall() {
|
STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); }
|
||||||
void *p = calloc(sizeof(STaosQall), 1);
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
|
|
||||||
void taosFreeQall(void *param) { free(param); }
|
void taosFreeQall(STaosQall *qall) { free(qall); }
|
||||||
|
|
||||||
int taosReadAllQitems(taos_queue param, taos_qall p2) {
|
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
||||||
STaosQueue *queue = (STaosQueue *)param;
|
int32_t code = 0;
|
||||||
STaosQall *qall = (STaosQall *)p2;
|
bool empty;
|
||||||
int code = 0;
|
|
||||||
bool empty;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&queue->mutex);
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
|
||||||
|
@ -219,29 +212,25 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosGetQitem(taos_qall param, void **pitem) {
|
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
|
||||||
STaosQall *qall = (STaosQall *)param;
|
|
||||||
STaosQnode *pNode;
|
STaosQnode *pNode;
|
||||||
int num = 0;
|
int32_t num = 0;
|
||||||
|
|
||||||
pNode = qall->current;
|
pNode = qall->current;
|
||||||
if (pNode) qall->current = pNode->next;
|
if (pNode) qall->current = pNode->next;
|
||||||
|
|
||||||
if (pNode) {
|
if (pNode) {
|
||||||
*pitem = pNode->item;
|
*ppItem = pNode->item;
|
||||||
num = 1;
|
num = 1;
|
||||||
uTrace("item:%p is fetched", *pitem);
|
uTrace("item:%p is fetched", *ppItem);
|
||||||
}
|
}
|
||||||
|
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosResetQitems(taos_qall param) {
|
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
|
||||||
STaosQall *qall = (STaosQall *)param;
|
|
||||||
qall->current = qall->start;
|
|
||||||
}
|
|
||||||
|
|
||||||
taos_qset taosOpenQset() {
|
STaosQset *taosOpenQset() {
|
||||||
STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1);
|
STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1);
|
||||||
if (qset == NULL) {
|
if (qset == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -255,9 +244,8 @@ taos_qset taosOpenQset() {
|
||||||
return qset;
|
return qset;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCloseQset(taos_qset param) {
|
void taosCloseQset(STaosQset *qset) {
|
||||||
if (param == NULL) return;
|
if (qset == NULL) return;
|
||||||
STaosQset *qset = (STaosQset *)param;
|
|
||||||
|
|
||||||
// remove all the queues from qset
|
// remove all the queues from qset
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
@ -279,16 +267,12 @@ void taosCloseQset(taos_qset param) {
|
||||||
// tsem_post 'qset->sem', so that reader threads waiting for it
|
// tsem_post 'qset->sem', so that reader threads waiting for it
|
||||||
// resumes execution and return, should only be used to signal the
|
// resumes execution and return, should only be used to signal the
|
||||||
// thread to exit.
|
// thread to exit.
|
||||||
void taosQsetThreadResume(taos_qset param) {
|
void taosQsetThreadResume(STaosQset *qset) {
|
||||||
STaosQset *qset = (STaosQset *)param;
|
|
||||||
uDebug("qset:%p, it will exit", qset);
|
uDebug("qset:%p, it will exit", qset);
|
||||||
tsem_post(&qset->sem);
|
tsem_post(&qset->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
|
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
|
||||||
STaosQueue *queue = (STaosQueue *)p2;
|
|
||||||
STaosQset *qset = (STaosQset *)p1;
|
|
||||||
|
|
||||||
if (queue->qset) return -1;
|
if (queue->qset) return -1;
|
||||||
|
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
@ -309,10 +293,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
|
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
|
||||||
STaosQueue *queue = (STaosQueue *)p2;
|
|
||||||
STaosQset *qset = (STaosQset *)p1;
|
|
||||||
|
|
||||||
STaosQueue *tqueue = NULL;
|
STaosQueue *tqueue = NULL;
|
||||||
|
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
@ -353,18 +334,17 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
|
||||||
uTrace("queue:%p is removed from qset:%p", queue, qset);
|
uTrace("queue:%p is removed from qset:%p", queue, qset);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; }
|
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
|
||||||
|
|
||||||
int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessItem *itemFp) {
|
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) {
|
||||||
STaosQset *qset = (STaosQset *)param;
|
|
||||||
STaosQnode *pNode = NULL;
|
STaosQnode *pNode = NULL;
|
||||||
int code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
tsem_wait(&qset->sem);
|
tsem_wait(&qset->sem);
|
||||||
|
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
|
||||||
for (int i = 0; i < qset->numOfQueues; ++i) {
|
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
|
||||||
if (qset->current == NULL) qset->current = qset->head;
|
if (qset->current == NULL) qset->current = qset->head;
|
||||||
STaosQueue *queue = qset->current;
|
STaosQueue *queue = qset->current;
|
||||||
if (queue) qset->current = queue->next;
|
if (queue) qset->current = queue->next;
|
||||||
|
@ -375,7 +355,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
|
||||||
|
|
||||||
if (queue->head) {
|
if (queue->head) {
|
||||||
pNode = queue->head;
|
pNode = queue->head;
|
||||||
*pitem = pNode->item;
|
*ppItem = pNode->item;
|
||||||
if (ahandle) *ahandle = queue->ahandle;
|
if (ahandle) *ahandle = queue->ahandle;
|
||||||
if (itemFp) *itemFp = queue->itemFp;
|
if (itemFp) *itemFp = queue->itemFp;
|
||||||
queue->head = pNode->next;
|
queue->head = pNode->next;
|
||||||
|
@ -383,7 +363,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
|
||||||
queue->numOfItems--;
|
queue->numOfItems--;
|
||||||
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
||||||
code = 1;
|
code = 1;
|
||||||
uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems);
|
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
@ -395,18 +375,15 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessItems *itemsFp) {
|
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) {
|
||||||
STaosQset *qset = (STaosQset *)param;
|
|
||||||
STaosQueue *queue;
|
STaosQueue *queue;
|
||||||
STaosQall *qall = (STaosQall *)p2;
|
int32_t code = 0;
|
||||||
int code = 0;
|
|
||||||
|
|
||||||
tsem_wait(&qset->sem);
|
tsem_wait(&qset->sem);
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
|
|
||||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
|
||||||
if (qset->current == NULL)
|
if (qset->current == NULL) qset->current = qset->head;
|
||||||
qset->current = qset->head;
|
|
||||||
queue = qset->current;
|
queue = qset->current;
|
||||||
if (queue) qset->current = queue->next;
|
if (queue) qset->current = queue->next;
|
||||||
if (queue == NULL) break;
|
if (queue == NULL) break;
|
||||||
|
@ -427,34 +404,32 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FPr
|
||||||
queue->tail = NULL;
|
queue->tail = NULL;
|
||||||
queue->numOfItems = 0;
|
queue->numOfItems = 0;
|
||||||
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
|
||||||
for (int j=1; j<qall->numOfItems; ++j) tsem_wait(&qset->sem);
|
for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
|
||||||
if (code != 0) break;
|
if (code != 0) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&qset->mutex);
|
pthread_mutex_unlock(&qset->mutex);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosGetQueueItemsNumber(taos_queue param) {
|
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
|
||||||
STaosQueue *queue = (STaosQueue *)param;
|
|
||||||
if (!queue) return 0;
|
if (!queue) return 0;
|
||||||
|
|
||||||
int num;
|
int32_t num;
|
||||||
pthread_mutex_lock(&queue->mutex);
|
pthread_mutex_lock(&queue->mutex);
|
||||||
num = queue->numOfItems;
|
num = queue->numOfItems;
|
||||||
pthread_mutex_unlock(&queue->mutex);
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosGetQsetItemsNumber(taos_qset param) {
|
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
|
||||||
STaosQset *qset = (STaosQset *)param;
|
|
||||||
if (!qset) return 0;
|
if (!qset) return 0;
|
||||||
|
|
||||||
int num = 0;
|
int32_t num = 0;
|
||||||
pthread_mutex_lock(&qset->mutex);
|
pthread_mutex_lock(&qset->mutex);
|
||||||
num = qset->numOfItems;
|
num = qset->numOfItems;
|
||||||
pthread_mutex_unlock(&qset->mutex);
|
pthread_mutex_unlock(&qset->mutex);
|
||||||
|
|
|
@ -85,9 +85,9 @@ static void *tWorkerThreadFp(SWorker *worker) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) {
|
STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) {
|
||||||
pthread_mutex_lock(&pool->mutex);
|
pthread_mutex_lock(&pool->mutex);
|
||||||
taos_queue queue = taosOpenQueue();
|
STaosQueue *queue = taosOpenQueue();
|
||||||
if (queue == NULL) {
|
if (queue == NULL) {
|
||||||
pthread_mutex_unlock(&pool->mutex);
|
pthread_mutex_unlock(&pool->mutex);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -121,7 +121,7 @@ taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp)
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tWorkerFreeQueue(SWorkerPool *pool, void *queue) {
|
void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) {
|
||||||
taosCloseQueue(queue);
|
taosCloseQueue(queue);
|
||||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
||||||
}
|
}
|
||||||
|
@ -195,11 +195,11 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) {
|
STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) {
|
||||||
pthread_mutex_lock(&pool->mutex);
|
pthread_mutex_lock(&pool->mutex);
|
||||||
SMWorker *worker = pool->workers + pool->nextId;
|
SMWorker *worker = pool->workers + pool->nextId;
|
||||||
|
|
||||||
taos_queue *queue = taosOpenQueue();
|
STaosQueue *queue = taosOpenQueue();
|
||||||
if (queue == NULL) {
|
if (queue == NULL) {
|
||||||
pthread_mutex_unlock(&pool->mutex);
|
pthread_mutex_unlock(&pool->mutex);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -250,7 +250,7 @@ taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems f
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) {
|
void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) {
|
||||||
taosCloseQueue(queue);
|
taosCloseQueue(queue);
|
||||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue