From dca556a7e8aa542d60e9d96818b24a414550febd Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 30 Sep 2024 16:52:02 +0800 Subject: [PATCH 01/12] enh: delay allocation of array in LRU-insert --- source/util/src/tlrucache.c | 39 ++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 69832cd46c..92dcf015a6 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -372,23 +372,34 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) { LRUStatus status = TAOS_LRU_STATUS_OK; - SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); - if (!lastReferenceList) { - taosLRUEntryFree(e); - return TAOS_LRU_STATUS_FAIL; + SLRUEntry *toFree = NULL; + SArray *lastReferenceList = NULL; + if (shard->usage + e->totalCharge > shard->capacity) { + lastReferenceList = taosArrayInit(16, POINTER_BYTES); + if (!lastReferenceList) { + taosLRUEntryFree(e); + return TAOS_LRU_STATUS_FAIL; + } } (void)taosThreadMutexLock(&shard->mutex); - taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList); + if (shard->usage + e->totalCharge > shard->capacity && shard->lru.next != &shard->lru) { + if (!lastReferenceList) { + lastReferenceList = taosArrayInit(16, POINTER_BYTES); + if (!lastReferenceList) { + taosLRUEntryFree(e); + (void)taosThreadMutexUnlock(&shard->mutex); + return TAOS_LRU_STATUS_FAIL; + } + } + taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList); + } if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) { TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (handle == NULL) { - if (!taosArrayPush(lastReferenceList, &e)) { - taosLRUEntryFree(e); - goto _exit; - } + toFree = e; } else { if (freeOnFail) { taosLRUEntryFree(e); @@ -413,11 +424,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * taosLRUCacheShardLRURemove(shard, old); shard->usage -= old->totalCharge; - if (!taosArrayPush(lastReferenceList, &old)) { - taosLRUEntryFree(e); - taosLRUEntryFree(old); - goto _exit; - } + toFree = old; } } if (handle == NULL) { @@ -434,6 +441,10 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * _exit: (void)taosThreadMutexUnlock(&shard->mutex); + if (toFree) { + taosLRUEntryFree(toFree); + } + for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); From 39af874ff8e8c50ae9efaeeee030518f5538914e Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 25 Oct 2024 14:20:07 +0800 Subject: [PATCH 02/12] enh(tsdb/cache): move last ref array into shard instance --- source/util/src/tlrucache.c | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 69832cd46c..bccf37c7d6 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -14,12 +14,12 @@ */ #define _DEFAULT_SOURCE +#include "tlrucache.h" #include "os.h" #include "taoserror.h" #include "tarray.h" #include "tdef.h" #include "tlog.h" -#include "tlrucache.h" #include "tutil.h" typedef struct SLRUEntry SLRUEntry; @@ -248,6 +248,7 @@ struct SLRUCacheShard { SLRUEntryTable table; size_t usage; // Memory size for entries residing in the cache. size_t lruUsage; // Memory size for entries residing only in the LRU list. + SArray *lastReferenceList; TdThreadMutex mutex; }; @@ -367,25 +368,34 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { (void)taosThreadMutexDestroy(&shard->mutex); taosLRUEntryTableCleanup(&shard->table); + + taosArrayDestroy(shard->lastReferenceList); } static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) { LRUStatus status = TAOS_LRU_STATUS_OK; - SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); - if (!lastReferenceList) { - taosLRUEntryFree(e); - return TAOS_LRU_STATUS_FAIL; - } (void)taosThreadMutexLock(&shard->mutex); - taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList); + if (!shard->lastReferenceList) { + shard->lastReferenceList = taosArrayInit(16, POINTER_BYTES); + if (!shard->lastReferenceList) { + taosLRUEntryFree(e); + (void)taosThreadMutexUnlock(&shard->mutex); + + return TAOS_LRU_STATUS_FAIL; + } + } else { + taosArrayClear(shard->lastReferenceList); + } + + taosLRUCacheShardEvictLRU(shard, e->totalCharge, shard->lastReferenceList); if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) { TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (handle == NULL) { - if (!taosArrayPush(lastReferenceList, &e)) { + if (!taosArrayPush(shard->lastReferenceList, &e)) { taosLRUEntryFree(e); goto _exit; } @@ -413,7 +423,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * taosLRUCacheShardLRURemove(shard, old); shard->usage -= old->totalCharge; - if (!taosArrayPush(lastReferenceList, &old)) { + if (!taosArrayPush(shard->lastReferenceList, &old)) { taosLRUEntryFree(e); taosLRUEntryFree(old); goto _exit; @@ -434,12 +444,11 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * _exit: (void)taosThreadMutexUnlock(&shard->mutex); - for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { - SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); + for (int i = 0; i < taosArrayGetSize(shard->lastReferenceList); ++i) { + SLRUEntry *entry = taosArrayGetP(shard->lastReferenceList, i); taosLRUEntryFree(entry); } - taosArrayDestroy(lastReferenceList); return status; } @@ -733,7 +742,8 @@ void taosLRUCacheCleanup(SLRUCache *cache) { } LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, - _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority, void *ud) { + _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, + LRUPriority priority, void *ud) { uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); uint32_t shardIndex = hash & cache->shardedCache.shardMask; From 69e930be6646561d4e190961c9342602ae0a4cbc Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 28 Oct 2024 10:18:48 +0800 Subject: [PATCH 03/12] Revert "enh(tsdb/cache): move last ref array into shard instance" This reverts commit 39af874ff8e8c50ae9efaeeee030518f5538914e. --- source/util/src/tlrucache.c | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index bccf37c7d6..69832cd46c 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -14,12 +14,12 @@ */ #define _DEFAULT_SOURCE -#include "tlrucache.h" #include "os.h" #include "taoserror.h" #include "tarray.h" #include "tdef.h" #include "tlog.h" +#include "tlrucache.h" #include "tutil.h" typedef struct SLRUEntry SLRUEntry; @@ -248,7 +248,6 @@ struct SLRUCacheShard { SLRUEntryTable table; size_t usage; // Memory size for entries residing in the cache. size_t lruUsage; // Memory size for entries residing only in the LRU list. - SArray *lastReferenceList; TdThreadMutex mutex; }; @@ -368,34 +367,25 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { (void)taosThreadMutexDestroy(&shard->mutex); taosLRUEntryTableCleanup(&shard->table); - - taosArrayDestroy(shard->lastReferenceList); } static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) { LRUStatus status = TAOS_LRU_STATUS_OK; + SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); + if (!lastReferenceList) { + taosLRUEntryFree(e); + return TAOS_LRU_STATUS_FAIL; + } (void)taosThreadMutexLock(&shard->mutex); - if (!shard->lastReferenceList) { - shard->lastReferenceList = taosArrayInit(16, POINTER_BYTES); - if (!shard->lastReferenceList) { - taosLRUEntryFree(e); - (void)taosThreadMutexUnlock(&shard->mutex); - - return TAOS_LRU_STATUS_FAIL; - } - } else { - taosArrayClear(shard->lastReferenceList); - } - - taosLRUCacheShardEvictLRU(shard, e->totalCharge, shard->lastReferenceList); + taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList); if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) { TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (handle == NULL) { - if (!taosArrayPush(shard->lastReferenceList, &e)) { + if (!taosArrayPush(lastReferenceList, &e)) { taosLRUEntryFree(e); goto _exit; } @@ -423,7 +413,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * taosLRUCacheShardLRURemove(shard, old); shard->usage -= old->totalCharge; - if (!taosArrayPush(shard->lastReferenceList, &old)) { + if (!taosArrayPush(lastReferenceList, &old)) { taosLRUEntryFree(e); taosLRUEntryFree(old); goto _exit; @@ -444,11 +434,12 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * _exit: (void)taosThreadMutexUnlock(&shard->mutex); - for (int i = 0; i < taosArrayGetSize(shard->lastReferenceList); ++i) { - SLRUEntry *entry = taosArrayGetP(shard->lastReferenceList, i); + for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { + SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); taosLRUEntryFree(entry); } + taosArrayDestroy(lastReferenceList); return status; } @@ -742,8 +733,7 @@ void taosLRUCacheCleanup(SLRUCache *cache) { } LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, - _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, - LRUPriority priority, void *ud) { + _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority, void *ud) { uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); uint32_t shardIndex = hash & cache->shardedCache.shardMask; From cf3e84e79d88e4227aa40c6aabee1e3ab5199d36 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 29 Oct 2024 12:25:25 +0800 Subject: [PATCH 04/12] tsdb/cache: cache schema to reuse --- source/dnode/vnode/src/inc/tsdb.h | 4 +++- source/dnode/vnode/src/tsdb/tsdbCache.c | 26 +++++++++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ed8f99ec75..f1ee5267a1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -342,7 +342,9 @@ typedef struct { rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; - TdThreadMutex writeBatchMutex; + TdThreadMutex writeBatchMutex; + tb_uid_t suid; + tb_uid_t uid; STSchema *pTSchema; } SRocksCache; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 5583e464ed..fa375a9fa0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -221,7 +221,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); - TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6) ; + TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6); pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.my_comparator = cmp; @@ -230,6 +230,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { pTsdb->rCache.readoptions = readoptions; pTsdb->rCache.flushoptions = flushoptions; pTsdb->rCache.db = db; + pTsdb->rCache.suid = -1; + pTsdb->rCache.uid = -1; pTsdb->rCache.pTSchema = NULL; TAOS_RETURN(code); @@ -1302,6 +1304,22 @@ _exit: TAOS_RETURN(code); } +static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { + if (suid) { + if (pTsdb->rCache.suid == suid) { + pTsdb->rCache.uid = uid; + return 0; + } + } else if (pTsdb->rCache.uid == uid) { + return 0; + } + + pTsdb->rCache.suid = suid; + pTsdb->rCache.uid = uid; + tDestroyTSchema(pTsdb->rCache.pTSchema); + return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTsdb->rCache.pTSchema); +} + int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow) { int32_t code = 0, lino = 0; @@ -1314,7 +1332,11 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 SArray *ctxArray = NULL; SSHashObj *iColHash = NULL; + TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid), &lino, _exit); + pTSchema = pTsdb->rCache.pTSchema; + /* TAOS_CHECK_GOTO(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema), &lino, _exit); + */ TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t nCol = pTSchema->numOfCols; @@ -1393,7 +1415,7 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 } _exit: - taosMemoryFreeClear(pTSchema); + // taosMemoryFreeClear(pTSchema); taosArrayDestroy(ctxArray); tSimpleHashCleanup(iColHash); From e364d5aa66479400f3e2fe2ccae136a1467042c1 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 29 Oct 2024 14:41:14 +0800 Subject: [PATCH 05/12] tsdb/cache: invalidate cached schema --- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/meta/metaTable.c | 6 ++++++ source/dnode/vnode/src/tsdb/tsdbCache.c | 14 ++++++++++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fc98d6578b..7673e562a8 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -222,6 +222,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey); +void tsdbCacheInvalidateSchema(STsdb* pTsdb, tb_uid_t suid, tb_uid_t uid); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 21d12ef77d..f525a28bd6 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -620,6 +620,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { } } if (uids) taosArrayDestroy(uids); + + tsdbCacheInvalidateSchema(pTsdb, pReq->suid, -1); } metaWLock(pMeta); @@ -1945,6 +1947,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl break; } + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid); + } + entry.version = version; // do actual write diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index fa375a9fa0..abc1e214a4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -493,7 +493,7 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; char *err = NULL; - + /* SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; @@ -511,7 +511,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { rocksdb_free(err); code = TSDB_CODE_FAILED; } - + */ TAOS_RETURN(code); } @@ -1304,6 +1304,16 @@ _exit: TAOS_RETURN(code); } +void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { + if (suid) { + if (pTsdb->rCache.suid == suid) { + pTsdb->rCache.suid = -1; + } + } else if (pTsdb->rCache.uid == uid) { + pTsdb->rCache.uid = -1; + } +} + static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { if (suid) { if (pTsdb->rCache.suid == suid) { From 6b67c2ebf0be26e47810ce2719f3320b6a61f937 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 29 Oct 2024 15:07:13 +0800 Subject: [PATCH 06/12] uncomment tsdb cache commit --- source/dnode/vnode/src/tsdb/tsdbCache.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index abc1e214a4..801e89c2b6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -493,7 +493,7 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; char *err = NULL; - /* + SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; @@ -511,7 +511,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { rocksdb_free(err); code = TSDB_CODE_FAILED; } - */ + TAOS_RETURN(code); } From 8b4b7c141e1d2adedf9b0b3e40194e8812cea7e6 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 30 Oct 2024 10:22:19 +0800 Subject: [PATCH 07/12] fix schema update with sver param --- source/dnode/vnode/src/tsdb/tsdbCache.c | 76 ++++++++++++------------- source/util/src/tlrucache.c | 20 +++++-- 2 files changed, 51 insertions(+), 45 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 801e89c2b6..16812b16f4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1136,19 +1136,17 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray (void)taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { - SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); - - int8_t lflag = updCtx->lflag; - SRowKey *pRowKey = &updCtx->tsdbRowKey.key; - SColVal *pColVal = &updCtx->colVal; + SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i]; + int8_t lflag = updCtx->lflag; + SRowKey *pRowKey = &updCtx->tsdbRowKey.key; + SColVal *pColVal = &updCtx->colVal; if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) { continue; } SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid}; - size_t klen = ROCKS_KEY_LEN; - LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { @@ -1304,28 +1302,30 @@ _exit: TAOS_RETURN(code); } -void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { - if (suid) { - if (pTsdb->rCache.suid == suid) { +void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + if (pTsdb->rCache.pTSchema && pTsdb->rCache.suid == suid) { + if (pTsdb->rCache.suid && sver == pTsdb->rCache.sver) { + pTsdb->rCache.sver = -1; pTsdb->rCache.suid = -1; + } else if (pTsdb->rCache.uid == uid && pTsdb->rCache.sver == sver) { + pTsdb->rCache.sver = -1; + pTsdb->rCache.uid = -1; } - } else if (pTsdb->rCache.uid == uid) { - pTsdb->rCache.uid = -1; } } -static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { - if (suid) { - if (pTsdb->rCache.suid == suid) { - pTsdb->rCache.uid = uid; +static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + if (pTsdb->rCache.pTSchema && pTsdb->rCache.suid == suid) { + if (pTsdb->rCache.suid && sver == pTsdb->rCache.sver) { + return 0; + } else if (pTsdb->rCache.uid == uid && pTsdb->rCache.sver == sver) { return 0; } - } else if (pTsdb->rCache.uid == uid) { - return 0; } pTsdb->rCache.suid = suid; pTsdb->rCache.uid = uid; + pTsdb->rCache.sver = sver; tDestroyTSchema(pTsdb->rCache.pTSchema); return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTsdb->rCache.pTSchema); } @@ -1335,36 +1335,27 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 int32_t code = 0, lino = 0; // 1. prepare last - TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; - + TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; STSchema *pTSchema = NULL; int32_t sver = TSDBROW_SVERSION(&lRow); SArray *ctxArray = NULL; SSHashObj *iColHash = NULL; - TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid), &lino, _exit); + TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit); pTSchema = pTsdb->rCache.pTSchema; - /* - TAOS_CHECK_GOTO(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema), &lino, _exit); - */ TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t nCol = pTSchema->numOfCols; - ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx)); - iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx)); // 1. prepare by lrow STsdbRowKey tsdbRowKey = {0}; tsdbRowGetKey(&lRow, &tsdbRowKey); STSDBRowIter iter = {0}; - code = tsdbRowIterOpen(&iter, &lRow, pTSchema); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); - TAOS_CHECK_GOTO(code, &lino, _exit); - } + TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit); + int32_t iCol = 0; for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; @@ -1372,15 +1363,19 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 TAOS_CHECK_GOTO(terrno, &lino, _exit); } - if (!COL_VAL_IS_VALUE(pColVal)) { + if (COL_VAL_IS_VALUE(pColVal)) { + updateCtx.lflag = LFLAG_LAST; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } + } else { + if (!iColHash) { + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + } + if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); } - continue; - } - updateCtx.lflag = LFLAG_LAST; - if (!taosArrayPush(ctxArray, &updateCtx)) { - TAOS_CHECK_GOTO(terrno, &lino, _exit); } } tsdbRowClose(&iter); @@ -1425,7 +1420,10 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 } _exit: - // taosMemoryFreeClear(pTSchema); + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + taosArrayDestroy(ctxArray); tSimpleHashCleanup(iColHash); diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 92dcf015a6..160808f956 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -14,12 +14,12 @@ */ #define _DEFAULT_SOURCE +#include "tlrucache.h" #include "os.h" #include "taoserror.h" #include "tarray.h" #include "tdef.h" #include "tlog.h" -#include "tlrucache.h" #include "tutil.h" typedef struct SLRUEntry SLRUEntry; @@ -110,7 +110,7 @@ struct SLRUEntryTable { }; static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) { - table->lengthBits = 4; + table->lengthBits = 16; table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *)); if (!table->list) { TAOS_RETURN(terrno); @@ -168,7 +168,14 @@ static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *k while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) { entry = &(*entry)->nextHash; } - + /* + SLRUEntry *pentry = table->list[hash >> (32 - table->lengthBits)]; + SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)]; + while (pentry && (pentry->hash != hash || memcmp(key, pentry->keyData, keyLen) != 0)) { + entry = &pentry->nextHash; + pentry = *entry; + } + */ return entry; } @@ -371,9 +378,9 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) { - LRUStatus status = TAOS_LRU_STATUS_OK; + LRUStatus status = TAOS_LRU_STATUS_OK; SLRUEntry *toFree = NULL; - SArray *lastReferenceList = NULL; + SArray *lastReferenceList = NULL; if (shard->usage + e->totalCharge > shard->capacity) { lastReferenceList = taosArrayInit(16, POINTER_BYTES); if (!lastReferenceList) { @@ -744,7 +751,8 @@ void taosLRUCacheCleanup(SLRUCache *cache) { } LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, - _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority, void *ud) { + _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, + LRUPriority priority, void *ud) { uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); uint32_t shardIndex = hash & cache->shardedCache.shardMask; From d8df6db06ab039e47a6356b3870e15454d5d83e9 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 30 Oct 2024 10:22:19 +0800 Subject: [PATCH 08/12] fix schema update with sver param --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/meta/metaTable.c | 4 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 76 ++++++++++++------------- source/util/src/tlrucache.c | 20 +++++-- 5 files changed, 55 insertions(+), 48 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f1ee5267a1..c1123db7a3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -343,6 +343,7 @@ typedef struct { rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; TdThreadMutex writeBatchMutex; + int32_t sver; tb_uid_t suid; tb_uid_t uid; STSchema *pTSchema; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7673e562a8..98f11086bc 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -222,7 +222,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey); -void tsdbCacheInvalidateSchema(STsdb* pTsdb, tb_uid_t suid, tb_uid_t uid); +void tsdbCacheInvalidateSchema(STsdb* pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index f525a28bd6..5c3516a962 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -621,7 +621,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { } if (uids) taosArrayDestroy(uids); - tsdbCacheInvalidateSchema(pTsdb, pReq->suid, -1); + tsdbCacheInvalidateSchema(pTsdb, pReq->suid, -1, pReq->schemaRow.version); } metaWLock(pMeta); @@ -1948,7 +1948,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl } if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { - tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid); + tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid, pSchema->version); } entry.version = version; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 801e89c2b6..16812b16f4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1136,19 +1136,17 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray (void)taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { - SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); - - int8_t lflag = updCtx->lflag; - SRowKey *pRowKey = &updCtx->tsdbRowKey.key; - SColVal *pColVal = &updCtx->colVal; + SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i]; + int8_t lflag = updCtx->lflag; + SRowKey *pRowKey = &updCtx->tsdbRowKey.key; + SColVal *pColVal = &updCtx->colVal; if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) { continue; } SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid}; - size_t klen = ROCKS_KEY_LEN; - LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { @@ -1304,28 +1302,30 @@ _exit: TAOS_RETURN(code); } -void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { - if (suid) { - if (pTsdb->rCache.suid == suid) { +void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + if (pTsdb->rCache.pTSchema && pTsdb->rCache.suid == suid) { + if (pTsdb->rCache.suid && sver == pTsdb->rCache.sver) { + pTsdb->rCache.sver = -1; pTsdb->rCache.suid = -1; + } else if (pTsdb->rCache.uid == uid && pTsdb->rCache.sver == sver) { + pTsdb->rCache.sver = -1; + pTsdb->rCache.uid = -1; } - } else if (pTsdb->rCache.uid == uid) { - pTsdb->rCache.uid = -1; } } -static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { - if (suid) { - if (pTsdb->rCache.suid == suid) { - pTsdb->rCache.uid = uid; +static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + if (pTsdb->rCache.pTSchema && pTsdb->rCache.suid == suid) { + if (pTsdb->rCache.suid && sver == pTsdb->rCache.sver) { + return 0; + } else if (pTsdb->rCache.uid == uid && pTsdb->rCache.sver == sver) { return 0; } - } else if (pTsdb->rCache.uid == uid) { - return 0; } pTsdb->rCache.suid = suid; pTsdb->rCache.uid = uid; + pTsdb->rCache.sver = sver; tDestroyTSchema(pTsdb->rCache.pTSchema); return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTsdb->rCache.pTSchema); } @@ -1335,36 +1335,27 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 int32_t code = 0, lino = 0; // 1. prepare last - TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; - + TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; STSchema *pTSchema = NULL; int32_t sver = TSDBROW_SVERSION(&lRow); SArray *ctxArray = NULL; SSHashObj *iColHash = NULL; - TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid), &lino, _exit); + TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit); pTSchema = pTsdb->rCache.pTSchema; - /* - TAOS_CHECK_GOTO(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema), &lino, _exit); - */ TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t nCol = pTSchema->numOfCols; - ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx)); - iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx)); // 1. prepare by lrow STsdbRowKey tsdbRowKey = {0}; tsdbRowGetKey(&lRow, &tsdbRowKey); STSDBRowIter iter = {0}; - code = tsdbRowIterOpen(&iter, &lRow, pTSchema); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); - TAOS_CHECK_GOTO(code, &lino, _exit); - } + TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit); + int32_t iCol = 0; for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; @@ -1372,15 +1363,19 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 TAOS_CHECK_GOTO(terrno, &lino, _exit); } - if (!COL_VAL_IS_VALUE(pColVal)) { + if (COL_VAL_IS_VALUE(pColVal)) { + updateCtx.lflag = LFLAG_LAST; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } + } else { + if (!iColHash) { + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + } + if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); } - continue; - } - updateCtx.lflag = LFLAG_LAST; - if (!taosArrayPush(ctxArray, &updateCtx)) { - TAOS_CHECK_GOTO(terrno, &lino, _exit); } } tsdbRowClose(&iter); @@ -1425,7 +1420,10 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 } _exit: - // taosMemoryFreeClear(pTSchema); + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + taosArrayDestroy(ctxArray); tSimpleHashCleanup(iColHash); diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 92dcf015a6..160808f956 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -14,12 +14,12 @@ */ #define _DEFAULT_SOURCE +#include "tlrucache.h" #include "os.h" #include "taoserror.h" #include "tarray.h" #include "tdef.h" #include "tlog.h" -#include "tlrucache.h" #include "tutil.h" typedef struct SLRUEntry SLRUEntry; @@ -110,7 +110,7 @@ struct SLRUEntryTable { }; static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) { - table->lengthBits = 4; + table->lengthBits = 16; table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *)); if (!table->list) { TAOS_RETURN(terrno); @@ -168,7 +168,14 @@ static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *k while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) { entry = &(*entry)->nextHash; } - + /* + SLRUEntry *pentry = table->list[hash >> (32 - table->lengthBits)]; + SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)]; + while (pentry && (pentry->hash != hash || memcmp(key, pentry->keyData, keyLen) != 0)) { + entry = &pentry->nextHash; + pentry = *entry; + } + */ return entry; } @@ -371,9 +378,9 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) { - LRUStatus status = TAOS_LRU_STATUS_OK; + LRUStatus status = TAOS_LRU_STATUS_OK; SLRUEntry *toFree = NULL; - SArray *lastReferenceList = NULL; + SArray *lastReferenceList = NULL; if (shard->usage + e->totalCharge > shard->capacity) { lastReferenceList = taosArrayInit(16, POINTER_BYTES); if (!lastReferenceList) { @@ -744,7 +751,8 @@ void taosLRUCacheCleanup(SLRUCache *cache) { } LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, - _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority, void *ud) { + _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, + LRUPriority priority, void *ud) { uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); uint32_t shardIndex = hash & cache->shardedCache.shardMask; From ba88253bf19be9a21855eb8e8a7edf0d012bb7ca Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 30 Oct 2024 14:42:13 +0800 Subject: [PATCH 09/12] lru: remove commented section --- source/util/src/tlrucache.c | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 160808f956..43a220af50 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -168,14 +168,7 @@ static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *k while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) { entry = &(*entry)->nextHash; } - /* - SLRUEntry *pentry = table->list[hash >> (32 - table->lengthBits)]; - SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)]; - while (pentry && (pentry->hash != hash || memcmp(key, pentry->keyData, keyLen) != 0)) { - entry = &pentry->nextHash; - pentry = *entry; - } - */ + return entry; } From 6da22e64838a677c7bd68e8d017771963fb0a01e Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 30 Oct 2024 16:13:27 +0800 Subject: [PATCH 10/12] enh: tsdb cache schema --- source/dnode/vnode/src/tsdb/tsdbCache.c | 37 ++++++++++++++----------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 16812b16f4..334725d9d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -230,6 +230,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { pTsdb->rCache.readoptions = readoptions; pTsdb->rCache.flushoptions = flushoptions; pTsdb->rCache.db = db; + pTsdb->rCache.sver = -1; pTsdb->rCache.suid = -1; pTsdb->rCache.uid = -1; pTsdb->rCache.pTSchema = NULL; @@ -1303,31 +1304,35 @@ _exit: } void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { - if (pTsdb->rCache.pTSchema && pTsdb->rCache.suid == suid) { - if (pTsdb->rCache.suid && sver == pTsdb->rCache.sver) { - pTsdb->rCache.sver = -1; - pTsdb->rCache.suid = -1; - } else if (pTsdb->rCache.uid == uid && pTsdb->rCache.sver == sver) { - pTsdb->rCache.sver = -1; - pTsdb->rCache.uid = -1; - } + SRocksCache *pRCache = &pTsdb->rCache; + if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return; + + if (suid > 0 && suid == pRCache->suid) { + pRCache->sver = -1; + pRCache->suid = -1; + } + if (suid == 0 && uid == pRCache->uid) { + pRCache->sver = -1; + pRCache->uid = -1; } } static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { - if (pTsdb->rCache.pTSchema && pTsdb->rCache.suid == suid) { - if (pTsdb->rCache.suid && sver == pTsdb->rCache.sver) { + SRocksCache *pRCache = &pTsdb->rCache; + if (pRCache->pTSchema && sver == pRCache->sver) { + if (suid > 0 && suid == pRCache->suid) { return 0; - } else if (pTsdb->rCache.uid == uid && pTsdb->rCache.sver == sver) { + } + if (suid == 0 && uid == pRCache->uid) { return 0; } } - pTsdb->rCache.suid = suid; - pTsdb->rCache.uid = uid; - pTsdb->rCache.sver = sver; - tDestroyTSchema(pTsdb->rCache.pTSchema); - return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTsdb->rCache.pTSchema); + pRCache->suid = suid; + pRCache->uid = uid; + pRCache->sver = sver; + tDestroyTSchema(pRCache->pTSchema); + return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema); } int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, From fde2a56076be942bfef4df63b9813240e9c05ba9 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 31 Oct 2024 11:47:32 +0800 Subject: [PATCH 11/12] enh: tsdbCacheRowFormatUpdate check for memory funcs --- source/dnode/vnode/src/tsdb/tsdbCache.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 8e4a53845a..1ae7dfeff7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1350,6 +1350,9 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 int32_t nCol = pTSchema->numOfCols; ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx)); + if (ctxArray == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); + } // 1. prepare by lrow STsdbRowKey tsdbRowKey = {0}; @@ -1362,20 +1365,27 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { + tsdbRowClose(&iter); TAOS_CHECK_GOTO(terrno, &lino, _exit); } if (COL_VAL_IS_VALUE(pColVal)) { updateCtx.lflag = LFLAG_LAST; if (!taosArrayPush(ctxArray, &updateCtx)) { + tsdbRowClose(&iter); TAOS_CHECK_GOTO(terrno, &lino, _exit); } } else { if (!iColHash) { iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (ctxArray == NULL) { + tsdbRowClose(&iter); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); + } } if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) { + tsdbRowClose(&iter); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); } } From 2879fafc1c6e9a27e268e630336949f3d54d1183 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 31 Oct 2024 14:54:01 +0800 Subject: [PATCH 12/12] fix: typo issue --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 1ae7dfeff7..2cef541cdb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1378,7 +1378,7 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 } else { if (!iColHash) { iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - if (ctxArray == NULL) { + if (iColHash == NULL) { tsdbRowClose(&iter); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); }