Merge branch '3.0' of github.com:taosdata/TDengine into feature/vnode

This commit is contained in:
Hongze Cheng 2021-12-30 19:31:19 +08:00
commit 9f6ab6da0d
17 changed files with 256 additions and 240 deletions

View File

@ -181,6 +181,7 @@ typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj);
typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
typedef struct { typedef struct {
ESdbType sdbType; ESdbType sdbType;
@ -279,7 +280,7 @@ void sdbRelease(SSdb *pSdb, void *pObj);
* *
* @param pSdb The sdb object. * @param pSdb The sdb object.
* @param type The type of the table. * @param type The type of the table.
* @param type The initial iterator of the table. * @param pIter The initial iterator of the table.
* @param pObj The object of the row just fetched. * @param pObj The object of the row just fetched.
* @return void* The next iterator of the table. * @return void* The next iterator of the table.
*/ */
@ -289,11 +290,22 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj);
* @brief Cancel a traversal * @brief Cancel a traversal
* *
* @param pSdb The sdb object. * @param pSdb The sdb object.
* @param pIter The iterator of the table.
* @param type The initial iterator of table. * @param type The initial iterator of table.
*/ */
void sdbCancelFetch(SSdb *pSdb, void *pIter); void sdbCancelFetch(SSdb *pSdb, void *pIter);
/**
* @brief Traverse a sdb
*
* @param pSdb The sdb object.
* @param type The initial iterator of table.
* @param fp The function pointer.
* @param p1 The callback param.
* @param p2 The callback param.
* @param p3 The callback param.
*/
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3);
/** /**
* @brief Get the number of rows in the table * @brief Get the number of rows in the table
* *

View File

@ -37,7 +37,6 @@ extern int32_t mqttDebugFlag;
extern int32_t monDebugFlag; extern int32_t monDebugFlag;
extern int32_t uDebugFlag; extern int32_t uDebugFlag;
extern int32_t rpcDebugFlag; extern int32_t rpcDebugFlag;
extern int32_t odbcDebugFlag;
extern int32_t qDebugFlag; extern int32_t qDebugFlag;
extern int32_t wDebugFlag; extern int32_t wDebugFlag;
extern int32_t sDebugFlag; extern int32_t sDebugFlag;

View File

@ -22,59 +22,57 @@ extern "C" {
/* /*
This set of API for queue is designed specially for vnode/mnode. The main purpose is to This set of API for queue is designed specially for vnode/mnode. The main purpose is to
consume all the items instead of one item from a queue by one single read. Also, it can consume all the items instead of one item from a queue by one single read. Also, it can
combine multiple queues into a queue set, a consumer thread can consume a queue set via combine multiple queues into a queue set, a consumer thread can consume a queue set via
a single API instead of looping every queue by itself. a single API instead of looping every queue by itself.
Notes: Notes:
1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe 1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe
2: after taosCloseQueue/taosCloseQset is called, read/write operation APIs are not safe. 2: after taosCloseQueue/taosCloseQset is called, read/write operation APIs are not safe.
3: read/write operation APIs are multi-thread safe 3: read/write operation APIs are multi-thread safe
To remove the limitation and make this set of queue APIs multi-thread safe, REF(tref.c) To remove the limitation and make this set of queue APIs multi-thread safe, REF(tref.c)
shall be used to set up the protection. shall be used to set up the protection.
*/ */
typedef void *taos_queue; typedef struct STaosQueue STaosQueue;
typedef void *taos_qset; typedef struct STaosQset STaosQset;
typedef void *taos_qall; typedef struct STaosQall STaosQall;
typedef void (*FProcessItem)(void *ahandle, void *pItem); typedef void (*FProcessItem)(void *ahandle, void *pItem);
typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems); typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems);
taos_queue taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(taos_queue); void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(taos_queue, FProcessItem, FProcessItems); void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp);
void *taosAllocateQitem(int size); void *taosAllocateQitem(int32_t size);
void taosFreeQitem(void *pItem); void taosFreeQitem(void *pItem);
int taosWriteQitem(taos_queue, void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
int taosReadQitem(taos_queue, void **pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
bool taosQueueEmpty(taos_queue); bool taosQueueEmpty(STaosQueue *queue);
taos_qall taosAllocateQall(); STaosQall *taosAllocateQall();
void taosFreeQall(taos_qall); void taosFreeQall(STaosQall *qall);
int taosReadAllQitems(taos_queue, taos_qall); int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall);
int taosGetQitem(taos_qall, void **pItem); int32_t taosGetQitem(STaosQall *qall, void **ppItem);
void taosResetQitems(taos_qall); void taosResetQitems(STaosQall *qall);
taos_qset taosOpenQset(); STaosQset *taosOpenQset();
void taosCloseQset(); void taosCloseQset(STaosQset *qset);
void taosQsetThreadResume(taos_qset param); void taosQsetThreadResume(STaosQset *qset);
int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue);
int taosGetQueueNumber(taos_qset); int32_t taosGetQueueNumber(STaosQset *qset);
int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessItem *); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp);
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessItems *); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp);
int taosGetQueueItemsNumber(taos_queue param); int32_t taosGetQueueItemsNumber(STaosQueue *queue);
int taosGetQsetItemsNumber(taos_qset param); int32_t taosGetQsetItemsNumber(STaosQset *qset);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_UTIL_QUEUE_H*/ #endif /*_TD_UTIL_QUEUE_H*/

View File

@ -35,7 +35,7 @@ typedef struct SWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t min; // min number of workers int32_t min; // min number of workers
int32_t num; // current number of workers int32_t num; // current number of workers
taos_qset qset; STaosQset *qset;
const char *name; const char *name;
SWorker *workers; SWorker *workers;
pthread_mutex_t mutex; pthread_mutex_t mutex;
@ -44,8 +44,8 @@ typedef struct SWorkerPool {
typedef struct SMWorker { typedef struct SMWorker {
int32_t id; // worker id int32_t id; // worker id
pthread_t thread; // thread pthread_t thread; // thread
taos_qall qall; STaosQall *qall;
taos_qset qset; // queue set STaosQset *qset; // queue set
SMWorkerPool *pool; SMWorkerPool *pool;
} SMWorker; } SMWorker;
@ -57,15 +57,15 @@ typedef struct SMWorkerPool {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SMWorkerPool; } SMWorkerPool;
int32_t tWorkerInit(SWorkerPool *pool); int32_t tWorkerInit(SWorkerPool *pool);
void tWorkerCleanup(SWorkerPool *pool); void tWorkerCleanup(SWorkerPool *pool);
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp);
void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue); void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue);
int32_t tMWorkerInit(SMWorkerPool *pool); int32_t tMWorkerInit(SMWorkerPool *pool);
void tMWorkerCleanup(SMWorkerPool *pool); void tMWorkerCleanup(SMWorkerPool *pool);
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp);
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue); void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -140,7 +140,7 @@ void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = 30000000; //3.0.0.0 pOption->sver = 30000000; //3.0.0.0
pOption->numOfCores = tsNumOfCores; pOption->numOfCores = tsNumOfCores;
pOption->numOfSupportVnodes = tsNumOfSupportVnodes; pOption->numOfSupportVnodes = tsNumOfSupportVnodes;
pOption->numOfCommitThreads = 1; pOption->numOfCommitThreads = tsNumOfCommitThreads;
pOption->statusInterval = tsStatusInterval; pOption->statusInterval = tsStatusInterval;
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
pOption->ratioOfQueryCores = tsRatioOfQueryCores; pOption->ratioOfQueryCores = tsRatioOfQueryCores;

View File

@ -64,7 +64,7 @@ typedef struct {
int32_t maxNum; int32_t maxNum;
void *queueFp; void *queueFp;
SDnode *pDnode; SDnode *pDnode;
taos_queue queue; STaosQueue *queue;
union { union {
SWorkerPool pool; SWorkerPool pool;
SMWorkerPool mpool; SMWorkerPool mpool;
@ -92,7 +92,7 @@ typedef struct {
SDnodeEps *dnodeEps; SDnodeEps *dnodeEps;
pthread_t *threadId; pthread_t *threadId;
SRWLatch latch; SRWLatch latch;
taos_queue pMgmtQ; STaosQueue *pMgmtQ;
SWorkerPool mgmtPool; SWorkerPool mgmtPool;
} SDnodeMgmt; } SDnodeMgmt;

View File

@ -19,7 +19,7 @@
#include "dndTransport.h" #include "dndTransport.h"
#include "dndWorker.h" #include "dndWorker.h"
static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
static SBnode *dndAcquireBnode(SDnode *pDnode) { static SBnode *dndAcquireBnode(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt; SBnodeMgmt *pMgmt = &pDnode->bmgmt;
@ -286,7 +286,7 @@ static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t code) { static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
@ -294,7 +294,7 @@ static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t cod
} }
} }
static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs) { static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
SBnode *pBnode = dndAcquireBnode(pDnode); SBnode *pBnode = dndAcquireBnode(pDnode);
if (pBnode == NULL) { if (pBnode == NULL) {
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);

View File

@ -27,20 +27,20 @@ typedef struct {
} SWrapperCfg; } SWrapperCfg;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t refCount; int32_t refCount;
int32_t vgVersion; int32_t vgVersion;
int8_t dropped; int8_t dropped;
int8_t accessState; int8_t accessState;
uint64_t dbUid; uint64_t dbUid;
char *db; char *db;
char *path; char *path;
SVnode *pImpl; SVnode *pImpl;
taos_queue pWriteQ; STaosQueue *pWriteQ;
taos_queue pSyncQ; STaosQueue *pSyncQ;
taos_queue pApplyQ; STaosQueue *pApplyQ;
taos_queue pQueryQ; STaosQueue *pQueryQ;
taos_queue pFetchQ; STaosQueue* pFetchQ;
} SVnodeObj; } SVnodeObj;
typedef struct { typedef struct {
@ -72,9 +72,9 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
@ -768,7 +768,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp); vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp);
} }
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
@ -804,7 +804,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
@ -815,7 +815,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
} }
} }
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
@ -826,7 +826,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
} }
} }
static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0; int32_t code = 0;
if (pQueue == NULL) { if (pQueue == NULL) {

View File

@ -117,7 +117,7 @@ typedef struct {
int64_t rebootTime; int64_t rebootTime;
int64_t lastAccessTime; int64_t lastAccessTime;
int32_t accessTimes; int32_t accessTimes;
int16_t numOfVnodes; int32_t numOfVnodes;
int32_t numOfSupportVnodes; int32_t numOfSupportVnodes;
int32_t numOfCores; int32_t numOfCores;
EDndReason offlineReason; EDndReason offlineReason;

View File

@ -549,7 +549,7 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
return 0; return 0;
} }
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb); SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
@ -725,7 +725,7 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
STransAction action = {0}; STransAction action = {0};
SVnodeGid * pVgid = pVgroup->vnodeGid + vn; SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1; if (pDnode == NULL) return -1;

View File

@ -196,6 +196,8 @@ static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return pDnode; return pDnode;
} }
sdbRelease(pSdb, pDnode);
} }
return NULL; return NULL;
@ -419,10 +421,9 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont; SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont;
pCreate->port = htonl(pCreate->port);
mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port); mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port);
pCreate->port = htonl(pCreate->port);
if (pCreate->fqdn[0] == 0 || pCreate->port <= 0 || pCreate->port > UINT16_MAX) { if (pCreate->fqdn[0] == 0 || pCreate->port <= 0 || pCreate->port > UINT16_MAX) {
terrno = TSDB_CODE_MND_INVALID_DNODE_EP; terrno = TSDB_CODE_MND_INVALID_DNODE_EP;
mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr()); mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr());

View File

@ -219,6 +219,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
} }
pEpSet->numOfEps++; pEpSet->numOfEps++;
sdbRelease(pSdb, pObj);
} }
} }

View File

@ -79,17 +79,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for (int32_t i = 0; i < redoLogNum; ++i) { for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
rawDataLen += sdbGetRawTotalSize(pTmp); rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
} }
for (int32_t i = 0; i < undoLogNum; ++i) { for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
rawDataLen += sdbGetRawTotalSize(pTmp); rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
} }
for (int32_t i = 0; i < commitLogNum; ++i) { for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
rawDataLen += sdbGetRawTotalSize(pTmp); rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
} }
for (int32_t i = 0; i < redoActionNum; ++i) { for (int32_t i = 0; i < redoActionNum; ++i) {
@ -437,7 +437,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans); SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
return -1; return -1;
} }
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
@ -835,7 +835,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans); SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr());
} }
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

View File

@ -238,39 +238,48 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p
return pDrop; return pDrop;
} }
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SDnodeObj *pDnode = pObj;
pDnode->numOfVnodes = 0;
return true;
}
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SDnodeObj *pDnode = pObj;
SArray *pArray = p1;
pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
if (online && pDnode->numOfSupportVnodes > 0) {
taosArrayPush(pArray, pDnode);
}
bool isMnode = mndIsMnode(pMnode, pDnode->id);
mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes,
pDnode->numOfSupportVnodes, isMnode, online);
if (isMnode) {
pDnode->numOfVnodes++;
}
return true;
}
static SArray *mndBuildDnodesArray(SMnode *pMnode) { static SArray *mndBuildDnodesArray(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfDnodes = mndGetDnodeSize(pMnode); int32_t numOfDnodes = mndGetDnodeSize(pMnode);
SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj)); SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
if (pArray == NULL) { if (pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
void *pIter = NULL; sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
while (1) { sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL);
SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
bool isMnode = mndIsMnode(pMnode, pDnode->id);
if (isMnode) {
pDnode->numOfVnodes++;
}
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
if (online) {
taosArrayPush(pArray, pDnode);
}
mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes,
pDnode->numOfSupportVnodes, isMnode, online);
sdbRelease(pSdb, pDnode);
}
return pArray; return pArray;
} }
@ -302,7 +311,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
pVgid->role = TAOS_SYNC_STATE_FOLLOWER; pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
} }
mDebug("db:%s, vgId:%d, vindex:%d dnodeId:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId); mDebug("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
pDnode->numOfVnodes++; pDnode->numOfVnodes++;
} }
@ -412,6 +421,20 @@ static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
int64_t uid = *(int64_t *)p1;
int8_t *pReplica = p2;
int32_t *pNumOfVgroups = p3;
if (pVgroup->dbUid == uid) {
*pReplica = MAX(*pReplica, pVgroup->replica);
(*pNumOfVgroups)++;
}
return true;
}
static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) { static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName); SDbObj *pDb = mndAcquireDb(pMnode, dbName);
@ -420,25 +443,10 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
return -1; return -1;
} }
int8_t replica = 1; *pReplica = 1;
int32_t numOfVgroups = 0; *pNumOfVgroups = 0;
sdbTraverse(pSdb, SDB_VGROUP, mndGetVgroupMaxReplicaFp, &pDb->uid, pReplica, pNumOfVgroups);
void *pIter = NULL; mndReleaseDb(pMnode, pDb);
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
replica = MAX(replica, pVgroup->replica);
numOfVgroups++;
}
sdbRelease(pSdb, pVgroup);
}
*pReplica = replica;
*pNumOfVgroups = numOfVgroups;
return 0; return 0;
} }
@ -540,25 +548,23 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = pObj;
int32_t numOfVnodes = 0; int32_t dnodeId = *(int32_t *)p1;
void *pIter = NULL; int32_t *pNumOfVnodes = (int32_t *)p2;
while (1) { for (int32_t v = 0; v < pVgroup->replica; ++v) {
SVgObj *pVgroup = NULL; if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); (*pNumOfVnodes)++;
if (pIter == NULL) break;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
numOfVnodes++;
}
} }
sdbRelease(pSdb, pVgroup);
} }
return true;
}
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
int32_t numOfVnodes = 0;
sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
return numOfVnodes; return numOfVnodes;
} }

View File

@ -336,6 +336,30 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);
} }
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
SSdbRow *pRow = *ppRow;
if (pRow->status == SDB_STATUS_READY) {
bool isContinue = (*fp)(pSdb->pMnode, pRow->pObj, p1, p2, p3);
if (!isContinue) {
taosHashCancelIterate(hash, ppRow);
break;
}
}
ppRow = taosHashIterate(hash, ppRow);
}
taosRUnLockLatch(pLock);
}
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return 0; if (hash == NULL) return 0;

View File

@ -14,26 +14,29 @@
*/ */
#include "os.h" #include "os.h"
#include "ulog.h"
#include "taoserror.h" #include "taoserror.h"
#include "tqueue.h" #include "tqueue.h"
#include "ulog.h"
typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode { typedef struct STaosQnode {
struct STaosQnode *next; STaosQnode *next;
char item[]; char item[];
} STaosQnode; } STaosQnode;
typedef struct STaosQueue { typedef struct STaosQueue {
int32_t itemSize; int32_t itemSize;
int32_t numOfItems; int32_t numOfItems;
struct STaosQnode *head; STaosQnode *head;
struct STaosQnode *tail; STaosQnode *tail;
struct STaosQueue *next; // for queue set STaosQueue *next; // for queue set
struct STaosQset *qset; // for queue set STaosQset *qset; // for queue set
void *ahandle; // for queue set void *ahandle; // for queue set
FProcessItem itemFp; FProcessItem itemFp;
FProcessItems itemsFp; FProcessItems itemsFp;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} STaosQueue; } STaosQueue;
typedef struct STaosQset { typedef struct STaosQset {
@ -52,8 +55,8 @@ typedef struct STaosQall {
int32_t numOfItems; int32_t numOfItems;
} STaosQall; } STaosQall;
taos_queue taosOpenQueue() { STaosQueue *taosOpenQueue() {
STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1); STaosQueue *queue = calloc(sizeof(STaosQueue), 1);
if (queue == NULL) { if (queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -65,16 +68,14 @@ taos_queue taosOpenQueue() {
return queue; return queue;
} }
void taosSetQueueFp(taos_queue param, FProcessItem itemFp, FProcessItems itemsFp) { void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) {
if (param == NULL) return; if (queue == NULL) return;
STaosQueue *queue = (STaosQueue *)param;
queue->itemFp = itemFp; queue->itemFp = itemFp;
queue->itemsFp = itemsFp; queue->itemsFp = itemsFp;
} }
void taosCloseQueue(taos_queue param) { void taosCloseQueue(STaosQueue *queue) {
if (param == NULL) return; if (queue == NULL) return;
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pTemp; STaosQnode *pTemp;
STaosQset *qset; STaosQset *qset;
@ -98,9 +99,8 @@ void taosCloseQueue(taos_queue param) {
uTrace("queue:%p is closed", queue); uTrace("queue:%p is closed", queue);
} }
bool taosQueueEmpty(taos_queue param) { bool taosQueueEmpty(STaosQueue *queue) {
if (param == NULL) return true; if (queue == NULL) return true;
STaosQueue *queue = (STaosQueue *)param;
bool empty = false; bool empty = false;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
@ -112,7 +112,7 @@ bool taosQueueEmpty(taos_queue param) {
return empty; return empty;
} }
void *taosAllocateQitem(int size) { void *taosAllocateQitem(int32_t size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
if (pNode == NULL) return NULL; if (pNode == NULL) return NULL;
@ -129,9 +129,8 @@ void taosFreeQitem(void *param) {
free(temp); free(temp);
} }
int taosWriteQitem(taos_queue param, void *item) { int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
pNode->next = NULL; pNode->next = NULL;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
@ -146,7 +145,7 @@ int taosWriteQitem(taos_queue param, void *item) {
queue->numOfItems++; queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems); uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
@ -155,22 +154,21 @@ int taosWriteQitem(taos_queue param, void *item) {
return 0; return 0;
} }
int taosReadQitem(taos_queue param, void **pitem) { int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int32_t code = 0;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
*pitem = pNode->item; *ppItem = pNode->item;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--; queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1; code = 1;
uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
@ -178,18 +176,13 @@ int taosReadQitem(taos_queue param, void **pitem) {
return code; return code;
} }
void *taosAllocateQall() { STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); }
void *p = calloc(sizeof(STaosQall), 1);
return p;
}
void taosFreeQall(void *param) { free(param); } void taosFreeQall(STaosQall *qall) { free(qall); }
int taosReadAllQitems(taos_queue param, taos_qall p2) { int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
STaosQueue *queue = (STaosQueue *)param; int32_t code = 0;
STaosQall *qall = (STaosQall *)p2; bool empty;
int code = 0;
bool empty;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
@ -219,29 +212,25 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) {
return code; return code;
} }
int taosGetQitem(taos_qall param, void **pitem) { int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode; STaosQnode *pNode;
int num = 0; int32_t num = 0;
pNode = qall->current; pNode = qall->current;
if (pNode) qall->current = pNode->next; if (pNode) qall->current = pNode->next;
if (pNode) { if (pNode) {
*pitem = pNode->item; *ppItem = pNode->item;
num = 1; num = 1;
uTrace("item:%p is fetched", *pitem); uTrace("item:%p is fetched", *ppItem);
} }
return num; return num;
} }
void taosResetQitems(taos_qall param) { void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
STaosQall *qall = (STaosQall *)param;
qall->current = qall->start;
}
taos_qset taosOpenQset() { STaosQset *taosOpenQset() {
STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1);
if (qset == NULL) { if (qset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -255,9 +244,8 @@ taos_qset taosOpenQset() {
return qset; return qset;
} }
void taosCloseQset(taos_qset param) { void taosCloseQset(STaosQset *qset) {
if (param == NULL) return; if (qset == NULL) return;
STaosQset *qset = (STaosQset *)param;
// remove all the queues from qset // remove all the queues from qset
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
@ -279,16 +267,12 @@ void taosCloseQset(taos_qset param) {
// tsem_post 'qset->sem', so that reader threads waiting for it // tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the // resumes execution and return, should only be used to signal the
// thread to exit. // thread to exit.
void taosQsetThreadResume(taos_qset param) { void taosQsetThreadResume(STaosQset *qset) {
STaosQset *qset = (STaosQset *)param;
uDebug("qset:%p, it will exit", qset); uDebug("qset:%p, it will exit", qset);
tsem_post(&qset->sem); tsem_post(&qset->sem);
} }
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1;
if (queue->qset) return -1; if (queue->qset) return -1;
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
@ -309,10 +293,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
return 0; return 0;
} }
void taosRemoveFromQset(taos_qset p1, taos_queue p2) { void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1;
STaosQueue *tqueue = NULL; STaosQueue *tqueue = NULL;
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
@ -353,18 +334,17 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
uTrace("queue:%p is removed from qset:%p", queue, qset); uTrace("queue:%p is removed from qset:%p", queue, qset);
} }
int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessItem *itemFp) { int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) {
STaosQset *qset = (STaosQset *)param;
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int32_t code = 0;
tsem_wait(&qset->sem); tsem_wait(&qset->sem);
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
for (int i = 0; i < qset->numOfQueues; ++i) { for (int32_t i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) qset->current = qset->head; if (qset->current == NULL) qset->current = qset->head;
STaosQueue *queue = qset->current; STaosQueue *queue = qset->current;
if (queue) qset->current = queue->next; if (queue) qset->current = queue->next;
@ -375,7 +355,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
*pitem = pNode->item; *ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle; if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp; if (itemFp) *itemFp = queue->itemFp;
queue->head = pNode->next; queue->head = pNode->next;
@ -383,7 +363,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
queue->numOfItems--; queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1); atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1; code = 1;
uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
@ -395,18 +375,15 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces
return code; return code;
} }
int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessItems *itemsFp) { int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) {
STaosQset *qset = (STaosQset *)param;
STaosQueue *queue; STaosQueue *queue;
STaosQall *qall = (STaosQall *)p2; int32_t code = 0;
int code = 0;
tsem_wait(&qset->sem); tsem_wait(&qset->sem);
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
for(int i=0; i<qset->numOfQueues; ++i) { for (int32_t i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) if (qset->current == NULL) qset->current = qset->head;
qset->current = qset->head;
queue = qset->current; queue = qset->current;
if (queue) qset->current = queue->next; if (queue) qset->current = queue->next;
if (queue == NULL) break; if (queue == NULL) break;
@ -427,34 +404,32 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FPr
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int j=1; j<qall->numOfItems; ++j) tsem_wait(&qset->sem); for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
if (code != 0) break; if (code != 0) break;
} }
pthread_mutex_unlock(&qset->mutex); pthread_mutex_unlock(&qset->mutex);
return code; return code;
} }
int taosGetQueueItemsNumber(taos_queue param) { int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
STaosQueue *queue = (STaosQueue *)param;
if (!queue) return 0; if (!queue) return 0;
int num; int32_t num;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
num = queue->numOfItems; num = queue->numOfItems;
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
return num; return num;
} }
int taosGetQsetItemsNumber(taos_qset param) { int32_t taosGetQsetItemsNumber(STaosQset *qset) {
STaosQset *qset = (STaosQset *)param;
if (!qset) return 0; if (!qset) return 0;
int num = 0; int32_t num = 0;
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
num = qset->numOfItems; num = qset->numOfItems;
pthread_mutex_unlock(&qset->mutex); pthread_mutex_unlock(&qset->mutex);

View File

@ -85,9 +85,9 @@ static void *tWorkerThreadFp(SWorker *worker) {
return NULL; return NULL;
} }
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
taos_queue queue = taosOpenQueue(); STaosQueue *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
return NULL; return NULL;
@ -121,7 +121,7 @@ taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp)
return queue; return queue;
} }
void tWorkerFreeQueue(SWorkerPool *pool, void *queue) { void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue); taosCloseQueue(queue);
uDebug("worker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }
@ -195,11 +195,11 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) {
return NULL; return NULL;
} }
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
SMWorker *worker = pool->workers + pool->nextId; SMWorker *worker = pool->workers + pool->nextId;
taos_queue *queue = taosOpenQueue(); STaosQueue *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
return NULL; return NULL;
@ -250,7 +250,7 @@ taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems f
return queue; return queue;
} }
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) { void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue); taosCloseQueue(queue);
uDebug("worker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }