From 18b121c69cdb6cfcb3950d1996c770ab4e1c6433 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Mar 2023 13:45:43 +0800 Subject: [PATCH] fix(tmq): fix the invalid write and do some internal refactor. --- source/client/src/clientTmq.c | 17 +++--- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 83 +++--------------------------- source/dnode/vnode/src/tq/tqRead.c | 11 ++-- utils/test/c/tmqSim.c | 15 +++--- 5 files changed, 31 insertions(+), 97 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index af819bf20f..77bbd0be1a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,7 +24,7 @@ #include "tref.h" #include "ttimer.h" -#define EMPTY_BLOCK_POLL_IDLE_DURATION 100 +#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 struct SMqMgmt { @@ -148,7 +148,8 @@ typedef struct { typedef struct { int8_t tmqRspType; - int32_t epoch; + int32_t epoch; // epoch can be used to guard the vgHandle + int32_t vgId; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; uint64_t reqId; @@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->topicHandle = pTopic; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; + pRspWrapper->vgId = pVg->vgId; pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_RSP) { @@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch, + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } } else { - SMqClientVg* pVg = pollRspWrapper->vgHandle; tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime); + tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout); #if 0 tmqHandleAllDelayedTask(tmq); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ee2bd007ce..c2b38f5cd1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1ee799a8cf..1feabcf9f9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -309,70 +309,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } -//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { -//#if 0 -// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); -// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); -// -// if (pRsp->withSchema) { -// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); -// } else { -// A(taosArrayGetSize(pRsp->blockSchema) == 0); -// } -// -// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { -// if (pRsp->blockNum > 0) { -// A(pRsp->rspOffset.version > pRsp->reqOffset.version); -// } else { -// A(pRsp->rspOffset.version >= pRsp->reqOffset.version); -// } -// } -//#endif -// -// int32_t len = 0; -// int32_t code = 0; -// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); -// if (code < 0) { -// return -1; -// } -// -// int32_t tlen = sizeof(SMqRspHead) + len; -// void* buf = rpcMallocCont(tlen); -// if (buf == NULL) { -// terrno = TSDB_CODE_OUT_OF_MEMORY; -// return -1; -// } -// -// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP; -// ((SMqRspHead*)buf)->epoch = pReq->epoch; -// ((SMqRspHead*)buf)->consumerId = pReq->consumerId; -// -// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); -// -// SEncoder encoder = {0}; -// tEncoderInit(&encoder, abuf, len); -// tEncodeSTaosxRsp(&encoder, pRsp); -// tEncoderClear(&encoder); -// -// SRpcMsg rsp = { -// .info = pMsg->info, -// .pCont = buf, -// .contLen = tlen, -// .code = 0, -// }; -// -// tmsgSendRsp(&rsp); -// -// char buf1[80] = {0}; -// char buf2[80] = {0}; -// tFormatOffset(buf1, 80, &pRsp->reqOffset); -// tFormatOffset(buf2, 80, &pRsp->rspOffset); -// -// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", -// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); -// return 0; -//} - static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && pLeft->val.version <= pRight->val.version; @@ -615,9 +551,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* // NOTE: this pHandle->consumerId may have been changed already. tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 - ", ts:%" PRId64, + ", ts:%" PRId64", reqId:0x%"PRIx64, consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, - dataRsp.rspOffset.ts); + dataRsp.rspOffset.ts, pRequest->reqId); tDeleteSMqDataRsp(&dataRsp); return code; @@ -680,17 +616,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* break; } - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); -// if (terrno == 0) { // failed to seek to given ver, but no errors happen. -// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); -// return code; -// } else { // error happens, return to consumers - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; -// } + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; } SWalCont* pHead = &pCkHead->head; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 1c8088be7e..7f9563ae5f 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -183,23 +183,24 @@ end: return tbSuid == realTbSuid; } -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) { int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", - pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64, + pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); *fetchOffset = offset - 1; code = -1; goto END; } - tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset, - TMSG_INFO((*ppCkHead)->head.msgType)); + tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId, + pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId); if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader, ppCkHead); diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index cb7b501298..69debe7ab5 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -688,16 +688,17 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { } static int32_t g_once_commit_flag = 0; -static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { - g_once_commit_flag = 1; - notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); +static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); + + if (0 == g_once_commit_flag) { + g_once_commit_flag = 1; + notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } - char tmpString[128]; - taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); + char tmpString[128]; + taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); } void build_consumer(SThreadInfo* pInfo) {