From 5f4cb41e288740e69a8f69d139af8e11400d5d66 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Mar 2023 23:45:41 +0800 Subject: [PATCH] fix(tmq): set correct start offset value. --- source/client/src/clientTmq.c | 12 ++++++++---- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/dnode/vnode/src/tq/tqScan.c | 2 +- source/libs/executor/src/executor.c | 18 +++++++++++------- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 111ca28cdc..a16ddce0aa 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -535,10 +535,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN atomic_add_fetch_32(&pParamSet->totalRspNum, 1); SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); - tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%" PRId64 " prev:%" PRId64 - ", ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, - tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, - pEp->port, index + 1, totalVgroups, pMsgSendInfo->requestId); + char offsetBuf[80] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->val); + + char commitBuf[80] = {0}; + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->committedOffset); + tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, + tmq->consumerId, pOffset->subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, + totalVgroups, pMsgSendInfo->requestId); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 484a0559a8..d4bdd633e9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -399,8 +399,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand char formatBuf[80]; tFormatOffset(formatBuf, 80, pOffsetVal); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.", - consumerId, pHandle->subKey, vgId, formatBuf); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"PRIx64, + consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId); return 0; } else { // no poll occurs in this vnode for this topic, let's seek to the right offset value. diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 00695e14f4..85a62c4dd1 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -92,7 +92,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("vgId:%d, tmq task start to execute, consumer:0x%" PRIx64, vgId, pHandle->consumerId); + tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId); code = qExecTask(task, &pDataBlock, &ts); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1b71a41586..60463bad5f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1156,6 +1156,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } STableKeyInfo keyInfo = {.uid = uid}; + int64_t oldSkey = pScanBaseInfo->cond.twindows.skey; + + // let's start from the next ts that returned to consumer. + pScanBaseInfo->cond.twindows.skey = ts + 1; + if (pScanBaseInfo->dataReader == NULL) { int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL); @@ -1164,21 +1169,20 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT terrno = code; return -1; } + + qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts %" PRId64 " table index:%d, total:%d, %s", + uid, ts, pScanInfo->currentTable, numOfTables, id); } else { tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); - int64_t oldSkey = pScanBaseInfo->cond.twindows.skey; - - // let's start from the next ts that returned to consumer. - pScanBaseInfo->cond.twindows.skey = ts + 1; tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); - - // restore the key value - pScanBaseInfo->cond.twindows.skey = oldSkey; pScanInfo->scanTimes = 0; qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", uid, ts, pScanInfo->currentTable, numOfTables, id); } + + // restore the key value + pScanBaseInfo->cond.twindows.skey = oldSkey; } else { qError("invalid pOffset->type:%d, %s", pOffset->type, id); return -1;