From 064c46bc7bc4ff2f5c3ab61167e75e18bf01f944 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 9 May 2023 09:17:26 +0800 Subject: [PATCH] cache/dclp: double check to load from tsdb --- source/dnode/vnode/src/tsdb/tsdbCache.c | 97 ++++++++++++++++--------- 1 file changed, 64 insertions(+), 33 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 63b0084849..a0c61a8699 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -190,6 +190,25 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { *size = length; } +static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char const *lstring) { + SLastCol *pLastCol = NULL; + + char *err = NULL; + size_t vlen = 0; + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); + char *value = NULL; + value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, key, klen, &vlen, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + pLastCol = tsdbCacheDeserialize(value); + + return pLastCol; +} + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; @@ -362,6 +381,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR taosMemoryFree(errs); for (int i = 0; i < num_keys; ++i) { + bool freeCol = true; SArray *pTmpColArray = NULL; SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); @@ -376,48 +396,59 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } } } else { - // recalc: load from tsdb - int16_t aCols[1] = {cid}; - int16_t slotIds[1] = {pr->pSlotIds[i]}; - pTmpColArray = NULL; + taosThreadMutexLock(&pTsdb->rCache.rMutex); - if (ltype) { - mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); - } else { - mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); - } - - if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { - pLastCol = taosArrayGet(pTmpColArray, 0); - } - - // still null, then make up a none col value + pLastCol = tsdbCacheLookup(pTsdb, uid, cid, lstring); if (!pLastCol) { - pLastCol = &noneCol; + // recalc: load from tsdb + int16_t aCols[1] = {cid}; + int16_t slotIds[1] = {pr->pSlotIds[i]}; + pTmpColArray = NULL; + + if (ltype) { + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } else { + mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } + + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { + pLastCol = taosArrayGet(pTmpColArray, 0); + } + + // still null, then make up a none col value + if (!pLastCol) { + pLastCol = &noneCol; + freeCol = false; + } + + // store result back to rocks cache + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring); + rocksdb_writebatch_put(wb, key, klen, value, vlen); + + taosMemoryFree(value); + + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } } - // store result back to rocks cache - rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(pLastCol, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid); - rocksdb_writebatch_put(wb, key, klen, value, vlen); - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - taosMemoryFree(value); + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); } taosArrayPush(pLastArray, pLastCol); taosArrayDestroy(pTmpColArray); - taosMemoryFree(values_list[i]); + if (freeCol) { + taosMemoryFree(pLastCol); + } } taosMemoryFree(values_list); taosMemoryFree(values_list_sizes);