fix(query): fix coverity issues.
This commit is contained in:
parent
6e95d18643
commit
6ebf6b29bd
|
@ -1891,6 +1891,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
} else {
|
||||
init = true;
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||
if (pSchema == NULL) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tRowMergerInit(&merge, piRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2286,7 +2290,7 @@ _end:
|
|||
if (pResBlock->info.rows > 0) {
|
||||
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
|
||||
" rows:%d, elapsed time:%.2f ms %s",
|
||||
pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||
pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||
pResBlock->info.rows, el, pReader->idStr);
|
||||
}
|
||||
|
||||
|
|
|
@ -966,9 +966,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_
|
|||
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, bool isStream);
|
||||
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
|
|
@ -3049,12 +3049,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
|||
SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
if (pAggNode->pExprs != NULL) {
|
||||
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
|
||||
}
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
|
@ -3065,6 +3059,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
if (pAggNode->pExprs != NULL) {
|
||||
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
|
||||
}
|
||||
|
||||
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -3720,28 +3720,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
||||
|
||||
STimeWindowAggSupp as = {
|
||||
.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
|
||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
|
||||
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
|
||||
pTaskInfo, isStream);
|
||||
|
||||
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
|
||||
pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
|
||||
pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
|
||||
|
|
|
@ -1751,9 +1751,7 @@ void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
|
|||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, bool isStream) {
|
||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -1761,46 +1759,63 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->window.node.pOutputDataBlockDesc);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
|
||||
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SInterval interval = {.interval = pPhyNode->interval,
|
||||
.sliding = pPhyNode->sliding,
|
||||
.intervalUnit = pPhyNode->intervalUnit,
|
||||
.slidingUnit = pPhyNode->slidingUnit,
|
||||
.offset = pPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
|
||||
|
||||
STimeWindowAggSupp as = {
|
||||
.waterMark = pPhyNode->window.watermark,
|
||||
.calTrigger = pPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
|
||||
ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->interval = interval;
|
||||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->twAggSup = as;
|
||||
pInfo->pCondition = pPhyNode->window.node.pConditions;
|
||||
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
|
||||
|
||||
if (pPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
if (isStream) {
|
||||
ASSERT(numOfCols > 0);
|
||||
ASSERT(num > 0);
|
||||
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
|
||||
}
|
||||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||
|
||||
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo);
|
||||
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo);
|
||||
if (pInfo->timeWindowInterpo) {
|
||||
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
|
||||
if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
|
||||
|
@ -1827,7 +1842,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyIntervalOperatorInfo(pInfo);
|
||||
if (pInfo != NULL) {
|
||||
destroyIntervalOperatorInfo(pInfo);
|
||||
}
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
|
|
@ -524,8 +524,6 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
while (1) {
|
||||
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
|
||||
if (pDataBlock == NULL) {
|
||||
taosArrayDestroy(pResList);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -2988,8 +2988,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
|
||||
int32_t rowIndex) {
|
||||
SInputColumnInfoData* pColInfo = &pCtx->input;
|
||||
|
||||
if (pOutput->hasResult) {
|
||||
if (isFirst) {
|
||||
if (pInput->ts > pOutput->ts) {
|
||||
|
|
Loading…
Reference in New Issue