fix(query): fix coverity issues.

This commit is contained in:
Haojun Liao 2022-10-20 11:27:33 +08:00
parent 00e56d678b
commit 70adc0ea33
14 changed files with 229 additions and 172 deletions

View File

@ -194,7 +194,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
} else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i);
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;

View File

@ -120,44 +120,46 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
return &pInfo->blockData[1];
}
pInfo->currentLoadBlockIndex ^= 1;
if (pIter->pSttBlk != NULL) { // current block not loaded yet
int64_t st = taosGetTimestampUs();
SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
TABLEID id = {0};
if (pIter->pSttBlk->suid != 0) {
id.suid = pIter->pSttBlk->suid;
} else {
id.uid = pIter->uid;
}
code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
double el = (taosGetTimestampUs() - st) / 1000.0;
pInfo->elapsedTime += el;
pInfo->loadBlocks += 1;
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
idStr);
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr);
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
if (pIter->pSttBlk == NULL) {
return NULL;
}
// current block not loaded yet
pInfo->currentLoadBlockIndex ^= 1;
int64_t st = taosGetTimestampUs();
SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
TABLEID id = {0};
if (pIter->pSttBlk->suid != 0) {
id.suid = pIter->pSttBlk->suid;
} else {
id.uid = pIter->uid;
}
code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
double el = (taosGetTimestampUs() - st) / 1000.0;
pInfo->elapsedTime += el;
pInfo->loadBlocks += 1;
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
idStr);
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr);
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
_exit:
@ -259,7 +261,8 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr) {
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
if (*pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -336,7 +339,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
}
return code;
_exit:
taosMemoryFree(*pIter);
return code;
}
@ -473,7 +479,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
int32_t iBlockL = pIter->iSttBlk;
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
if (pBlockData == NULL && terrno != TSDB_CODE_SUCCESS) {
if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
goto _exit;
}

View File

@ -1047,11 +1047,16 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
if (pScanInfo == NULL) {
tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, %s", pBlockInfo->uid, idStr);
return TSDB_CODE_INVALID_PARA;
}
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
}
@ -1135,7 +1140,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
cleanupBlockOrderSupporter(&sup);
doSetCurrentBlock(pBlockIter);
doSetCurrentBlock(pBlockIter, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
@ -1175,12 +1180,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
taosMemoryFree(pTree);
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
doSetCurrentBlock(pBlockIter);
doSetCurrentBlock(pBlockIter, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
bool asc = ASCENDING_TRAVERSE(pBlockIter->order);
int32_t step = asc ? 1 : -1;
@ -1189,7 +1194,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
}
pBlockIter->index += step;
doSetCurrentBlock(pBlockIter);
doSetCurrentBlock(pBlockIter, idStr);
return true;
}
@ -1260,7 +1265,7 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
}
doSetCurrentBlock(pBlockIter);
doSetCurrentBlock(pBlockIter, "");
return TSDB_CODE_SUCCESS;
}
@ -2190,6 +2195,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
}
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pResBlock = pReader->pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
@ -2200,6 +2207,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) {
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, total tables:%d, %s",
pBlockInfo->uid, taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
goto _end;
}
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
@ -2276,7 +2290,7 @@ _end:
pResBlock->info.rows, el, pReader->idStr);
}
return TSDB_CODE_SUCCESS;
return code;
}
void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }
@ -2732,7 +2746,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// current block are exhausted, try the next file block
if (pDumpInfo->allDumped) {
// try next data block in current file
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter);
} else {
@ -3849,8 +3863,14 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
return pReader->pResBlock->pDataBlock;
}
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
return NULL;
}
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
@ -3979,7 +3999,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
hasNext = blockIteratorNext(&pStatus->blockIter);
hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {

View File

@ -955,9 +955,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
SExecTaskInfo* pTaskInfo);
@ -981,9 +979,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);

View File

@ -162,7 +162,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pTableList->map != NULL) {
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
pInfo->pRes->info.groupId = *groupId;
if (groupId != NULL) {
pInfo->pRes->info.groupId = *groupId;
}
} else {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);

View File

@ -227,8 +227,11 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
SDataDeleterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
if (pBuf != NULL) {
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
}
}
taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex);

View File

@ -358,15 +358,15 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 10000, .maxDataBlockNumPerQuery = 5000};
code = dsDataSinkMgtInit(&cfg);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to dsDataSinkMgtInit, code: %s", tstrerror(code));
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
goto _error;
}
if (handle) {
void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to createDataSinkParam, code: %s", tstrerror(code));
if (code != TSDB_CODE_SUCCESS || pSinkParam == NULL) {
qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
goto _error;
}

View File

@ -3047,32 +3047,40 @@ void cleanupExprSupp(SExprSupp* pSupp) {
taosMemoryFree(pSupp->rowEntryInfoOffset);
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResultBlock);
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.mergeResultBlock = mergeResult;
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupId = UINT64_MAX;
pInfo->pCondition = pCondition;
pInfo->pCondition = pAggNode->node.pConditions;
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true;
@ -3331,8 +3339,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
return pTaskInfo;
}
static SArray* extractColumnInfo(SNodeList* pNodeList);
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
@ -3709,22 +3715,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pTaskInfo);
pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
} else {
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo);
pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
@ -3814,39 +3808,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOptr;
}
SArray* extractColumnInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
SColumn c = extractColumnFromColumnNode(pColNode);
taosArrayPush(pList, &c);
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
SValueNode* pValNode = (SValueNode*)pNode->pExpr;
SColumn c = {0};
c.slotId = pNode->slotId;
c.colId = pNode->slotId;
c.type = pValNode->node.type;
c.bytes = pValNode->node.resType.bytes;
c.scale = pValNode->node.resType.scale;
c.precision = pValNode->node.resType.precision;
taosArrayPush(pList, &c);
}
}
return pList;
}
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
@ -4070,6 +4031,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql;
sql = NULL;
(*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);

View File

@ -30,6 +30,7 @@ static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDa
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static void freeGroupKey(void* param) {
SGroupKeys* pKey = (SGroupKeys*)param;
@ -61,7 +62,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupColList, i);
SColumn* pCol = (SColumn*) taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes; // actual data + null_flag
SGroupKeys key = {0};
@ -396,41 +397,48 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return buildGroupResultDataBlock(pOperator);
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys);
pInfo->pCondition = pAggNode->node.pConditions;
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResultBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "GroupbyAggOperator";
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
@ -451,8 +459,6 @@ _error:
}
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo* pInfo = pOperator->info;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
@ -760,7 +766,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
@ -781,14 +786,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
pTaskInfo->code = terrno;
qError("Create partition operator info failed since %s", terrstr(terrno));
blockDataDestroy(pResBlock);
goto _error;
}
@ -797,8 +801,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error;
}
pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf));
pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity);
pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf));
pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -808,7 +812,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResBlock;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo;
@ -1102,3 +1105,37 @@ _error:
taosMemoryFreeClear(pOperator);
return NULL;
}
SArray* extractColumnInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
SColumn c = extractColumnFromColumnNode(pColNode);
taosArrayPush(pList, &c);
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
SValueNode* pValNode = (SValueNode*)pNode->pExpr;
SColumn c = {0};
c.slotId = pNode->slotId;
c.colId = pNode->slotId;
c.type = pValNode->node.type;
c.bytes = pValNode->node.resType.bytes;
c.scale = pValNode->node.resType.scale;
c.precision = pValNode->node.resType.precision;
taosArrayPush(pList, &c);
}
}
return pList;
}

View File

@ -59,15 +59,16 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
int32_t numOfCols = 0;
SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = pResBlock;
@ -84,8 +85,18 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (pInfo->pCondAfterMerge == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
pLogicCond->pParameterList = nodesMakeList();
if (pLogicCond->pParameterList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
pLogicCond->condType = LOGIC_COND_TYPE_AND;
@ -106,7 +117,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -114,9 +125,12 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
return pOperator;
_error:
taosMemoryFree(pInfo);
if (pInfo != NULL) {
destroyMergeJoinOperator(pInfo);
}
taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
pTaskInfo->code = code;
return NULL;
}

View File

@ -494,7 +494,13 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
if (fpSet.process != NULL) {
fpSet.process(&srcParam, 1, &param);
} else {
qError("failed to get the corresponding callback function, functionId:%d", functionId);
}
colDataDestroy(&infoData);
}

View File

@ -719,12 +719,16 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
pInfo->binfo.pRes = createResDataBlock(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
int32_t code = TSDB_CODE_SUCCESS;
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pInfo->binfo.pRes = createResDataBlock(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
ASSERT(rowSize < 100 * 1024 * 1024);
SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo =
@ -737,6 +741,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->pInputBlock = pInputBlock;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
pOperator->name = "MultiwayMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = false;
@ -744,15 +751,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
// one additional is reserved for merged result.
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL,
destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo);
int32_t code = appendDownstream(pOperator, downStreams, numStreams);
code = appendDownstream(pOperator, downStreams, numStreams);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -251,6 +251,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
printf("tHash Init failed since %s", terrstr(terrno));
taosMemoryFree(pHashObj);
return NULL;
}

View File

@ -140,6 +140,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
int32_t* sourceId, SArray* pPageIdList) {
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) {
taosArrayDestroy(pPageIdList);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
@ -155,6 +156,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
int32_t numOfRows =
(getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
ASSERT(numOfRows > 0);
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}
@ -224,6 +226,22 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
int32_t code = 0;
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
if (!osTempSpaceAvailable()) {
code = TSDB_CODE_NO_AVAIL_DISK;
qError("Sort compare init failed since %s", terrstr(code));
return code;
}
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
"sortComparInit", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
@ -245,22 +263,6 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
releaseBufPage(pHandle->pBuf, pPage);
}
} else {
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
code = terrno;
qError("Sort compare init failed since %s", terrstr(terrno));
return code;
}
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
"sortComparInit", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
@ -507,12 +509,14 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code;
}
code =
tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code;
}
@ -520,12 +524,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
while (1) {
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) {
taosArrayDestroy(pResList);
taosArrayDestroy(pPageIdList);
break;
}
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) {
taosArrayDestroy(pResList);
taosArrayDestroy(pPageIdList);
return terrno;
}