feat(stream):save force window close scan range

This commit is contained in:
54liuyao 2024-10-18 15:40:37 +08:00
parent 6a2a5b80a1
commit e9550a6b2c
8 changed files with 190 additions and 82 deletions

View File

@ -222,7 +222,7 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
void qResetTaskInfoCode(qTaskInfo_t tinfo);
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval);
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow);
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);

View File

@ -417,8 +417,8 @@ typedef struct SStateStore {
SUpdateInfo** ppInfo);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen);
int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
int32_t (*updateInfoSerialize)(SEncoder* pEncoder, const SUpdateInfo* pInfo);
int32_t (*updateInfoDeserialize)(SDecoder* pDeCoder, SUpdateInfo* pInfo);
SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* (*streamStateCountSeekKeyPrev)(SStreamState* pState, const SSessionKey* pKey, COUNT_TYPE count);

View File

@ -36,8 +36,8 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid);
void updateInfoDestroy(SUpdateInfo* pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo* pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo);
int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen);
int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo);
int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo);
void windowSBfDelete(SUpdateInfo* pInfo, uint64_t count);
int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count);
bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);

View File

@ -537,6 +537,7 @@ typedef struct SStreamScanInfo {
int8_t pkColType;
int32_t pkColLen;
bool useGetResultRange;
STimeWindow lastScanRange;
} SStreamScanInfo;
typedef struct {

View File

@ -1092,9 +1092,9 @@ _end:
return code;
}
static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval) {
static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval);
return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval, pLastWindow);
}
SStreamScanInfo* pScanOp = (SStreamScanInfo*) pOperator->info;
*pWaterMark = pScanOp->twAggSup.waterMark;
@ -1102,10 +1102,10 @@ static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterM
return TSDB_CODE_SUCCESS;
}
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval) {
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
return getOpratorIntervalInfo(pOperator, pWaterMark, pInterval);
return getOpratorIntervalInfo(pOperator, pWaterMark, pInterval, pLastWindow);
}
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {

View File

@ -3313,31 +3313,78 @@ _end:
}
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t* pLen) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t len = 0;
code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len);
QUERY_CHECK_CODE(code, lino, _end);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t len = 0;
SEncoder* pEnCoder = NULL;
SEncoder* pScanEnCoder = NULL;
len += encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup);
SEncoder encoder = {0};
pEnCoder = &encoder;
tEncoderInit(pEnCoder, NULL, 0);
if (tStartEncode(pEnCoder) != 0) {
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
code = pInfo->stateStore.updateInfoSerialize(pEnCoder, pInfo->pUpdateInfo);
QUERY_CHECK_CODE(code, lino, _end);
if (tEncodeI64(pEnCoder, pInfo->lastScanRange.skey) < 0) {
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(pEnCoder, pInfo->lastScanRange.ekey) < 0) {
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
tEndEncode(pEnCoder);
len += encoder.pos;
tEncoderClear(pEnCoder);
pEnCoder = NULL;
*pBuff = taosMemoryCalloc(1, len);
if (!(*pBuff)) {
code = terrno;
QUERY_CHECK_CODE(code, lino, _end);
}
void* buf = *pBuff;
(void)encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup);
int32_t stwLen = encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup);
int32_t tmp = 0;
code = pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo, &tmp);
SEncoder scanEncoder = {0};
pScanEnCoder = &scanEncoder;
tEncoderInit(pScanEnCoder, buf, len - stwLen);
if (tStartEncode(pScanEnCoder) != 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
code = pInfo->stateStore.updateInfoSerialize(pScanEnCoder, pInfo->pUpdateInfo);
QUERY_CHECK_CODE(code, lino, _end);
if (tEncodeI64(pScanEnCoder, pInfo->lastScanRange.skey) < 0) {
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(pScanEnCoder, pInfo->lastScanRange.ekey) < 0) {
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
*pLen = len;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pEnCoder != NULL) {
tEndEncode(pEnCoder);
tEncoderClear(pEnCoder);
}
if (pScanEnCoder != NULL) {
tEndEncode(pScanEnCoder);
tEncoderClear(pScanEnCoder);
}
return code;
}
@ -3366,28 +3413,70 @@ _end:
// other properties are recovered from the execution plan
void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SDecoder* pDeCoder = NULL;
if (!pBuff || len == 0) {
return;
lino = __LINE__;
goto _end;
}
void* buf = pBuff;
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
int32_t tlen = len - encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup);
if (tlen == 0) {
return;
lino = __LINE__;
goto _end;
}
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (!pUpInfo) {
return;
lino = __LINE__;
goto _end;
}
int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo);
SDecoder decoder = {0};
pDeCoder = &decoder;
tDecoderInit(pDeCoder, buf, tlen);
if (tStartDecode(&decoder) < 0) {
lino = __LINE__;
goto _end;
}
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
if (code == TSDB_CODE_SUCCESS) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo;
} else {
taosMemoryFree(pUpInfo);
lino = __LINE__;
goto _end;
}
if (tDecodeIsEnd(pDeCoder)) {
lino = __LINE__;
goto _end;
}
SET_WIN_KEY_INVALID(pInfo->lastScanRange.skey);
SET_WIN_KEY_INVALID(pInfo->lastScanRange.ekey);
if (tDecodeI64(pDeCoder, &pInfo->lastScanRange.skey) < 0) {
lino = __LINE__;
goto _end;
}
if (tDecodeI64(pDeCoder, &pInfo->lastScanRange.ekey) < 0) {
lino = __LINE__;
goto _end;
}
_end:
if (pDeCoder != NULL) {
tEndDecode(pDeCoder);
tDecoderClear(pDeCoder);
}
qInfo("%s end at line %d", __func__, lino);
}
static bool hasScanRange(SStreamScanInfo* pInfo) {
SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
return pSup && pSup->pScanBlock->info.rows > 0 && (isStateWindow(pInfo) || isCountWindow(pInfo));
@ -3634,6 +3723,7 @@ FETCH_NEXT_BLOCK:
case STREAM_GET_RESULT: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0;
pInfo->lastScanRange = pBlock->info.window;
TSKEY endKey = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
if (pInfo->useGetResultRange == true) {
endKey = pBlock->info.window.ekey;
@ -4141,6 +4231,8 @@ void streamScanReleaseState(SOperatorInfo* pOperator) {
int32_t lino = 0;
SStreamScanInfo* pInfo = pOperator->info;
void* pBuff = NULL;
SEncoder* pEnCoder = NULL;
SEncoder* pScanEnCoder = NULL;
if (!pInfo->pState) {
return;
}
@ -4148,18 +4240,36 @@ void streamScanReleaseState(SOperatorInfo* pOperator) {
qDebug("stask:%s streamScanReleaseState cancel", GET_TASKID(pOperator->pTaskInfo));
return;
}
int32_t len = 0;
code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len);
int32_t len = 0;
SEncoder encoder = {0};
pEnCoder = &encoder;
tEncoderInit(pEnCoder, NULL, 0);
if (tStartEncode(pEnCoder) != 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
code = pInfo->stateStore.updateInfoSerialize(pEnCoder, pInfo->pUpdateInfo);
QUERY_CHECK_CODE(code, lino, _end);
tEndEncode(pEnCoder);
len += encoder.pos;
tEncoderClear(pEnCoder);
pEnCoder = NULL;
pBuff = taosMemoryCalloc(1, len);
if (!pBuff) {
code = terrno;
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t tmp = 0;
code = pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo, &tmp);
SEncoder scanEncoder = {0};
pScanEnCoder = &scanEncoder;
tEncoderInit(pScanEnCoder, pBuff, len);
if (tStartEncode(pScanEnCoder) != 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
code = pInfo->stateStore.updateInfoSerialize(pScanEnCoder, pInfo->pUpdateInfo);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME),
@ -4168,12 +4278,21 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pEnCoder != NULL) {
tEndEncode(pEnCoder);
tEncoderClear(pEnCoder);
}
if (pScanEnCoder != NULL) {
tEndEncode(pScanEnCoder);
tEncoderClear(pScanEnCoder);
}
taosMemoryFree(pBuff);
}
void streamScanReloadState(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SDecoder* pDeCoder = NULL;
SStreamScanInfo* pInfo = pOperator->info;
if (!pInfo->pState) {
return;
@ -4194,7 +4313,10 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t winCode = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
SDecoder decoder = {0};
pDeCoder = &decoder;
tDecoderInit(pDeCoder, pBuff, len);
int32_t winCode = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
taosMemoryFree(pBuff);
if (winCode == TSDB_CODE_SUCCESS && pInfo->pUpdateInfo) {
if (pInfo->pUpdateInfo->minTS < 0) {
@ -4231,6 +4353,10 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
}
_end:
if (pDeCoder != NULL) {
tEndDecode(pDeCoder);
tDecoderClear(pDeCoder);
}
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}

View File

@ -33,7 +33,8 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
if ((pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) && (pTask->info.taskLevel == TASK_LEVEL__SOURCE)) {
int64_t waterMark = 0;
SInterval interval = {0};
code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval);
STimeWindow lastTimeWindow = {0};
code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval, &lastTimeWindow);
if (code) {
stError("s-task:%s failed to init scheduler info, code:%s", id, tstrerror(code));
return;

View File

@ -441,76 +441,69 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo) {
pInfo->pCloseWinSBF = NULL;
}
int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen) {
int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) != 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
if (tEncodeI32(&encoder, size) < 0) {
if (tEncodeI32(pEncoder, size) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
if (tEncodeI64(&encoder, *pTs) < 0) {
if (tEncodeI64(pEncoder, *pTs) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) {
if (tEncodeU64(pEncoder, pInfo->numBuckets) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
if (tEncodeI32(&encoder, sBfSize) < 0) {
if (tEncodeI32(pEncoder, sBfSize) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
if (tScalableBfEncode(pSBf, &encoder) < 0) {
if (tScalableBfEncode(pSBf, pEncoder) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) {
if (tEncodeU64(pEncoder, pInfo->numSBFs) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->interval) < 0) {
if (tEncodeI64(pEncoder, pInfo->interval) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->watermark) < 0) {
if (tEncodeI64(pEncoder, pInfo->watermark) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->minTS) < 0) {
if (tEncodeI64(pEncoder, pInfo->minTS) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) {
if (tScalableBfEncode(pInfo->pCloseWinSBF, pEncoder) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t mapSize = taosHashGetSize(pInfo->pMap);
if (tEncodeI32(&encoder, mapSize) < 0) {
if (tEncodeI32(pEncoder, mapSize) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
@ -518,60 +511,51 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo,
size_t keyLen = 0;
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) {
if (tEncodeU64(pEncoder, *(uint64_t*)key) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t valueSize = taosHashGetValueSize(pIte);
if (tEncodeBinary(&encoder, (const uint8_t*)pIte, valueSize) < 0) {
if (tEncodeBinary(pEncoder, (const uint8_t*)pIte, valueSize) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) {
if (tEncodeU64(pEncoder, pInfo->maxDataVersion) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) {
if (tEncodeI32(pEncoder, pInfo->pkColLen) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI8(&encoder, pInfo->pkColType) < 0) {
if (tEncodeI8(pEncoder, pInfo->pkColType) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
*pLen = tlen;
_end:
tEncoderClear(&encoder);
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
QUERY_CHECK_NULL(pInfo, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(&decoder, &size) < 0) return -1;
if (tDecodeI32(pDeCoder, &size) < 0) return -1;
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno);
TSKEY ts = INT64_MIN;
for (int32_t i = 0; i < size; i++) {
if (tDecodeI64(&decoder, &ts) < 0) return -1;
if (tDecodeI64(pDeCoder, &ts) < 0) return -1;
void* tmp = taosArrayPush(pInfo->pTsBuckets, &ts);
if (!tmp) {
code = terrno;
@ -579,16 +563,16 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
}
}
if (tDecodeU64(&decoder, &pInfo->numBuckets) < 0) return -1;
if (tDecodeU64(pDeCoder, &pInfo->numBuckets) < 0) return -1;
int32_t sBfSize = 0;
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
if (tDecodeI32(pDeCoder, &sBfSize) < 0) return -1;
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void*));
QUERY_CHECK_NULL(pInfo->pTsSBFs, code, lino, _error, terrno);
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = NULL;
code = tScalableBfDecode(&decoder, &pSBf);
code = tScalableBfDecode(pDeCoder, &pSBf);
QUERY_CHECK_CODE(code, lino, _error);
void* tmp = taosArrayPush(pInfo->pTsSBFs, &pSBf);
@ -598,36 +582,36 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
}
}
if (tDecodeU64(&decoder, &pInfo->numSBFs) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1;
if (tDecodeU64(pDeCoder, &pInfo->numSBFs) < 0) return -1;
if (tDecodeI64(pDeCoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(pDeCoder, &pInfo->watermark) < 0) return -1;
if (tDecodeI64(pDeCoder, &pInfo->minTS) < 0) return -1;
code = tScalableBfDecode(&decoder, &pInfo->pCloseWinSBF);
code = tScalableBfDecode(pDeCoder, &pInfo->pCloseWinSBF);
if (code != TSDB_CODE_SUCCESS) {
pInfo->pCloseWinSBF = NULL;
code = TSDB_CODE_SUCCESS;
}
int32_t mapSize = 0;
if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
if (tDecodeI32(pDeCoder, &mapSize) < 0) return -1;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
uint64_t uid = 0;
void* pVal = NULL;
uint32_t valSize = 0;
for (int32_t i = 0; i < mapSize; i++) {
if (tDecodeU64(&decoder, &uid) < 0) return -1;
if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1;
if (tDecodeU64(pDeCoder, &uid) < 0) return -1;
if (tDecodeBinary(pDeCoder, (uint8_t**)&pVal, &valSize) < 0) return -1;
code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
QUERY_CHECK_CODE(code, lino, _error);
}
QUERY_CHECK_CONDITION((mapSize == taosHashGetSize(pInfo->pMap)), code, lino, _error,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
if (tDecodeU64(pDeCoder, &pInfo->maxDataVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1;
if (tDecodeI8(&decoder, &pInfo->pkColType) < 0) return -1;
if (tDecodeI32(pDeCoder, &pInfo->pkColLen) < 0) return -1;
if (tDecodeI8(pDeCoder, &pInfo->pkColType) < 0) return -1;
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen);
QUERY_CHECK_NULL(pInfo->pKeyBuff, code, lino, _error, terrno);
@ -643,10 +627,6 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
pInfo->comparePkCol = NULL;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));