tsdb/cache: remove uid array when loading from imem
This commit is contained in:
parent
d20ef164b3
commit
451093188d
|
@ -250,7 +250,7 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||||
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
|
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
|
||||||
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
||||||
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
|
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
|
||||||
int32_t tsdbMemTableUids(SMemTable *pMemTable, SArray *aUid);
|
int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func);
|
||||||
|
|
||||||
// STbData
|
// STbData
|
||||||
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
||||||
|
|
|
@ -546,8 +546,8 @@ static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get first non-deleted row from imem
|
// Get first non-deleted row from imem
|
||||||
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, TABLEID tid, STbDataIter *pTbIter,
|
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
|
||||||
SArray *pSkyline, int64_t *piSkyline) {
|
int64_t *piSkyline) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
|
tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
|
||||||
|
@ -601,7 +601,7 @@ static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t
|
||||||
|
|
||||||
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
|
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
|
||||||
|
|
||||||
static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) {
|
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
STsdb *pTsdb = imem->pTsdb;
|
STsdb *pTsdb = imem->pTsdb;
|
||||||
|
@ -618,7 +618,7 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) {
|
||||||
SArray *ctxArray = pTsdb->rCache.ctxArray;
|
SArray *ctxArray = pTsdb->rCache.ctxArray;
|
||||||
STsdbRowKey tsdbRowKey = {0};
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
|
|
||||||
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
|
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
|
||||||
|
|
||||||
// load imem tomb data and build skyline
|
// load imem tomb data and build skyline
|
||||||
TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
|
TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
|
||||||
|
@ -630,14 +630,14 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) {
|
||||||
iSkyline = taosArrayGetSize(pSkyline) - 1;
|
iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMemRow = tsdbImemGetFirstRow(imem, pIMem, tid, &tbIter, pSkyline, &iSkyline);
|
pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
|
||||||
if (!pMemRow) {
|
if (!pMemRow) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
// iter first row to last_row/last col values to ctxArray, and mark last null col ids
|
// iter first row to last_row/last col values to ctxArray, and mark last null col ids
|
||||||
sver = TSDBROW_SVERSION(pMemRow);
|
sver = TSDBROW_SVERSION(pMemRow);
|
||||||
TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, tid.suid, tid.uid, sver), &lino, _exit);
|
TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
|
||||||
pTSchema = pTsdb->rCache.pTSchema;
|
pTSchema = pTsdb->rCache.pTSchema;
|
||||||
nCol = pTSchema->numOfCols;
|
nCol = pTSchema->numOfCols;
|
||||||
|
|
||||||
|
@ -706,7 +706,7 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) {
|
||||||
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
|
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit);
|
TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -738,23 +738,10 @@ static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
|
||||||
int32_t nTbData = imem->nTbData;
|
int32_t nTbData = imem->nTbData;
|
||||||
int64_t nRow = imem->nRow;
|
int64_t nRow = imem->nRow;
|
||||||
int64_t nDel = imem->nDel;
|
int64_t nDel = imem->nDel;
|
||||||
SArray *aUid = NULL;
|
|
||||||
|
|
||||||
if (nRow == 0 || nTbData == 0) return 0;
|
if (nRow == 0 || nTbData == 0) return 0;
|
||||||
|
|
||||||
// get all last/last_row cols of all uids from imem
|
TAOS_CHECK_GOTO(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem), &lino, _exit);
|
||||||
// 1, get all uids from imem
|
|
||||||
aUid = taosArrayInit(nTbData, sizeof(TABLEID));
|
|
||||||
if (!aUid) {
|
|
||||||
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
|
||||||
}
|
|
||||||
TAOS_CHECK_GOTO(tsdbMemTableUids(imem, aUid), &lino, _exit);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) {
|
|
||||||
TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i];
|
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid), &lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -763,8 +750,6 @@ _exit:
|
||||||
tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
|
tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(aUid);
|
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -325,21 +325,23 @@ void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *
|
||||||
taosRUnLockLatch(&pMemTable->latch);
|
taosRUnLockLatch(&pMemTable->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbMemTableUids(SMemTable *pMemTable, SArray *aUid) {
|
typedef int32_t (*__tsdb_cache_update)(SMemTable *imem, int64_t suid, int64_t uid);
|
||||||
int32_t code = 0;
|
|
||||||
|
int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func) {
|
||||||
|
int32_t code = 0;
|
||||||
|
__tsdb_cache_update cb = (__tsdb_cache_update)func;
|
||||||
|
|
||||||
taosRLockLatch(&pMemTable->latch);
|
|
||||||
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
|
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
|
||||||
STbData *pTbData = pMemTable->aBucket[i];
|
STbData *pTbData = pMemTable->aBucket[i];
|
||||||
while (pTbData) {
|
while (pTbData) {
|
||||||
if (!taosArrayPush(aUid, &(TABLEID){.suid = pTbData->suid, .uid = pTbData->uid})) {
|
code = (*cb)(pMemTable, pTbData->suid, pTbData->uid);
|
||||||
TAOS_RETURN(terrno);
|
if (!code) {
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTbData = pTbData->next;
|
pTbData = pTbData->next;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pMemTable->latch);
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue