adj stream function return

This commit is contained in:
54liuyao 2024-07-15 14:32:31 +08:00
parent 503822a9c5
commit 4efd774ce8
1 changed files with 27 additions and 7 deletions

View File

@ -224,17 +224,20 @@ _end:
return code;
}
static SScalableBf* getSBf(SUpdateInfo* pInfo, TSKEY ts) {
int32_t code = 0;
static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (ts <= 0) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (pInfo->minTS < 0) {
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
}
int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
if (index < 0) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (index >= pInfo->numSBFs) {
uint64_t count = index + 1 - pInfo->numSBFs;
@ -246,9 +249,17 @@ static SScalableBf* getSBf(SUpdateInfo* pInfo, TSKEY ts) {
if (res == NULL) {
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &res);
TSDB_CHECK_CODE(code, lino, _end);
taosArrayPush(pInfo->pTsSBFs, &res);
}
return res;
(*ppSBf) = res;
_end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) {
@ -258,6 +269,7 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) {
}
TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) {
int32_t code = TSDB_CODE_SUCCESS;
if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN;
TSKEY maxTs = INT64_MIN;
void* pPkVal = NULL;
@ -281,7 +293,11 @@ TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t p
maxLen = colDataGetRowLength(pPkDataInfo, i);
}
}
SScalableBf* pSBf = getSBf(pInfo, ts);
SScalableBf* pSBf = NULL;
int32_t code = getSBf(pInfo, ts, &pSBf);
if (code != TSDB_CODE_SUCCESS) {
uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
if (pSBf) {
if (primaryKeyCol >= 0) {
pPkVal = colDataGetData(pPkDataInfo, i);
@ -320,7 +336,11 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
return true;
}
SScalableBf* pSBf = getSBf(pInfo, ts);
SScalableBf* pSBf = NULL;
int32_t code = getSBf(pInfo, ts, &pSBf);
if (code != TSDB_CODE_SUCCESS) {
uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
int32_t size = taosHashGetSize(pInfo->pMap);
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) ||