From b649f1f1c41988563a35887144a063f34db80bb4 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sat, 14 Sep 2024 10:51:57 +0800 Subject: [PATCH] fix(stream):adj build file state res --- include/libs/executor/storageapi.h | 6 +-- include/libs/stream/tstreamFileState.h | 10 ++-- .../executor/src/streamtimewindowoperator.c | 41 ++++++++-------- source/libs/stream/src/streamUpdate.c | 4 +- source/libs/stream/src/tstreamFileState.c | 48 ++++++++++++------- 5 files changed, 62 insertions(+), 47 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 61ae034450..ff42643b75 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -408,9 +408,9 @@ typedef struct SStateStore { SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key); SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key); - struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, - uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, - const char* id, int64_t ckId, int8_t type); + int32_t (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, + GetTsFun fp, void* pFile, TSKEY delMark, const char* id, int64_t ckId, int8_t type, + struct SStreamFileState** ppFileState); void (*streamFileStateDestroy)(struct SStreamFileState* pFileState); void (*streamFileStateClear)(struct SStreamFileState* pFileState); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 8c005fa994..6f1a1b3b98 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -45,9 +45,9 @@ typedef int32_t (*_state_fun_get_fn)(SStreamFileState* pFileState, void* pKey, i typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2); -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, - GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, - int8_t type); +int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, + void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type, + struct SStreamFileState** ppFileState); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState); @@ -63,7 +63,7 @@ int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); -void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); +int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); @@ -89,7 +89,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen); SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen); -void recoverSesssion(SStreamFileState* pFileState, int64_t ckId); +int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId); void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateCleanup(void* pBuff); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 6196647eff..b9484decdc 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1991,10 +1991,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->pUpdatedMap = NULL; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); - pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( - tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH); - QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno); + pInfo->pState->pFileState = NULL; + code = + pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, + compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), + pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState); + QUERY_CHECK_CODE(code, lino, _error); pInfo->dataVersion = 0; pInfo->recvGetAll = false; @@ -2176,39 +2178,33 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); int32_t lino = 0; int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock); - if (code) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); pSup->gap = gap; pSup->stateKeySize = keySize; pSup->stateKeyType = keyType; pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); - if (pSup->pDummyCtx == NULL) { - return terrno; - } + QUERY_CHECK_NULL(pSup->pDummyCtx, code, lino, _end, terrno); pSup->stateStore = *pStore; pSup->pSessionAPI = pApi; initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - if (!pSup->pState) { - return terrno; - } + QUERY_CHECK_NULL(pSup->pState, code, lino, _end, terrno); + *(pSup->pState) = *pState; pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); - pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( + pSup->pState->pFileState = NULL; + code = pSup->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, - pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT); - QUERY_CHECK_NULL(pSup->pState->pFileState, code, lino, _end, terrno); + pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState); + QUERY_CHECK_CODE(code, lino, _end); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); - if (!pSup->pResultRows) { - return terrno; - } + QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno); for (int32_t i = 0; i < numOfOutput; ++i) { pExpSup->pCtx[i].saveHandle.pState = pSup->pState; @@ -5348,10 +5344,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t funResSize = getMaxFunResSize(pSup, numOfCols); pInfo->stateStore = pTaskInfo->storageAPI.stateStore; - pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( + pInfo->pState->pFileState = NULL; + code = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH); - QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState); + QUERY_CHECK_CODE(code, lino, _error); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 9a1a30ac7b..cc80e27467 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -345,7 +345,7 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p void** pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; TSKEY maxTs = *(TSKEY*)taosArrayGet(pInfo->pTsBuckets, index); - if (ts < maxTs - pInfo->watermark && maxTs != INT64_MIN) { + if (maxTs != INT64_MIN && ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res); @@ -585,6 +585,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { int32_t sBfSize = 0; if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void*)); + QUERY_CHECK_NULL(pInfo->pTsSBFs, code, lino, _error, terrno); + for (int32_t i = 0; i < sBfSize; i++) { SScalableBf* pSBf = NULL; code = tScalableBfDecode(&decoder, &pSBf); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index aa237efd04..abb796b0b7 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -127,9 +127,9 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { int32_t tmp = taosEncodeFixedI64(&buff, *pKey); } -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, - GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, - int8_t type) { +int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, + void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type, + SStreamFileState** ppFileState) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (memSize <= 0) { @@ -194,10 +194,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ // todo(liuyao) optimize if (type == STREAM_STATE_BUFF_HASH) { - recoverSnapshot(pFileState, checkpointId); + code = recoverSnapshot(pFileState, checkpointId); } else { - recoverSesssion(pFileState, checkpointId); + code = recoverSesssion(pFileState, checkpointId); } + QUERY_CHECK_CODE(code, lino, _error); void* valBuf = NULL; int32_t len = 0; @@ -208,14 +209,14 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark); } taosMemoryFreeClear(valBuf); - return pFileState; + (*ppFileState) = pFileState; _error: if (code != TSDB_CODE_SUCCESS) { + streamFileStateDestroy(pFileState); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - streamFileStateDestroy(pFileState); - return NULL; + return code; } void destroyRowBuffPos(SRowBuffPos* pPos) { @@ -806,8 +807,10 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { return code; } -void recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { +int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t winRes = TSDB_CODE_SUCCESS; if (pFileState->maxTs != INT64_MIN) { int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN @@ -818,7 +821,7 @@ void recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX); int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount); - while (code == TSDB_CODE_SUCCESS) { + while (winRes == TSDB_CODE_SUCCESS) { if (pFileState->curRowCount >= recoverNum) { break; } @@ -826,22 +829,34 @@ void recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { void* pVal = NULL; int32_t vlen = 0; SSessionKey key = {0}; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen); - if (code != TSDB_CODE_SUCCESS) { + winRes = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen); + if (winRes != TSDB_CODE_SUCCESS) { break; } + + if (vlen != pFileState->rowSize) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen); - code = putSessionWinResultBuff(pFileState, pPos); - if (code != TSDB_CODE_SUCCESS) { + winRes = putSessionWinResultBuff(pFileState, pPos); + if (winRes != TSDB_CODE_SUCCESS) { break; } - code = streamStateSessionCurPrev_rocksdb(pCur); + winRes = streamStateSessionCurPrev_rocksdb(pCur); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } streamStateFreeCur(pCur); + return code; } -void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { +int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t winCode = TSDB_CODE_SUCCESS; @@ -896,6 +911,7 @@ _end: qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } streamStateFreeCur(pCur); + return code; } int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }