diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 8afe55b0a1..a746fa8443 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -367,7 +367,8 @@ typedef struct SStateStore { int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen, int32_t* pWinCode); int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); - int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode); + int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, + int32_t* pWinCode); void (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key); void (*streamStateSessionReset)(SStreamState* pState, void* pVal); void (*streamStateSessionClear)(SStreamState* pState); @@ -385,8 +386,8 @@ typedef struct SStateStore { int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); - TSKEY(*updateInfoFillBlockData) - (SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol); + int32_t (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, + int32_t primaryKeyCol, TSKEY* pMaxResTs); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index e1b33de429..06465e79e5 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -25,9 +25,12 @@ extern "C" { #endif -int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); -int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); -TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol); +int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, + SUpdateInfo** ppInfo); +int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, + SUpdateInfo** ppInfo); +int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol, + TSKEY* pMaxResTs); bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid); void updateInfoDestroy(SUpdateInfo* pInfo); diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h index f82493aca2..3f0f6d6e68 100644 --- a/include/util/tscalablebf.h +++ b/include/util/tscalablebf.h @@ -34,7 +34,7 @@ typedef struct SScalableBf { int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf** ppSBf); int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t len); -int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len); +int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int32_t* winRes); int32_t tScalableBfNoContain(const SScalableBf* pSBf, const void* keyBuf, uint32_t len); void tScalableBfDestroy(SScalableBf* pSBf); int32_t tScalableBfEncode(const SScalableBf* pSBf, SEncoder* pEncoder); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cf464ad918..885a21d5e4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2659,7 +2659,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pRecoverRes != NULL) { calBlockTbName(pInfo, pInfo->pRecoverRes, 0); if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex, pInfo->primaryKeyIndex); + TSKEY maxTs = INT64_MIN; + pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex, pInfo->primaryKeyIndex, &maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); } if (pInfo->pCreateTbRes->info.rows > 0) { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5a90207125..3feb3156d8 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -4565,7 +4565,9 @@ void streamStateReloadState(SOperatorInfo* pOperator) { SStateWindowInfo nextInfo = {0}; qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i); - getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo); + code = getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo); + TSDB_CHECK_CODE(code, lino, _end); + bool cpRes = compareWinStateKey(curInfo.pStateKey, nextInfo.pStateKey); qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d", nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index e3e07136c6..68e48bb378 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -172,7 +172,8 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _end); } - windowSBfAdd(pInfo, bfSize); + code = windowSBfAdd(pInfo, bfSize); + TSDB_CHECK_CODE(code, lino, _end); pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY)); if (pInfo->pTsBuckets == NULL) { @@ -246,7 +247,9 @@ static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) { if (index >= pInfo->numSBFs) { uint64_t count = index + 1 - pInfo->numSBFs; windowSBfDelete(pInfo, count); - windowSBfAdd(pInfo, count); + code = windowSBfAdd(pInfo, count); + TSDB_CHECK_CODE(code, lino, _end); + index = pInfo->numSBFs - 1; } SScalableBf* res = taosArrayGetP(pInfo->pTsSBFs, index); @@ -276,9 +279,13 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) { return false; } -TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) { +int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol, TSKEY* pMaxResTs) { int32_t code = TSDB_CODE_SUCCESS; - if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN; + int32_t lino = 0; + if (pBlock == NULL || pBlock->info.rows == 0) { + (*pMaxResTs) = INT64_MIN; + goto _end; + } TSKEY maxTs = INT64_MIN; void* pPkVal = NULL; void* pMaxPkVal = NULL; @@ -302,10 +309,9 @@ TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t p } } SScalableBf* pSBf = NULL; - int32_t code = getSBf(pInfo, ts, &pSBf); - if (code != TSDB_CODE_SUCCESS) { - uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = getSBf(pInfo, ts, &pSBf); + TSDB_CHECK_CODE(code, lino, _end); + if (pSBf) { if (primaryKeyCol >= 0) { pPkVal = colDataGetData(pPkDataInfo, i); @@ -313,18 +319,29 @@ TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t p } int32_t buffLen = getKeyBuff(ts, tbUid, pPkVal, len, pInfo->pKeyBuff); // we don't care whether the data is updated or not - tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen); + int32_t winRes = 0; + code = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen, &winRes); + TSDB_CHECK_CODE(code, lino, _end); } } void* pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) { int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff); - taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen); + code = taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen); + TSDB_CHECK_CODE(code, lino, _end); } - return maxTs; + (*pMaxResTs) = maxTs; + +_end: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t res = TSDB_CODE_FAILED; int32_t buffLen = 0; @@ -335,7 +352,8 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p if (ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { - res = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen); + code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res); + TSDB_CHECK_CODE(code, lino, _end); if (res == TSDB_CODE_SUCCESS) { return false; } else { @@ -346,16 +364,16 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p } SScalableBf* pSBf = NULL; - int32_t code = getSBf(pInfo, ts, &pSBf); - if (code != TSDB_CODE_SUCCESS) { - uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = getSBf(pInfo, ts, &pSBf); + TSDB_CHECK_CODE(code, lino, _end); int32_t size = taosHashGetSize(pInfo->pMap); if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1)) { int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); - taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); + code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); + TSDB_CHECK_CODE(code, lino, _end); + // pSBf may be a null pointer if (pSBf) { res = tScalableBfPutNoCheck(pSBf, pInfo->pKeyBuff, buffLen); @@ -365,7 +383,8 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p // pSBf may be a null pointer if (pSBf) { - res = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen); + code = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen, &res); + TSDB_CHECK_CODE(code, lino, _end); } if (!pMapMaxTs && maxTs < ts) { @@ -378,6 +397,11 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p } else if (res == TSDB_CODE_SUCCESS) { return false; } + +_end: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } // check from tsdb api return true; } @@ -596,7 +620,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { for (int32_t i = 0; i < mapSize; i++) { if (tDecodeU64(&decoder, &uid) < 0) return -1; if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1; - taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize); + code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize); + TSDB_CHECK_CODE(code, lino, _error); } ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; @@ -633,7 +658,11 @@ bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void res = false; } else { int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); - taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); + int32_t code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); + if (code != TSDB_CODE_SUCCESS) { + res = false; + uError("%s failed at line %d since %d", __func__, __LINE__, code); + } } return res; } diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 2e6dfbc96d..95f67d364a 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -97,19 +97,20 @@ _error: return code; } -int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len) { +int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int32_t* winRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (pSBf->status == SBF_INVALID) { code = TSDB_CODE_FAILED; - TSDB_CHECK_CODE(code, lino, _error); + TSDB_CHECK_CODE(code, lino, _end); } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); for (int32_t i = size - 2; i >= 0; --i) { if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; + (*winRes) = TSDB_CODE_FAILED; + goto _end; } } @@ -120,12 +121,12 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len) { pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); if (code != TSDB_CODE_SUCCESS) { pSBf->status = SBF_INVALID; - TSDB_CHECK_CODE(code, lino, _error); + TSDB_CHECK_CODE(code, lino, _end); } } - return tBloomFilterPutHash(pNormalBf, h1, h2); + (*winRes) = tBloomFilterPutHash(pNormalBf, h1, h2); -_error: +_end: if (code != TSDB_CODE_SUCCESS) { uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } diff --git a/source/util/test/bloomFilterTest.cpp b/source/util/test/bloomFilterTest.cpp index 9d83cbf086..4a48677809 100644 --- a/source/util/test/bloomFilterTest.cpp +++ b/source/util/test/bloomFilterTest.cpp @@ -14,8 +14,8 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) { GTEST_ASSERT_NE(0, tBloomFilterInit(100, -0.1, &pBFTmp)); GTEST_ASSERT_NE(0, tBloomFilterInit(0, 0.01, &pBFTmp)); - SBloomFilter *pBF1 = NULL; - int32_t code = tBloomFilterInit(100, 0.005, &pBF1); + SBloomFilter* pBF1 = NULL; + int32_t code = tBloomFilterInit(100, 0.005, &pBF1); GTEST_ASSERT_EQ(0, code); GTEST_ASSERT_EQ(pBF1->numBits, 1152); GTEST_ASSERT_EQ(pBF1->numUnits, 1152 / 64); @@ -83,7 +83,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { int64_t index = 0; for (; count < 100; index++) { int64_t ts = index + ts1; - if (tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS) { + int32_t res = TSDB_CODE_SUCCESS; + int32_t code = tScalableBfPut(pSBF1, &ts, sizeof(int64_t), &res); + if (res == TSDB_CODE_SUCCESS) { count++; } } @@ -91,7 +93,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { for (; count < 300; index++) { int64_t ts = index + ts1; - if (tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS) { + int32_t res = TSDB_CODE_SUCCESS; + int32_t code = tScalableBfPut(pSBF1, &ts, sizeof(int64_t), &res); + if (res == TSDB_CODE_SUCCESS) { count++; } } @@ -99,7 +103,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { for (; count < 700; index++) { int64_t ts = index + ts1; - if (tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS) { + int32_t res = TSDB_CODE_SUCCESS; + int32_t code = tScalableBfPut(pSBF1, &ts, sizeof(int64_t), &res); + if (res == TSDB_CODE_SUCCESS) { count++; } } @@ -107,7 +113,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { for (; count < 1500; index++) { int64_t ts = index + ts1; - if (tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS) { + int32_t res = TSDB_CODE_SUCCESS; + int32_t code = tScalableBfPut(pSBF1, &ts, sizeof(int64_t), &res); + if (res == TSDB_CODE_SUCCESS) { count++; } } @@ -132,7 +140,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { GTEST_ASSERT_EQ(0, tScalableBfInit(size, 0.001, &pSBF4)); for (int64_t i = 0; i < 1000; i++) { int64_t ts = i + ts1; - GTEST_ASSERT_EQ(tScalableBfPut(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS); + int32_t res = TSDB_CODE_SUCCESS; + int32_t code = tScalableBfPut(pSBF4, &ts, sizeof(int64_t), &res); + GTEST_ASSERT_EQ(res, TSDB_CODE_SUCCESS); } for (int64_t i = 0; i < 1000; i++) {