From 70adc0ea3314fa5ca87b4ef530bf2cafd61b7050 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Oct 2022 11:27:33 +0800 Subject: [PATCH] fix(query): fix coverity issues. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 2 +- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 82 +++++++++++--------- source/dnode/vnode/src/tsdb/tsdbRead.c | 44 ++++++++--- source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/src/cachescanoperator.c | 4 +- source/libs/executor/src/dataDeleter.c | 7 +- source/libs/executor/src/executor.c | 6 +- source/libs/executor/src/executorimpl.c | 76 +++++------------- source/libs/executor/src/groupoperator.c | 75 +++++++++++++----- source/libs/executor/src/joinoperator.c | 28 +++++-- source/libs/executor/src/scanoperator.c | 8 +- source/libs/executor/src/sortoperator.c | 20 ++--- source/libs/executor/src/tlinearhash.c | 1 + source/libs/executor/src/tsort.c | 40 ++++++---- 14 files changed, 229 insertions(+), 172 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 20352bb502..ed491ca182 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -194,7 +194,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { - STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); + STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 2f4fdfc5f8..8808f3c50c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -120,44 +120,46 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { return &pInfo->blockData[1]; } - pInfo->currentLoadBlockIndex ^= 1; - if (pIter->pSttBlk != NULL) { // current block not loaded yet - int64_t st = taosGetTimestampUs(); - - SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex]; - - TABLEID id = {0}; - if (pIter->pSttBlk->suid != 0) { - id.suid = pIter->pSttBlk->suid; - } else { - id.uid = pIter->uid; - } - - code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); - if (code != TSDB_CODE_SUCCESS) { - goto _exit; - } - - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); - if (code != TSDB_CODE_SUCCESS) { - goto _exit; - } - - double el = (taosGetTimestampUs() - st) / 1000.0; - pInfo->elapsedTime += el; - pInfo->loadBlocks += 1; - - tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64 - ", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s", - pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el, - idStr); - - pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; - tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr); - - pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; + if (pIter->pSttBlk == NULL) { + return NULL; } + // current block not loaded yet + pInfo->currentLoadBlockIndex ^= 1; + int64_t st = taosGetTimestampUs(); + + SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex]; + + TABLEID id = {0}; + if (pIter->pSttBlk->suid != 0) { + id.suid = pIter->pSttBlk->suid; + } else { + id.uid = pIter->uid; + } + + code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + double el = (taosGetTimestampUs() - st) / 1000.0; + pInfo->elapsedTime += el; + pInfo->loadBlocks += 1; + + tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64 + ", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s", + pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el, + idStr); + + pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; + tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr); + + pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; return &pInfo->blockData[pInfo->currentLoadBlockIndex]; _exit: @@ -259,7 +261,8 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); if (*pIter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -336,7 +339,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1; } + return code; + _exit: + taosMemoryFree(*pIter); return code; } @@ -473,7 +479,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { int32_t iBlockL = pIter->iSttBlk; SBlockData *pBlockData = loadLastBlock(pIter, idStr); - if (pBlockData == NULL && terrno != TSDB_CODE_SUCCESS) { + if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) { goto _exit; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 452e8c5d55..6580951ce0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1047,11 +1047,16 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; } -static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { +static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); if (pBlockInfo != NULL) { STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); - int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); + if (pScanInfo == NULL) { + tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, %s", pBlockInfo->uid, idStr); + return TSDB_CODE_INVALID_PARA; + } + + int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk); } @@ -1135,7 +1140,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte pBlockIter->index = asc ? 0 : (numOfBlocks - 1); cleanupBlockOrderSupporter(&sup); - doSetCurrentBlock(pBlockIter); + doSetCurrentBlock(pBlockIter, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -1175,12 +1180,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte taosMemoryFree(pTree); pBlockIter->index = asc ? 0 : (numOfBlocks - 1); - doSetCurrentBlock(pBlockIter); + doSetCurrentBlock(pBlockIter, pReader->idStr); return TSDB_CODE_SUCCESS; } -static bool blockIteratorNext(SDataBlockIter* pBlockIter) { +static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) { bool asc = ASCENDING_TRAVERSE(pBlockIter->order); int32_t step = asc ? 1 : -1; @@ -1189,7 +1194,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { } pBlockIter->index += step; - doSetCurrentBlock(pBlockIter); + doSetCurrentBlock(pBlockIter, idStr); return true; } @@ -1260,7 +1265,7 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx); } - doSetCurrentBlock(pBlockIter); + doSetCurrentBlock(pBlockIter, ""); return TSDB_CODE_SUCCESS; } @@ -2190,6 +2195,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } static int32_t buildComposedDataBlock(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pResBlock = pReader->pResBlock; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); @@ -2200,6 +2207,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = NULL; if (pBlockInfo != NULL) { pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pBlockScanInfo == NULL) { + code = TSDB_CODE_INVALID_PARA; + tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, total tables:%d, %s", + pBlockInfo->uid, taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + goto _end; + } + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); @@ -2276,7 +2290,7 @@ _end: pResBlock->info.rows, el, pReader->idStr); } - return TSDB_CODE_SUCCESS; + return code; } void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; } @@ -2732,7 +2746,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // current block are exhausted, try the next file block if (pDumpInfo->allDumped) { // try next data block in current file - bool hasNext = blockIteratorNext(&pReader->status.blockIter); + bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr); if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); } else { @@ -3849,8 +3863,14 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { return pReader->pResBlock->pDataBlock; } - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); - STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); + STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pBlockScanInfo == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, + taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + return NULL; + } int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { @@ -3979,7 +3999,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows); pTableBlockInfo->blockRowsHisto[bucketIndex]++; - hasNext = blockIteratorNext(&pStatus->blockIter); + hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr); } else { code = initForFirstBlockInFile(pReader, pBlockIter); if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fc996a6003..f9442bf46b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -955,9 +955,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); @@ -981,9 +979,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, - SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index ebf23ba6bc..1074678efd 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -162,7 +162,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pTableList->map != NULL) { int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); - pInfo->pRes->info.groupId = *groupId; + if (groupId != NULL) { + pInfo->pRes->info.groupId = *groupId; + } } else { ASSERT(taosArrayGetSize(pTableList->pTableList) == 1); STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 55978855d1..e9c3bfe34d 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -227,8 +227,11 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { while (!taosQueueEmpty(pDeleter->pDataBlocks)) { SDataDeleterBuf* pBuf = NULL; taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); - taosMemoryFreeClear(pBuf->pData); - taosFreeQitem(pBuf); + + if (pBuf != NULL) { + taosMemoryFreeClear(pBuf->pData); + taosFreeQitem(pBuf); + } } taosCloseQueue(pDeleter->pDataBlocks); taosThreadMutexDestroy(&pDeleter->mutex); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ba687ab7fc..4f4b6364e9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -358,15 +358,15 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SDataSinkMgtCfg cfg = {.maxDataBlockNum = 10000, .maxDataBlockNumPerQuery = 5000}; code = dsDataSinkMgtInit(&cfg); if (code != TSDB_CODE_SUCCESS) { - qError("failed to dsDataSinkMgtInit, code: %s", tstrerror(code)); + qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str); goto _error; } if (handle) { void* pSinkParam = NULL; code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to createDataSinkParam, code: %s", tstrerror(code)); + if (code != TSDB_CODE_SUCCESS || pSinkParam == NULL) { + qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str); goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8e53cc2c33..a17ba79819 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3047,32 +3047,40 @@ void cleanupExprSupp(SExprSupp* pSupp) { taosMemoryFree(pSupp->rowEntryInfoOffset); } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); + initBasicInfo(&pInfo->binfo, pResBlock); + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + if (pAggNode->pExprs != NULL) { + pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + } + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initBasicInfo(&pInfo->binfo, pResultBlock); code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->binfo.mergeResultBlock = mergeResult; + pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupId = UINT64_MAX; - pInfo->pCondition = pCondition; + pInfo->pCondition = pAggNode->node.pConditions; pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->blocking = true; @@ -3331,8 +3339,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT return pTaskInfo; } -static SArray* extractColumnInfo(SNodeList* pNodeList); - SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { @@ -3709,22 +3715,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - - int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = NULL; - if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); - } - if (pAggNode->pGroupKeys != NULL) { - SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); - pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, - pScalarExprInfo, numOfScalarExpr, pTaskInfo); + pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo); } else { - pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, - pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo); + pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -3814,39 +3808,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOptr; } -SArray* extractColumnInfo(SNodeList* pNodeList) { - size_t numOfCols = LIST_LENGTH(pNodeList); - SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); - if (pList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - for (int32_t i = 0; i < numOfCols; ++i) { - STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); - - if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { - SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - - SColumn c = extractColumnFromColumnNode(pColNode); - taosArrayPush(pList, &c); - } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) { - SValueNode* pValNode = (SValueNode*)pNode->pExpr; - SColumn c = {0}; - c.slotId = pNode->slotId; - c.colId = pNode->slotId; - c.type = pValNode->node.type; - c.bytes = pValNode->node.resType.bytes; - c.scale = pValNode->node.resType.scale; - c.precision = pValNode->node.resType.precision; - - taosArrayPush(pList, &c); - } - } - - return pList; -} - static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -4070,6 +4031,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead (*pTaskInfo)->sql = sql; sql = NULL; + (*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 4ca4ef7b3f..60b11b7326 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -30,6 +30,7 @@ static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDa static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup); +static SArray* extractColumnInfo(SNodeList* pNodeList); static void freeGroupKey(void* param) { SGroupKeys* pKey = (SGroupKeys*)param; @@ -61,7 +62,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pGroupColList, i); + SColumn* pCol = (SColumn*) taosArrayGet(pGroupColList, i); (*keyLen) += pCol->bytes; // actual data + null_flag SGroupKeys key = {0}; @@ -396,41 +397,48 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { return buildGroupResultDataBlock(pOperator); } -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, - SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo) { SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pGroupCols = pGroupColList; - pInfo->pCondition = pCondition; + SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); + initBasicInfo(&pInfo->binfo, pResBlock); + + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + if (pAggNode->pExprs != NULL) { + pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + } + + pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys); + pInfo->pCondition = pAggNode->node.pConditions; int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; } initResultSizeInfo(&pOperator->resultInfo, 4096); - code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); + code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initBasicInfo(&pInfo->binfo, pResultBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); pOperator->name = "GroupbyAggOperator"; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; - // pOperator->operatorType = OP_Groupby; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -451,8 +459,6 @@ _error: } static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { - // SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SPartitionOperatorInfo* pInfo = pOperator->info; for (int32_t j = 0; j < pBlock->info.rows; ++j) { @@ -760,7 +766,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); - pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); if (pPartNode->pExprs != NULL) { @@ -781,14 +786,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition uint32_t defaultPgsz = 0; uint32_t defaultBufsz = 0; - SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); - getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz); + pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); + getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; pTaskInfo->code = terrno; qError("Create partition operator info failed since %s", terrstr(terrno)); - blockDataDestroy(pResBlock); goto _error; } @@ -797,8 +801,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition goto _error; } - pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf)); - pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity); + pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf)); + pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -808,7 +812,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pInfo->binfo.pRes = pResBlock; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->info = pInfo; @@ -1102,3 +1105,37 @@ _error: taosMemoryFreeClear(pOperator); return NULL; } + + +SArray* extractColumnInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); + + if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { + SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + + SColumn c = extractColumnFromColumnNode(pColNode); + taosArrayPush(pList, &c); + } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) { + SValueNode* pValNode = (SValueNode*)pNode->pExpr; + SColumn c = {0}; + c.slotId = pNode->slotId; + c.colId = pNode->slotId; + c.type = pValNode->node.type; + c.bytes = pValNode->node.resType.bytes; + c.scale = pValNode->node.resType.scale; + c.precision = pValNode->node.resType.precision; + + taosArrayPush(pList, &c); + } + } + + return pList; +} diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 53cfa6c27a..eab0307e01 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -59,15 +59,16 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + int32_t code = TSDB_CODE_SUCCESS; if (pOperator == NULL || pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } + int32_t numOfCols = 0; SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); - - int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); - + SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = pResBlock; @@ -84,8 +85,18 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (pInfo->pCondAfterMerge == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge); pLogicCond->pParameterList = nodesMakeList(); + if (pLogicCond->pParameterList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions)); nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); pLogicCond->condType = LOGIC_COND_TYPE_AND; @@ -106,7 +117,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); - int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); + code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -114,9 +125,12 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t return pOperator; _error: - taosMemoryFree(pInfo); + if (pInfo != NULL) { + destroyMergeJoinOperator(pInfo); + } + taosMemoryFree(pOperator); - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code; return NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 109e402b45..62ac94dd60 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -494,7 +494,13 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData}; SScalarParam param = {.columnData = pColInfoData}; - fpSet.process(&srcParam, 1, ¶m); + + if (fpSet.process != NULL) { + fpSet.process(&srcParam, 1, ¶m); + } else { + qError("failed to get the corresponding callback function, functionId:%d", functionId); + } + colDataDestroy(&infoData); } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 7ca3de5214..c00b5c4802 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -719,12 +719,16 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; - pInfo->binfo.pRes = createResDataBlock(pDescNode); - int32_t rowSize = pInfo->binfo.pRes->info.rowSize; - if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { + int32_t code = TSDB_CODE_SUCCESS; + if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } + pInfo->binfo.pRes = createResDataBlock(pDescNode); + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; + ASSERT(rowSize < 100 * 1024 * 1024); + SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; SArray* pColMatchColInfo = @@ -737,6 +741,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pInputBlock = pInputBlock; + pInfo->bufPageSize = getProperSortPageSize(rowSize); + pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. + pOperator->name = "MultiwayMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->blocking = false; @@ -744,15 +751,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pInfo->bufPageSize = getProperSortPageSize(rowSize); - - // one additional is reserved for merged result. - pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); - pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL, destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo); - int32_t code = appendDownstream(pOperator, downStreams, numStreams); + code = appendDownstream(pOperator, downStreams, numStreams); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index b133041fdc..4204692514 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -251,6 +251,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; printf("tHash Init failed since %s", terrstr(terrno)); + taosMemoryFree(pHashObj); return NULL; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 03248f5069..7ad5c1365c 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -140,6 +140,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource int32_t* sourceId, SArray* pPageIdList) { SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); if (pSource == NULL) { + taosArrayDestroy(pPageIdList); return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -155,6 +156,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize; ASSERT(numOfRows > 0); + return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows); } @@ -224,6 +226,22 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int int32_t code = 0; + // multi-pass internal merge sort is required + if (pHandle->pBuf == NULL) { + if (!osTempSpaceAvailable()) { + code = TSDB_CODE_NO_AVAIL_DISK; + qError("Sort compare init failed since %s", terrstr(code)); + return code; + } + + code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, + "sortComparInit", tsTempDir); + dBufSetPrintInfo(pHandle->pBuf); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + if (pHandle->type == SORT_SINGLESOURCE_SORT) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; @@ -245,22 +263,6 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int releaseBufPage(pHandle->pBuf, pPage); } } else { - // multi-pass internal merge sort is required - if (pHandle->pBuf == NULL) { - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_AVAIL_DISK; - code = terrno; - qError("Sort compare init failed since %s", terrstr(terrno)); - return code; - } - code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, - "sortComparInit", tsTempDir); - dBufSetPrintInfo(pHandle->pBuf); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); @@ -507,12 +509,14 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pResList); return code; } code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pResList); return code; } @@ -520,12 +524,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { while (1) { SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { + taosArrayDestroy(pResList); + taosArrayDestroy(pPageIdList); break; } int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); if (pPage == NULL) { + taosArrayDestroy(pResList); + taosArrayDestroy(pPageIdList); return terrno; }