From 139a4c7fb1e8f403f6e40df0f4e2bdab757c9346 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 29 Oct 2021 17:11:15 +0800 Subject: [PATCH] adjust tqueue codes --- include/util/tqueue.h | 21 +-- include/util/tworker.h | 54 +++---- source/server/mnode/src/mnodeWorker.c | 116 +++++++++------- source/server/vnode/inc/vnodeWrite.h | 7 +- source/server/vnode/src/vnodeInt.c | 20 +-- source/server/vnode/src/vnodeMgmt.c | 24 ++-- source/server/vnode/src/vnodeRead.c | 42 +++--- source/server/vnode/src/vnodeWrite.c | 193 ++++++++++++-------------- source/util/src/tqueue.c | 168 +++++++++++----------- source/util/src/tworker.c | 64 ++++----- 10 files changed, 339 insertions(+), 370 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index cd897435fa..dc16222c2b 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -37,21 +37,24 @@ shall be used to set up the protection. */ -typedef void* taos_queue; -typedef void* taos_qset; -typedef void* taos_qall; +typedef void *taos_queue; +typedef void *taos_qset; +typedef void *taos_qall; +typedef void *(*FProcessOneItem)(void *pItem, void *ahandle); +typedef void *(*FProcessAllItem)(taos_qall qall, int numOfItems, void *ahandle); taos_queue taosOpenQueue(); void taosCloseQueue(taos_queue); +void taosSetQueueFp(taos_queue, FProcessOneItem, FProcessAllItem fp); void *taosAllocateQitem(int size); -void taosFreeQitem(void *item); -int taosWriteQitem(taos_queue, int type, void *item); -int taosReadQitem(taos_queue, int *type, void **pitem); +void taosFreeQitem(void *pItem); +int taosWriteQitem(taos_queue, void *pItem); +int taosReadQitem(taos_queue, void **pItem); taos_qall taosAllocateQall(); void taosFreeQall(taos_qall); int taosReadAllQitems(taos_queue, taos_qall); -int taosGetQitem(taos_qall, int *type, void **pitem); +int taosGetQitem(taos_qall, void **pItem); void taosResetQitems(taos_qall); taos_qset taosOpenQset(); @@ -61,8 +64,8 @@ int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); void taosRemoveFromQset(taos_qset, taos_queue); int taosGetQueueNumber(taos_qset); -int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle); -int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle); +int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessOneItem *); +int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessAllItem *); int taosGetQueueItemsNumber(taos_queue param); int taosGetQsetItemsNumber(taos_qset param); diff --git a/include/util/tworker.h b/include/util/tworker.h index 47ad0bd9e7..591ebf8967 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -22,13 +22,6 @@ extern "C" { #endif -typedef int32_t (*ProcessStartFp)(void *ahandle, void *pMsg, int32_t qtype); -typedef void (*ProcessEndFp)(void *ahandle, void *pMsg, int32_t qtype, int32_t code); - -typedef bool (*ProcessWriteStartFp)(void *ahandle, void *pMsg, int32_t qtype); -typedef void (*ProcessWriteSyncFp)(void *ahandle, int32_t code); -typedef void (*ProcessWriteEndFp)(void *ahandle, void *pMsg, int32_t qtype); - typedef struct SWorker { int32_t id; // worker ID pthread_t thread; // thread @@ -40,41 +33,36 @@ typedef struct SWorkerPool { int32_t min; // min number of workers int32_t num; // current number of workers taos_qset qset; - const char * name; - ProcessStartFp startFp; - ProcessEndFp endFp; - SWorker * workers; + const char *name; + SWorker *workers; pthread_mutex_t mutex; } SWorkerPool; -typedef struct SWriteWorker { - int32_t id; // worker id - pthread_t thread; // thread - taos_qall qall; - taos_qset qset; // queue set - struct SWriteWorkerPool *pool; -} SWriteWorker; +typedef struct SMWorker { + int32_t id; // worker id + pthread_t thread; // thread + taos_qall qall; + taos_qset qset; // queue set + struct SMWorkerPool *pool; +} SMWorker; -typedef struct SWriteWorkerPool { - int32_t max; // max number of workers - int32_t nextId; // from 0 to max-1, cyclic - const char * name; - ProcessWriteStartFp startFp; - ProcessWriteSyncFp syncFp; - ProcessWriteEndFp endFp; - SWriteWorker * workers; - pthread_mutex_t mutex; -} SWriteWorkerPool; +typedef struct SMWorkerPool { + int32_t max; // max number of workers + int32_t nextId; // from 0 to max-1, cyclic + const char *name; + SMWorker *workers; + pthread_mutex_t mutex; +} SMWorkerPool; int32_t tWorkerInit(SWorkerPool *pool); void tWorkerCleanup(SWorkerPool *pool); -taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle); +taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessOneItem fp); void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue); -int32_t tWriteWorkerInit(SWriteWorkerPool *pool); -void tWriteWorkerCleanup(SWriteWorkerPool *pool); -taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle); -void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue); +int32_t tMWorkerInit(SMWorkerPool *pool); +void tMWorkerCleanup(SMWorkerPool *pool); +taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessAllItem fp); +void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue); #ifdef __cplusplus } diff --git a/source/server/mnode/src/mnodeWorker.c b/source/server/mnode/src/mnodeWorker.c index 84beddb2da..fc370538aa 100644 --- a/source/server/mnode/src/mnodeWorker.c +++ b/source/server/mnode/src/mnodeWorker.c @@ -28,10 +28,10 @@ static struct { SWorkerPool write; SWorkerPool peerReq; SWorkerPool peerRsp; - taos_queue readQ; - taos_queue writeQ; - taos_queue peerReqQ; - taos_queue peerRspQ; + taos_queue readQ; + taos_queue writeQ; + taos_queue peerReqQ; + taos_queue peerRspQ; int32_t (*writeMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); int32_t (*readMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); int32_t (*peerReqFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); @@ -81,7 +81,7 @@ static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) { rpcSendResponse(&rpcRsp); } else { mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); - taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg); + taosWriteQitem(tsMworker.writeQ, pMsg); } } @@ -93,7 +93,7 @@ void mnodeReDispatchToWriteQueue(SMnMsg *pMsg) { mnodeSendRedirectMsg(&pMsg->rpcMsg, true); mnodeCleanupMsg(pMsg); } else { - taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg); + taosWriteQitem(tsMworker.writeQ, pMsg); } } @@ -107,7 +107,7 @@ static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) { rpcSendResponse(&rpcRsp); } else { mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); - taosWriteQitem(tsMworker.readQ, TAOS_QTYPE_RPC, pMsg); + taosWriteQitem(tsMworker.readQ, pMsg); } } @@ -125,7 +125,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) { } else { mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); - taosWriteQitem(tsMworker.peerReqQ, TAOS_QTYPE_RPC, pMsg); + taosWriteQitem(tsMworker.peerReqQ, pMsg); } } @@ -140,13 +140,13 @@ void mnodeDispatchToPeerRspQueue(SRpcMsg *pRpcMsg) { } else { mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); - taosWriteQitem(tsMworker.peerRspQ, TAOS_QTYPE_RPC, pMsg); + taosWriteQitem(tsMworker.peerRspQ, pMsg); } // rpcFreeCont(pRpcMsg->pCont); } -static void mnodeSendRpcRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) { +void mnodeSendRsp(SMnMsg *pMsg, int32_t code) { if (pMsg == NULL) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { @@ -155,22 +155,16 @@ static void mnodeSendRpcRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t } SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pMsg->rpcRsp.rsp, - .contLen = pMsg->rpcRsp.len, - .code = code, + .handle = pMsg->rpcMsg.handle, + .pCont = pMsg->rpcRsp.rsp, + .contLen = pMsg->rpcRsp.len, + .code = code, }; rpcSendResponse(&rpcRsp); mnodeCleanupMsg(pMsg); } -void mnodeSendRsp(SMnMsg *pMsg, int32_t code) { mnodeSendRpcRsp(NULL, pMsg, 0, code); } - -static void mnodeProcessPeerRspEnd(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) { - mnodeCleanupMsg(pMsg); -} - static void mnodeInitMsgFp() { // // peer req // tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeDispatchToPeerQueue; @@ -290,13 +284,15 @@ static void mnodeInitMsgFp() { // tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessKillConnectionMsg; } -static int32_t mnodeProcessWriteReq(void *unused, SMnMsg *pMsg, int32_t qtype) { +static void mnodeProcessWriteReq(SMnMsg *pMsg, void *unused) { int32_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; + void *ahandle = pMsg->rpcMsg.ahandle; + int32_t code = 0; if (pMsg->rpcMsg.pCont == NULL) { mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); - return TSDB_CODE_MND_INVALID_MSG_LEN; + code = TSDB_CODE_MND_INVALID_MSG_LEN; + goto PROCESS_WRITE_REQ_END; } if (!mnodeIsMaster()) { @@ -309,31 +305,39 @@ static int32_t mnodeProcessWriteReq(void *unused, SMnMsg *pMsg, int32_t qtype) { mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], epSet->numOfEps, epSet->inUse); - return TSDB_CODE_RPC_REDIRECT; + code = TSDB_CODE_RPC_REDIRECT; + goto PROCESS_WRITE_REQ_END; } if (tsMworker.writeMsgFp[msgType] == NULL) { mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); - return TSDB_CODE_MND_MSG_NOT_PROCESSED; + code = TSDB_CODE_MND_MSG_NOT_PROCESSED; + goto PROCESS_WRITE_REQ_END; } - return (*tsMworker.writeMsgFp[msgType])(pMsg); + code = (*tsMworker.writeMsgFp[msgType])(pMsg); + +PROCESS_WRITE_REQ_END: + mnodeSendRsp(pMsg, code); } -static int32_t mnodeProcessReadReq(void* unused, SMnMsg *pMsg, int32_t qtype) { +static void mnodeProcessReadReq(SMnMsg *pMsg, void *unused) { int32_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; + void *ahandle = pMsg->rpcMsg.ahandle; + int32_t code = 0; if (pMsg->rpcMsg.pCont == NULL) { mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]); - return TSDB_CODE_MND_INVALID_MSG_LEN; + code = TSDB_CODE_MND_INVALID_MSG_LEN; + goto PROCESS_READ_REQ_END; } if (!mnodeIsMaster()) { SMnRsp *rpcRsp = &pMsg->rpcRsp; SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); if (!epSet) { - return TSDB_CODE_MND_OUT_OF_MEMORY; + code = TSDB_CODE_MND_OUT_OF_MEMORY; + goto PROCESS_READ_REQ_END; } mnodeGetMnodeEpSetForShell(epSet, true); rpcRsp->rsp = epSet; @@ -341,25 +345,32 @@ static int32_t mnodeProcessReadReq(void* unused, SMnMsg *pMsg, int32_t qtype) { mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], epSet->numOfEps, epSet->inUse); - return TSDB_CODE_RPC_REDIRECT; + code = TSDB_CODE_RPC_REDIRECT; + goto PROCESS_READ_REQ_END; } if (tsMworker.readMsgFp[msgType] == NULL) { mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]); - return TSDB_CODE_MND_MSG_NOT_PROCESSED; + code = TSDB_CODE_MND_MSG_NOT_PROCESSED; + goto PROCESS_READ_REQ_END; } mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]); - return (*tsMworker.readMsgFp[msgType])(pMsg); + code = (*tsMworker.readMsgFp[msgType])(pMsg); + +PROCESS_READ_REQ_END: + mnodeSendRsp(pMsg, code); } -static int32_t mnodeProcessPeerReq(void *unused, SMnMsg *pMsg, int32_t qtype) { +static void mnodeProcessPeerReq(SMnMsg *pMsg, void *unused) { int32_t msgType = pMsg->rpcMsg.msgType; - void * ahandle = pMsg->rpcMsg.ahandle; + void *ahandle = pMsg->rpcMsg.ahandle; + int32_t code = 0; if (pMsg->rpcMsg.pCont == NULL) { mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]); - return TSDB_CODE_MND_INVALID_MSG_LEN; + code = TSDB_CODE_MND_INVALID_MSG_LEN; + goto PROCESS_PEER_REQ_END; } if (!mnodeIsMaster()) { @@ -372,24 +383,29 @@ static int32_t mnodeProcessPeerReq(void *unused, SMnMsg *pMsg, int32_t qtype) { mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], epSet->numOfEps, epSet->inUse); - return TSDB_CODE_RPC_REDIRECT; + code = TSDB_CODE_RPC_REDIRECT; + goto PROCESS_PEER_REQ_END; } if (tsMworker.peerReqFp[msgType] == NULL) { mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]); - return TSDB_CODE_MND_MSG_NOT_PROCESSED; + code = TSDB_CODE_MND_MSG_NOT_PROCESSED; + goto PROCESS_PEER_REQ_END; } - return (*tsMworker.peerReqFp[msgType])(pMsg); + code = (*tsMworker.peerReqFp[msgType])(pMsg); + +PROCESS_PEER_REQ_END: + mnodeSendRsp(pMsg, code); } -static int32_t mnodeProcessPeerRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype) { +static void mnodeProcessPeerRsp(SMnMsg *pMsg, void *unused) { int32_t msgType = pMsg->rpcMsg.msgType; SRpcMsg *pRpcMsg = &pMsg->rpcMsg; if (!mnodeIsMaster()) { mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); - return 0; + mnodeCleanupMsg(pMsg); } if (tsMworker.peerRspFp[msgType]) { @@ -398,7 +414,7 @@ static int32_t mnodeProcessPeerRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype) { mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); } - return 0; + mnodeCleanupMsg(pMsg); } int32_t mnodeInitWorker() { @@ -406,20 +422,16 @@ int32_t mnodeInitWorker() { SWorkerPool *pPool = &tsMworker.write; pPool->name = "mnode-write"; - pPool->startFp = (ProcessStartFp)mnodeProcessWriteReq; - pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp; pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { return TSDB_CODE_MND_OUT_OF_MEMORY; } else { - tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL); + tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)mnodeProcessWriteReq); } pPool = &tsMworker.read; pPool->name = "mnode-read"; - pPool->startFp = (ProcessStartFp)mnodeProcessReadReq; - pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp; pPool->min = 2; pPool->max = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2); pPool->max = MAX(2, pPool->max); @@ -427,31 +439,27 @@ int32_t mnodeInitWorker() { if (tWorkerInit(pPool) != 0) { return TSDB_CODE_MND_OUT_OF_MEMORY; } else { - tsMworker.readQ = tWorkerAllocQueue(pPool, NULL); + tsMworker.readQ = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)mnodeProcessReadReq); } pPool = &tsMworker.peerReq; pPool->name = "mnode-peer-req"; - pPool->startFp = (ProcessStartFp)mnodeProcessPeerReq; - pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp; pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { return TSDB_CODE_MND_OUT_OF_MEMORY; } else { - tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL); + tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)mnodeProcessPeerReq); } pPool = &tsMworker.peerRsp; pPool->name = "mnode-peer-rsp"; - pPool->startFp = (ProcessStartFp)mnodeProcessPeerRsp; - pPool->endFp = (ProcessEndFp)mnodeProcessPeerRspEnd; pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { return TSDB_CODE_MND_OUT_OF_MEMORY; } else { - tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL); + tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)mnodeProcessPeerRsp); } mInfo("mnode worker is initialized"); diff --git a/source/server/vnode/inc/vnodeWrite.h b/source/server/vnode/inc/vnodeWrite.h index 0bb670de5b..8500607694 100644 --- a/source/server/vnode/inc/vnodeWrite.h +++ b/source/server/vnode/inc/vnodeWrite.h @@ -23,11 +23,12 @@ extern "C" { int32_t vnodeInitWrite(); void vnodeCleanupWrite(); + taos_queue vnodeAllocWriteQueue(SVnode *pVnode); void vnodeFreeWriteQueue(taos_queue pQueue); - -void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg); -int32_t vnodeProcessWalMsg(SVnode *pVnode, SWalHead *pHead); +taos_queue vnodeAllocApplyQueue(SVnode *pVnode); +void vnodeFreeApplyQueue(taos_queue pQueue); +void vnodeProcessWriteReq(SRpcMsg *pRpcMsg); void vnodeStartWrite(SVnode *pVnode); void vnodeStopWrite(SVnode *pVnode); diff --git a/source/server/vnode/src/vnodeInt.c b/source/server/vnode/src/vnodeInt.c index 9e1739a68e..4061c04d94 100644 --- a/source/server/vnode/src/vnodeInt.c +++ b/source/server/vnode/src/vnodeInt.c @@ -54,17 +54,17 @@ static void vnodeInitMsgFp() { tsVint.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; - tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; - tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; - tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; - tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; - tsVint.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; - tsVint.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteReq; + tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteReq; + tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteReq; + tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteReq; + tsVint.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteReq; + tsVint.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteReq; // mq related - tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; - tsVint.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; - tsVint.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; - tsVint.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteReq; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteReq; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteReq; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteReq; tsVint.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; // mq related end diff --git a/source/server/vnode/src/vnodeMgmt.c b/source/server/vnode/src/vnodeMgmt.c index e0e76d5b56..dfb3c95b8d 100644 --- a/source/server/vnode/src/vnodeMgmt.c +++ b/source/server/vnode/src/vnodeMgmt.c @@ -194,22 +194,18 @@ static int32_t vnodeProcessAlterStreamReq(SRpcMsg *pMsg) { return TSDB_CODE_VND_MSG_NOT_PROCESSED; } -static int32_t vnodeProcessMgmtStart(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype) { +static void vnodeProcessMgmtReq(SVnMgmtMsg *pMgmt, void *unused) { SRpcMsg *pMsg = &pMgmt->rpcMsg; int32_t msgType = pMsg->msgType; + int32_t code = 0; if (tsVmgmt.msgFp[msgType]) { vTrace("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[msgType]); - return (*tsVmgmt.msgFp[msgType])(pMsg); + code = (*tsVmgmt.msgFp[msgType])(pMsg); } else { vError("msg:%p, ahandle:%p type:%s not processed since no handle", pMgmt, pMsg->ahandle, taosMsg[msgType]); - return TSDB_CODE_DND_MSG_NOT_PROCESSED; + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; } -} - -static void vnodeProcessMgmtEnd(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype, int32_t code) { - SRpcMsg *pMsg = &pMgmt->rpcMsg; - vTrace("msg:%p, is processed, result:%s", pMgmt, tstrerror(code)); SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; rpcSendResponse(&rsp); @@ -235,9 +231,9 @@ static int32_t vnodeWriteToMgmtQueue(SRpcMsg *pMsg) { memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen); if (pMsg->msgType == TSDB_MSG_TYPE_MD_CREATE_VNODE) { - return taosWriteQitem(tsVmgmt.createQueue, TAOS_QTYPE_RPC, pMgmt); + return taosWriteQitem(tsVmgmt.createQueue, pMgmt); } else { - return taosWriteQitem(tsVmgmt.workerQueue, TAOS_QTYPE_RPC, pMgmt); + return taosWriteQitem(tsVmgmt.workerQueue, pMgmt); } } @@ -257,27 +253,23 @@ int32_t vnodeInitMgmt() { SWorkerPool *pPool = &tsVmgmt.createPool; pPool->name = "vnode-mgmt-create"; - pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart; - pPool->endFp = (ProcessEndFp)vnodeProcessMgmtEnd; pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { return TSDB_CODE_VND_OUT_OF_MEMORY; } - tsVmgmt.createQueue = tWorkerAllocQueue(pPool, NULL); + tsVmgmt.createQueue = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)vnodeProcessMgmtReq); pPool = &tsVmgmt.workerPool; pPool->name = "vnode-mgmt-worker"; - pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart; - pPool->endFp = (ProcessEndFp)vnodeProcessMgmtEnd; pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { return TSDB_CODE_VND_OUT_OF_MEMORY; } - tsVmgmt.workerQueue = tWorkerAllocQueue(pPool, NULL); + tsVmgmt.workerQueue = tWorkerAllocQueue(pPool, NULL, (FProcessOneItem)vnodeProcessMgmtReq); vInfo("vmgmt is initialized"); return TSDB_CODE_SUCCESS; diff --git a/source/server/vnode/src/vnodeRead.c b/source/server/vnode/src/vnodeRead.c index 2ca2a81739..ce6348c992 100644 --- a/source/server/vnode/src/vnodeRead.c +++ b/source/server/vnode/src/vnodeRead.c @@ -71,9 +71,9 @@ static int32_t vnodeWriteToRQueue(SVnode *pVnode, void *pCont, int32_t contLen, atomic_add_fetch_32(&pVnode->queuedRMsg, 1); if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { - return taosWriteQitem(pVnode->pFetchQ, qtype, pRead); + return taosWriteQitem(pVnode->pFetchQ, pRead); } else { - return taosWriteQitem(pVnode->pQueryQ, qtype, pRead); + return taosWriteQitem(pVnode->pQueryQ, pRead); } } @@ -136,18 +136,6 @@ static void vnodeInitReadMsgFp() { tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg; } -static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) { - int32_t msgType = pRead->msgType; - if (tsVread.msgFp[msgType] == NULL) { - vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); - return TSDB_CODE_VND_MSG_NOT_PROCESSED; - } else { - vTrace("msg:%p, app:%p type:%s will be processed", pRead, pRead->rpcAhandle, taosMsg[msgType]); - } - - return (*tsVread.msgFp[msgType])(pVnode, pRead); -} - static void vnodeSendReadRsp(SReadMsg *pRead, int32_t code) { SRpcMsg rpcRsp = { .handle = pRead->rpcHandle, @@ -159,8 +147,18 @@ static void vnodeSendReadRsp(SReadMsg *pRead, int32_t code) { rpcSendResponse(&rpcRsp); } -static void vnodeProcessReadEnd(SVnode *pVnode, SReadMsg *pRead, int32_t qtype, int32_t code) { - if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { +static void vnodeProcessReadReq(SReadMsg *pRead, SVnode *pVnode) { + int32_t msgType = pRead->msgType; + int32_t code = 0; + if (tsVread.msgFp[msgType] == NULL) { + vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); + code = TSDB_CODE_VND_MSG_NOT_PROCESSED; + } else { + vTrace("msg:%p, app:%p type:%s will be processed", pRead, pRead->rpcAhandle, taosMsg[msgType]); + code = (*tsVread.msgFp[msgType])(pVnode, pRead); + } + + if (/*qtype == TAOS_QTYPE_RPC && */ code != TSDB_CODE_QRY_NOT_READY) { vnodeSendReadRsp(pRead, code); } else { if (code == TSDB_CODE_QRY_HAS_RSP) { @@ -181,16 +179,12 @@ int32_t vnodeInitRead() { SWorkerPool *pPool = &tsVread.query; pPool->name = "vquery"; - pPool->startFp = (ProcessStartFp)vnodeProcessReadStart; - pPool->endFp = (ProcessEndFp)vnodeProcessReadEnd; pPool->min = (int32_t)threadsForQuery; pPool->max = pPool->min; if (tWorkerInit(pPool) != 0) return -1; pPool = &tsVread.fetch; pPool->name = "vfetch"; - pPool->startFp = (ProcessStartFp)vnodeProcessReadStart; - pPool->endFp = (ProcessEndFp)vnodeProcessReadEnd; pPool->min = MIN(maxFetchThreads, tsNumOfCores); pPool->max = pPool->min; if (tWorkerInit(pPool) != 0) return -1; @@ -205,9 +199,13 @@ void vnodeCleanupRead() { vInfo("vread is closed"); } -taos_queue vnodeAllocQueryQueue(SVnode *pVnode) { return tWorkerAllocQueue(&tsVread.query, pVnode); } +taos_queue vnodeAllocQueryQueue(SVnode *pVnode) { + return tWorkerAllocQueue(&tsVread.query, pVnode, (FProcessOneItem)vnodeProcessReadReq); +} -taos_queue vnodeAllocFetchQueue(SVnode *pVnode) { return tWorkerAllocQueue(&tsVread.fetch, pVnode); } +taos_queue vnodeAllocFetchQueue(SVnode *pVnode) { + return tWorkerAllocQueue(&tsVread.fetch, pVnode, (FProcessOneItem)vnodeProcessReadReq); +} void vnodeFreeQueryQueue(taos_queue pQueue) { tWorkerFreeQueue(&tsVread.query, pQueue); } diff --git a/source/server/vnode/src/vnodeWrite.c b/source/server/vnode/src/vnodeWrite.c index f3258af0bf..119cd29537 100644 --- a/source/server/vnode/src/vnodeWrite.c +++ b/source/server/vnode/src/vnodeWrite.c @@ -32,15 +32,13 @@ typedef struct { } SVnWriteMsg; static struct { - SWriteWorkerPool pool; - int64_t queuedBytes; - int32_t queuedMsgs; + SMWorkerPool pool; + int64_t queuedBytes; + int32_t queuedMsgs; } tsVwrite = {0}; void vnodeStartWrite(SVnode *pVnode) {} -void vnodeStoprite(SVnode *pVnode) {} - -void vnodeWaitWriteCompleted(SVnode *pVnode) { +void vnodeStopWrite(SVnode *pVnode) { while (pVnode->queuedWMsg > 0) { vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg); taosMsleep(10); @@ -86,7 +84,7 @@ static int32_t vnodeWriteToWQueue(SVnode *pVnode, SWalHead *pHead, int32_t qtype atomic_add_fetch_32(&tsVwrite.queuedMsgs, 1); atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->queuedWMsg, 1); - taosWriteQitem(pVnode->pWriteQ, pWrite->qtype, pWrite); + taosWriteQitem(pVnode->pWriteQ, pWrite); return TSDB_CODE_SUCCESS; } @@ -101,11 +99,7 @@ static void vnodeFreeFromWQueue(SVnode *pVnode, SVnWriteMsg *pWrite) { vnodeRelease(pVnode); } -int32_t vnodeProcessWalMsg(SVnode *pVnode, SWalHead *pHead) { - return vnodeWriteToWQueue(pVnode, pHead, TAOS_QTYPE_WAL, NULL); -} - -void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg) { +void vnodeProcessWriteReq(SRpcMsg *pRpcMsg) { int32_t code; SMsgHead *pMsg = pRpcMsg->pCont; @@ -132,109 +126,104 @@ void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg) { rpcFreeCont(pRpcMsg->pCont); } -static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t qtype) { - SWalHead *pHead = &pWrite->walHead; - SVnRsp * pRet = &pWrite->rspRet; - int32_t msgType = pHead->msgType; - - vTrace("vgId:%d, msg:%s will be processed, hver:%" PRIu64, pVnode->vgId, taosMsg[pHead->msgType], pHead->version); - - // write into WAL -#if 0 - pWrite->code = walWrite(pVnode->wal, pHead); - if (pWrite->code < 0) return false; - - - pVnode->version = pHead->version; -#endif - // write data locally - switch (msgType) { - case TSDB_MSG_TYPE_SUBMIT: - pRet->len = sizeof(SSubmitRsp); - pRet->rsp = rpcMallocCont(pRet->len); - pWrite->code = vnodeProcessSubmitReq(pVnode, (void*)pHead->cont, pRet->rsp); - break; - case TSDB_MSG_TYPE_MD_CREATE_TABLE: - pWrite->code = vnodeProcessCreateTableReq(pVnode, (void*)pHead->cont, NULL); - break; - case TSDB_MSG_TYPE_MD_DROP_TABLE: - pWrite->code = vnodeProcessDropTableReq(pVnode, (void*)pHead->cont, NULL); - break; - case TSDB_MSG_TYPE_MD_ALTER_TABLE: - pWrite->code = vnodeProcessAlterTableReq(pVnode, (void*)pHead->cont, NULL); - break; - case TSDB_MSG_TYPE_MD_DROP_STABLE: - pWrite->code = vnodeProcessDropStableReq(pVnode, (void*)pHead->cont, NULL); - break; - case TSDB_MSG_TYPE_UPDATE_TAG_VAL: - pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); - break; - //mq related - case TSDB_MSG_TYPE_MQ_CONNECT: - pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL); - break; - case TSDB_MSG_TYPE_MQ_DISCONNECT: - pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL); - break; - case TSDB_MSG_TYPE_MQ_ACK: - pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL); - break; - case TSDB_MSG_TYPE_MQ_RESET: - pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL); - break; - //mq related end - default: - pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; - break; - } - - if (pWrite->code < 0) return false; - - // update fsync - return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); +static void vnodeProcessWrite(SVnWriteMsg *pWrite, SVnode *pVnode) { } + // SWalHead *pHead = &pWrite->walHead; + // SVnRsp * pRet = &pWrite->rspRet; + // int32_t msgType = pHead->msgType; -static void vnodeFsync(SVnode *pVnode, bool fsync) { -#if 0 - walFsync(pVnode->wal, fsync); -#endif -} + // vTrace("vgId:%d, msg:%s will be processed, hver:%" PRIu64, pVnode->vgId, taosMsg[pHead->msgType], pHead->version); -static void vnodeProcessWriteEnd(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t qtype, int32_t code) { - if (qtype == TAOS_QTYPE_RPC) { - SRpcMsg rpcRsp = { - .handle = pWrite->rpcMsg.handle, - .pCont = pWrite->rspRet.rsp, - .contLen = pWrite->rspRet.len, - .code = pWrite->code, - }; - rpcSendResponse(&rpcRsp); - } else { - if (pWrite->rspRet.rsp) { - rpcFreeCont(pWrite->rspRet.rsp); - } - } - vnodeFreeFromWQueue(pVnode, pWrite); -} + // // write data locally + // switch (msgType) { + // case TSDB_MSG_TYPE_SUBMIT: + // pRet->len = sizeof(SSubmitRsp); + // pRet->rsp = rpcMallocCont(pRet->len); + // pWrite->code = vnodeProcessSubmitReq(pVnode, (void*)pHead->cont, pRet->rsp); + // break; + // case TSDB_MSG_TYPE_MD_CREATE_TABLE: + // pWrite->code = vnodeProcessCreateTableReq(pVnode, (void*)pHead->cont, NULL); + // break; + // case TSDB_MSG_TYPE_MD_DROP_TABLE: + // pWrite->code = vnodeProcessDropTableReq(pVnode, (void*)pHead->cont, NULL); + // break; + // case TSDB_MSG_TYPE_MD_ALTER_TABLE: + // pWrite->code = vnodeProcessAlterTableReq(pVnode, (void*)pHead->cont, NULL); + // break; + // case TSDB_MSG_TYPE_MD_DROP_STABLE: + // pWrite->code = vnodeProcessDropStableReq(pVnode, (void*)pHead->cont, NULL); + // break; + // case TSDB_MSG_TYPE_UPDATE_TAG_VAL: + // pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); + // break; + // //mq related + // case TSDB_MSG_TYPE_MQ_CONNECT: + // pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL); + // break; + // case TSDB_MSG_TYPE_MQ_DISCONNECT: + // pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL); + // break; + // case TSDB_MSG_TYPE_MQ_ACK: + // pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL); + // break; + // case TSDB_MSG_TYPE_MQ_RESET: + // pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL); + // break; + // //mq related end + // default: + // pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; + // break; + // } + + // if (pWrite->code < 0) return false; + + // // update fsync + // return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); + + + // // walFsync(pVnode->wal, fsync); + + +// static void vnodeProcessWriteEnd(SVnWriteMsg *pWrite, SVnode *pVnode) { +// if (qtype == TAOS_QTYPE_RPC) { +// SRpcMsg rpcRsp = { +// .handle = pWrite->rpcMsg.handle, +// .pCont = pWrite->rspRet.rsp, +// .contLen = pWrite->rspRet.len, +// .code = pWrite->code, +// }; +// rpcSendResponse(&rpcRsp); +// } else { +// if (pWrite->rspRet.rsp) { +// rpcFreeCont(pWrite->rspRet.rsp); +// } +// } +// vnodeFreeFromWQueue(pVnode, pWrite); +// } int32_t vnodeInitWrite() { - SWriteWorkerPool *pPool = &tsVwrite.pool; - pPool->name = "vwrite"; + SMWorkerPool *pPool = &tsVwrite.pool; + pPool->name = "vnode-write"; pPool->max = tsNumOfCores; - pPool->startFp = (ProcessWriteStartFp)vnodeProcessWriteStart; - pPool->syncFp = (ProcessWriteSyncFp)vnodeFsync; - pPool->endFp = (ProcessWriteEndFp)vnodeProcessWriteEnd; - if (tWriteWorkerInit(pPool) != 0) return -1; + if (tMWorkerInit(pPool) != 0) return -1; vInfo("vwrite is initialized, max worker %d", pPool->max); return TSDB_CODE_SUCCESS; } void vnodeCleanupWrite() { - tWriteWorkerCleanup(&tsVwrite.pool); + tMWorkerCleanup(&tsVwrite.pool); vInfo("vwrite is closed"); } -taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); } +taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { + return tMWorkerAllocQueue(&tsVwrite.pool, pVnode, NULL); +} -void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } +void vnodeFreeWriteQueue(taos_queue pQueue) { tMWorkerFreeQueue(&tsVwrite.pool, pQueue); } + +taos_queue vnodeAllocApplyQueue(SVnode *pVnode) { + return tMWorkerAllocQueue(&tsVwrite.pool, pVnode, NULL); +} + +void vnodeFreeApplyQueue(taos_queue pQueue) { tMWorkerFreeQueue(&tsVwrite.pool, pQueue); } diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index da1fb1837f..bb0303c04a 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -19,41 +19,41 @@ #include "tqueue.h" typedef struct STaosQnode { - int type; - struct STaosQnode *next; - char item[]; + struct STaosQnode *next; + char item[]; } STaosQnode; typedef struct STaosQueue { - int32_t itemSize; - int32_t numOfItems; - struct STaosQnode *head; - struct STaosQnode *tail; - struct STaosQueue *next; // for queue set - struct STaosQset *qset; // for queue set - void *ahandle; // for queue set - pthread_mutex_t mutex; + int32_t itemSize; + int32_t numOfItems; + struct STaosQnode *head; + struct STaosQnode *tail; + struct STaosQueue *next; // for queue set + struct STaosQset *qset; // for queue set + void *ahandle; // for queue set + FProcessOneItem fpOneItem; + FProcessAllItem fpAllItem; + pthread_mutex_t mutex; } STaosQueue; typedef struct STaosQset { - STaosQueue *head; - STaosQueue *current; - pthread_mutex_t mutex; - int32_t numOfQueues; - int32_t numOfItems; - tsem_t sem; + STaosQueue *head; + STaosQueue *current; + pthread_mutex_t mutex; + int32_t numOfQueues; + int32_t numOfItems; + tsem_t sem; } STaosQset; typedef struct STaosQall { - STaosQnode *current; - STaosQnode *start; - int32_t itemSize; - int32_t numOfItems; -} STaosQall; - + STaosQnode *current; + STaosQnode *start; + int32_t itemSize; + int32_t numOfItems; +} STaosQall; + taos_queue taosOpenQueue() { - - STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1); + STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1); if (queue == NULL) { terrno = TSDB_CODE_COM_OUT_OF_MEMORY; return NULL; @@ -65,6 +65,13 @@ taos_queue taosOpenQueue() { return queue; } +void taosSetQueueFp(taos_queue param, FProcessOneItem fpOneItem, FProcessAllItem fpAllItem) { + if (param == NULL) return; + STaosQueue *queue = (STaosQueue *)param; + queue->fpOneItem = fpOneItem; + queue->fpAllItem = fpAllItem; +} + void taosCloseQueue(taos_queue param) { if (param == NULL) return; STaosQueue *queue = (STaosQueue *)param; @@ -72,17 +79,17 @@ void taosCloseQueue(taos_queue param) { STaosQset *qset; pthread_mutex_lock(&queue->mutex); - STaosQnode *pNode = queue->head; + STaosQnode *pNode = queue->head; queue->head = NULL; qset = queue->qset; pthread_mutex_unlock(&queue->mutex); - if (queue->qset) taosRemoveFromQset(qset, queue); + if (queue->qset) taosRemoveFromQset(qset, queue); while (pNode) { pTemp = pNode; pNode = pNode->next; - free (pTemp); + free(pTemp); } pthread_mutex_destroy(&queue->mutex); @@ -93,7 +100,7 @@ void taosCloseQueue(taos_queue param) { void *taosAllocateQitem(int size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); - + if (pNode == NULL) return NULL; uTrace("item:%p, node:%p is allocated", pNode->item, pNode); return (void *)pNode->item; @@ -108,10 +115,9 @@ void taosFreeQitem(void *param) { free(temp); } -int taosWriteQitem(taos_queue param, int type, void *item) { +int taosWriteQitem(taos_queue param, void *item) { STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); - pNode->type = type; pNode->next = NULL; pthread_mutex_lock(&queue->mutex); @@ -121,12 +127,12 @@ int taosWriteQitem(taos_queue param, int type, void *item) { queue->tail = pNode; } else { queue->head = pNode; - queue->tail = pNode; + queue->tail = pNode; } queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems); + uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -135,7 +141,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { return 0; } -int taosReadQitem(taos_queue param, int *type, void **pitem) { +int taosReadQitem(taos_queue param, void **pitem) { STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = NULL; int code = 0; @@ -143,17 +149,15 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) { pthread_mutex_lock(&queue->mutex); if (queue->head) { - pNode = queue->head; - *pitem = pNode->item; - *type = pNode->type; - queue->head = pNode->next; - if (queue->head == NULL) - queue->tail = NULL; - queue->numOfItems--; - if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); - code = 1; - uDebug("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems); - } + pNode = queue->head; + *pitem = pNode->item; + queue->head = pNode->next; + if (queue->head == NULL) queue->tail = NULL; + queue->numOfItems--; + if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); + code = 1; + uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); + } pthread_mutex_unlock(&queue->mutex); @@ -165,9 +169,7 @@ void *taosAllocateQall() { return p; } -void taosFreeQall(void *param) { - free(param); -} +void taosFreeQall(void *param) { free(param); } int taosReadAllQitems(taos_queue param, taos_qall p2) { STaosQueue *queue = (STaosQueue *)param; @@ -203,33 +205,30 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) { return code; } -int taosGetQitem(taos_qall param, int *type, void **pitem) { +int taosGetQitem(taos_qall param, void **pitem) { STaosQall *qall = (STaosQall *)param; STaosQnode *pNode; int num = 0; pNode = qall->current; - if (pNode) - qall->current = pNode->next; - + if (pNode) qall->current = pNode->next; + if (pNode) { *pitem = pNode->item; - *type = pNode->type; num = 1; - uTrace("item:%p is fetched, type:%d", *pitem, *type); + uTrace("item:%p is fetched", *pitem); } return num; } void taosResetQitems(taos_qall param) { - STaosQall *qall = (STaosQall *)param; + STaosQall *qall = (STaosQall *)param; qall->current = qall->start; } taos_qset taosOpenQset() { - - STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1); + STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); if (qset == NULL) { terrno = TSDB_CODE_COM_OUT_OF_MEMORY; return NULL; @@ -276,7 +275,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { STaosQueue *queue = (STaosQueue *)p2; STaosQset *qset = (STaosQset *)p1; - if (queue->qset) return -1; + if (queue->qset) return -1; pthread_mutex_lock(&qset->mutex); @@ -299,7 +298,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { void taosRemoveFromQset(taos_qset p1, taos_queue p2) { STaosQueue *queue = (STaosQueue *)p2; STaosQset *qset = (STaosQset *)p1; - + STaosQueue *tqueue = NULL; pthread_mutex_lock(&qset->mutex); @@ -313,7 +312,7 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) { tqueue = qset->head->next; while (tqueue) { assert(tqueue->qset); - if (tqueue== queue) { + if (tqueue == queue) { prev->next = tqueue->next; break; } else { @@ -333,29 +332,26 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) { queue->next = NULL; pthread_mutex_unlock(&queue->mutex); } - } - + } + pthread_mutex_unlock(&qset->mutex); uTrace("queue:%p is removed from qset:%p", queue, qset); } -int taosGetQueueNumber(taos_qset param) { - return ((STaosQset *)param)->numOfQueues; -} +int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; } -int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) { +int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessOneItem *fpOneItem) { STaosQset *qset = (STaosQset *)param; STaosQnode *pNode = NULL; int code = 0; - + tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); - for(int i=0; inumOfQueues; ++i) { - if (qset->current == NULL) - qset->current = qset->head; + for (int i = 0; i < qset->numOfQueues; ++i) { + if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; if (queue) qset->current = queue->next; if (queue == NULL) break; @@ -364,18 +360,17 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand pthread_mutex_lock(&queue->mutex); if (queue->head) { - pNode = queue->head; - *pitem = pNode->item; - if (type) *type = pNode->type; - if (phandle) *phandle = queue->ahandle; - queue->head = pNode->next; - if (queue->head == NULL) - queue->tail = NULL; - queue->numOfItems--; - atomic_sub_fetch_32(&qset->numOfItems, 1); - code = 1; - uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, pNode->type, queue->numOfItems); - } + pNode = queue->head; + *pitem = pNode->item; + if (ahandle) *ahandle = queue->ahandle; + if (fpOneItem) *fpOneItem = queue->fpOneItem; + queue->head = pNode->next; + if (queue->head == NULL) queue->tail = NULL; + queue->numOfItems--; + atomic_sub_fetch_32(&qset->numOfItems, 1); + code = 1; + uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); + } pthread_mutex_unlock(&queue->mutex); if (pNode) break; @@ -383,10 +378,10 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand pthread_mutex_unlock(&qset->mutex); - return code; + return code; } -int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { +int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessAllItem *fpAllItem) { STaosQset *qset = (STaosQset *)param; STaosQueue *queue; STaosQall *qall = (STaosQall *)p2; @@ -411,8 +406,9 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { qall->numOfItems = queue->numOfItems; qall->itemSize = queue->itemSize; code = qall->numOfItems; - *phandle = queue->ahandle; - + if (ahandle) *ahandle = queue->ahandle; + if (fpAllItem) *fpAllItem = queue->fpAllItem; + queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index abed265e0b..12f1aac9f9 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -58,11 +58,11 @@ void tWorkerCleanup(SWorkerPool *pool) { } static void *tWorkerThreadFp(SWorker *worker) { - SWorkerPool *pool = worker->pool; + SWorkerPool *pool = worker->pool; + FProcessOneItem fp = NULL; - void * msg = NULL; - void * ahandle = NULL; - int32_t qtype = 0; + void *msg = NULL; + void *ahandle = NULL; int32_t code = 0; taosBlockSIGPIPE(); @@ -70,19 +70,20 @@ static void *tWorkerThreadFp(SWorker *worker) { uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - if (taosReadQitemFromQset(pool->qset, &qtype, (void **)&msg, &ahandle) == 0) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } - code = (*pool->startFp)(ahandle, msg, qtype); - if (pool->endFp) (*pool->endFp)(ahandle, msg, qtype, code); + if (fp) { + (*fp)(msg, ahandle); + } } return NULL; } -taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle) { +taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessOneItem fp) { pthread_mutex_lock(&pool->mutex); taos_queue queue = taosOpenQueue(); if (queue == NULL) { @@ -90,6 +91,7 @@ taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle) { return NULL; } + taosSetQueueFp(queue, fp, NULL); taosAddIntoQset(pool->qset, queue, ahandle); // spawn a thread to process queue @@ -122,14 +124,14 @@ void tWorkerFreeQueue(SWorkerPool *pool, void *queue) { uDebug("worker:%s, queue:%p is freed", pool->name, queue); } -int32_t tWriteWorkerInit(SWriteWorkerPool *pool) { +int32_t tMWorkerInit(SMWorkerPool *pool) { pool->nextId = 0; - pool->workers = calloc(sizeof(SWriteWorker), pool->max); + pool->workers = calloc(sizeof(SMWorker), pool->max); if (pool->workers == NULL) return -1; pthread_mutex_init(&pool->mutex, NULL); for (int32_t i = 0; i < pool->max; ++i) { - SWriteWorker *worker = pool->workers + i; + SMWorker *worker = pool->workers + i; worker->id = i; worker->qall = NULL; worker->qset = NULL; @@ -140,16 +142,16 @@ int32_t tWriteWorkerInit(SWriteWorkerPool *pool) { return 0; } -void tWriteWorkerCleanup(SWriteWorkerPool *pool) { +void tMWorkerCleanup(SMWorkerPool *pool) { for (int32_t i = 0; i < pool->max; ++i) { - SWriteWorker *worker = pool->workers + i; + SMWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { if (worker->qset) taosQsetThreadResume(worker->qset); } } for (int32_t i = 0; i < pool->max; ++i) { - SWriteWorker *worker = pool->workers + i; + SMWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { pthread_join(worker->thread, NULL); taosFreeQall(worker->qall); @@ -163,11 +165,12 @@ void tWriteWorkerCleanup(SWriteWorkerPool *pool) { uInfo("worker:%s is closed", pool->name); } -static void *tWriteWorkerThreadFp(SWriteWorker *worker) { - SWriteWorkerPool *pool = worker->pool; +static void *tWriteWorkerThreadFp(SMWorker *worker) { + SMWorkerPool *pool = worker->pool; + FProcessAllItem fp = NULL; - void * msg = NULL; - void * ahandle = NULL; + void *msg = NULL; + void *ahandle = NULL; int32_t numOfMsgs = 0; int32_t qtype = 0; @@ -176,34 +179,23 @@ static void *tWriteWorkerThreadFp(SWriteWorker *worker) { uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle); + numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); if (numOfMsgs == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); break; } - bool fsync = false; - for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(worker->qall, &qtype, (void **)&msg); - fsync = fsync | (*pool->startFp)(ahandle, msg, qtype); - } - - (*pool->syncFp)(ahandle, fsync); - - // browse all items, and process them one by one - taosResetQitems(worker->qall); - for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(worker->qall, &qtype, (void **)&msg); - (*pool->endFp)(ahandle, msg, qtype); + if (fp) { + (*fp)(worker->qall, numOfMsgs, ahandle); } } return NULL; } -taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle) { +taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessAllItem fp) { pthread_mutex_lock(&pool->mutex); - SWriteWorker *worker = pool->workers + pool->nextId; + SMWorker *worker = pool->workers + pool->nextId; taos_queue *queue = taosOpenQueue(); if (queue == NULL) { @@ -211,6 +203,8 @@ taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle) { return NULL; } + taosSetQueueFp(queue, NULL, fp); + if (worker->qset == NULL) { worker->qset = taosOpenQset(); if (worker->qset == NULL) { @@ -254,7 +248,7 @@ taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle) { return queue; } -void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue) { +void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); }