From 9189f886a9994b13d5ddf0951f003cf2ccee95ee Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 10 Jul 2022 16:34:45 +0800 Subject: [PATCH] feat(tmq): support consume from tsdb then wal --- include/libs/executor/executor.h | 1 + source/dnode/vnode/src/inc/tq.h | 2 ++ source/dnode/vnode/src/tq/tq.c | 3 +++ source/dnode/vnode/src/tq/tqExec.c | 8 +++++++- source/libs/executor/src/scanoperator.c | 3 +++ 5 files changed, 16 insertions(+), 1 deletion(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index cff4b0234c..a95293a5b1 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -36,6 +36,7 @@ typedef struct SReadHandle { void* vnode; void* mnd; SMsgCb* pMsgCb; + int64_t version; bool initMetaReader; bool initTableReader; bool initTqReader; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index f58bbe0d68..8abaac6dff 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -89,6 +89,8 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; + // TODO remove it + int64_t tsdbEndVer; } STqExecHandle; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e0afc6c80f..ae0f7f56a2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -483,6 +483,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { /*for (int32_t i = 0; i < 5; i++) {*/ /*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/ /*}*/ + int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { pHandle->execHandle.execCol.qmsg = req.qmsg; req.qmsg = NULL; @@ -493,6 +494,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .vnode = pTq->pVnode, .initTableReader = true, .initTqReader = true, + .version = ver, }; pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); @@ -501,6 +503,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ASSERT(scanner); pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader[i]); + pHandle->execHandle.tsdbEndVer = ver; } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { for (int32_t i = 0; i < 5; i++) { diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 49cf42b083..3ee274ced1 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -96,6 +96,12 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset } } + if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1); + qStreamPrepareScan(task, pOffset); + continue; + } + void* meta = qStreamExtractMetaMsg(task); if (meta != NULL) { // tq add meta to rsp @@ -107,7 +113,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset ASSERT(pRsp->rspOffset.type != 0); - if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { + if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e42510ceba..f7155b2431 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1493,6 +1493,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle) { SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info; + if (pHandle->version > 0) { + pSTInfo->cond.endVersion = pHandle->version; + } SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); if (pHandle->initTableReader) {