Merge pull request #27836 from taosdata/fix/ly_stream

fix(stream):fix issue when ts less than 0
This commit is contained in:
Haojun Liao 2024-09-12 16:24:55 +08:00 committed by GitHub
commit 54545c394f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 4 additions and 8 deletions

View File

@ -155,7 +155,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
}
pInfo->pTsBuckets = NULL;
pInfo->pTsSBFs = NULL;
pInfo->minTS = -1;
pInfo->minTS = INT64_MIN;
pInfo->interval = adjustInterval(interval, precision);
pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
pInfo->numSBFs = 0;
@ -181,7 +181,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
QUERY_CHECK_CODE(code, lino, _end);
}
TSKEY dumy = 0;
TSKEY dumy = INT64_MIN;
for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
void* tmp = taosArrayPush(pInfo->pTsBuckets, &dumy);
if (!tmp) {
@ -231,11 +231,7 @@ _end:
static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (ts < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->minTS < 0) {
if (pInfo->minTS == INT64_MIN) {
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
}
int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
@ -349,7 +345,7 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
void** pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
TSKEY maxTs = *(TSKEY*)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) {
if (ts < maxTs - pInfo->watermark && maxTs != INT64_MIN) {
// this window has been closed.
if (pInfo->pCloseWinSBF) {
code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res);