diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ef37b41f30..d3d3eeb107 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -250,6 +250,7 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum); +int32_t tsdbMemTableUids(SMemTable *pMemTable, SArray *aUid); // STbData int32_t tsdbGetNRowsInTbData(STbData *pTbData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index eb0e7b684e..2564e89d1a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -493,10 +493,126 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { return 0; } +static int32_t tsdbCacheQueryReseek(void *pQHandle) { + int32_t code = 0; + SCacheRowsReader *pReader = pQHandle; + + code = taosThreadMutexTryLock(&pReader->readerMutex); + if (code == 0) { + // pause current reader's state if not paused, save ts & version for resuming + // just wait for the big all tables' snapshot untaking for now + + code = TSDB_CODE_VND_QUERY_BUSY; + (void)taosThreadMutexUnlock(&pReader->readerMutex); + + return code; + } else if (code == EBUSY) { + return TSDB_CODE_VND_QUERY_BUSY; + } else { + return -1; + } +} + +static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) { + int32_t code = 0; + int32_t lino = 0; + STsdb *pTsdb = imem->pTsdb; + + STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid); + + STbDataIter iter = {0}; + tsdbTbDataIterOpen(pIMem, NULL, 1, &iter); + TSDBROW *pMemRow = tsdbTbDataIterGet(&iter); + if (pMemRow) { + //*ppRow = pMemRow; + // if non deleted, foundLastrow = true; + + } else { + TAOS_RETURN(code); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + } + + TAOS_RETURN(code); +} + +static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray); + +static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) { + if (!pTsdb) return 0; + if (!pTsdb->imem) return 0; + + int32_t code = 0; + int32_t lino = 0; + SMemTable *imem = pTsdb->imem; + int32_t nTbData = imem->nTbData; + int64_t nRow = imem->nRow; + int64_t nDel = imem->nDel; + SArray *aUid = NULL; + SArray *ctxArray = NULL; + + if (nRow == 0 || nTbData == 0) return 0; + + // get all last/last_row cols of all uids from imem + // 1, get all uids from imem + aUid = taosArrayInit(nTbData, sizeof(TABLEID)); + TAOS_CHECK_GOTO(tsdbMemTableUids(imem, aUid), &lino, _exit); + + // 2, read all uid's last/last with tomb filtered + if (!pTsdb->rCache.ctxArray) { + pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx)); + if (!pTsdb->rCache.ctxArray) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); + } + } + ctxArray = pTsdb->rCache.ctxArray; + + for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) { + TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i]; + + TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid, ctxArray), &lino, _exit); + } + + // 3, update cols into lru + for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) { + TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i]; + + TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel); + } + + taosArrayDestroy(aUid); + taosArrayClear(ctxArray); + + TAOS_RETURN(code); +} + int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; - char *err = NULL; + // 0, tsdbCacheUpdateFromIMem if updateCacheBatch + // flush dirty data of lru into rocks with + // 4, and update when writing if !updateCacheBatch + + code = tsdbCacheUpdateFromIMem(pTsdb); + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + + TAOS_RETURN(code); + } + + char *err = NULL; SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index eb22335311..617bfc1e17 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -321,6 +321,25 @@ void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t * taosRUnLockLatch(&pMemTable->latch); } +int32_t tsdbMemTableUids(SMemTable *pMemTable, SArray *aUid) { + int32_t code = 0; + + taosRLockLatch(&pMemTable->latch); + for (int32_t i = 0; i < pMemTable->nBucket; ++i) { + STbData *pTbData = pMemTable->aBucket[i]; + while (pTbData) { + if (!taosArrayPush(aUid, &(TABLEID){.suid = pTbData->suid, .uid = pTbData->uid})) { + TAOS_RETURN(terrno); + } + + pTbData = pTbData->next; + } + } + taosRUnLockLatch(&pMemTable->latch); + + return code; +} + static int32_t tsdbMemTableRehash(SMemTable *pMemTable) { int32_t code = 0;