adj stream operator result

This commit is contained in:
54liuyao 2024-07-18 11:26:50 +08:00
parent 4e6b898c9a
commit 99f4484328
8 changed files with 89 additions and 42 deletions

View File

@ -367,7 +367,8 @@ typedef struct SStateStore {
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen, int32_t* pWinCode);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode);
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
void (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
void (*streamStateSessionReset)(SStreamState* pState, void* pVal);
void (*streamStateSessionClear)(SStreamState* pState);
@ -385,8 +386,8 @@ typedef struct SStateStore {
int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType,
int32_t pkLen, SUpdateInfo** ppInfo);
TSKEY(*updateInfoFillBlockData)
(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol);
int32_t (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol,
int32_t primaryKeyCol, TSKEY* pMaxResTs);
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);

View File

@ -25,9 +25,12 @@
extern "C" {
#endif
int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol);
int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
SUpdateInfo** ppInfo);
int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
SUpdateInfo** ppInfo);
int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol,
TSKEY* pMaxResTs);
bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid);
void updateInfoDestroy(SUpdateInfo* pInfo);

View File

@ -34,7 +34,7 @@ typedef struct SScalableBf {
int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf** ppSBf);
int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t len);
int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len);
int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int32_t* winRes);
int32_t tScalableBfNoContain(const SScalableBf* pSBf, const void* keyBuf, uint32_t len);
void tScalableBfDestroy(SScalableBf* pSBf);
int32_t tScalableBfEncode(const SScalableBf* pSBf, SEncoder* pEncoder);

View File

@ -2659,7 +2659,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pRecoverRes != NULL) {
calBlockTbName(pInfo, pInfo->pRecoverRes, 0);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex, pInfo->primaryKeyIndex);
TSKEY maxTs = INT64_MIN;
pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex, pInfo->primaryKeyIndex, &maxTs);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
}
if (pInfo->pCreateTbRes->info.rows > 0) {

View File

@ -4565,7 +4565,9 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
SStateWindowInfo nextInfo = {0};
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey,
pSeKeyBuf[i].groupId, i);
getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo);
code = getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo);
TSDB_CHECK_CODE(code, lino, _end);
bool cpRes = compareWinStateKey(curInfo.pStateKey, nextInfo.pStateKey);
qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d",
nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes);

View File

@ -172,7 +172,8 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
windowSBfAdd(pInfo, bfSize);
code = windowSBfAdd(pInfo, bfSize);
TSDB_CHECK_CODE(code, lino, _end);
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
if (pInfo->pTsBuckets == NULL) {
@ -246,7 +247,9 @@ static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
if (index >= pInfo->numSBFs) {
uint64_t count = index + 1 - pInfo->numSBFs;
windowSBfDelete(pInfo, count);
windowSBfAdd(pInfo, count);
code = windowSBfAdd(pInfo, count);
TSDB_CHECK_CODE(code, lino, _end);
index = pInfo->numSBFs - 1;
}
SScalableBf* res = taosArrayGetP(pInfo->pTsSBFs, index);
@ -276,9 +279,13 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) {
return false;
}
TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) {
int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol, TSKEY* pMaxResTs) {
int32_t code = TSDB_CODE_SUCCESS;
if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN;
int32_t lino = 0;
if (pBlock == NULL || pBlock->info.rows == 0) {
(*pMaxResTs) = INT64_MIN;
goto _end;
}
TSKEY maxTs = INT64_MIN;
void* pPkVal = NULL;
void* pMaxPkVal = NULL;
@ -302,10 +309,9 @@ TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t p
}
}
SScalableBf* pSBf = NULL;
int32_t code = getSBf(pInfo, ts, &pSBf);
if (code != TSDB_CODE_SUCCESS) {
uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = getSBf(pInfo, ts, &pSBf);
TSDB_CHECK_CODE(code, lino, _end);
if (pSBf) {
if (primaryKeyCol >= 0) {
pPkVal = colDataGetData(pPkDataInfo, i);
@ -313,18 +319,29 @@ TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t p
}
int32_t buffLen = getKeyBuff(ts, tbUid, pPkVal, len, pInfo->pKeyBuff);
// we don't care whether the data is updated or not
tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen);
int32_t winRes = 0;
code = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen, &winRes);
TSDB_CHECK_CODE(code, lino, _end);
}
}
void* pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) {
int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff);
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen);
code = taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen);
TSDB_CHECK_CODE(code, lino, _end);
}
return maxTs;
(*pMaxResTs) = maxTs;
_end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t res = TSDB_CODE_FAILED;
int32_t buffLen = 0;
@ -335,7 +352,8 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
if (ts < maxTs - pInfo->watermark) {
// this window has been closed.
if (pInfo->pCloseWinSBF) {
res = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen);
code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res);
TSDB_CHECK_CODE(code, lino, _end);
if (res == TSDB_CODE_SUCCESS) {
return false;
} else {
@ -346,16 +364,16 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
}
SScalableBf* pSBf = NULL;
int32_t code = getSBf(pInfo, ts, &pSBf);
if (code != TSDB_CODE_SUCCESS) {
uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = getSBf(pInfo, ts, &pSBf);
TSDB_CHECK_CODE(code, lino, _end);
int32_t size = taosHashGetSize(pInfo->pMap);
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) ||
(pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1)) {
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
TSDB_CHECK_CODE(code, lino, _end);
// pSBf may be a null pointer
if (pSBf) {
res = tScalableBfPutNoCheck(pSBf, pInfo->pKeyBuff, buffLen);
@ -365,7 +383,8 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
// pSBf may be a null pointer
if (pSBf) {
res = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen);
code = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen, &res);
TSDB_CHECK_CODE(code, lino, _end);
}
if (!pMapMaxTs && maxTs < ts) {
@ -378,6 +397,11 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
} else if (res == TSDB_CODE_SUCCESS) {
return false;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
// check from tsdb api
return true;
}
@ -596,7 +620,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
for (int32_t i = 0; i < mapSize; i++) {
if (tDecodeU64(&decoder, &uid) < 0) return -1;
if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1;
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
TSDB_CHECK_CODE(code, lino, _error);
}
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
@ -633,7 +658,11 @@ bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void
res = false;
} else {
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
int32_t code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
if (code != TSDB_CODE_SUCCESS) {
res = false;
uError("%s failed at line %d since %d", __func__, __LINE__, code);
}
}
return res;
}

View File

@ -97,19 +97,20 @@ _error:
return code;
}
int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len) {
int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int32_t* winRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
TSDB_CHECK_CODE(code, lino, _end);
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray);
for (int32_t i = size - 2; i >= 0; --i) {
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
(*winRes) = TSDB_CODE_FAILED;
goto _end;
}
}
@ -120,12 +121,12 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len) {
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
pSBf->status = SBF_INVALID;
TSDB_CHECK_CODE(code, lino, _error);
TSDB_CHECK_CODE(code, lino, _end);
}
}
return tBloomFilterPutHash(pNormalBf, h1, h2);
(*winRes) = tBloomFilterPutHash(pNormalBf, h1, h2);
_error:
_end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}

View File

@ -14,8 +14,8 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
GTEST_ASSERT_NE(0, tBloomFilterInit(100, -0.1, &pBFTmp));
GTEST_ASSERT_NE(0, tBloomFilterInit(0, 0.01, &pBFTmp));
SBloomFilter *pBF1 = NULL;
int32_t code = tBloomFilterInit(100, 0.005, &pBF1);
SBloomFilter* pBF1 = NULL;
int32_t code = tBloomFilterInit(100, 0.005, &pBF1);
GTEST_ASSERT_EQ(0, code);
GTEST_ASSERT_EQ(pBF1->numBits, 1152);
GTEST_ASSERT_EQ(pBF1->numUnits, 1152 / 64);
@ -83,7 +83,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
int64_t index = 0;
for (; count < 100; index++) {
int64_t ts = index + ts1;
if (tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS) {
int32_t res = TSDB_CODE_SUCCESS;
int32_t code = tScalableBfPut(pSBF1, &ts, sizeof(int64_t), &res);
if (res == TSDB_CODE_SUCCESS) {
count++;
}
}
@ -91,7 +93,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
for (; count < 300; index++) {
int64_t ts = index + ts1;
if (tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS) {
int32_t res = TSDB_CODE_SUCCESS;
int32_t code = tScalableBfPut(pSBF1, &ts, sizeof(int64_t), &res);
if (res == TSDB_CODE_SUCCESS) {
count++;
}
}
@ -99,7 +103,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
for (; count < 700; index++) {
int64_t ts = index + ts1;
if (tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS) {
int32_t res = TSDB_CODE_SUCCESS;
int32_t code = tScalableBfPut(pSBF1, &ts, sizeof(int64_t), &res);
if (res == TSDB_CODE_SUCCESS) {
count++;
}
}
@ -107,7 +113,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
for (; count < 1500; index++) {
int64_t ts = index + ts1;
if (tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS) {
int32_t res = TSDB_CODE_SUCCESS;
int32_t code = tScalableBfPut(pSBF1, &ts, sizeof(int64_t), &res);
if (res == TSDB_CODE_SUCCESS) {
count++;
}
}
@ -132,7 +140,9 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
GTEST_ASSERT_EQ(0, tScalableBfInit(size, 0.001, &pSBF4));
for (int64_t i = 0; i < 1000; i++) {
int64_t ts = i + ts1;
GTEST_ASSERT_EQ(tScalableBfPut(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
int32_t res = TSDB_CODE_SUCCESS;
int32_t code = tScalableBfPut(pSBF4, &ts, sizeof(int64_t), &res);
GTEST_ASSERT_EQ(res, TSDB_CODE_SUCCESS);
}
for (int64_t i = 0; i < 1000; i++) {