From 34ce872eafa1f9d7d08bdd694cc69046efec668a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 10:16:09 +0800 Subject: [PATCH 1/6] refactor: do some internal refactor. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 11 +++++-- source/libs/stream/src/streamCheckpoint.c | 36 ++++++++++++++-------- source/libs/stream/src/streamData.c | 7 +++-- source/libs/stream/src/streamDispatch.c | 7 ----- 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b56c474ed5..a4c490e9b5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -553,8 +553,15 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) return code; } - tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId, - pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_INVALID_MSG; + } else { + tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + } code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f7c61b48e3..640e2af94f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -94,12 +94,17 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, i } int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + return TSDB_CODE_INVALID_MSG; + } // todo this status may not be set here. // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); - ASSERT(code == TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr); + return code; + } pTask->chkInfo.pActiveInfo->transId = pReq->transId; pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId; @@ -112,7 +117,10 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { - ASSERT(pTask->info.taskLevel != TASK_LEVEL__SOURCE); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", pTask->id.idStr); + return TSDB_CODE_INVALID_MSG; + } if (pRsp->rspCode != TSDB_CODE_SUCCESS) { stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr, @@ -258,7 +266,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock } if (p->upstreamTaskId == pBlock->srcTaskId) { - ASSERT(p->checkpointId == checkpointId); stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); @@ -320,7 +327,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0); if (pTask->chkInfo.startTs == 0) { pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; @@ -410,8 +416,6 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId int32_t notReady = 0; int32_t transId = 0; - ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG)); - // 1. not in checkpoint status now SStreamTaskState pStat = streamTaskGetStatus(pTask); if (pStat.state != TASK_STATUS__CK) { @@ -799,6 +803,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + // check the status every 100ms if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); @@ -843,7 +854,6 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // send msg to retrieve checkpoint trigger msg SArray* pList = pTask->upstreamInfo.pList; - ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); if (pNotSendList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1085,10 +1095,12 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask); - stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", - pTask->id.idStr, taskId, vgId, numOfConfirmed, total); - - ASSERT(taskId != 0); + if (taskId == 0) { + stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId); + } else { + stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", + pTask->id.idStr, taskId, vgId, numOfConfirmed, total); + } } static int32_t uploadCheckpointToS3(const char* id, const char* path) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 57e5322e38..eb846b5a92 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -175,9 +175,10 @@ int32_t streamDataSubmitNew(SPackedData* pData, int32_t type, SStreamDataSubmit* } void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { - ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); - taosMemoryFree(pDataSubmit->submit.msgStr); - taosFreeQitem(pDataSubmit); + if (pDataSubmit != NULL && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT) { + taosMemoryFree(pDataSubmit->submit.msgStr); + taosFreeQitem(pDataSubmit); + } } int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 255afb44f9..d245548ce5 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -96,8 +96,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r int32_t code = 0; void* buf = NULL; int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList); - ASSERT(sz > 0); - for (int32_t i = 0; i < sz; i++) { req->reqId = tGenIdPI64(); SStreamUpstreamEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); @@ -107,7 +105,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r tEncodeSize(tEncodeStreamRetrieveReq, req, len, code); if (code != 0) { - ASSERT(0); return code; } @@ -946,8 +943,6 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList; streamMutexLock(&pTask->chkInfo.pActiveInfo->lock); - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - if (taosArrayGetSize(pList) == 1) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0); tmsgSendRsp(&pInfo->msg); @@ -1122,8 +1117,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId) { - ASSERT(upstreamTaskId != 0); - pReadyInfo->upstreamTaskId = upstreamTaskId; pReadyInfo->upstreamNodeEpset = *pEpset; pReadyInfo->upstreamNodeId = upstreamNodeId; From 183f33af876cc8aa8cc3b9757269e700ef7b8ee6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 10:35:00 +0800 Subject: [PATCH 2/6] fix(stream): fix syntax error. --- source/libs/stream/src/streamCheckpoint.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 640e2af94f..acac5dfc9e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -805,7 +805,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id, ref); + stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); streamMetaReleaseTask(pTask->pMeta, pTask); return; } From 9a2ee547194f0a97f20c4aa7dc08fad0b2bd6405 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 11:06:20 +0800 Subject: [PATCH 3/6] fix(stream): check return value. --- source/common/src/tdatablock.c | 112 +++++++++++++++++++++++++++++++-- 1 file changed, 107 insertions(+), 5 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 82bd1b24f6..b489314e21 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -644,6 +644,10 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b SDataBlockInfo* pInfo = &pDataBlock->info; SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex); + if (pColInfoData == NULL) { + return terrno; + } + if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) { return 0; } @@ -685,6 +689,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); + if (pCol1 == NULL || pCol2 == NULL) { + return terrno; + } capacity = pDest->info.capacity; int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); @@ -709,6 +716,9 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); + if (pCol2 == NULL || pCol1 == NULL) { + return terrno; + } code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows); if (code) { @@ -729,6 +739,10 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + continue; + } + if (IS_VAR_DATA_TYPE(pCol->info.type)) { pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows]; memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows); @@ -760,6 +774,10 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + continue; + } + total += colDataGetFullLength(pColInfoData, pBlock->info.rows); } @@ -861,6 +879,10 @@ int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t r for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); + if (pColData == NULL || pDstCol == NULL) { + continue; + } + for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { bool isNull = false; if (pBlock->pBlockAgg == NULL) { @@ -908,6 +930,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + continue; + } + if (IS_VAR_DATA_TYPE(pCol->info.type)) { memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t)); pStart += numOfRows * sizeof(int32_t); @@ -958,6 +984,9 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + continue; + } if (IS_VAR_DATA_TYPE(pCol->info.type)) { size_t metaSize = pBlock->info.rows * sizeof(int32_t); @@ -965,6 +994,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + pCol->varmeta.offset = (int32_t*)tmp; memcpy(pCol->varmeta.offset, pStart, metaSize); pStart += metaSize; @@ -1039,6 +1069,10 @@ int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + continue; + } + pCol->hasNull = true; if (IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -1087,6 +1121,10 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfo == NULL) { + continue; + } + rowSize += pColInfo->info.bytes; } @@ -1114,8 +1152,11 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - rowSize += pColInfo->info.bytes; + if (pColInfo == NULL) { + continue; + } + rowSize += pColInfo->info.bytes; if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { rowSize += sizeof(int32_t); } else { @@ -1193,6 +1234,9 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = &pCols[i]; SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + if (pSrc == NULL) { + continue; + } if (IS_VAR_DATA_TYPE(pSrc->info.type)) { if (pSrc->varmeta.length != 0) { @@ -1228,8 +1272,11 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); - pCols[i].info = pColInfoData->info; + if (pColInfoData == NULL) { + continue; + } + pCols[i].info = pColInfoData->info; if (IS_VAR_DATA_TYPE(pCols[i].info.type)) { pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t)); pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length); @@ -1256,8 +1303,11 @@ static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); - pColInfoData->info = pCols[i].info; + if (pColInfoData == NULL) { + continue; + } + pColInfoData->info = pCols[i].info; if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { taosMemoryFreeClear(pColInfoData->varmeta.offset); pColInfoData->varmeta = pCols[i].varmeta; @@ -1301,8 +1351,15 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); + if (pInfo == NULL) { + continue; + } SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); + if (pColInfoData == NULL) { + continue; + } + if (pColInfoData->hasNull) { sortColumnHasNull = true; } @@ -1319,6 +1376,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { if (!varTypeSort) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0); SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0); + if (pColInfoData == NULL || pOrder == NULL) { + return errno; + } int64_t p0 = taosGetTimestampUs(); @@ -1346,7 +1406,14 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); + if (pInfo == NULL) { + continue; + } + pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); + if (pInfo->pColData == NULL) { + continue; + } pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order); } @@ -1399,6 +1466,10 @@ void blockDataEmpty(SSDataBlock* pDataBlock) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + continue; + } + colInfoDataCleanup(p, pInfo->capacity); } @@ -1417,6 +1488,10 @@ void blockDataReset(SSDataBlock* pDataBlock) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + continue; + } + p->hasNull = false; p->reassigned = false; if (IS_VAR_DATA_TYPE(p->info.type)) { @@ -1527,6 +1602,10 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + return terrno; + } + code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false); if (code) { return code; @@ -1544,6 +1623,10 @@ void blockDataFreeRes(SSDataBlock* pBlock) { int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + continue; + } + colDataDestroy(pColInfoData); } @@ -1579,6 +1662,10 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { size_t numOfCols = taosArrayGetSize(src->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(src->pDataBlock, i); + if (p == NULL) { + return terrno; + } + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; code = blockDataAppendColInfo(dst, &colInfo); if (code) { @@ -1594,7 +1681,7 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i); - if (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type))) { + if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) { continue; } @@ -1622,6 +1709,10 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i); + if (pDstCol == NULL || pSrcCol == NULL) { + continue; + } + int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info); if (ret < 0) { code = ret; @@ -3149,15 +3240,26 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) { if (!pDataBlock || !pOrderInfo) return 0; for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i); + if (pOrder == NULL) { + continue; + } + pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId); + if (pOrder->pColData == NULL) { + continue; + } + pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order); } + SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock}; - int32_t rowIdx = 0, nextRowIdx = 1; + + int32_t rowIdx = 0, nextRowIdx = 1; for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) { if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) { break; } } + return nextRowIdx; } From 0bcfe11e84d57e4d13a14548237f7c7fb987a73b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 5 Aug 2024 11:55:36 +0800 Subject: [PATCH 4/6] refactor remve backend code --- include/libs/stream/tstream.h | 2 +- source/libs/stream/inc/streamBackendRocksdb.h | 3 +++ source/libs/stream/src/streamBackendRocksdb.c | 17 ++++++++++++-- source/libs/stream/src/streamTask.c | 22 ++++++++++--------- source/libs/stream/src/streamTaskSm.c | 2 +- 5 files changed, 32 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 90cb06ff42..9c59e3f3ec 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -534,7 +534,7 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); -void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status); +void streamFreeTaskState(SStreamTask* pTask, int8_t remove); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 0f158591b4..3bb4532db3 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -81,6 +81,7 @@ typedef struct { int64_t dataWritten; void* pMeta; + int8_t removeAllFiles; } STaskDbWrapper; @@ -152,6 +153,8 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void* taskDbAddRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb); +void taskDbRemoveAllFiles(void* pTaskDb); + int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); void streamStateDestroyCompar(void* arg); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7396c6b7c6..8498c9118a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2331,6 +2331,15 @@ void taskDbRemoveRef(void* pTaskDb) { (void)taosReleaseRef(taskDbWrapperId, pBackend->refId); } +void taskDbRemoveAllFiles(void* pTaskDb) { + if (pTaskDb == NULL) { + return; + } + + STaskDbWrapper* pBackend = pTaskDb; + atomic_store_8(&pBackend->removeAllFiles, 1); +} + void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_env_t* env = rocksdb_create_default_env(); @@ -2573,8 +2582,7 @@ void taskDbDestroy(void* pDb, bool flush) { stDebug("succ to destroy stream backend:%p", wrapper); int8_t nCf = tListLen(ginitDict); - - if (flush) { + if (flush && wrapper->removeAllFiles == 0) { if (wrapper->db && wrapper->pCf) { rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_set_wait(flushOpt, 1); @@ -2636,6 +2644,11 @@ void taskDbDestroy(void* pDb, bool flush) { taskDbDestroyChkpOpt(wrapper); taosMemoryFree(wrapper->idstr); + + if (wrapper->removeAllFiles) { + char* err = NULL; + taosRemoveDir(wrapper->path); + } taosMemoryFree(wrapper->path); taosMemoryFree(wrapper); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f07fd81953..90167e446e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -275,7 +275,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } streamTaskCleanupCheckInfo(&pTask->taskCheckInfo); - streamFreeTaskState(pTask, status1); + streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0); if (pTask->pNameMap) { tSimpleHashCleanup(pTask->pNameMap); @@ -296,14 +296,14 @@ void tFreeStreamTask(SStreamTask* pTask) { taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); pTask->outputInfo.pNodeEpsetUpdateList = NULL; - if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { - char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); - sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); - taosRemoveDir(path); + // if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { + // char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); + // sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); + // taosRemoveDir(path); - stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); - taosMemoryFree(path); - } + // stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); + // taosMemoryFree(path); + // } if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); @@ -316,10 +316,12 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x free task completed", taskId); } -void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status) { +void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { if (pTask->pState != NULL) { stDebug("s-task:0x%x start to free task state", pTask->id.taskId); - streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); + streamStateClose(pTask->pState, remove); + + taskDbRemoveAllFiles(pTask->pBackend); taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index d3c39da6bd..04969c2b48 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -98,7 +98,7 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv static int32_t stopTaskSuccFn(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; - streamFreeTaskState(pTask, pSM->current.state); + streamFreeTaskState(pTask,pSM->current.state == TASK_STATUS__DROPPING ? 1 : 0); return TSDB_CODE_SUCCESS; } From 40537001a2bab42ca1bf529ce1e3b10e647a0df3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 11:57:18 +0800 Subject: [PATCH 5/6] fix(stream): check return value. --- source/libs/executor/inc/executil.h | 9 ++-- source/libs/executor/src/aggregateoperator.c | 30 +++++------ source/libs/executor/src/cachescanoperator.c | 4 +- .../libs/executor/src/countwindowoperator.c | 10 +++- .../libs/executor/src/eventwindowoperator.c | 10 +++- source/libs/executor/src/executil.c | 17 +++++-- source/libs/executor/src/filloperator.c | 19 ++++--- source/libs/executor/src/groupoperator.c | 51 ++++++++++--------- source/libs/executor/src/projectoperator.c | 30 +++++++---- source/libs/executor/src/scanoperator.c | 22 +++++--- source/libs/executor/src/sortoperator.c | 10 +++- .../executor/src/streamcountwindowoperator.c | 10 +++- .../executor/src/streameventwindowoperator.c | 10 +++- source/libs/executor/src/streamfilloperator.c | 17 +++++-- .../executor/src/streamtimewindowoperator.c | 42 ++++++++++++--- source/libs/executor/src/sysscanoperator.c | 5 +- source/libs/executor/src/timesliceoperator.c | 10 +++- source/libs/executor/src/timewindowoperator.c | 38 +++++++++++--- 18 files changed, 234 insertions(+), 110 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f3ceb33f64..2adc863baf 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -174,9 +174,9 @@ SArray* makeColumnArrayFromList(SNodeList* pNodeList); int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type, SColMatchInfo* pMatchInfo); -int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); -int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); -SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); +int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); +int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); +int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, SFunctionStateStore* pStore); @@ -197,9 +197,6 @@ char* getStreamOpName(uint16_t opType); void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr); void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr); -void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); -void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); - TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols); void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 7e105d2260..7c63120fcf 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -73,7 +73,13 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t lino = 0; + int32_t code = 0; + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -89,29 +95,23 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); + TSDB_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); - int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + TSDB_CHECK_CODE(code, lino, _error); } code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9d49c8e9ca..81d55ec092 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -200,7 +200,9 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl if (pScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* p = &pInfo->pseudoExprSup; - p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs); + code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs); + TSDB_CHECK_CODE(code, lino, _error); + p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); } diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index b7aa57e4b1..8d2ad4cbad 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -256,14 +256,19 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pCountWindowNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pCountWindowNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pCountWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } size_t keyBufSize = 0; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, @@ -286,6 +291,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pInfo->windowCount != pInfo->windowSliding) { numOfItem = pInfo->windowCount / pInfo->windowSliding + 1; } + pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem); if (!pInfo->countSup.pWinStates) { goto _error; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 629afbbb8e..6a39cac525 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -84,7 +84,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pEventWindowNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pEventWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -95,7 +98,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5957d08a18..cdc8cc6dd5 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1822,7 +1822,10 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) { return pExprs; } -SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { +int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) { + QRY_OPTR_CHECK(pExprInfo); + + int32_t code = 0; int32_t numOfFuncs = LIST_LENGTH(pNodeList); int32_t numOfGroupKeys = 0; if (pGroupKeys != NULL) { @@ -1831,10 +1834,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* *numOfExprs = numOfFuncs + numOfGroupKeys; if (*numOfExprs == 0) { - return NULL; + return code; } SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + if (pExprs == NULL) { + return terrno; + } for (int32_t i = 0; i < (*numOfExprs); ++i) { STargetNode* pTargetNode = NULL; @@ -1845,15 +1851,16 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* } SExprInfo* pExp = &pExprs[i]; - int32_t code = createExprFromTargetNode(pExp, pTargetNode); + code = createExprFromTargetNode(pExp, pTargetNode); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pExprs); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - return NULL; + return code; } } - return pExprs; + *pExprInfo = pExprs; + return code; } // set the output buffer for the selectivity + tag query diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index c4ef74608a..d88e09273f 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -455,6 +455,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = 0; + int32_t lino = 0; SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -464,21 +465,23 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi } pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); - SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr); + QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.pExprInfo = pExprInfo; SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp; - pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs); + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d88aef8fb7..43b2f5ab6d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -560,7 +560,8 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); } pInfo->pGroupCols = NULL; @@ -578,7 +579,11 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1125,42 +1130,42 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - pTaskInfo->code = code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code = terrno; goto _error; } int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); + pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); if (pPartNode->needBlockOutputTsOrder) { SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId}; pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo)); if (!pInfo->pOrderInfoArr) { - terrno = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = terrno; goto _error; } + void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order); QUERY_CHECK_NULL(tmp, code, lino, _error, terrno); } if (pPartNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num); + SExprInfo* pExprInfo1 = NULL; + code = createExprInfo(pPartNode->pExprs, NULL, &pExprInfo1, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = terrno; - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); if (pInfo->pGroupSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - pTaskInfo->code = terrno; goto _error; } @@ -1170,22 +1175,17 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_DISKSPACE; - pTaskInfo->code = terrno; qError("Create partition operator info failed since %s, tempDir:%s", terrstr(), tsTempDir); goto _error; } code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1195,8 +1195,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1210,8 +1208,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1224,7 +1220,7 @@ _error: } pTaskInfo->code = code; taosMemoryFreeClear(pOperator); - return code; + TAOS_RETURN(code); } int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, @@ -1663,7 +1659,10 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart if (pPartNode->part.pExprs != NULL) { int32_t num = 0; - SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num); + SExprInfo* pCalExprInfo = NULL; + code = createExprInfo(pPartNode->part.pExprs, NULL, &pCalExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -1724,7 +1723,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart QUERY_CHECK_CODE(code, lino, _error); int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPartNode->part.pTargets, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 295180652d..7185f74254 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -108,9 +108,13 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* int32_t lino = 0; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); + TSDB_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); + TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; @@ -258,14 +262,13 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; - - SExprSupp* pSup = &pOperator->exprSupp; - SSDataBlock* pRes = pInfo->pRes; - SSDataBlock* pFinalRes = pProjectInfo->pFinalRes; - int32_t code = 0; - int64_t st = 0; - int32_t order = pInfo->inputTsOrder; - int32_t scanFlag = 0; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->pRes; + SSDataBlock* pFinalRes = pProjectInfo->pFinalRes; + int32_t code = 0; + int64_t st = 0; + int32_t order = pInfo->inputTsOrder; + int32_t scanFlag = 0; blockDataCleanup(pFinalRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -465,11 +468,16 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; int32_t numOfExpr = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); + TSDB_CHECK_CODE(code, lino, _error); if (pPhyNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num); + SExprInfo* pSExpr = NULL; + code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index acc3de3447..d491ffb524 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1338,7 +1338,10 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa if (pScanNode->pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->base.pseudoSup; - pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); + pSup->pExprInfo = NULL; + code = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); } @@ -3981,13 +3984,12 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* // create the pseduo columns info if (pTableScanNode->scan.pScanPseudoCols != NULL) { - pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); + code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr); + QUERY_CHECK_CODE(code, lino, _error); } code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); pInfo->pRes = createDataBlockFromDescNode(pDescNode); code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); @@ -4539,7 +4541,11 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc; int32_t numOfExprs = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &pExprInfo, &numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -5694,7 +5700,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR if (pTableScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->base.pseudoSup; - pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); + code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a0c56df49c..a08787d358 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -60,6 +60,8 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN QRY_OPTR_CHECK(pOptrInfo); int32_t code = 0; + int32_t lino = 0; + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -71,7 +73,9 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); + code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.numOfExprs = numOfCols; int32_t numOfOutputCols = 0; code = @@ -770,7 +774,9 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); pSup->pExprInfo = pExprInfo; pSup->numOfExprs = numOfCols; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 6adc60b79e..8ac73b44f6 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -823,13 +823,19 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pCountNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } SExprSupp* pExpSup = &pOperator->exprSupp; - SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 17ef2fe41f..1311216c06 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -864,7 +864,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pEventNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pEventNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pEventNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -884,7 +887,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 480814f6a0..39aedd9d59 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1190,7 +1190,11 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod } pFillSup->numOfFillCols = numOfFillCols; int32_t numOfNotFillCols = 0; - SExprInfo* noFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); + SExprInfo* noFillExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols); + QUERY_CHECK_CODE(code, lino, _end); + pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols, (const SNodeListNode*)(pPhyFillNode->pValues)); pFillSup->type = convertFillType(pPhyFillNode->mode); @@ -1201,7 +1205,10 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod code = initResultBuf(pFillSup); QUERY_CHECK_CODE(code, lino, _end); - SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); + SExprInfo* noFillExpr = NULL; + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols); + QUERY_CHECK_CODE(code, lino, _end); + code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore); QUERY_CHECK_CODE(code, lino, _end); @@ -1343,7 +1350,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval; int32_t numOfFillCols = 0; - SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols); + SExprInfo* pFillExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7462d71a8a..3c696a1be8 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1880,13 +1880,20 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN initResultSizeInfo(&pOperator->resultInfo, 4096); if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); @@ -3690,7 +3697,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode initResultSizeInfo(&pOperator->resultInfo, 4096); if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3698,7 +3708,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } SExprSupp* pExpSup = &pOperator->exprSupp; - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -4831,7 +4844,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pStateNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -4849,7 +4865,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -5126,7 +5145,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); pInfo->interval = (SInterval){ @@ -5174,7 +5196,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 5f4bbd66ce..90c760136e 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2716,7 +2716,10 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP pInfo->uid = (pBlockScanNode->suid != 0) ? pBlockScanNode->suid : pBlockScanNode->uid; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 6eaef50491..99a66efecb 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1126,13 +1126,19 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN SExprSupp* pSup = &pOperator->exprSupp; int32_t numOfExprs = 0; - SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pInterpPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); if (pInterpPhyNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pInterpPhyNode->pExprs, NULL, &pScalarExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a1ec923352..989eb97327 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1298,7 +1298,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode QUERY_CHECK_CODE(code, lino, _error); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1336,7 +1339,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1578,7 +1584,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy if (pStateNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1603,7 +1612,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, @@ -1682,7 +1694,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1709,7 +1724,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -2012,7 +2030,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge initResultSizeInfo(&pOperator->resultInfo, 512); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); @@ -2312,7 +2332,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, From 093e7ef0bb0127a1307025ea45f8b0c6e41d07d0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 5 Aug 2024 12:06:01 +0800 Subject: [PATCH 6/6] refactor remve backend code --- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 2 +- source/libs/stream/src/streamTask.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 3bb4532db3..3a5d72576b 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -153,7 +153,7 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void* taskDbAddRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb); -void taskDbRemoveAllFiles(void* pTaskDb); +void taskDbSetClearFileFlag(void* pTaskDb); int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8498c9118a..e3f747fb22 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2331,7 +2331,7 @@ void taskDbRemoveRef(void* pTaskDb) { (void)taosReleaseRef(taskDbWrapperId, pBackend->refId); } -void taskDbRemoveAllFiles(void* pTaskDb) { +void taskDbSetClearFileFlag(void* pTaskDb) { if (pTaskDb == NULL) { return; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 90167e446e..c5b1284560 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -321,7 +321,7 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { stDebug("s-task:0x%x start to free task state", pTask->id.taskId); streamStateClose(pTask->pState, remove); - taskDbRemoveAllFiles(pTask->pBackend); + taskDbSetClearFileFlag(pTask->pBackend); taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL;