From bf3231ab9fa89ae567597594c4ffafe77e9e8f78 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 Aug 2022 17:30:50 +0800 Subject: [PATCH] fix: cols num error in tmq for query --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqExec.c | 7 ++++++- source/dnode/vnode/src/tq/tqMeta.c | 2 +- source/libs/executor/src/executor.c | 4 ++-- tests/system-test/7-tmq/stbTagFilter-1ctb.py | 5 ++--- 6 files changed, 13 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 7c394c4baf..a97c8ff132 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,7 +88,7 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; -// int32_t numOfCols; // number of out pout column, temporarily used + int32_t numOfCols; // number of out pout column, temporarily used SSchemaWrapper* pSchemaWrapper; // columns that are involved in query } STqExecHandle; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 26db68a1d4..54f764c6b3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -596,7 +596,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe req.qmsg = NULL; pHandle->execHandle.task = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, NULL, + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.task); void* scanner = NULL; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index e21125f3a4..a0b8141cfb 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -110,7 +110,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* taosArrayPush(pRsp->blockSchema, &pSW); } } - tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); + + if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + }else{ + tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); + } pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) { continue; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 6b6717ff57..a192d1f863 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -260,7 +260,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = qCreateQueueExecTaskInfo( - handle.execHandle.execCol.qmsg, &reader, NULL, &handle.execHandle.pSchemaWrapper); + handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.task); void* scanner = NULL; qExtractStreamScanner(handle.execHandle.task, &scanner); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 271a65647d..124f4b44b0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -177,13 +177,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n // extract the number of output columns SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc; - if(numOfCols) *numOfCols = 0; + *numOfCols = 0; SNode* pNode; FOREACH(pNode, pDescNode->pSlots) { SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; if (pSlotDesc->output) { - if(numOfCols) ++(*numOfCols); + ++(*numOfCols); } } diff --git a/tests/system-test/7-tmq/stbTagFilter-1ctb.py b/tests/system-test/7-tmq/stbTagFilter-1ctb.py index 526ff7181e..6cb152342b 100644 --- a/tests/system-test/7-tmq/stbTagFilter-1ctb.py +++ b/tests/system-test/7-tmq/stbTagFilter-1ctb.py @@ -250,15 +250,14 @@ class TDTestCase: tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() - # self.tmqCase2() + self.tmqCase2() self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() - # self.tmqCase2() - + self.tmqCase2() def stop(self): tdSql.close()