From 50487255b33a79f2d63f6009db00426d8e799c88 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 26 May 2023 19:02:40 +0800 Subject: [PATCH 1/4] fix:put poll to push manager if wal not exist when offset is latest --- source/dnode/vnode/src/inc/tq.h | 1 - source/dnode/vnode/src/tq/tqUtil.c | 61 ++++++----------------------- source/libs/executor/src/executor.c | 3 ++ 3 files changed, 16 insertions(+), 49 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ef36b8429a..e6ba5d8636 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -106,7 +106,6 @@ typedef struct { // STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec SRpcMsg* msg; - int32_t noDataPollCnt; tq_handle_status status; } STqHandle; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 622cc5b049..33d2a238c5 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -16,7 +16,6 @@ #include "tq.h" #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) -#define NO_POLL_CNT 5 static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); @@ -88,15 +87,12 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { return 0; } -static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, - SRpcMsg* pMsg, bool* pBlockReturned) { +static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest) { uint64_t consumerId = pRequest->consumerId; STqOffsetVal reqOffset = pRequest->reqOffset; STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); int32_t vgId = TD_VID(pTq->pVnode); - *pBlockReturned = false; - // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value. if (pOffset != NULL) { *pOffsetVal = pOffset->val; @@ -129,28 +125,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - - tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, - pHandle->subKey, vgId, dataRsp.rspOffset.version); - int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - tDeleteSMqDataRsp(&dataRsp); - - *pBlockReturned = true; - return code; - } else { - STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pRequest); - tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - - *pBlockReturned = true; - return code; - } + // offset set to previous version when init + tqOffsetResetToLog(pOffsetVal, walGetLastVer(pTq->pVnode->pWal)); } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, vgId, pRequest->subKey); @@ -173,25 +149,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); - if(code != 0) { + if(code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { goto end; } // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data - pHandle->noDataPollCnt = 0; - // lock - taosWLockLatch(&pTq->lock); - code = tqRegisterPushHandle(pTq, pHandle, pMsg); - taosWUnLockLatch(&pTq->lock); - tDeleteSMqDataRsp(&dataRsp); - return code; - } - else{ - pHandle->noDataPollCnt++; - } + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST || (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && + dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId)) { + // lock + taosWLockLatch(&pTq->lock); + code = tqRegisterPushHandle(pTq, pHandle, pMsg); + taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); + return code; } // NOTE: this pHandle->consumerId may have been changed already. @@ -326,16 +296,11 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // 1. reset the offset if needed if (IS_OFFSET_RESET_TYPE(reqOffset.type)) { // handle the reset offset cases, according to the consumer's choice. - bool blockReturned = false; - code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); + code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest); if (code != 0) { return code; } - // empty block returned, quit - if (blockReturned) { - return 0; - } } else { // use the consumer specified offset // the offset value can not be monotonious increase?? offset = reqOffset; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a73deffa52..fb0b9957f8 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1080,6 +1080,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT const char* id = GET_TASKID(pTaskInfo); // if pOffset equal to current offset, means continue consume + char buf[80] = {0}; + tFormatOffset(buf, 80, &pTaskInfo->streamInfo.currentOffset); + qDebug("currentOffset:%s", buf); if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) { return 0; } From a3e214b9e8eb24b842ad22b96f09e83bcf9e2632 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 26 May 2023 19:17:04 +0800 Subject: [PATCH 2/4] fix:put poll to push manager if wal not exist when offset is latest --- include/libs/wal/wal.h | 3 ++- source/dnode/vnode/src/tq/tqUtil.c | 12 +++--------- source/libs/executor/src/executor.c | 3 --- source/libs/wal/src/walRef.c | 17 +++++++++-------- 4 files changed, 14 insertions(+), 21 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 7e106eefde..7af218b78e 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -205,7 +205,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); -SWalRef *walRefFirstVer(SWal *, SWalRef *); +void walRefFirstVer(SWal *, SWalRef *); +void walRefLastVer(SWal *, SWalRef *); SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 33d2a238c5..5eaf7b240b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -115,18 +115,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand tqOffsetResetToData(pOffsetVal, 0, 0); } } else { - pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); - if (pHandle->pRef == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - // offset set to previous version when init + walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - // offset set to previous version when init - tqOffsetResetToLog(pOffsetVal, walGetLastVer(pTq->pVnode->pWal)); + walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); + tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, vgId, pRequest->subKey); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fb0b9957f8..a73deffa52 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1080,9 +1080,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT const char* id = GET_TASKID(pTaskInfo); // if pOffset equal to current offset, means continue consume - char buf[80] = {0}; - tFormatOffset(buf, 80, &pTaskInfo->streamInfo.currentOffset); - qDebug("currentOffset:%s", buf); if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) { return 0; } diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 6aba661926..eb36389f1d 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -63,21 +63,22 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { return 0; } -SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) { - if (pRef == NULL) { - pRef = walOpenRef(pWal); - if (pRef == NULL) { - return NULL; - } - } +void walRefFirstVer(SWal *pWal, SWalRef *pRef) { taosThreadMutexLock(&pWal->mutex); int64_t ver = walGetFirstVer(pWal); pRef->refVer = ver; taosThreadMutexUnlock(&pWal->mutex); wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); +} - return pRef; +void walRefLastVer(SWal *pWal, SWalRef *pRef) { + taosThreadMutexLock(&pWal->mutex); + int64_t ver = walGetLastVer(pWal); + pRef->refVer = ver; + + taosThreadMutexUnlock(&pWal->mutex); + wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver); } SWalRef *walRefCommittedVer(SWal *pWal) { From 5eb1c559e5553f3d6bff2ab71bfd7e11e6d4d1da Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 May 2023 15:51:00 +0800 Subject: [PATCH 3/4] fix:save offset if latest --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/dnode/vnode/src/tq/tqUtil.c | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 17eac7d096..7557ddb4e9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -377,7 +377,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } taosWUnLockLatch(&pTq->lock); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); taosMsleep(10); } @@ -410,7 +410,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { while (tqIsHandleExec(pHandle)) { - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); taosMsleep(10); } if (pHandle->pRef) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5eaf7b240b..e73aed8966 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -121,6 +121,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); + STqOffset offset = {0}; + strcpy(offset.subKey, pRequest->subKey); + if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { + terrno = TSDB_CODE_PAR_INTERNAL_ERROR; + return -1; + } } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, vgId, pRequest->subKey); From 9eb15e41efc8a3b936746151163018420bf17b48 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 May 2023 16:44:03 +0800 Subject: [PATCH 4/4] fix:put poll to push manager if wal not exist when offset is latest --- source/dnode/vnode/src/tq/tqUtil.c | 40 ++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index e73aed8966..d30ed638f1 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -87,12 +87,13 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { return 0; } -static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest) { +static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) { uint64_t consumerId = pRequest->consumerId; STqOffsetVal reqOffset = pRequest->reqOffset; STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); int32_t vgId = TD_VID(pTq->pVnode); + *pBlockReturned = false; // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value. if (pOffset != NULL) { *pOffsetVal = pOffset->val; @@ -120,12 +121,27 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); - tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); - STqOffset offset = {0}; - strcpy(offset.subKey, pRequest->subKey); - if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { - terrno = TSDB_CODE_PAR_INTERNAL_ERROR; - return -1; + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); + + tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, + pHandle->subKey, vgId, dataRsp.rspOffset.version); + int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + tDeleteSMqDataRsp(&dataRsp); + + *pBlockReturned = true; + return code; + } else { + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pRequest); + tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer); + int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + + *pBlockReturned = true; + return code; } } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", @@ -154,8 +170,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST || (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId)) { + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) { // lock taosWLockLatch(&pTq->lock); code = tqRegisterPushHandle(pTq, pHandle, pMsg); @@ -296,11 +311,16 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // 1. reset the offset if needed if (IS_OFFSET_RESET_TYPE(reqOffset.type)) { // handle the reset offset cases, according to the consumer's choice. - code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest); + bool blockReturned = false; + code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); if (code != 0) { return code; } + // empty block returned, quit + if (blockReturned) { + return 0; + } } else { // use the consumer specified offset // the offset value can not be monotonious increase?? offset = reqOffset;