From 25542c26c7998641bf62039a35a8edbac318944f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Apr 2023 10:43:13 +0800 Subject: [PATCH 01/20] fix:remove tmPushMsg in consumer --- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqUtil.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8f26d5868c..aca2729731 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -535,7 +535,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_32(&pHandle->epoch, -1); // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); + //tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5ac747947f..81dd8abc3e 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -265,7 +265,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); taosWUnLockLatch(&pTq->lock); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b62bf27def..3c8687fa4d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -447,11 +447,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { + //if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ - vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } + //vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); + //return -1; + //} // commit if need if (needCommit) { From 1061eef1441c56058b7fc4d31df51c7d9e0d157a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Apr 2023 10:49:18 +0800 Subject: [PATCH 02/20] fix:move consumer msg from fetch thread to query thread --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d61eb3ec03..fc724f2b45 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -519,7 +519,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3c8687fa4d..579ef8a952 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -487,11 +487,16 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) { + if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } + if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { + vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); + return 0; + } + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_SCH_QUERY: @@ -499,6 +504,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); + case TDMT_VND_TMQ_CONSUME: + return tqProcessPollReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; @@ -508,17 +515,12 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || - pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && + pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } - if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { - vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); - return 0; - } - switch (pMsg->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: @@ -537,8 +539,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); - case TDMT_VND_TMQ_CONSUME: - return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: From 14fd2e790402c11f0c68d118f685ab6ee87fd626 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Apr 2023 11:44:52 +0800 Subject: [PATCH 03/20] fix:remove lock for consume handler --- source/dnode/vnode/src/tq/tq.c | 22 +++++++++++----------- source/dnode/vnode/src/tq/tqUtil.c | 16 ++++++++-------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aca2729731..bccfe2b9e1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -325,25 +325,25 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // 2. check re-balance status - taosRLockLatch(&pTq->lock); +// taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; - taosRUnLockLatch(&pTq->lock); +// taosRUnLockLatch(&pTq->lock); return -1; } - taosRUnLockLatch(&pTq->lock); +// taosRUnLockLatch(&pTq->lock); // 3. update the epoch value - taosWLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); int32_t savedEpoch = pHandle->epoch; if (savedEpoch < reqEpoch) { tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); pHandle->epoch = reqEpoch; } - taosWUnLockLatch(&pTq->lock); +// taosWUnLockLatch(&pTq->lock); char buf[80]; tFormatOffset(buf, 80, &reqOffset); @@ -358,12 +358,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); - taosWLockLatch(&pTq->lock); - int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); - if (code != 0) { - tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); - } - taosWUnLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); +// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); +// if (code != 0) { +// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); +// } +// taosWUnLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 81dd8abc3e..ab1be15271 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -254,7 +254,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); // lock - taosWLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); @@ -263,12 +263,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - taosWUnLockLatch(&pTq->lock); - return code; - } +// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && +// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { +// //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); +// taosWUnLockLatch(&pTq->lock); +// return code; +// } code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); @@ -281,7 +281,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tFormatOffset(buf, 80, &dataRsp.rspOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); - taosWUnLockLatch(&pTq->lock); +// taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } return code; From 594d68b8a4b86f116f489e927607e25687c356a1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Apr 2023 11:46:59 +0800 Subject: [PATCH 04/20] fix:remove lock for consume handler --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bccfe2b9e1..36fcb35791 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -357,7 +357,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); - + int32_t code = 0; // taosWLockLatch(&pTq->lock); // int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); // if (code != 0) { From 6f94281ab7ebb76b3ccd1b26be87f7e12912bacf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Apr 2023 17:30:58 +0800 Subject: [PATCH 05/20] fix:add log for wal --- include/libs/wal/wal.h | 2 ++ source/libs/wal/src/walRead.c | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b51289de5e..835f786d97 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -149,6 +149,8 @@ typedef struct SWalReader { int64_t capacity; // int8_t curInvalid; // int8_t curStopped; + int64_t bodyCnt; + int64_t bodyTotalSize; TdThreadMutex mutex; SWalFilterCond cond; // TODO remove it diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index dc3ff3e6de..9a46b4af2a 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -262,8 +262,8 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { SWalCont *pReadHead = &pReader->pHead->head; int64_t ver = pReadHead->version; - wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d", pReader->pWal->cfg.vgId, ver, - pReadHead->bodyLen); + wInfo("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total cnt:%"PRId64 ", total size:%"PRId64, pReader->pWal->cfg.vgId, ver, + pReadHead->bodyLen, pReader->bodyCnt, pReader->bodyTotalSize); if (pReader->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); @@ -300,6 +300,8 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver); pReader->curVersion = ver + 1; + pReader->bodyCnt++; + pReader->bodyTotalSize += pReadHead->bodyLen; return 0; } From 4d9d1b520d95b55af544638e69a51affea055ed1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Apr 2023 18:16:57 +0800 Subject: [PATCH 06/20] fix:add log for wal --- source/libs/wal/src/walRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 9a46b4af2a..09b6db6afe 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -262,7 +262,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { SWalCont *pReadHead = &pReader->pHead->head; int64_t ver = pReadHead->version; - wInfo("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total cnt:%"PRId64 ", total size:%"PRId64, pReader->pWal->cfg.vgId, ver, + wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total cnt:%"PRId64 ", total size:%"PRId64, pReader->pWal->cfg.vgId, ver, pReadHead->bodyLen, pReader->bodyCnt, pReader->bodyTotalSize); if (pReader->capacity < pReadHead->bodyLen) { From 77e03bfd782eb5609e596ea69f90b44967cdc351 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 23 Apr 2023 20:14:49 +0800 Subject: [PATCH 07/20] opti:change push mgr to consume msg for subscribe --- source/dnode/mnode/impl/src/mndConsumer.c | 2 - source/dnode/vnode/src/inc/tq.h | 2 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 62 ++++++++++---- source/dnode/vnode/src/tq/tqPush.c | 100 +++++++++++----------- source/dnode/vnode/src/tq/tqUtil.c | 27 +++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 10 +-- 7 files changed, 123 insertions(+), 81 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 65a2fa72a2..ca71e17d7e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -449,7 +449,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); -#if 1 if (status == MQ_CONSUMER_STATUS__LOST_REBD) { mInfo("try to recover consumer:0x%" PRIx64, consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); @@ -463,7 +462,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } -#endif if (status != MQ_CONSUMER_STATUS__READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index acc0d29382..e1b1092c28 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -100,6 +100,7 @@ typedef struct { SWalRef* pRef; STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec + SRpcMsg* msg; } STqHandle; typedef struct { @@ -114,6 +115,7 @@ struct STQ { int64_t walLogLastVer; SRWLatch lock; SHashObj* pPushMgr; // consumerId -> STqPushEntry + SArray * pPushArray; SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 416bc6cdc7..b24cb7e136 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -214,6 +214,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a78239a4b5..ae4a7e1d61 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -71,6 +71,11 @@ static void destroyTqHandle(void* data) { walCloseReader(pData->pWalReader); tqCloseReader(pData->execHandle.pTqReader); } + if(pData->msg != NULL) { + rpcFreeCont(pData->msg->pCont); + taosMemoryFree(pData->msg); + pData->msg = NULL; + } } static void tqPushEntryFree(void* data) { @@ -104,6 +109,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); + pTq->pPushArray = taosArrayInit(8, POINTER_BYTES); + taosInitRWLatch(&pTq->lock); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); @@ -152,6 +159,7 @@ void tqClose(STQ* pTq) { tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); taosMemoryFree(pTq); + taosArrayDestroy(pTq->pPushArray); } void tqNotifyClose(STQ* pTq) { @@ -350,25 +358,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // 2. check re-balance status -// taosRLockLatch(&pTq->lock); + taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; -// taosRUnLockLatch(&pTq->lock); + taosRUnLockLatch(&pTq->lock); return -1; } -// taosRUnLockLatch(&pTq->lock); - - // 3. update the epoch value -// taosWLockLatch(&pTq->lock); - int32_t savedEpoch = pHandle->epoch; - if (savedEpoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, - reqEpoch); - pHandle->epoch = reqEpoch; - } -// taosWUnLockLatch(&pTq->lock); + taosRUnLockLatch(&pTq->lock); char buf[80]; tFormatOffset(buf, 80, &reqOffset); @@ -560,8 +558,20 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_32(&pHandle->epoch, -1); // remove if it has been register in the push manager, and return one empty block to consumer - //tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); - +// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); + for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++) { + void* handle = taosArrayGetP(pTq->pPushArray, i); + if(handle == pHandle) { + tqInfo("vgId:%d remove handle when switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); + taosArrayRemove(pTq->pPushArray, i); + break; + } + } + if(pHandle->msg != NULL) { + rpcFreeCont(pHandle->msg->pCont); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); @@ -1067,6 +1077,28 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + tqDebug("vgId:%d start set submit for subscribe", vgId); + + taosWLockLatch(&pTq->lock); + for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){ + STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i); + if(pHandle->msg == NULL){ + tqError("pHandle->msg should not be null"); + } + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + taosArrayClear(pTq->pPushArray); + // unlock + taosWUnLockLatch(&pTq->lock); + + return 0; +} + int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { #if 0 void* pIter = NULL; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 7a1a6b7454..d2d17792d3 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -268,59 +268,61 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6 } } + int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); - int32_t len = msgLen - sizeof(SSubmitReq2Msg); - int32_t vgId = TD_VID(pTq->pVnode); +// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); +// int32_t len = msgLen - sizeof(SSubmitReq2Msg); +// int32_t vgId = TD_VID(pTq->pVnode); if (msgType == TDMT_VND_SUBMIT) { + tqProcessSubmitReqForSubscribe(pTq); // lock push mgr to avoid potential msg lost - taosWLockLatch(&pTq->lock); - - int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); - if (numOfRegisteredPush > 0) { - tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", - vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); - taosWUnLockLatch(&pTq->lock); - return -1; - } - - memcpy(data, pReq, len); - - SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); - void* pIter = NULL; - - while (1) { - pIter = taosHashIterate(pTq->pPushMgr, pIter); - if (pIter == NULL) { - break; - } - - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - - STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); - if (pHandle == NULL) { - tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, - pPushEntry->subKey); - continue; - } - - STqExecHandle* pExec = &pHandle->execHandle; - doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); - } - - doRemovePushedEntry(cachedKey, pTq); - taosArrayDestroyEx(cachedKey, freeItem); - taosMemoryFree(data); - } - - // unlock - taosWUnLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); +// +// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); +// if (numOfRegisteredPush > 0) { +// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", +// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); +// +// void* data = taosMemoryMalloc(len); +// if (data == NULL) { +// terrno = TSDB_CODE_OUT_OF_MEMORY; +// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); +// taosWUnLockLatch(&pTq->lock); +// return -1; +// } +// +// memcpy(data, pReq, len); +// +// SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); +// void* pIter = NULL; +// +// while (1) { +// pIter = taosHashIterate(pTq->pPushMgr, pIter); +// if (pIter == NULL) { +// break; +// } +// +// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; +// +// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); +// if (pHandle == NULL) { +// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, +// pPushEntry->subKey); +// continue; +// } +// +// STqExecHandle* pExec = &pHandle->execHandle; +// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); +// } +// +// doRemovePushedEntry(cachedKey, pTq); +// taosArrayDestroyEx(cachedKey, freeItem); +// taosMemoryFree(data); +// } +// +// // unlock +// taosWUnLockLatch(&pTq->lock); } tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 128ddedf6d..f76e641f2b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -169,22 +169,29 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - // lock -// taosWLockLatch(&pTq->lock); - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { goto end; } - // till now, all data has been transferred to consumer, new data needs to push client once arrived. -// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && -// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { -// //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); -// taosWUnLockLatch(&pTq->lock); -// return code; -// } +// till now, all data has been transferred to consumer, new data needs to push client once arrived. + if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && + dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { +// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + // lock + taosWLockLatch(&pTq->lock); + if(pHandle->msg != NULL){ + tqError("pHandle->msg should be null"); + } + pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); + memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); + taosArrayPush(pTq->pPushArray, &pHandle); + taosWUnLockLatch(&pTq->lock); + return code; + } code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 579ef8a952..b29081170d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -447,11 +447,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); - //if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ - //vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); - //return -1; - //} + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { +// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ + vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } // commit if need if (needCommit) { From 8572f4a32fa6db44b563d01be8c0ce3aeb271af3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 24 Apr 2023 19:41:30 +0800 Subject: [PATCH 08/20] opti:change push mgr to consume msg for subscribe --- source/client/src/clientTmq.c | 4 ++-- source/common/src/tglobal.c | 8 ++++---- source/common/src/tmsg.c | 6 +++--- source/dnode/vnode/src/tq/tq.c | 12 +++++++----- source/dnode/vnode/src/tq/tqUtil.c | 7 ++++++- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 16a4f55840..f05a314e44 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1702,7 +1702,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, + tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1710,7 +1710,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, + tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; #if 0 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index aa35b298e6..8cd3d7f5ab 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1274,10 +1274,10 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32, false); if (taosMulModeMkDir(tsLogDir, 0777) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - printf("failed to create dir:%s since %s", tsLogDir, terrstr()); - cfgCleanup(pCfg); - return -1; +// terrno = TAOS_SYSTEM_ERROR(errno); +// printf("failed to create dir:%s since %s", tsLogDir, terrstr()); +// cfgCleanup(pCfg); +// return -1; } if (taosInitLog(logname, logFileNum) != 0) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d9802244b7..cd980d028c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5328,9 +5328,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); - SMsgHead *pHead = buf; - pHead->vgId = pReq->head.vgId; - pHead->contLen = pReq->head.contLen; +// SMsgHead *pHead = buf; +// pHead->vgId = pReq->head.vgId; +// pHead->contLen = pReq->head.contLen; SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ae4a7e1d61..73c7075d51 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1084,13 +1084,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { taosWLockLatch(&pTq->lock); for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){ STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i); - if(pHandle->msg == NULL){ + if(ASSERT(pHandle->msg != NULL)){ tqError("pHandle->msg should not be null"); + break; + }else{ + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; } - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen}; - tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; } taosArrayClear(pTq->pPushArray); // unlock diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f76e641f2b..3f92414c34 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -181,15 +181,20 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); // lock taosWLockLatch(&pTq->lock); - if(pHandle->msg != NULL){ + if(ASSERT(pHandle->msg == NULL)){ tqError("pHandle->msg should be null"); + taosWUnLockLatch(&pTq->lock); + goto end; } pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); + pHandle->msg->contLen = pMsg->contLen; + tqError("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); taosArrayPush(pTq->pPushArray, &pHandle); taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); return code; } From 168e6f8936dca0586e2a7a4e57d50310f4bccc9c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 24 Apr 2023 19:44:10 +0800 Subject: [PATCH 09/20] opti:change push mgr to consume msg for subscribe --- source/common/src/tglobal.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8cd3d7f5ab..aa35b298e6 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1274,10 +1274,10 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32, false); if (taosMulModeMkDir(tsLogDir, 0777) != 0) { -// terrno = TAOS_SYSTEM_ERROR(errno); -// printf("failed to create dir:%s since %s", tsLogDir, terrstr()); -// cfgCleanup(pCfg); -// return -1; + terrno = TAOS_SYSTEM_ERROR(errno); + printf("failed to create dir:%s since %s", tsLogDir, terrstr()); + cfgCleanup(pCfg); + return -1; } if (taosInitLog(logname, logFileNum) != 0) { From 1c63408b3e85510e1a372286d7be8c92fc30fb89 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 24 Apr 2023 20:18:20 +0800 Subject: [PATCH 10/20] opti:change push mgr to consume msg for subscribe --- include/libs/wal/wal.h | 2 -- source/client/src/clientTmq.c | 7 ++++--- source/libs/wal/src/walRead.c | 6 ++---- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 835f786d97..b51289de5e 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -149,8 +149,6 @@ typedef struct SWalReader { int64_t capacity; // int8_t curInvalid; // int8_t curStopped; - int64_t bodyCnt; - int64_t bodyTotalSize; TdThreadMutex mutex; SWalFilterCond cond; // TODO remove it diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f05a314e44..9e60f8b04d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1702,7 +1702,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1710,7 +1710,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, + tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; #if 0 @@ -1805,12 +1805,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { " total:%" PRId64 " reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); + pVg->emptyBlockReceiveTs = taosGetTimestampMs(); taosFreeQitem(pollRspWrapper); } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; - + pVg->emptyBlockReceiveTs = 0; tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 09b6db6afe..6154e30938 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -262,8 +262,8 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { SWalCont *pReadHead = &pReader->pHead->head; int64_t ver = pReadHead->version; - wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total cnt:%"PRId64 ", total size:%"PRId64, pReader->pWal->cfg.vgId, ver, - pReadHead->bodyLen, pReader->bodyCnt, pReader->bodyTotalSize); + wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver, + pReadHead->bodyLen); if (pReader->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); @@ -300,8 +300,6 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver); pReader->curVersion = ver + 1; - pReader->bodyCnt++; - pReader->bodyTotalSize += pReadHead->bodyLen; return 0; } From 41bec8560a2587edabe2dd8a55841883a5a77b74 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 25 Apr 2023 09:58:28 +0800 Subject: [PATCH 11/20] opti:change push mgr to consume msg for subscribe --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 73c7075d51..8e35183ebb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -158,8 +158,8 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); - taosMemoryFree(pTq); taosArrayDestroy(pTq->pPushArray); + taosMemoryFree(pTq); } void tqNotifyClose(STQ* pTq) { From d1e5d6e0f992d39986926a3e092f9cde1d67a7cf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 25 Apr 2023 16:23:58 +0800 Subject: [PATCH 12/20] fix:pHandle->msg is not null if rebalance --- source/client/src/clientTmq.c | 5 +++-- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqUtil.c | 11 +++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9e60f8b04d..54e929c9a4 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1363,6 +1363,7 @@ CREATE_MSG_FAIL: typedef struct SVgroupSaveInfo { STqOffsetVal offset; int64_t numOfRows; + int32_t vgStatus; } SVgroupSaveInfo; static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, @@ -1398,7 +1399,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .currentOffset = offsetNew, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, - .vgStatus = TMQ_VG_STATUS__IDLE, + .vgStatus = pInfo != NULL ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, .numOfRows = numOfRows, @@ -1457,7 +1458,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows, .vgStatus = pVgCur->vgStatus}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8e35183ebb..0080feadbe 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1079,11 +1079,11 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); - tqDebug("vgId:%d start set submit for subscribe", vgId); taosWLockLatch(&pTq->lock); for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){ STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i); + tqDebug("vgId:%d start set submit for pHandle:%p", vgId, pHandle); if(ASSERT(pHandle->msg != NULL)){ tqError("pHandle->msg should not be null"); break; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 3f92414c34..663dc8bbb9 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -181,17 +181,16 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); // lock taosWLockLatch(&pTq->lock); - if(ASSERT(pHandle->msg == NULL)){ - tqError("pHandle->msg should be null"); - taosWUnLockLatch(&pTq->lock); - goto end; +// tqDebug("data is over, register to handle:%p, msg:%p", pHandle, pHandle->msg); + if(pHandle->msg == NULL){ + pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); } - pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; - tqError("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + tqDebug("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); taosArrayPush(pTq->pPushArray, &pHandle); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); From 224d87b1313e1474660918d3791b8146b41b76fd Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 26 Apr 2023 17:58:14 +0800 Subject: [PATCH 13/20] fix:cosume null if rebalance --- source/client/src/clientTmq.c | 5 ++--- source/dnode/vnode/src/tq/tq.c | 10 ++++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 54e929c9a4..9e60f8b04d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1363,7 +1363,6 @@ CREATE_MSG_FAIL: typedef struct SVgroupSaveInfo { STqOffsetVal offset; int64_t numOfRows; - int32_t vgStatus; } SVgroupSaveInfo; static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, @@ -1399,7 +1398,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .currentOffset = offsetNew, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, - .vgStatus = pInfo != NULL ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, + .vgStatus = TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, .numOfRows = numOfRows, @@ -1458,7 +1457,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows, .vgStatus = pVgCur->vgStatus}; + SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0080feadbe..7539761f4e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -368,6 +368,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } taosRUnLockLatch(&pTq->lock); + // 3. update the epoch value + taosWLockLatch(&pTq->lock); + int32_t savedEpoch = pHandle->epoch; + if (savedEpoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, + reqEpoch); + pHandle->epoch = reqEpoch; + } + taosWUnLockLatch(&pTq->lock); + char buf[80]; tFormatOffset(buf, 80, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, From 8677b56a4fe606f918d8579243efb8c12b1cc57a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 27 Apr 2023 15:44:44 +0800 Subject: [PATCH 14/20] fix:assert error in tqProcessSubmitReqForSubscribe if put pHandle to array twice --- source/client/src/clientSml.c | 2 +- source/dnode/vnode/src/inc/tq.h | 3 +- source/dnode/vnode/src/tq/tq.c | 36 ++------ source/dnode/vnode/src/tq/tqPush.c | 144 ----------------------------- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 5 files changed, 11 insertions(+), 176 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index cac559b0c1..f727715a54 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -534,7 +534,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL; if (index) { if (colField[*index].type != kv->type) { - uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key); + uError("SML:0x%" PRIx64 " point type and db type mismatch. db type: %d, point type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key); return TSDB_CODE_SML_INVALID_DATA; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e1b1092c28..080e72c504 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -114,8 +114,7 @@ struct STQ { char* path; int64_t walLogLastVer; SRWLatch lock; - SHashObj* pPushMgr; // consumerId -> STqPushEntry - SArray * pPushArray; + SHashObj* pPushMgr; // consumerId -> STqHandle SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7539761f4e..00684652f0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -78,18 +78,6 @@ static void destroyTqHandle(void* data) { } } -static void tqPushEntryFree(void* data) { - STqPushEntry* p = *(void**)data; - if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - tDeleteSMqDataRsp(p->pDataRsp); - } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) { - tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp); - } - - taosMemoryFree(p->pDataRsp); - taosMemoryFree(p); -} - static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && pLeft->val.version <= pRight->val.version; @@ -109,11 +97,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); - pTq->pPushArray = taosArrayInit(8, POINTER_BYTES); - taosInitRWLatch(&pTq->lock); - pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); + pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); @@ -158,7 +143,6 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); - taosArrayDestroy(pTq->pPushArray); taosMemoryFree(pTq); } @@ -569,14 +553,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // remove if it has been register in the push manager, and return one empty block to consumer // tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); - for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++) { - void* handle = taosArrayGetP(pTq->pPushArray, i); - if(handle == pHandle) { - tqInfo("vgId:%d remove handle when switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); - taosArrayRemove(pTq->pPushArray, i); - break; - } - } + taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t)); + if(pHandle->msg != NULL) { rpcFreeCont(pHandle->msg->pCont); taosMemoryFree(pHandle->msg); @@ -1091,8 +1069,9 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); taosWLockLatch(&pTq->lock); - for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){ - STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i); + void *pIter = taosHashIterate(pTq->pPushMgr, NULL); + while(pIter){ + STqHandle* pHandle = *(STqHandle**)pIter; tqDebug("vgId:%d start set submit for pHandle:%p", vgId, pHandle); if(ASSERT(pHandle->msg != NULL)){ tqError("pHandle->msg should not be null"); @@ -1103,8 +1082,9 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { taosMemoryFree(pHandle->msg); pHandle->msg = NULL; } + pIter = taosHashIterate(pTq->pPushMgr, pIter); } - taosArrayClear(pTq->pPushArray); + taosHashClear(pTq->pPushMgr); // unlock taosWUnLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index d2d17792d3..dca988cbbd 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -206,69 +206,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ } #endif -typedef struct { - void* pKey; - int64_t keyLen; -} SItem; - -static void recordPushedEntry(SArray* cachedKey, void* pIter); -static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq); - -static void freeItem(void* param) { - SItem* p = (SItem*)param; - taosMemoryFree(p->pKey); -} - -static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData, - int32_t dataLen, SArray* pCachedKey) { - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - if (pRsp->reqOffset.version >= ver) { - tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, - pRsp->reqOffset.version, ver); - return; - } - - qTaskInfo_t pTaskInfo = pExec->task; - - // prepare scan mem data - SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver}; - - if (qStreamSetScanMemData(pTaskInfo, submit) != 0) { - return; - } - qStreamSetOpen(pTaskInfo); - // here start to scan submit block to extract the subscribed data - int32_t totalRows = 0; - - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) { - tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); - } - - if (pDataBlock == NULL) { - break; - } - - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - pRsp->blockNum++; - totalRows += pDataBlock->info.rows; - } - - tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum, - totalRows); - - if (pRsp->blockNum > 0) { - tqOffsetResetToLog(&pRsp->rspOffset, ver); - tqPushDataRsp(pTq, pPushEntry); - recordPushedEntry(pCachedKey, pIter); - } -} - - int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { // void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); // int32_t len = msgLen - sizeof(SSubmitReq2Msg); @@ -363,84 +300,3 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } - -int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type) { - uint64_t consumerId = pRequest->consumerId; - int32_t vgId = TD_VID(pTq->pVnode); - STqHandle* pTqHandle = pHandle; - - STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); - if (pPushEntry == NULL) { - tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId, - (int32_t)sizeof(STqPushEntry)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pPushEntry->info = pRpcMsg->info; - memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); - - if (type == TMQ_MSG_TYPE__TAOSX_RSP) { - pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp)); - memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp)); - } else if (type == TMQ_MSG_TYPE__POLL_RSP) { - pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp)); - memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp)); - } - - SMqRspHead* pHead = &pPushEntry->pDataRsp->head; - pHead->consumerId = consumerId; - pHead->epoch = pRequest->epoch; - pHead->mqMsgType = type; - - taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*)); - - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", - consumerId, pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr)); - return 0; -} - -int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { - int32_t vgId = TD_VID(pTq->pVnode); - STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen); - - if (pEntry != NULL) { - uint64_t cId = (*pEntry)->pDataRsp->head.consumerId; - ASSERT(consumerId == cId); - - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId, - (*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1); - - if (rspConsumer) { // rsp the old consumer with empty block. - tqPushDataRsp(pTq, *pEntry); - } - - taosHashRemove(pTq->pPushMgr, pKey, keyLen); - } - - return 0; -} - -void recordPushedEntry(SArray* cachedKey, void* pIter) { - size_t kLen = 0; - void* key = taosHashGetKey(pIter, &kLen); - SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen}; - taosArrayPush(cachedKey, &item); -} - -void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - int32_t numOfKeys = (int32_t)taosArrayGetSize(pCachedKeys); - - for (int32_t i = 0; i < numOfKeys; i++) { - SItem* pItem = taosArrayGet(pCachedKeys, i); - if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) { - tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*)pItem->pKey); - } - } - - if (numOfKeys > 0) { - tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr)); - } -} diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 663dc8bbb9..2398ef41f4 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -191,7 +191,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; tqDebug("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); - taosArrayPush(pTq->pPushArray, &pHandle); + taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); return code; From db5b5c828e602ddc3211e2a10af5f5de72bf74b4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 27 Apr 2023 18:35:10 +0800 Subject: [PATCH 15/20] fix:add log for msg push --- source/client/src/clientTmq.c | 6 +++--- source/dnode/vnode/src/tq/tq.c | 34 ++++++++++++++++-------------- source/dnode/vnode/src/tq/tqUtil.c | 5 +++-- utils/test/c/tmqSim.c | 2 +- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 76384fbe6a..33324552dc 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1377,7 +1377,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); - tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); + tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); for (int32_t j = 0; j < vgNumGet; j++) { @@ -1447,14 +1447,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); if (pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur); + tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[80]; tFormatOffset(buf, 80, &pVgCur->currentOffset); - tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, + tscDebug("consumer:0x%" PRIx64 ", doUpdateLocalEp current vg, epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, tmq->epoch, pVgCur->vgId, vgKey, buf); SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 00684652f0..7f082c748b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -553,8 +553,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // remove if it has been register in the push manager, and return one empty block to consumer // tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); - taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t)); - + int32_t ret = taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t)); + tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); if(pHandle->msg != NULL) { rpcFreeCont(pHandle->msg->pCont); taosMemoryFree(pHandle->msg); @@ -1069,22 +1069,24 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); taosWLockLatch(&pTq->lock); - void *pIter = taosHashIterate(pTq->pPushMgr, NULL); - while(pIter){ - STqHandle* pHandle = *(STqHandle**)pIter; - tqDebug("vgId:%d start set submit for pHandle:%p", vgId, pHandle); - if(ASSERT(pHandle->msg != NULL)){ - tqError("pHandle->msg should not be null"); - break; - }else{ - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; - tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; + if(taosHashGetSize(pTq->pPushMgr) > 0){ + void *pIter = taosHashIterate(pTq->pPushMgr, NULL); + while(pIter){ + STqHandle* pHandle = *(STqHandle**)pIter; + tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId); + if(ASSERT(pHandle->msg != NULL)){ + tqError("pHandle->msg should not be null"); + break; + }else{ + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + pIter = taosHashIterate(pTq->pPushMgr, pIter); } - pIter = taosHashIterate(pTq->pPushMgr, pIter); + taosHashClear(pTq->pPushMgr); } - taosHashClear(pTq->pPushMgr); // unlock taosWUnLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 2398ef41f4..da8dc1d379 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -190,8 +190,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; - tqDebug("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); - taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES); + int32_t ret = taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); return code; diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index f2de219f4e..d98a45f0d3 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -232,7 +232,7 @@ void saveConfigToLogFile() { taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); } taosFprintfFile(g_fp, "\n"); - taosFprintfFile(g_fp, " expect rows: %" PRIx64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt); + taosFprintfFile(g_fp, " expect rows: %" PRId64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt); } char tmpString[128]; From 1ee1b0422c710f035f2e2dcfc9d00eac030b1e19 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 27 Apr 2023 23:44:27 +0800 Subject: [PATCH 16/20] fix:change push mgr from SArray to Hash --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 5 ++-- source/dnode/vnode/src/tq/tq.c | 28 ++++++++++------------- source/dnode/vnode/src/tq/tqPush.c | 33 +++++++++++++++++++++++++++ source/dnode/vnode/src/tq/tqUtil.c | 14 +----------- 5 files changed, 49 insertions(+), 33 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 080e72c504..1b29b34073 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -147,7 +147,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); +int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b24cb7e136..7668d45108 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -193,9 +193,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqNotifyClose(STQ*); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); -int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); +int tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg); +int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7f082c748b..53a40eb839 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -98,7 +98,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); taosInitRWLatch(&pTq->lock); - pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); @@ -220,17 +220,19 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData return 0; } -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; - doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); +int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) { + SMqDataRsp dataRsp = {0}; + dataRsp.head.consumerId = pHandle->consumerId; + dataRsp.head.epoch = pHandle->epoch; + dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP); char buf1[80] = {0}; char buf2[80] = {0}; - tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); - tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); + tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); + tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -552,14 +554,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_32(&pHandle->epoch, -1); // remove if it has been register in the push manager, and return one empty block to consumer -// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); - int32_t ret = taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t)); - tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); - if(pHandle->msg != NULL) { - rpcFreeCont(pHandle->msg->pCont); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; - } + tqUnregisterPushHandle(pTq, pHandle); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index dca988cbbd..1ee19b7a7b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -300,3 +300,36 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } + + +int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) { + int32_t vgId = TD_VID(pTq->pVnode); + STqHandle* pHandle = (STqHandle*) handle; + if(pHandle->msg == NULL){ + pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + } + + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); + memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); + pHandle->msg->contLen = pMsg->contLen; + int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + return 0; +} + +int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { + STqHandle *pHandle = (STqHandle*)handle; + int32_t vgId = TD_VID(pTq->pVnode); + + int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); + tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + if(pHandle->msg != NULL) { + tqPushDataRsp(pTq, pHandle); + + rpcFreeCont(pHandle->msg->pCont); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index da8dc1d379..1f06132d2f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -178,21 +178,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { -// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); // lock taosWLockLatch(&pTq->lock); -// tqDebug("data is over, register to handle:%p, msg:%p", pHandle, pHandle->msg); - if(pHandle->msg == NULL){ - pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); - } - - memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); - pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); - memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); - pHandle->msg->contLen = pMsg->contLen; - int32_t ret = taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES); - tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); - + code = tqRegisterPushEntry(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); return code; From cce868d140249866698d80aa5e84c487299279aa Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 28 Apr 2023 09:45:16 +0800 Subject: [PATCH 17/20] fix:offset encode assert error --- source/client/src/clientTmq.c | 5 +++-- source/common/src/tmsg.c | 8 ++------ source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 3 +-- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 33324552dc..0a31eac09c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1790,8 +1790,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - // update the local offset value only for the returned values. - pVg->currentOffset = pDataRsp->rspOffset; + if(pDataRsp->rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pDataRsp->rspOffset; // update the local offset value only for the returned values. + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); char buf[80]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cbf856a799..2639b81c39 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6839,10 +6839,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1; - } else if (pOffsetVal->type < 0) { - // do nothing } else { - ASSERT(0); + // do nothing } return 0; } @@ -6854,10 +6852,8 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1; - } else if (pOffsetVal->type < 0) { - // do nothing } else { - ASSERT(0); + // do nothing } return 0; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1b29b34073..30b2fb74ca 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -114,7 +114,7 @@ struct STQ { char* path; int64_t walLogLastVer; SRWLatch lock; - SHashObj* pPushMgr; // consumerId -> STqHandle + SHashObj* pPushMgr; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 53a40eb839..bd29c8019d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -551,13 +551,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosWLockLatch(&pTq->lock); - atomic_store_32(&pHandle->epoch, -1); + atomic_store_32(&pHandle->epoch, 0); // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_add_fetch_32(&pHandle->epoch, 1); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pTaskInfo); From 18d05ff69fc159e5979695c9477720232ef39fc4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 28 Apr 2023 15:53:56 +0800 Subject: [PATCH 18/20] fix:memory leak --- source/dnode/vnode/src/tq/tqPush.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 1ee19b7a7b..68240195d7 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -307,10 +307,14 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) { STqHandle* pHandle = (STqHandle*) handle; if(pHandle->msg == NULL){ pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); + }else{ + void *tmp = pHandle->msg->pCont; + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = tmp; } - memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); - pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); From 8d84e8f8a54e69e45848ed66c20703204d9350ab Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 28 Apr 2023 19:29:17 +0800 Subject: [PATCH 19/20] fix:[TD-23788] client wait if task status error in taosx transform data --- source/client/src/clientTmq.c | 8 ++++++-- source/dnode/vnode/src/tq/tqUtil.c | 6 ++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0a31eac09c..b5ae9116ef 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1830,7 +1830,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); @@ -1848,7 +1850,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->taosxRsp.blockNum == 0) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 1f06132d2f..1b5e498a7f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -211,6 +211,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); + qTaskInfo_t task = pHandle->execHandle.task; + if(qTaskIsExecuting(task)){ + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { From 3b66e63444f1f9076ed54b23078ba2ae50b44857 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 28 Apr 2023 20:07:54 +0800 Subject: [PATCH 20/20] fix:[TD-23788] client wait if task status error in taosx transform data --- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 1b5e498a7f..d186c63871 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -213,7 +213,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqInitTaosxRsp(&taosxRsp, pRequest); qTaskInfo_t task = pHandle->execHandle.task; if(qTaskIsExecuting(task)){ - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); return code; }