diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 7122b8f10f..8727ce0174 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -224,17 +224,20 @@ _end: return code; } -static SScalableBf* getSBf(SUpdateInfo* pInfo, TSKEY ts) { - int32_t code = 0; +static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (ts <= 0) { - return NULL; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); } if (pInfo->minTS < 0) { pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval); } int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval); if (index < 0) { - return NULL; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); } if (index >= pInfo->numSBFs) { uint64_t count = index + 1 - pInfo->numSBFs; @@ -246,9 +249,17 @@ static SScalableBf* getSBf(SUpdateInfo* pInfo, TSKEY ts) { if (res == NULL) { int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &res); + TSDB_CHECK_CODE(code, lino, _end); + taosArrayPush(pInfo->pTsSBFs, &res); } - return res; + (*ppSBf) = res; + +_end: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) { @@ -258,6 +269,7 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) { } TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) { + int32_t code = TSDB_CODE_SUCCESS; if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN; TSKEY maxTs = INT64_MIN; void* pPkVal = NULL; @@ -281,7 +293,11 @@ TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t p maxLen = colDataGetRowLength(pPkDataInfo, i); } } - SScalableBf* pSBf = getSBf(pInfo, ts); + SScalableBf* pSBf = NULL; + int32_t code = getSBf(pInfo, ts, &pSBf); + if (code != TSDB_CODE_SUCCESS) { + uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } if (pSBf) { if (primaryKeyCol >= 0) { pPkVal = colDataGetData(pPkDataInfo, i); @@ -320,7 +336,11 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p return true; } - SScalableBf* pSBf = getSBf(pInfo, ts); + SScalableBf* pSBf = NULL; + int32_t code = getSBf(pInfo, ts, &pSBf); + if (code != TSDB_CODE_SUCCESS) { + uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } int32_t size = taosHashGetSize(pInfo->pMap); if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) ||