tsdb/cache: update lru when committing

This commit is contained in:
Minglei Jin 2024-11-15 17:04:22 +08:00
parent e3f57a1678
commit 66535adb70
3 changed files with 137 additions and 1 deletions

View File

@ -250,6 +250,7 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
int32_t tsdbMemTableUids(SMemTable *pMemTable, SArray *aUid);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);

View File

@ -493,10 +493,126 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
return 0;
}
static int32_t tsdbCacheQueryReseek(void *pQHandle) {
int32_t code = 0;
SCacheRowsReader *pReader = pQHandle;
code = taosThreadMutexTryLock(&pReader->readerMutex);
if (code == 0) {
// pause current reader's state if not paused, save ts & version for resuming
// just wait for the big all tables' snapshot untaking for now
code = TSDB_CODE_VND_QUERY_BUSY;
(void)taosThreadMutexUnlock(&pReader->readerMutex);
return code;
} else if (code == EBUSY) {
return TSDB_CODE_VND_QUERY_BUSY;
} else {
return -1;
}
}
static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = imem->pTsdb;
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
STbDataIter iter = {0};
tsdbTbDataIterOpen(pIMem, NULL, 1, &iter);
TSDBROW *pMemRow = tsdbTbDataIterGet(&iter);
if (pMemRow) {
//*ppRow = pMemRow;
// if non deleted, foundLastrow = true;
} else {
TAOS_RETURN(code);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
TAOS_RETURN(code);
}
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
if (!pTsdb) return 0;
if (!pTsdb->imem) return 0;
int32_t code = 0;
int32_t lino = 0;
SMemTable *imem = pTsdb->imem;
int32_t nTbData = imem->nTbData;
int64_t nRow = imem->nRow;
int64_t nDel = imem->nDel;
SArray *aUid = NULL;
SArray *ctxArray = NULL;
if (nRow == 0 || nTbData == 0) return 0;
// get all last/last_row cols of all uids from imem
// 1, get all uids from imem
aUid = taosArrayInit(nTbData, sizeof(TABLEID));
TAOS_CHECK_GOTO(tsdbMemTableUids(imem, aUid), &lino, _exit);
// 2, read all uid's last/last with tomb filtered
if (!pTsdb->rCache.ctxArray) {
pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
if (!pTsdb->rCache.ctxArray) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
}
ctxArray = pTsdb->rCache.ctxArray;
for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) {
TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i];
TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid, ctxArray), &lino, _exit);
}
// 3, update cols into lru
for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) {
TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i];
TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
}
taosArrayDestroy(aUid);
taosArrayClear(ctxArray);
TAOS_RETURN(code);
}
int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0;
char *err = NULL;
// 0, tsdbCacheUpdateFromIMem if updateCacheBatch
// flush dirty data of lru into rocks with
// 4, and update when writing if !updateCacheBatch
code = tsdbCacheUpdateFromIMem(pTsdb);
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
TAOS_RETURN(code);
}
char *err = NULL;
SLRUCache *pCache = pTsdb->lruCache;
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;

View File

@ -321,6 +321,25 @@ void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *
taosRUnLockLatch(&pMemTable->latch);
}
int32_t tsdbMemTableUids(SMemTable *pMemTable, SArray *aUid) {
int32_t code = 0;
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) {
if (!taosArrayPush(aUid, &(TABLEID){.suid = pTbData->suid, .uid = pTbData->uid})) {
TAOS_RETURN(terrno);
}
pTbData = pTbData->next;
}
}
taosRUnLockLatch(&pMemTable->latch);
return code;
}
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
int32_t code = 0;