diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index bf7a2a869a..ec92bd56dd 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -312,7 +312,7 @@ typedef struct SUpdateInfo { char* pKeyBuff; char* pValueBuff; - bool (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn); + int (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn); __compar_fn_t comparePkCol; } SUpdateInfo; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0f6d78caa3..08ce0e25f1 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -888,7 +888,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat if ((!IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData && pSDataBlock->info.type != STREAM_PULL_DATA && checkExpiredData(&pInfo->stateStore, pInfo->pUpdateInfo, &pInfo->twAggSup, pSDataBlock->info.id.uid, - nextWin.ekey, NULL, 0)) || + nextWin.ekey, pPkVal, pkLen)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { @@ -2147,7 +2147,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } if (!IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup, - pSDataBlock->info.id.uid, endTsCols[i], NULL, 0)) { + pSDataBlock->info.id.uid, endTsCols[i], pPkVal, pkLen)) { i++; continue; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 263400b7d3..0adb7050c6 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -33,19 +33,18 @@ static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } -bool lowerThanTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) { - return *((TSKEY*)pTs1) < *((TSKEY*)pTs2); +int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) { + return compareInt64Val(pTs1, pTs2);; } -bool lowerThanPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) { +int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) { int res = compareInt64Val(pValue1, pTs); - if (res < 0) { - return true; - } else if (res == 0) { + if (res != 0) { + return res; + } else { void* pk1 = (char*)pValue1 + sizeof(TSKEY); - return (cmpPkFn(pk1, pPkVal) < 0); + return cmpPkFn(pk1, pPkVal); } - return false; } int32_t getKeyBuff(TSKEY ts, int64_t tbUid, void* pVal, int32_t len, char* buff) { @@ -174,10 +173,10 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen); pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen); if (pkLen != 0) { - pInfo->comparePkRowFn = lowerThanPk; + pInfo->comparePkRowFn = compareKeyTsAndPk; pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);; } else { - pInfo->comparePkRowFn = lowerThanTs; + pInfo->comparePkRowFn = compareKeyTs; pInfo->comparePkCol = NULL; } return pInfo; @@ -250,7 +249,7 @@ TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t p } } void *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); - if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol)) { + if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) { int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff); taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen); } @@ -281,7 +280,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* p SScalableBf *pSBf = getSBf(pInfo, ts); int32_t size = taosHashGetSize(pInfo->pMap); - if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol))) { + if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1 )) { int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); // pSBf may be a null pointer @@ -443,7 +442,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { for (int32_t i = 0; i < mapSize; i++) { if (tDecodeU64(&decoder, &uid) < 0) return -1; if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1; - taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &pVal, valSize); + taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize); } ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; @@ -454,11 +453,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen); pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen); if (pInfo->pkColLen != 0) { - pInfo->comparePkRowFn = lowerThanPk; - pInfo->comparePkCol = NULL; + pInfo->comparePkRowFn = compareKeyTsAndPk; + pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);; } else { - pInfo->comparePkRowFn = lowerThanTs; - pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC); + pInfo->comparePkRowFn = compareKeyTs; + pInfo->comparePkCol = NULL; } tEndDecode(&decoder); @@ -470,7 +469,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); bool res = true; - if (pMapMaxTs && !pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol)) { + if (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == 1) { res = false; } else { int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);