adj stream function return

This commit is contained in:
54liuyao 2024-07-15 13:38:34 +08:00
parent 60b268bbbe
commit 9b067d075f
10 changed files with 181 additions and 148 deletions

View File

@ -381,7 +381,7 @@ typedef struct SStateStore {
void** ppVal, int32_t* pVLen);
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol);
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
@ -391,7 +391,7 @@ typedef struct SStateStore {
void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count);
int32_t (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
int32_t (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);

View File

@ -15,18 +15,18 @@
#ifndef _TSTREAMUPDATE_H_
#define _TSTREAMUPDATE_H_
#include "storageapi.h"
#include "taosdef.h"
#include "tarray.h"
#include "tcommon.h"
#include "tmsg.h"
#include "storageapi.h"
#ifdef __cplusplus
extern "C" {
#endif
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol);
bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid);

View File

@ -1369,7 +1369,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
pScanInfo->pPartScalarSup = pExpr;
pScanInfo->pPartTbnameSup = pTbnameExpr;
if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
}
}

View File

@ -487,8 +487,11 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo =
pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
int32_t code = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate,
pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at since %s", __func__, __LINE__, tstrerror(code));
}
}
pScanInfo->interval = pInfo->interval;
@ -1769,8 +1772,8 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState;
if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
}
pScanInfo->twAggSup = *pTwSup;
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;

View File

@ -18,6 +18,7 @@
#include "tencode.h"
#include "tstreamUpdate.h"
#include "ttime.h"
#include "tutil.h"
#define DEFAULT_FALSE_POSITIVE 0.01
#define DEFAULT_BUCKET_SIZE 131072
@ -34,7 +35,8 @@
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) {
return compareInt64Val(pTs1, pTs2);;
return compareInt64Val(pTs1, pTs2);
;
}
int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) {
@ -138,14 +140,19 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w
return watermark;
}
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) {
return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen);
int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
SUpdateInfo** ppInfo) {
return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen, ppInfo);
}
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) {
int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
SUpdateInfo** ppInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SUpdateInfo* pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (pInfo == NULL) {
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pInfo->pTsBuckets = NULL;
pInfo->pTsSBFs = NULL;
@ -162,14 +169,16 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void*));
if (pInfo->pTsSBFs == NULL) {
updateInfoDestroy(pInfo);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
windowSBfAdd(pInfo, bfSize);
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
if (pInfo->pTsBuckets == NULL) {
updateInfoDestroy(pInfo);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
TSKEY dumy = 0;
@ -181,19 +190,38 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
if (!pInfo->pMap) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pInfo->maxDataVersion = 0;
pInfo->pkColLen = pkLen;
pInfo->pkColType = pkType;
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen);
if (!pInfo->pKeyBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen);
if (!pInfo->pValueBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
if (pkLen != 0) {
pInfo->comparePkRowFn = compareKeyTsAndPk;
pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);;
pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);
;
} else {
pInfo->comparePkRowFn = compareKeyTs;
pInfo->comparePkCol = NULL;
}
return pInfo;
(*ppInfo) = pInfo;
_end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static SScalableBf* getSBf(SUpdateInfo* pInfo, TSKEY ts) {
@ -295,7 +323,8 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* p
SScalableBf* pSBf = getSBf(pInfo, ts);
int32_t size = taosHashGetSize(pInfo->pMap);
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1 )) {
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) ||
(pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1)) {
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
// pSBf may be a null pointer
@ -479,7 +508,8 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen);
if (pInfo->pkColLen != 0) {
pInfo->comparePkRowFn = compareKeyTsAndPk;
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);;
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);
;
} else {
pInfo->comparePkRowFn = compareKeyTs;
pInfo->comparePkCol = NULL;

View File

@ -14,8 +14,7 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
GTEST_ASSERT_EQ(0, tBloomFilterInit(100, -0.1, &pBFTmp));
GTEST_ASSERT_EQ(0, tBloomFilterInit(0, 0.01, &pBFTmp));
SBloomFilter *pBF1 = NULL,
int32_t code = tBloomFilterInit(100, 0.005, &pBF1);
SBloomFilter *pBF1 = NULL, int32_t code = tBloomFilterInit(100, 0.005, &pBF1);
GTEST_ASSERT_EQ(0, code);
GTEST_ASSERT_EQ(pBF1->numBits, 1152);
GTEST_ASSERT_EQ(pBF1->numUnits, 1152 / 64);
@ -128,7 +127,8 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
}
int64_t size = 10000;
SScalableBf *pSBF4 = tScalableBfInit(size, 0.001);
SScalableBf* pSBF4 = NULL;
GTEST_ASSERT_EQ(0, tScalableBfInit(size, 0.001, pSBF4));
for (int64_t i = 0; i < 1000; i++) {
int64_t ts = i + ts1;
GTEST_ASSERT_EQ(tScalableBfPut(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);