From 76f56d940b19ce48df661dff6bb6993a47e9a2ba Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 16 Aug 2024 10:26:09 +0800 Subject: [PATCH] fix issue --- .../executor/src/streamtimewindowoperator.c | 28 +++++++++++++------ source/libs/stream/src/tstreamFileState.c | 25 +++++++++++------ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 756a6d71e1..5c12db1ab9 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -477,10 +477,12 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidPulloverRes); - pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + if (pInfo->stateStore.streamFileStateDestroy != NULL) { + pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + } taosArrayDestroy(pInfo->pMidPullDatas); - if (pInfo->pState->dump == 1) { + if (pInfo->pState !=NULL && pInfo->pState->dump == 1) { taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); taosMemoryFreeClear(pInfo->pState->pTdbState); } @@ -1953,12 +1955,14 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH); + QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno); + pInfo->dataVersion = 0; - pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; pInfo->recvPullover = false; pInfo->recvRetrive = false; @@ -2032,7 +2036,9 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { tSimpleHashCleanup(pSup->pResultRows); destroyDiskbasedBuf(pSup->pResultBuf); blockDataDestroy(pSup->pScanBlock); - pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState); + if (pSup->stateStore.streamFileStateDestroy != NULL) { + pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState); + } taosMemoryFreeClear(pSup->pState); taosMemoryFreeClear(pSup->pDummyCtx); } @@ -2141,7 +2147,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, SStorageAPI* pApi, int32_t tsIndex) { pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); - + int32_t lino = 0; int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock); if (code) { return code; @@ -2156,6 +2162,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in } pSup->stateStore = *pStore; + pSup->pSessionAPI = pApi; initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); @@ -2168,6 +2175,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT); + QUERY_CHECK_NULL(pSup->pState->pFileState, code, lino, _end, terrno); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); @@ -2179,8 +2187,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } - pSup->pSessionAPI = pApi; - return TSDB_CODE_SUCCESS; +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) { @@ -5308,9 +5319,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pUpdatedMap = NULL; int32_t funResSize = getMaxFunResSize(pSup, numOfCols); + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH); + QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -5319,7 +5332,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); - pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 5dacd4c80c..3cdbad2dd5 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -131,21 +131,27 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } if (rowSize == 0) { + code = TSDB_CODE_INVALID_PARA; goto _error; } SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState)); - if (!pFileState) { - goto _error; - } + QUERY_CHECK_NULL(pFileState, code, lino, _error, terrno); + rowSize += selectRowSize; pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); + QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _error, terrno); + pFileState->freeBuffs = tdListNew(POINTER_BYTES); + QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _error, terrno); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); if (type == STREAM_STATE_BUFF_HASH) { @@ -171,10 +177,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->cfName = taosStrdup("sess"); pFileState->stateFunctionGetFn = getSessionRowBuff; } - - if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { - goto _error; - } + QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _error, terrno); pFileState->keyLen = keySize; pFileState->rowSize = rowSize; @@ -188,6 +191,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->flushMark = INT64_MIN; pFileState->maxTs = INT64_MIN; pFileState->id = taosStrdup(taskId); + QUERY_CHECK_NULL(pFileState->id, code, lino, _error, terrno); // todo(liuyao) optimize if (type == STREAM_STATE_BUFF_HASH) { @@ -198,8 +202,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ void* valBuf = NULL; int32_t len = 0; - int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); - if (code == TSDB_CODE_SUCCESS) { + int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); + if (tmpRes == TSDB_CODE_SUCCESS) { ASSERT(len == sizeof(TSKEY)); streamFileStateDecode(&pFileState->flushMark, valBuf, len); qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark); @@ -208,6 +212,9 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ return pFileState; _error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } streamFileStateDestroy(pFileState); return NULL; }