diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 7fb7e64b49..fcbeaedadf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -234,10 +234,15 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { pTsdb->rCache.suid = -1; pTsdb->rCache.uid = -1; pTsdb->rCache.pTSchema = NULL; - pTsdb->rCache.ctxArray = NULL; + pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx)); + if (!pTsdb->rCache.ctxArray) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err7); + } TAOS_RETURN(code); +_err7: + (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex); _err6: rocksdb_writebatch_destroy(writebatch); _err5: @@ -594,7 +599,9 @@ static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema); } -static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) { +static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray); + +static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) { int32_t code = 0; int32_t lino = 0; STsdb *pTsdb = imem->pTsdb; @@ -608,6 +615,7 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) SSHashObj *iColHash = NULL; int32_t sver; int32_t nCol; + SArray *ctxArray = pTsdb->rCache.ctxArray; STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid); @@ -702,11 +710,14 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline); } + TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit); + _exit: if (code) { tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); } + taosArrayClear(ctxArray); // destroy any allocated resource tSimpleHashCleanup(iColHash); if (pMemDelData) { @@ -719,8 +730,6 @@ _exit: TAOS_RETURN(code); } -static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray); - static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) { if (!pTsdb) return 0; if (!pTsdb->imem) return 0; @@ -732,31 +741,21 @@ static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) { int64_t nRow = imem->nRow; int64_t nDel = imem->nDel; SArray *aUid = NULL; - SArray *ctxArray = NULL; if (nRow == 0 || nTbData == 0) return 0; // get all last/last_row cols of all uids from imem // 1, get all uids from imem aUid = taosArrayInit(nTbData, sizeof(TABLEID)); - TAOS_CHECK_GOTO(tsdbMemTableUids(imem, aUid), &lino, _exit); - - // 2, read all uid's last/last with tomb filtered - if (!pTsdb->rCache.ctxArray) { - pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx)); - if (!pTsdb->rCache.ctxArray) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); - } + if (!aUid) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); } - ctxArray = pTsdb->rCache.ctxArray; + TAOS_CHECK_GOTO(tsdbMemTableUids(imem, aUid), &lino, _exit); for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) { TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i]; - TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid, ctxArray), &lino, _exit); - - TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit); - taosArrayClear(ctxArray); + TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid), &lino, _exit); } _exit: @@ -767,7 +766,6 @@ _exit: } taosArrayDestroy(aUid); - taosArrayClear(ctxArray); TAOS_RETURN(code); } @@ -778,6 +776,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { // 0, tsdbCacheUpdateFromIMem if updateCacheBatch // flush dirty data of lru into rocks with // 4, and update when writing if !updateCacheBatch + // 5, merge cache & mem if updateCacheBatch if (tsUpdateCacheBatch) { code = tsdbCacheUpdateFromIMem(pTsdb); @@ -1609,13 +1608,6 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t nCol = pTSchema->numOfCols; - - if (!pTsdb->rCache.ctxArray) { - pTsdb->rCache.ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx)); - if (!pTsdb->rCache.ctxArray) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); - } - } SArray *ctxArray = pTsdb->rCache.ctxArray; // 1. prepare by lrow