From 1ee1b0422c710f035f2e2dcfc9d00eac030b1e19 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 27 Apr 2023 23:44:27 +0800 Subject: [PATCH] fix:change push mgr from SArray to Hash --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 5 ++-- source/dnode/vnode/src/tq/tq.c | 28 ++++++++++------------- source/dnode/vnode/src/tq/tqPush.c | 33 +++++++++++++++++++++++++++ source/dnode/vnode/src/tq/tqUtil.c | 14 +----------- 5 files changed, 49 insertions(+), 33 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 080e72c504..1b29b34073 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -147,7 +147,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); +int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b24cb7e136..7668d45108 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -193,9 +193,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqNotifyClose(STQ*); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); -int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); +int tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg); +int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7f082c748b..53a40eb839 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -98,7 +98,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); taosInitRWLatch(&pTq->lock); - pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); @@ -220,17 +220,19 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData return 0; } -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; - doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); +int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) { + SMqDataRsp dataRsp = {0}; + dataRsp.head.consumerId = pHandle->consumerId; + dataRsp.head.epoch = pHandle->epoch; + dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP); char buf1[80] = {0}; char buf2[80] = {0}; - tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); - tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); + tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); + tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -552,14 +554,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_32(&pHandle->epoch, -1); // remove if it has been register in the push manager, and return one empty block to consumer -// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); - int32_t ret = taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t)); - tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); - if(pHandle->msg != NULL) { - rpcFreeCont(pHandle->msg->pCont); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; - } + tqUnregisterPushHandle(pTq, pHandle); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index dca988cbbd..1ee19b7a7b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -300,3 +300,36 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } + + +int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) { + int32_t vgId = TD_VID(pTq->pVnode); + STqHandle* pHandle = (STqHandle*) handle; + if(pHandle->msg == NULL){ + pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + } + + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); + memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); + pHandle->msg->contLen = pMsg->contLen; + int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + return 0; +} + +int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { + STqHandle *pHandle = (STqHandle*)handle; + int32_t vgId = TD_VID(pTq->pVnode); + + int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); + tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + if(pHandle->msg != NULL) { + tqPushDataRsp(pTq, pHandle); + + rpcFreeCont(pHandle->msg->pCont); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index da8dc1d379..1f06132d2f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -178,21 +178,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { -// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); // lock taosWLockLatch(&pTq->lock); -// tqDebug("data is over, register to handle:%p, msg:%p", pHandle, pHandle->msg); - if(pHandle->msg == NULL){ - pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); - } - - memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); - pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); - memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); - pHandle->msg->contLen = pMsg->contLen; - int32_t ret = taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES); - tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); - + code = tqRegisterPushEntry(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); return code;