diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 90cb06ff42..9c59e3f3ec 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -534,7 +534,7 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); -void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status); +void streamFreeTaskState(SStreamTask* pTask, int8_t remove); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 82bd1b24f6..b489314e21 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -644,6 +644,10 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b SDataBlockInfo* pInfo = &pDataBlock->info; SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex); + if (pColInfoData == NULL) { + return terrno; + } + if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) { return 0; } @@ -685,6 +689,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); + if (pCol1 == NULL || pCol2 == NULL) { + return terrno; + } capacity = pDest->info.capacity; int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); @@ -709,6 +716,9 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); + if (pCol2 == NULL || pCol1 == NULL) { + return terrno; + } code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows); if (code) { @@ -729,6 +739,10 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + continue; + } + if (IS_VAR_DATA_TYPE(pCol->info.type)) { pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows]; memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows); @@ -760,6 +774,10 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + continue; + } + total += colDataGetFullLength(pColInfoData, pBlock->info.rows); } @@ -861,6 +879,10 @@ int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t r for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); + if (pColData == NULL || pDstCol == NULL) { + continue; + } + for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { bool isNull = false; if (pBlock->pBlockAgg == NULL) { @@ -908,6 +930,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + continue; + } + if (IS_VAR_DATA_TYPE(pCol->info.type)) { memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t)); pStart += numOfRows * sizeof(int32_t); @@ -958,6 +984,9 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + continue; + } if (IS_VAR_DATA_TYPE(pCol->info.type)) { size_t metaSize = pBlock->info.rows * sizeof(int32_t); @@ -965,6 +994,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + pCol->varmeta.offset = (int32_t*)tmp; memcpy(pCol->varmeta.offset, pStart, metaSize); pStart += metaSize; @@ -1039,6 +1069,10 @@ int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + continue; + } + pCol->hasNull = true; if (IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -1087,6 +1121,10 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfo == NULL) { + continue; + } + rowSize += pColInfo->info.bytes; } @@ -1114,8 +1152,11 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - rowSize += pColInfo->info.bytes; + if (pColInfo == NULL) { + continue; + } + rowSize += pColInfo->info.bytes; if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { rowSize += sizeof(int32_t); } else { @@ -1193,6 +1234,9 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = &pCols[i]; SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + if (pSrc == NULL) { + continue; + } if (IS_VAR_DATA_TYPE(pSrc->info.type)) { if (pSrc->varmeta.length != 0) { @@ -1228,8 +1272,11 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); - pCols[i].info = pColInfoData->info; + if (pColInfoData == NULL) { + continue; + } + pCols[i].info = pColInfoData->info; if (IS_VAR_DATA_TYPE(pCols[i].info.type)) { pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t)); pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length); @@ -1256,8 +1303,11 @@ static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); - pColInfoData->info = pCols[i].info; + if (pColInfoData == NULL) { + continue; + } + pColInfoData->info = pCols[i].info; if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { taosMemoryFreeClear(pColInfoData->varmeta.offset); pColInfoData->varmeta = pCols[i].varmeta; @@ -1301,8 +1351,15 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); + if (pInfo == NULL) { + continue; + } SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); + if (pColInfoData == NULL) { + continue; + } + if (pColInfoData->hasNull) { sortColumnHasNull = true; } @@ -1319,6 +1376,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { if (!varTypeSort) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0); SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0); + if (pColInfoData == NULL || pOrder == NULL) { + return errno; + } int64_t p0 = taosGetTimestampUs(); @@ -1346,7 +1406,14 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); + if (pInfo == NULL) { + continue; + } + pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); + if (pInfo->pColData == NULL) { + continue; + } pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order); } @@ -1399,6 +1466,10 @@ void blockDataEmpty(SSDataBlock* pDataBlock) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + continue; + } + colInfoDataCleanup(p, pInfo->capacity); } @@ -1417,6 +1488,10 @@ void blockDataReset(SSDataBlock* pDataBlock) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + continue; + } + p->hasNull = false; p->reassigned = false; if (IS_VAR_DATA_TYPE(p->info.type)) { @@ -1527,6 +1602,10 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + return terrno; + } + code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false); if (code) { return code; @@ -1544,6 +1623,10 @@ void blockDataFreeRes(SSDataBlock* pBlock) { int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + continue; + } + colDataDestroy(pColInfoData); } @@ -1579,6 +1662,10 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { size_t numOfCols = taosArrayGetSize(src->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(src->pDataBlock, i); + if (p == NULL) { + return terrno; + } + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; code = blockDataAppendColInfo(dst, &colInfo); if (code) { @@ -1594,7 +1681,7 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i); - if (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type))) { + if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) { continue; } @@ -1622,6 +1709,10 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i); + if (pDstCol == NULL || pSrcCol == NULL) { + continue; + } + int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info); if (ret < 0) { code = ret; @@ -3149,15 +3240,26 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) { if (!pDataBlock || !pOrderInfo) return 0; for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i); + if (pOrder == NULL) { + continue; + } + pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId); + if (pOrder->pColData == NULL) { + continue; + } + pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order); } + SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock}; - int32_t rowIdx = 0, nextRowIdx = 1; + + int32_t rowIdx = 0, nextRowIdx = 1; for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) { if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) { break; } } + return nextRowIdx; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b56c474ed5..a4c490e9b5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -553,8 +553,15 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) return code; } - tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId, - pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_INVALID_MSG; + } else { + tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + } code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f3ceb33f64..2adc863baf 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -174,9 +174,9 @@ SArray* makeColumnArrayFromList(SNodeList* pNodeList); int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type, SColMatchInfo* pMatchInfo); -int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); -int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); -SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); +int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); +int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); +int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, SFunctionStateStore* pStore); @@ -197,9 +197,6 @@ char* getStreamOpName(uint16_t opType); void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr); void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr); -void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); -void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); - TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols); void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 7e105d2260..7c63120fcf 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -73,7 +73,13 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t lino = 0; + int32_t code = 0; + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -89,29 +95,23 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); + TSDB_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); - int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + TSDB_CHECK_CODE(code, lino, _error); } code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 137f05c356..75c7d8af29 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -201,7 +201,9 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl if (pScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* p = &pInfo->pseudoExprSup; - p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs); + code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs); + TSDB_CHECK_CODE(code, lino, _error); + p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno); diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index b7aa57e4b1..8d2ad4cbad 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -256,14 +256,19 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pCountWindowNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pCountWindowNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pCountWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } size_t keyBufSize = 0; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, @@ -286,6 +291,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pInfo->windowCount != pInfo->windowSliding) { numOfItem = pInfo->windowCount / pInfo->windowSliding + 1; } + pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem); if (!pInfo->countSup.pWinStates) { goto _error; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 90a0c7927e..fcd202e221 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -84,7 +84,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pEventWindowNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pEventWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -95,7 +98,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d0f0fce8fc..fb8f5bd43e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1838,7 +1838,10 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) { return pExprs; } -SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { +int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) { + QRY_OPTR_CHECK(pExprInfo); + + int32_t code = 0; int32_t numOfFuncs = LIST_LENGTH(pNodeList); int32_t numOfGroupKeys = 0; if (pGroupKeys != NULL) { @@ -1847,10 +1850,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* *numOfExprs = numOfFuncs + numOfGroupKeys; if (*numOfExprs == 0) { - return NULL; + return code; } SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + if (pExprs == NULL) { + return terrno; + } for (int32_t i = 0; i < (*numOfExprs); ++i) { STargetNode* pTargetNode = NULL; @@ -1861,15 +1867,16 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* } SExprInfo* pExp = &pExprs[i]; - int32_t code = createExprFromTargetNode(pExp, pTargetNode); + code = createExprFromTargetNode(pExp, pTargetNode); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pExprs); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - return NULL; + return code; } } - return pExprs; + *pExprInfo = pExprs; + return code; } // set the output buffer for the selectivity + tag query diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 41036dfd94..7026129546 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -458,6 +458,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = 0; + int32_t lino = 0; SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -467,21 +468,23 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi } pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); - SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr); + QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.pExprInfo = pExprInfo; SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp; - pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs); + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 4371879cdc..b00571055b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -560,7 +560,8 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); } pInfo->pGroupCols = NULL; @@ -578,7 +579,11 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1128,42 +1133,42 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - pTaskInfo->code = code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code = terrno; goto _error; } int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); + pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); if (pPartNode->needBlockOutputTsOrder) { SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId}; pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo)); if (!pInfo->pOrderInfoArr) { - terrno = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = terrno; goto _error; } + void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order); QUERY_CHECK_NULL(tmp, code, lino, _error, terrno); } if (pPartNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num); + SExprInfo* pExprInfo1 = NULL; + code = createExprInfo(pPartNode->pExprs, NULL, &pExprInfo1, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = terrno; - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); if (pInfo->pGroupSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - pTaskInfo->code = terrno; goto _error; } @@ -1173,22 +1178,17 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_DISKSPACE; - pTaskInfo->code = terrno; qError("Create partition operator info failed since %s, tempDir:%s", terrstr(), tsTempDir); goto _error; } code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1200,8 +1200,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1215,8 +1213,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1229,7 +1225,7 @@ _error: } pTaskInfo->code = code; taosMemoryFreeClear(pOperator); - return code; + TAOS_RETURN(code); } int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, @@ -1673,7 +1669,10 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart if (pPartNode->part.pExprs != NULL) { int32_t num = 0; - SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num); + SExprInfo* pCalExprInfo = NULL; + code = createExprInfo(pPartNode->part.pExprs, NULL, &pCalExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -1734,7 +1733,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart QUERY_CHECK_CODE(code, lino, _error); int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPartNode->part.pTargets, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 295180652d..7185f74254 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -108,9 +108,13 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* int32_t lino = 0; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); + TSDB_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); + TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; @@ -258,14 +262,13 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; - - SExprSupp* pSup = &pOperator->exprSupp; - SSDataBlock* pRes = pInfo->pRes; - SSDataBlock* pFinalRes = pProjectInfo->pFinalRes; - int32_t code = 0; - int64_t st = 0; - int32_t order = pInfo->inputTsOrder; - int32_t scanFlag = 0; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->pRes; + SSDataBlock* pFinalRes = pProjectInfo->pFinalRes; + int32_t code = 0; + int64_t st = 0; + int32_t order = pInfo->inputTsOrder; + int32_t scanFlag = 0; blockDataCleanup(pFinalRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -465,11 +468,16 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; int32_t numOfExpr = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); + TSDB_CHECK_CODE(code, lino, _error); if (pPhyNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num); + SExprInfo* pSExpr = NULL; + code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2daf9e1fc7..5aa2268311 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1341,7 +1341,10 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa if (pScanNode->pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->base.pseudoSup; - pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); + pSup->pExprInfo = NULL; + code = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno); @@ -3991,13 +3994,12 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* // create the pseduo columns info if (pTableScanNode->scan.pScanPseudoCols != NULL) { - pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); + code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr); + QUERY_CHECK_CODE(code, lino, _error); } code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); pInfo->pRes = createDataBlockFromDescNode(pDescNode); code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); @@ -4549,7 +4551,11 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc; int32_t numOfExprs = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &pExprInfo, &numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -5710,7 +5716,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR if (pTableScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->base.pseudoSup; - pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); + code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a4e1e0b648..858f26ad18 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -61,6 +61,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN int32_t code = 0; int32_t lino = 0; + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -72,7 +73,9 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); + code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.numOfExprs = numOfCols; int32_t numOfOutputCols = 0; code = @@ -772,7 +775,9 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); pSup->pExprInfo = pExprInfo; pSup->numOfExprs = numOfCols; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 27809f8a69..66b63249b6 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -825,13 +825,19 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pCountNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } SExprSupp* pExpSup = &pOperator->exprSupp; - SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 72f8172bc4..fb31f659b2 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -871,7 +871,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pEventNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pEventNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pEventNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -891,7 +894,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index c0949ec012..274316972e 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1202,7 +1202,11 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod } pFillSup->numOfFillCols = numOfFillCols; int32_t numOfNotFillCols = 0; - SExprInfo* noFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); + SExprInfo* noFillExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols); + QUERY_CHECK_CODE(code, lino, _end); + pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols, (const SNodeListNode*)(pPhyFillNode->pValues)); pFillSup->type = convertFillType(pPhyFillNode->mode); @@ -1213,7 +1217,10 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod code = initResultBuf(pFillSup); QUERY_CHECK_CODE(code, lino, _end); - SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); + SExprInfo* noFillExpr = NULL; + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols); + QUERY_CHECK_CODE(code, lino, _end); + code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore); QUERY_CHECK_CODE(code, lino, _end); @@ -1356,7 +1363,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval; int32_t numOfFillCols = 0; - SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols); + SExprInfo* pFillExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 90d00b369a..fd918f1514 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1884,13 +1884,20 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN initResultSizeInfo(&pOperator->resultInfo, 4096); if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); @@ -3712,7 +3719,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode initResultSizeInfo(&pOperator->resultInfo, 4096); if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3720,7 +3730,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } SExprSupp* pExpSup = &pOperator->exprSupp; - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -4865,7 +4878,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pStateNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -4883,7 +4899,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -5164,7 +5183,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); pInfo->interval = (SInterval){ @@ -5213,7 +5235,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 80b4b646bf..9bb11eef61 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2727,7 +2727,10 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP pInfo->uid = (pBlockScanNode->suid != 0) ? pBlockScanNode->suid : pBlockScanNode->uid; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 00f88934d9..29730c6919 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1129,13 +1129,19 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN SExprSupp* pSup = &pOperator->exprSupp; int32_t numOfExprs = 0; - SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pInterpPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); if (pInterpPhyNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pInterpPhyNode->pExprs, NULL, &pScalarExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a1ec923352..989eb97327 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1298,7 +1298,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode QUERY_CHECK_CODE(code, lino, _error); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1336,7 +1339,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1578,7 +1584,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy if (pStateNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1603,7 +1612,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, @@ -1682,7 +1694,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1709,7 +1724,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -2012,7 +2030,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge initResultSizeInfo(&pOperator->resultInfo, 512); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); @@ -2312,7 +2332,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 0f158591b4..3a5d72576b 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -81,6 +81,7 @@ typedef struct { int64_t dataWritten; void* pMeta; + int8_t removeAllFiles; } STaskDbWrapper; @@ -152,6 +153,8 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void* taskDbAddRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb); +void taskDbSetClearFileFlag(void* pTaskDb); + int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); void streamStateDestroyCompar(void* arg); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7396c6b7c6..e3f747fb22 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2331,6 +2331,15 @@ void taskDbRemoveRef(void* pTaskDb) { (void)taosReleaseRef(taskDbWrapperId, pBackend->refId); } +void taskDbSetClearFileFlag(void* pTaskDb) { + if (pTaskDb == NULL) { + return; + } + + STaskDbWrapper* pBackend = pTaskDb; + atomic_store_8(&pBackend->removeAllFiles, 1); +} + void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_env_t* env = rocksdb_create_default_env(); @@ -2573,8 +2582,7 @@ void taskDbDestroy(void* pDb, bool flush) { stDebug("succ to destroy stream backend:%p", wrapper); int8_t nCf = tListLen(ginitDict); - - if (flush) { + if (flush && wrapper->removeAllFiles == 0) { if (wrapper->db && wrapper->pCf) { rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_set_wait(flushOpt, 1); @@ -2636,6 +2644,11 @@ void taskDbDestroy(void* pDb, bool flush) { taskDbDestroyChkpOpt(wrapper); taosMemoryFree(wrapper->idstr); + + if (wrapper->removeAllFiles) { + char* err = NULL; + taosRemoveDir(wrapper->path); + } taosMemoryFree(wrapper->path); taosMemoryFree(wrapper); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f7c61b48e3..acac5dfc9e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -94,12 +94,17 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, i } int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + return TSDB_CODE_INVALID_MSG; + } // todo this status may not be set here. // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); - ASSERT(code == TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr); + return code; + } pTask->chkInfo.pActiveInfo->transId = pReq->transId; pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId; @@ -112,7 +117,10 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { - ASSERT(pTask->info.taskLevel != TASK_LEVEL__SOURCE); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", pTask->id.idStr); + return TSDB_CODE_INVALID_MSG; + } if (pRsp->rspCode != TSDB_CODE_SUCCESS) { stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr, @@ -258,7 +266,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock } if (p->upstreamTaskId == pBlock->srcTaskId) { - ASSERT(p->checkpointId == checkpointId); stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); @@ -320,7 +327,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0); if (pTask->chkInfo.startTs == 0) { pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; @@ -410,8 +416,6 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId int32_t notReady = 0; int32_t transId = 0; - ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG)); - // 1. not in checkpoint status now SStreamTaskState pStat = streamTaskGetStatus(pTask); if (pStat.state != TASK_STATUS__CK) { @@ -799,6 +803,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + // check the status every 100ms if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); @@ -843,7 +854,6 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // send msg to retrieve checkpoint trigger msg SArray* pList = pTask->upstreamInfo.pList; - ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); if (pNotSendList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1085,10 +1095,12 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask); - stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", - pTask->id.idStr, taskId, vgId, numOfConfirmed, total); - - ASSERT(taskId != 0); + if (taskId == 0) { + stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId); + } else { + stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", + pTask->id.idStr, taskId, vgId, numOfConfirmed, total); + } } static int32_t uploadCheckpointToS3(const char* id, const char* path) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 57e5322e38..eb846b5a92 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -175,9 +175,10 @@ int32_t streamDataSubmitNew(SPackedData* pData, int32_t type, SStreamDataSubmit* } void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { - ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); - taosMemoryFree(pDataSubmit->submit.msgStr); - taosFreeQitem(pDataSubmit); + if (pDataSubmit != NULL && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT) { + taosMemoryFree(pDataSubmit->submit.msgStr); + taosFreeQitem(pDataSubmit); + } } int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 255afb44f9..d245548ce5 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -96,8 +96,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r int32_t code = 0; void* buf = NULL; int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList); - ASSERT(sz > 0); - for (int32_t i = 0; i < sz; i++) { req->reqId = tGenIdPI64(); SStreamUpstreamEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); @@ -107,7 +105,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r tEncodeSize(tEncodeStreamRetrieveReq, req, len, code); if (code != 0) { - ASSERT(0); return code; } @@ -946,8 +943,6 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList; streamMutexLock(&pTask->chkInfo.pActiveInfo->lock); - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - if (taosArrayGetSize(pList) == 1) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0); tmsgSendRsp(&pInfo->msg); @@ -1122,8 +1117,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId) { - ASSERT(upstreamTaskId != 0); - pReadyInfo->upstreamTaskId = upstreamTaskId; pReadyInfo->upstreamNodeEpset = *pEpset; pReadyInfo->upstreamNodeId = upstreamNodeId; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f07fd81953..c5b1284560 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -275,7 +275,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } streamTaskCleanupCheckInfo(&pTask->taskCheckInfo); - streamFreeTaskState(pTask, status1); + streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0); if (pTask->pNameMap) { tSimpleHashCleanup(pTask->pNameMap); @@ -296,14 +296,14 @@ void tFreeStreamTask(SStreamTask* pTask) { taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); pTask->outputInfo.pNodeEpsetUpdateList = NULL; - if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { - char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); - sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); - taosRemoveDir(path); + // if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { + // char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); + // sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); + // taosRemoveDir(path); - stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); - taosMemoryFree(path); - } + // stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); + // taosMemoryFree(path); + // } if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); @@ -316,10 +316,12 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x free task completed", taskId); } -void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status) { +void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { if (pTask->pState != NULL) { stDebug("s-task:0x%x start to free task state", pTask->id.taskId); - streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); + streamStateClose(pTask->pState, remove); + + taskDbSetClearFileFlag(pTask->pBackend); taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index d3c39da6bd..04969c2b48 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -98,7 +98,7 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv static int32_t stopTaskSuccFn(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; - streamFreeTaskState(pTask, pSM->current.state); + streamFreeTaskState(pTask,pSM->current.state == TASK_STATUS__DROPPING ? 1 : 0); return TSDB_CODE_SUCCESS; }