tsdb/cache: get first row from imem

This commit is contained in:
Minglei Jin 2024-11-19 14:36:16 +08:00
parent 66535adb70
commit ffab4e8886
1 changed files with 118 additions and 40 deletions

View File

@ -513,24 +513,133 @@ static int32_t tsdbCacheQueryReseek(void *pQHandle) {
}
}
static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) {
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
bool deleted = false;
while (*iSkyline > 0) {
TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
if (key->ts > pItemBack->ts) {
return false;
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
// if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
return true;
} else {
if (*iSkyline > 1) {
--*iSkyline;
} else {
return false;
}
}
} else {
if (*iSkyline > 1) {
--*iSkyline;
} else {
return false;
}
}
}
return deleted;
}
// Get next non-deleted row from imem
static int32_t tsdbImemGetNextRow(SMemTable *imem, TABLEID tid, STbDataIter *pTbIter, TSDBROW **ppRow, SArray *pSkyline,
int64_t iSkyline) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = imem->pTsdb;
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
STbDataIter iter = {0};
tsdbTbDataIterOpen(pIMem, NULL, 1, &iter);
TSDBROW *pMemRow = tsdbTbDataIterGet(&iter);
// tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
if (pMemRow) {
//*ppRow = pMemRow;
// if non deleted, foundLastrow = true;
// if non deleted, return the found row.
TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, &iSkyline);
if (!deleted) {
*ppRow = pMemRow;
TAOS_RETURN(code);
}
} else {
TAOS_RETURN(code);
}
// continue to find the non-deleted first row from imem
TAOS_CHECK_GOTO(tsdbImemGetNextRow(imem, tid, pTbIter, ppRow, pSkyline, iSkyline), &lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
TAOS_RETURN(code);
}
// Get first non-deleted row from imem
static int32_t tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, TABLEID tid, STbDataIter *pTbIter, TSDBROW **ppRow,
SArray *pSkyline, int64_t iSkyline) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = imem->pTsdb;
tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
if (pMemRow) {
// if non deleted, return the found row.
TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, &iSkyline);
if (!deleted) {
*ppRow = pMemRow;
TAOS_RETURN(code);
}
} else {
TAOS_RETURN(code);
}
// continue to find the non-deleted first row from imem, using get next row
TAOS_CHECK_GOTO(tsdbImemGetNextRow(imem, tid, pTbIter, ppRow, pSkyline, iSkyline), &lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
TAOS_RETURN(code);
}
static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = imem->pTsdb;
SArray *pMemDelData = NULL;
SArray *pTombData = NULL;
SArray *pSkyline = NULL;
int64_t iSkyline = 0;
STbDataIter iter = {0};
TSDBROW *pMemRow = NULL;
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
// load imem tomb data and build skyline
TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
// tsdbBuildDeleteSkyline
size_t delSize = TARRAY_SIZE(pMemDelData);
if (delSize > 0) {
TAOS_CHECK_GOTO(tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(delSize - 1), pSkyline), &lino, _exit);
iSkyline = taosArrayGetSize(pSkyline) - 1;
}
TAOS_CHECK_GOTO(tsdbImemGetFirstRow(imem, pIMem, tid, &iter, &pMemRow, pSkyline, iSkyline), &lino, _exit);
// iter first row to last_row/last col values to ctxArray, and mark last null col ids
// continue to get next row to fill null last col values
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
@ -2957,37 +3066,6 @@ _err:
TAOS_RETURN(code);
}
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
bool deleted = false;
while (*iSkyline > 0) {
TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
if (key->ts > pItemBack->ts) {
return false;
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
// if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
return true;
} else {
if (*iSkyline > 1) {
--*iSkyline;
} else {
return false;
}
}
} else {
if (*iSkyline > 1) {
--*iSkyline;
} else {
return false;
}
}
}
return deleted;
}
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols);
typedef int32_t (*_next_row_clear_fn_t)(void *iter);