From e9550a6b2cef5d83fe72a700576f7cf16801493f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 18 Oct 2024 15:40:37 +0800 Subject: [PATCH] feat(stream):save force window close scan range --- include/libs/executor/executor.h | 2 +- include/libs/executor/storageapi.h | 4 +- include/libs/stream/tstreamUpdate.h | 4 +- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/executor.c | 8 +- source/libs/executor/src/scanoperator.c | 160 +++++++++++++++++++++--- source/libs/stream/src/streamSched.c | 3 +- source/libs/stream/src/streamUpdate.c | 90 ++++++------- 8 files changed, 190 insertions(+), 82 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index cd78e97e39..438b4c869d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -222,7 +222,7 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); bool qStreamScanhistoryFinished(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); void qResetTaskInfoCode(qTaskInfo_t tinfo); -int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval); +int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow); int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index cd426a3a3a..db0d6339c8 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -417,8 +417,8 @@ typedef struct SStateStore { SUpdateInfo** ppInfo); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo); - int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen); - int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo); + int32_t (*updateInfoSerialize)(SEncoder* pEncoder, const SUpdateInfo* pInfo); + int32_t (*updateInfoDeserialize)(SDecoder* pDeCoder, SUpdateInfo* pInfo); SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key); SStreamStateCur* (*streamStateCountSeekKeyPrev)(SStreamState* pState, const SSessionKey* pKey, COUNT_TYPE count); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 06465e79e5..32712736c2 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -36,8 +36,8 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid); void updateInfoDestroy(SUpdateInfo* pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo* pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo); -int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen); -int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo); +int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo); +int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo); void windowSBfDelete(SUpdateInfo* pInfo, uint64_t count); int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count); bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 26d59a90ba..aa7344758a 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -537,6 +537,7 @@ typedef struct SStreamScanInfo { int8_t pkColType; int32_t pkColLen; bool useGetResultRange; + STimeWindow lastScanRange; } SStreamScanInfo; typedef struct { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index f3764f1fd8..a5211f2da0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1092,9 +1092,9 @@ _end: return code; } -static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval) { +static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval); + return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval, pLastWindow); } SStreamScanInfo* pScanOp = (SStreamScanInfo*) pOperator->info; *pWaterMark = pScanOp->twAggSup.waterMark; @@ -1102,10 +1102,10 @@ static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterM return TSDB_CODE_SUCCESS; } -int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval) { +int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; - return getOpratorIntervalInfo(pOperator, pWaterMark, pInterval); + return getOpratorIntervalInfo(pOperator, pWaterMark, pInterval, pLastWindow); } int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f10cb14686..32012c9fbb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3313,31 +3313,78 @@ _end: } int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t* pLen) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - int32_t len = 0; - code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len); - QUERY_CHECK_CODE(code, lino, _end); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t len = 0; + SEncoder* pEnCoder = NULL; + SEncoder* pScanEnCoder = NULL; len += encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup); + SEncoder encoder = {0}; + pEnCoder = &encoder; + tEncoderInit(pEnCoder, NULL, 0); + if (tStartEncode(pEnCoder) != 0) { + code = TSDB_CODE_STREAM_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + code = pInfo->stateStore.updateInfoSerialize(pEnCoder, pInfo->pUpdateInfo); + QUERY_CHECK_CODE(code, lino, _end); + + if (tEncodeI64(pEnCoder, pInfo->lastScanRange.skey) < 0) { + code = TSDB_CODE_STREAM_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + if (tEncodeI64(pEnCoder, pInfo->lastScanRange.ekey) < 0) { + code = TSDB_CODE_STREAM_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + + tEndEncode(pEnCoder); + len += encoder.pos; + tEncoderClear(pEnCoder); + pEnCoder = NULL; + *pBuff = taosMemoryCalloc(1, len); if (!(*pBuff)) { code = terrno; QUERY_CHECK_CODE(code, lino, _end); } void* buf = *pBuff; - (void)encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup); + int32_t stwLen = encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup); - int32_t tmp = 0; - code = pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo, &tmp); + SEncoder scanEncoder = {0}; + pScanEnCoder = &scanEncoder; + tEncoderInit(pScanEnCoder, buf, len - stwLen); + if (tStartEncode(pScanEnCoder) != 0) { + code = TSDB_CODE_FAILED; + QUERY_CHECK_CODE(code, lino, _end); + } + code = pInfo->stateStore.updateInfoSerialize(pScanEnCoder, pInfo->pUpdateInfo); QUERY_CHECK_CODE(code, lino, _end); + if (tEncodeI64(pScanEnCoder, pInfo->lastScanRange.skey) < 0) { + code = TSDB_CODE_STREAM_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + if (tEncodeI64(pScanEnCoder, pInfo->lastScanRange.ekey) < 0) { + code = TSDB_CODE_STREAM_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + *pLen = len; _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } + if (pEnCoder != NULL) { + tEndEncode(pEnCoder); + tEncoderClear(pEnCoder); + } + if (pScanEnCoder != NULL) { + tEndEncode(pScanEnCoder); + tEncoderClear(pScanEnCoder); + } return code; } @@ -3366,28 +3413,70 @@ _end: // other properties are recovered from the execution plan void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SDecoder* pDeCoder = NULL; if (!pBuff || len == 0) { - return; + lino = __LINE__; + goto _end; } void* buf = pBuff; buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); int32_t tlen = len - encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup); if (tlen == 0) { - return; + lino = __LINE__; + goto _end; } void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); if (!pUpInfo) { - return; + lino = __LINE__; + goto _end; } - int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo); + SDecoder decoder = {0}; + pDeCoder = &decoder; + tDecoderInit(pDeCoder, buf, tlen); + if (tStartDecode(&decoder) < 0) { + lino = __LINE__; + goto _end; + } + + code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); if (code == TSDB_CODE_SUCCESS) { pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->pUpdateInfo = pUpInfo; } else { taosMemoryFree(pUpInfo); + lino = __LINE__; + goto _end; } + + if (tDecodeIsEnd(pDeCoder)) { + lino = __LINE__; + goto _end; + } + + SET_WIN_KEY_INVALID(pInfo->lastScanRange.skey); + SET_WIN_KEY_INVALID(pInfo->lastScanRange.ekey); + + if (tDecodeI64(pDeCoder, &pInfo->lastScanRange.skey) < 0) { + lino = __LINE__; + goto _end; + } + + if (tDecodeI64(pDeCoder, &pInfo->lastScanRange.ekey) < 0) { + lino = __LINE__; + goto _end; + } + +_end: + if (pDeCoder != NULL) { + tEndDecode(pDeCoder); + tDecoderClear(pDeCoder); + } + qInfo("%s end at line %d", __func__, lino); } + static bool hasScanRange(SStreamScanInfo* pInfo) { SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup; return pSup && pSup->pScanBlock->info.rows > 0 && (isStateWindow(pInfo) || isCountWindow(pInfo)); @@ -3634,6 +3723,7 @@ FETCH_NEXT_BLOCK: case STREAM_GET_RESULT: { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; + pInfo->lastScanRange = pBlock->info.window; TSKEY endKey = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval); if (pInfo->useGetResultRange == true) { endKey = pBlock->info.window.ekey; @@ -4141,6 +4231,8 @@ void streamScanReleaseState(SOperatorInfo* pOperator) { int32_t lino = 0; SStreamScanInfo* pInfo = pOperator->info; void* pBuff = NULL; + SEncoder* pEnCoder = NULL; + SEncoder* pScanEnCoder = NULL; if (!pInfo->pState) { return; } @@ -4148,18 +4240,36 @@ void streamScanReleaseState(SOperatorInfo* pOperator) { qDebug("stask:%s streamScanReleaseState cancel", GET_TASKID(pOperator->pTaskInfo)); return; } - int32_t len = 0; - code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len); + int32_t len = 0; + SEncoder encoder = {0}; + pEnCoder = &encoder; + tEncoderInit(pEnCoder, NULL, 0); + if (tStartEncode(pEnCoder) != 0) { + code = TSDB_CODE_FAILED; + QUERY_CHECK_CODE(code, lino, _end); + } + code = pInfo->stateStore.updateInfoSerialize(pEnCoder, pInfo->pUpdateInfo); QUERY_CHECK_CODE(code, lino, _end); + tEndEncode(pEnCoder); + len += encoder.pos; + tEncoderClear(pEnCoder); + pEnCoder = NULL; + pBuff = taosMemoryCalloc(1, len); if (!pBuff) { code = terrno; QUERY_CHECK_CODE(code, lino, _end); } - int32_t tmp = 0; - code = pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo, &tmp); + SEncoder scanEncoder = {0}; + pScanEnCoder = &scanEncoder; + tEncoderInit(pScanEnCoder, pBuff, len); + if (tStartEncode(pScanEnCoder) != 0) { + code = TSDB_CODE_FAILED; + QUERY_CHECK_CODE(code, lino, _end); + } + code = pInfo->stateStore.updateInfoSerialize(pScanEnCoder, pInfo->pUpdateInfo); QUERY_CHECK_CODE(code, lino, _end); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), @@ -4168,12 +4278,21 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } + if (pEnCoder != NULL) { + tEndEncode(pEnCoder); + tEncoderClear(pEnCoder); + } + if (pScanEnCoder != NULL) { + tEndEncode(pScanEnCoder); + tEncoderClear(pScanEnCoder); + } taosMemoryFree(pBuff); } void streamScanReloadState(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SDecoder* pDeCoder = NULL; SStreamScanInfo* pInfo = pOperator->info; if (!pInfo->pState) { return; @@ -4194,7 +4313,10 @@ void streamScanReloadState(SOperatorInfo* pOperator) { QUERY_CHECK_CODE(code, lino, _end); } - int32_t winCode = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo); + SDecoder decoder = {0}; + pDeCoder = &decoder; + tDecoderInit(pDeCoder, pBuff, len); + int32_t winCode = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); taosMemoryFree(pBuff); if (winCode == TSDB_CODE_SUCCESS && pInfo->pUpdateInfo) { if (pInfo->pUpdateInfo->minTS < 0) { @@ -4231,6 +4353,10 @@ void streamScanReloadState(SOperatorInfo* pOperator) { } _end: + if (pDeCoder != NULL) { + tEndDecode(pDeCoder); + tDecoderClear(pDeCoder); + } if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 82c5b07c59..cfb033fb71 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -33,7 +33,8 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { if ((pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) && (pTask->info.taskLevel == TASK_LEVEL__SOURCE)) { int64_t waterMark = 0; SInterval interval = {0}; - code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval); + STimeWindow lastTimeWindow = {0}; + code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval, &lastTimeWindow); if (code) { stError("s-task:%s failed to init scheduler info, code:%s", id, tstrerror(code)); return; diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 764045eed4..a3cfa00127 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -441,76 +441,69 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo) { pInfo->pCloseWinSBF = NULL; } -int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen) { +int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (!pInfo) { return TSDB_CODE_SUCCESS; } - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) != 0) { - code = TSDB_CODE_FAILED; - QUERY_CHECK_CODE(code, lino, _end); - } - int32_t size = taosArrayGetSize(pInfo->pTsBuckets); - if (tEncodeI32(&encoder, size) < 0) { + if (tEncodeI32(pEncoder, size) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } for (int32_t i = 0; i < size; i++) { TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i); - if (tEncodeI64(&encoder, *pTs) < 0) { + if (tEncodeI64(pEncoder, *pTs) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } } - if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) { + if (tEncodeU64(pEncoder, pInfo->numBuckets) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs); - if (tEncodeI32(&encoder, sBfSize) < 0) { + if (tEncodeI32(pEncoder, sBfSize) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } for (int32_t i = 0; i < sBfSize; i++) { SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i); - if (tScalableBfEncode(pSBf, &encoder) < 0) { + if (tScalableBfEncode(pSBf, pEncoder) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } } - if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) { + if (tEncodeU64(pEncoder, pInfo->numSBFs) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } - if (tEncodeI64(&encoder, pInfo->interval) < 0) { + if (tEncodeI64(pEncoder, pInfo->interval) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } - if (tEncodeI64(&encoder, pInfo->watermark) < 0) { + if (tEncodeI64(pEncoder, pInfo->watermark) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } - if (tEncodeI64(&encoder, pInfo->minTS) < 0) { + if (tEncodeI64(pEncoder, pInfo->minTS) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } - if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) { + if (tScalableBfEncode(pInfo->pCloseWinSBF, pEncoder) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } int32_t mapSize = taosHashGetSize(pInfo->pMap); - if (tEncodeI32(&encoder, mapSize) < 0) { + if (tEncodeI32(pEncoder, mapSize) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } @@ -518,60 +511,51 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, size_t keyLen = 0; while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) { void* key = taosHashGetKey(pIte, &keyLen); - if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) { + if (tEncodeU64(pEncoder, *(uint64_t*)key) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } int32_t valueSize = taosHashGetValueSize(pIte); - if (tEncodeBinary(&encoder, (const uint8_t*)pIte, valueSize) < 0) { + if (tEncodeBinary(pEncoder, (const uint8_t*)pIte, valueSize) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } } - if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) { + if (tEncodeU64(pEncoder, pInfo->maxDataVersion) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } - if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) { + if (tEncodeI32(pEncoder, pInfo->pkColLen) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } - if (tEncodeI8(&encoder, pInfo->pkColType) < 0) { + if (tEncodeI8(pEncoder, pInfo->pkColType) < 0) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); } - tEndEncode(&encoder); - - int32_t tlen = encoder.pos; - *pLen = tlen; - _end: - tEncoderClear(&encoder); if (code != TSDB_CODE_SUCCESS) { uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; } -int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { +int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; QUERY_CHECK_NULL(pInfo, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - + int32_t size = 0; - if (tDecodeI32(&decoder, &size) < 0) return -1; + if (tDecodeI32(pDeCoder, &size) < 0) return -1; pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno); TSKEY ts = INT64_MIN; for (int32_t i = 0; i < size; i++) { - if (tDecodeI64(&decoder, &ts) < 0) return -1; + if (tDecodeI64(pDeCoder, &ts) < 0) return -1; void* tmp = taosArrayPush(pInfo->pTsBuckets, &ts); if (!tmp) { code = terrno; @@ -579,16 +563,16 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { } } - if (tDecodeU64(&decoder, &pInfo->numBuckets) < 0) return -1; + if (tDecodeU64(pDeCoder, &pInfo->numBuckets) < 0) return -1; int32_t sBfSize = 0; - if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; + if (tDecodeI32(pDeCoder, &sBfSize) < 0) return -1; pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void*)); QUERY_CHECK_NULL(pInfo->pTsSBFs, code, lino, _error, terrno); for (int32_t i = 0; i < sBfSize; i++) { SScalableBf* pSBf = NULL; - code = tScalableBfDecode(&decoder, &pSBf); + code = tScalableBfDecode(pDeCoder, &pSBf); QUERY_CHECK_CODE(code, lino, _error); void* tmp = taosArrayPush(pInfo->pTsSBFs, &pSBf); @@ -598,36 +582,36 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { } } - if (tDecodeU64(&decoder, &pInfo->numSBFs) < 0) return -1; - if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1; - if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1; - if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1; + if (tDecodeU64(pDeCoder, &pInfo->numSBFs) < 0) return -1; + if (tDecodeI64(pDeCoder, &pInfo->interval) < 0) return -1; + if (tDecodeI64(pDeCoder, &pInfo->watermark) < 0) return -1; + if (tDecodeI64(pDeCoder, &pInfo->minTS) < 0) return -1; - code = tScalableBfDecode(&decoder, &pInfo->pCloseWinSBF); + code = tScalableBfDecode(pDeCoder, &pInfo->pCloseWinSBF); if (code != TSDB_CODE_SUCCESS) { pInfo->pCloseWinSBF = NULL; code = TSDB_CODE_SUCCESS; } int32_t mapSize = 0; - if (tDecodeI32(&decoder, &mapSize) < 0) return -1; + if (tDecodeI32(pDeCoder, &mapSize) < 0) return -1; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK); uint64_t uid = 0; void* pVal = NULL; uint32_t valSize = 0; for (int32_t i = 0; i < mapSize; i++) { - if (tDecodeU64(&decoder, &uid) < 0) return -1; - if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1; + if (tDecodeU64(pDeCoder, &uid) < 0) return -1; + if (tDecodeBinary(pDeCoder, (uint8_t**)&pVal, &valSize) < 0) return -1; code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize); QUERY_CHECK_CODE(code, lino, _error); } QUERY_CHECK_CONDITION((mapSize == taosHashGetSize(pInfo->pMap)), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); - if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; + if (tDecodeU64(pDeCoder, &pInfo->maxDataVersion) < 0) return -1; - if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1; - if (tDecodeI8(&decoder, &pInfo->pkColType) < 0) return -1; + if (tDecodeI32(pDeCoder, &pInfo->pkColLen) < 0) return -1; + if (tDecodeI8(pDeCoder, &pInfo->pkColType) < 0) return -1; pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen); QUERY_CHECK_NULL(pInfo->pKeyBuff, code, lino, _error, terrno); @@ -643,10 +627,6 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { pInfo->comparePkCol = NULL; } - tEndDecode(&decoder); - - tDecoderClear(&decoder); - _error: if (code != TSDB_CODE_SUCCESS) { uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));