opt bloom filter
This commit is contained in:
parent
d8946dbedd
commit
85cd290aa5
|
@ -312,7 +312,7 @@ typedef struct SUpdateInfo {
|
|||
char* pKeyBuff;
|
||||
char* pValueBuff;
|
||||
|
||||
bool (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn);
|
||||
int (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn);
|
||||
__compar_fn_t comparePkCol;
|
||||
} SUpdateInfo;
|
||||
|
||||
|
|
|
@ -888,7 +888,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
|||
|
||||
if ((!IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData && pSDataBlock->info.type != STREAM_PULL_DATA &&
|
||||
checkExpiredData(&pInfo->stateStore, pInfo->pUpdateInfo, &pInfo->twAggSup, pSDataBlock->info.id.uid,
|
||||
nextWin.ekey, NULL, 0)) ||
|
||||
nextWin.ekey, pPkVal, pkLen)) ||
|
||||
!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
|
||||
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
|
||||
if (startPos < 0) {
|
||||
|
@ -2147,7 +2147,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|||
}
|
||||
if (!IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData &&
|
||||
checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
|
||||
pSDataBlock->info.id.uid, endTsCols[i], NULL, 0)) {
|
||||
pSDataBlock->info.id.uid, endTsCols[i], pPkVal, pkLen)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -33,19 +33,18 @@
|
|||
|
||||
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
|
||||
|
||||
bool lowerThanTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) {
|
||||
return *((TSKEY*)pTs1) < *((TSKEY*)pTs2);
|
||||
int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) {
|
||||
return compareInt64Val(pTs1, pTs2);;
|
||||
}
|
||||
|
||||
bool lowerThanPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) {
|
||||
int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) {
|
||||
int res = compareInt64Val(pValue1, pTs);
|
||||
if (res < 0) {
|
||||
return true;
|
||||
} else if (res == 0) {
|
||||
if (res != 0) {
|
||||
return res;
|
||||
} else {
|
||||
void* pk1 = (char*)pValue1 + sizeof(TSKEY);
|
||||
return (cmpPkFn(pk1, pPkVal) < 0);
|
||||
return cmpPkFn(pk1, pPkVal);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t getKeyBuff(TSKEY ts, int64_t tbUid, void* pVal, int32_t len, char* buff) {
|
||||
|
@ -174,10 +173,10 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
|
|||
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen);
|
||||
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen);
|
||||
if (pkLen != 0) {
|
||||
pInfo->comparePkRowFn = lowerThanPk;
|
||||
pInfo->comparePkRowFn = compareKeyTsAndPk;
|
||||
pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);;
|
||||
} else {
|
||||
pInfo->comparePkRowFn = lowerThanTs;
|
||||
pInfo->comparePkRowFn = compareKeyTs;
|
||||
pInfo->comparePkCol = NULL;
|
||||
}
|
||||
return pInfo;
|
||||
|
@ -250,7 +249,7 @@ TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t p
|
|||
}
|
||||
}
|
||||
void *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
|
||||
if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol)) {
|
||||
if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) {
|
||||
int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff);
|
||||
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen);
|
||||
}
|
||||
|
@ -281,7 +280,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* p
|
|||
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||
|
||||
int32_t size = taosHashGetSize(pInfo->pMap);
|
||||
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol))) {
|
||||
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1 )) {
|
||||
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
|
||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
|
||||
// pSBf may be a null pointer
|
||||
|
@ -443,7 +442,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
|||
for (int32_t i = 0; i < mapSize; i++) {
|
||||
if (tDecodeU64(&decoder, &uid) < 0) return -1;
|
||||
if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1;
|
||||
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &pVal, valSize);
|
||||
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
|
||||
}
|
||||
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
|
||||
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
|
||||
|
@ -454,11 +453,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
|||
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen);
|
||||
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen);
|
||||
if (pInfo->pkColLen != 0) {
|
||||
pInfo->comparePkRowFn = lowerThanPk;
|
||||
pInfo->comparePkCol = NULL;
|
||||
pInfo->comparePkRowFn = compareKeyTsAndPk;
|
||||
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);;
|
||||
} else {
|
||||
pInfo->comparePkRowFn = lowerThanTs;
|
||||
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);
|
||||
pInfo->comparePkRowFn = compareKeyTs;
|
||||
pInfo->comparePkCol = NULL;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
@ -470,7 +469,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
|||
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {
|
||||
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||
bool res = true;
|
||||
if (pMapMaxTs && !pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol)) {
|
||||
if (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == 1) {
|
||||
res = false;
|
||||
} else {
|
||||
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
|
||||
|
|
Loading…
Reference in New Issue