From ee34c3bb3e442786594f5c23fbc2fff17198d16b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Jul 2022 11:20:30 +0800 Subject: [PATCH 1/6] fix(query): add query involved column info --- include/libs/executor/executor.h | 2 +- source/dnode/vnode/src/inc/tq.h | 3 +- source/dnode/vnode/src/tq/tq.c | 3 +- source/dnode/vnode/src/tq/tqMeta.c | 3 +- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executor.c | 3 +- source/libs/executor/src/executorimpl.c | 46 ++++++++++++++++++------- 7 files changed, 43 insertions(+), 18 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 16e1a1c395..65244ec11a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -64,7 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); * @param SReadHandle * @return */ -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper); /** * Set the input data block for the stream scan. diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 07bee22a1f..cde8fd7064 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,7 +88,8 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; - int32_t numOfCols; // number of out pout column, temporarily used + int32_t numOfCols; // number of out pout column, temporarily used + SSchemaWrapper *pSchemaWrapper; // columns that are involved in query } STqExecHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f6862621f9..a9ed954828 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -506,7 +506,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .initTqReader = true, .version = ver, }; - pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); + pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, + &pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 468490350a..83e852c79e 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -93,7 +93,8 @@ int32_t tqMetaOpen(STQ* pTq) { .version = handle.snapshotVer, }; - handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols); + handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, + &handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2cc4058b3b..3665a2539f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -164,6 +164,7 @@ typedef struct { char* dbname; int32_t tversion; SSchemaWrapper* sw; + SSchemaWrapper* qsw; } SSchemaInfo; typedef struct SExecTaskInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0aa50cdb0a..c92ff93cc5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -104,7 +104,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO return code; } -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols) { +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper) { if (msg == NULL) { // TODO create raw scan return NULL; @@ -138,6 +138,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n } } + *pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);; return pTaskInfo; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5d5eded132..2e606ae6de 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4139,32 +4139,52 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea static SArray* extractColumnInfo(SNodeList* pNodeList); -int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { +int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); - int32_t code = metaGetTableEntryByUid(&mr, uid); + int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid); if (code != TSDB_CODE_SUCCESS) { metaReaderClear(&mr); return terrno; } - pTaskInfo->schemaInfo.tablename = strdup(mr.me.name); + SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo; + pSchemaInfo->tablename = strdup(mr.me.name); if (mr.me.type == TSDB_SUPER_TABLE) { - pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; } else if (mr.me.type == TSDB_CHILD_TABLE) { tDecoderClear(&mr.coder); tb_uid_t suid = mr.me.ctbEntry.suid; metaGetTableEntryByUid(&mr, suid); - pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; } else { - pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } metaReaderClear(&mr); + + int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols); + SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + pqSw->pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema)); + pqSw->version = pSchemaInfo->sw->version; + + for(int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*) nodesListGetNode(pScanNode->pScanCols, i); + SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + + for(int32_t j = 0; j < pSchemaInfo->sw->nCols; ++j) { + if (pColNode->colId == pSchemaInfo->sw->pSchema[j].colId) { + pqSw->pSchema[pqSw->nCols++] = pSchemaInfo->sw->pSchema[j]; + break; + } + } + } + + pSchemaInfo->qsw = pqSw; return TSDB_CODE_SUCCESS; } @@ -4175,8 +4195,8 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { } taosMemoryFree(pSchemaInfo->tablename); - taosMemoryFree(pSchemaInfo->sw->pSchema); - taosMemoryFree(pSchemaInfo->sw); + tDeleteSSchemaWrapper(pSchemaInfo->sw); + tDeleteSSchemaWrapper(pSchemaInfo->qsw); } static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { @@ -4377,7 +4397,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo); + code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); if (code) { pTaskInfo->code = terrno; return NULL; @@ -4397,7 +4417,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo); + code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); if (code) { pTaskInfo->code = terrno; return NULL; @@ -4479,7 +4499,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); + code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; From ef0d705c5fceae6148fba404ebf3eeaf8d364732 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Jul 2022 11:27:20 +0800 Subject: [PATCH 2/6] fix(query): close del file read handle. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 6c3d0648e0..648108a2c6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1944,17 +1944,20 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pDelFile) { SDelFReader* pDelFReader = NULL; code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); - if (code) { + if (code != TSDB_CODE_SUCCESS) { goto _err; } SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx)); if (aDelIdx == NULL) { + tsdbDelFReaderClose(&pDelFReader); goto _err; } code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL); - if (code) { + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(aDelIdx); + tsdbDelFReaderClose(&pDelFReader); goto _err; } @@ -1964,6 +1967,8 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pIdx != NULL) { code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(aDelIdx); + tsdbDelFReaderClose(&pDelFReader); goto _err; } } From 2a437ee38acd792509ec28353606ae636043fe7c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Jul 2022 11:35:31 +0800 Subject: [PATCH 3/6] fix(query): close del handle when successing in read del file content. --- source/client/CMakeLists.txt | 4 ++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/source/client/CMakeLists.txt b/source/client/CMakeLists.txt index 0b259169dc..129e20e5de 100644 --- a/source/client/CMakeLists.txt +++ b/source/client/CMakeLists.txt @@ -51,6 +51,6 @@ target_link_libraries( PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom ) -#if(${BUILD_TEST}) +if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) -#endif(${BUILD_TEST}) \ No newline at end of file +endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b121d1b912..0e557d9fa0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1965,11 +1965,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pIdx != NULL) { code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(aDelIdx); - tsdbDelFReaderClose(&pDelFReader); - goto _err; - } + } + + taosArrayDestroy(aDelIdx); + tsdbDelFReaderClose(&pDelFReader); + + if (code != TSDB_CODE_SUCCESS) { + goto _err; } } From 9d565752c49b3003902257623c22d0546e519b7b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Jul 2022 13:51:39 +0800 Subject: [PATCH 4/6] fix(query): extract schema before creating stream scanner. --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/executorimpl.c | 8 ++------ source/libs/executor/src/scanoperator.c | 11 ++++++++--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3665a2539f..b36a5ebdd1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -869,7 +869,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); + SExecTaskInfo* pTaskInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2e6ea3f2b3..b00dc9dba5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -154,7 +154,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n } } - *pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);; + *pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw); return pTaskInfo; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1d9b2235c7..bba1626669 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4440,11 +4440,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - STimeWindowAggSupp twSup = { - .waterMark = pTableScanNode->watermark, - .calTrigger = pTableScanNode->triggerType, - .maxTs = INT64_MIN, - }; if (pHandle->vnode) { int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); @@ -4454,7 +4449,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo, &twSup); + extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 459444de34..d3ab004d9b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1524,7 +1524,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) { + SExecTaskInfo* pTaskInfo) { SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -1538,6 +1538,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pTagCond = pTagCond; + pInfo->twAggSup = (STimeWindowAggSupp){ + .waterMark = pTableScanNode->watermark, + .calTrigger = pTableScanNode->triggerType, + .maxTs = INT64_MIN, + }; + int32_t numOfCols = 0; pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); @@ -1590,7 +1596,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } if (pTSInfo->interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pTwSup->waterMark); + pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pInfo->twAggSup.waterMark); } else { pInfo->pUpdateInfo = NULL; } @@ -1630,7 +1636,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->deleteDataIndex = 0; pInfo->pDeleteDataRes = createPullDataBlock(); pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; - pInfo->twAggSup = *pTwSup; pOperator->name = "StreamScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; From 8e42bf85e899400cecfd2ad4c1a1fe53a5c0c10e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Jul 2022 15:47:22 +0800 Subject: [PATCH 5/6] fix(query): fix border check. --- source/common/src/tdatablock.c | 4 ++++ source/dnode/vnode/src/tsdb/tsdbRead.c | 3 +-- source/libs/executor/src/executorMain.c | 4 ++++ source/libs/executor/src/executorimpl.c | 29 +++++++++++++++---------- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c674728fe6..136d86dac4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1163,9 +1163,13 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; + if (pColumn->varmeta.offset > 0) { + memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows); + } } else { if (pColumn->nullbitmap != NULL) { memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); + memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows); } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0e557d9fa0..bad1037123 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2539,8 +2539,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); - - if (pDumpInfo->rowIndex >= pBlock->nRow) { + if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) { *state = CHECK_FILEBLOCK_CONT; } } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index c3efbf9336..e0020a496e 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -199,6 +199,10 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { void qDestroyTask(qTaskInfo_t qTaskHandle) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle; + if (pTaskInfo == NULL) { + return; + } + qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); queryCostStatis(pTaskInfo); // print the query cost summary diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1924275be3..08f63a17db 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4145,11 +4145,16 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea static SArray* extractColumnInfo(SNodeList* pNodeList); +SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); + int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid); if (code != TSDB_CODE_SUCCESS) { + qError("failed to get the table meta, uid:0x%"PRIx64", suid:0x%"PRIx64 ", %s", pScanNode->uid, pScanNode->suid, + GET_TASKID(pTaskInfo)); + metaReaderClear(&mr); return terrno; } @@ -4173,25 +4178,27 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, metaReaderClear(&mr); + pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode); + return TSDB_CODE_SUCCESS; +} + +SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols); SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); pqSw->pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema)); - pqSw->version = pSchemaInfo->sw->version; for(int32_t i = 0; i < numOfCols; ++i) { - STargetNode* pNode = (STargetNode*) nodesListGetNode(pScanNode->pScanCols, i); + STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - for(int32_t j = 0; j < pSchemaInfo->sw->nCols; ++j) { - if (pColNode->colId == pSchemaInfo->sw->pSchema[j].colId) { - pqSw->pSchema[pqSw->nCols++] = pSchemaInfo->sw->pSchema[j]; - break; - } - } + SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++]; + pSchema->colId = pColNode->colId; + pSchema->type = pColNode->node.resType.type; + pSchema->type = pColNode->node.resType.bytes; + strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); } - pSchemaInfo->qsw = pqSw; - return TSDB_CODE_SUCCESS; + return pqSw; } static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { @@ -4449,7 +4456,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } - extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); + pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); return pOperator; From 0c326410fdf08e72d143eaf6c635d4cf69ebaeef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Jul 2022 16:54:07 +0800 Subject: [PATCH 6/6] fix(query): set value for varchar type in fill and check for null ptr before cleanup. --- source/common/src/tdatablock.c | 4 +- source/libs/executor/src/executorimpl.c | 5 -- source/libs/executor/src/tfill.c | 86 ++++++++++++++----------- 3 files changed, 50 insertions(+), 45 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 136d86dac4..1792a18c07 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1169,7 +1169,9 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { } else { if (pColumn->nullbitmap != NULL) { memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); - memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows); + if (pColumn->pData != NULL) { + memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows); + } } } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 08f63a17db..3b2965c733 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1647,11 +1647,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } -int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) { - int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows); - return pBlock->info.rows; -} - void queryCostStatis(SExecTaskInfo* pTaskInfo) { STaskCostInfo* pSummary = &pTaskInfo->cost; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 31c079e55f..90ffff5faf 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -66,12 +66,32 @@ static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) { static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey); +static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) { + if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + colDataAppend(pDst, rowIndex, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + colDataAppend(pDst, rowIndex, (char*)&v, false); + } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + colDataAppend(pDst, rowIndex, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + colDataAppend(pDst, rowIndex, (const char*)¤tKey, false); + } else { // varchar/nchar data + colDataAppendNULL(pDst, rowIndex); + } +} + static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts, bool outOfBound) { SPoint point1, point2, point; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); -// set the primary timestamp column value + // set the primary timestamp column value int32_t index = pBlock->info.rows; // set the other values @@ -160,30 +180,13 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* } else { // fill with user specified value for each column for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag) /* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) { + if (TSDB_COL_IS_TAG(pCol->flag)) { continue; } SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; - SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); - if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - colDataAppend(pDst, index, (char*)&v, false); - } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { - double v = 0; - GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - colDataAppend(pDst, index, (char*)&v, false); - } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - colDataAppend(pDst, index, (char*)&v, false); - } else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false); - } else { // varchar/nchar data - colDataAppendNULL(pDst, index); - } + doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey); } } @@ -273,7 +276,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t return outputRows; } } else { - assert(pFillInfo->currentKey == ts); + ASSERT(pFillInfo->currentKey == ts); int32_t index = pBlock->info.rows; if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) { @@ -295,27 +298,32 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); char* src = colDataGetData(pSrc, pFillInfo->index); - if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*|| - (pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) { + if (/*i == 0 || (*/!colDataIsNull_s(pSrc, pFillInfo->index)) { bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); colDataAppend(pDst, index, src, isNull); saveColData(pFillInfo->prev, i, src, isNull); - } else { // i > 0 and data is null , do interpolation - if (pFillInfo->type == TSDB_FILL_PREV) { - SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i); - doSetVal(pDst, index, pKey); - } else if (pFillInfo->type == TSDB_FILL_LINEAR) { - bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); - colDataAppend(pDst, index, src, isNull); - saveColData(pFillInfo->prev, i, src, isNull); - } else if (pFillInfo->type == TSDB_FILL_NULL) { - colDataAppendNULL(pDst, index); - } else if (pFillInfo->type == TSDB_FILL_NEXT) { - SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i); - doSetVal(pDst, index, pKey); - } else { - SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; - colDataAppend(pDst, index, (char*)&pVar->i, false); + } else { + if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false); + } else { // i > 0 and data is null , do interpolation + if (pFillInfo->type == TSDB_FILL_PREV) { + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; + SGroupKeys* pKey = taosArrayGet(p, i); + doSetVal(pDst, index, pKey); + } else if (pFillInfo->type == TSDB_FILL_LINEAR) { + bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); + colDataAppend(pDst, index, src, isNull); + saveColData(pFillInfo->prev, i, src, isNull); // todo: + } else if (pFillInfo->type == TSDB_FILL_NULL) { + colDataAppendNULL(pDst, index); + } else if (pFillInfo->type == TSDB_FILL_NEXT) { + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev; + SGroupKeys* pKey = taosArrayGet(p, i); + doSetVal(pDst, index, pKey); + } else { + SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; + doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey); + } } } }