Merge pull request #16488 from taosdata/feature/TD-14761

fix: cols num error in tmq for query
This commit is contained in:
Shengliang Guan 2022-08-29 19:27:56 +08:00 committed by GitHub
commit fe4d13da3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 13 additions and 9 deletions

View File

@ -88,7 +88,7 @@ typedef struct {
STqExecTb execTb;
STqExecDb execDb;
};
// int32_t numOfCols; // number of out pout column, temporarily used
int32_t numOfCols; // number of out pout column, temporarily used
SSchemaWrapper* pSchemaWrapper; // columns that are involved in query
} STqExecHandle;

View File

@ -596,7 +596,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
req.qmsg = NULL;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, NULL,
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.task);
void* scanner = NULL;

View File

@ -110,7 +110,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
taosArrayPush(pRsp->blockSchema, &pSW);
}
}
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
}else{
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
}
pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) {
continue;

View File

@ -260,7 +260,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, NULL, &handle.execHandle.pSchemaWrapper);
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.task);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.task, &scanner);

View File

@ -177,13 +177,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
// extract the number of output columns
SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
if(numOfCols) *numOfCols = 0;
*numOfCols = 0;
SNode* pNode;
FOREACH(pNode, pDescNode->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
if(numOfCols) ++(*numOfCols);
++(*numOfCols);
}
}

View File

@ -250,15 +250,14 @@ class TDTestCase:
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1()
# self.tmqCase2()
self.tmqCase2()
self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.tmqCase1()
# self.tmqCase2()
self.tmqCase2()
def stop(self):
tdSql.close()