From 756fefaa292c22bd294c50017acc7e38807cc7cd Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 16 May 2023 18:56:05 +0800 Subject: [PATCH 01/10] cache/batchread: load columns in one trip --- source/dnode/vnode/src/inc/tsdb.h | 24 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 676 ++++++++++++++++---- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 24 +- 3 files changed, 567 insertions(+), 157 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b2bc9abf33..486becdf96 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -346,13 +346,17 @@ struct STsdbFS { }; typedef struct { - rocksdb_t *db; - rocksdb_options_t *options; - rocksdb_flushoptions_t *flushoptions; - rocksdb_writeoptions_t *writeoptions; - rocksdb_readoptions_t *readoptions; - rocksdb_writebatch_t *writebatch; - TdThreadMutex rMutex; + rocksdb_t *db; + rocksdb_comparator_t *my_comparator; + rocksdb_cache_t *blockcache; + rocksdb_block_based_table_options_t *tableoptions; + rocksdb_options_t *options; + rocksdb_flushoptions_t *flushoptions; + rocksdb_writeoptions_t *writeoptions; + rocksdb_readoptions_t *readoptions; + rocksdb_writebatch_t *writebatch; + TdThreadMutex rMutex; + STSchema *pTSchema; } SRocksCache; struct STsdb { @@ -782,7 +786,7 @@ typedef struct SLDataIter { #define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row)) int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter); + bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); @@ -822,13 +826,15 @@ typedef struct SCacheRowsReader { typedef struct { TSKEY ts; + int8_t dirty; SColVal colVal; } SLastCol; int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype); +int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype); +int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c0a8de5743..a637ba4968 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -46,7 +46,13 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } -#define ROCKS_KEY_LEN 64 +#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) + +typedef struct { + tb_uid_t uid; + int16_t cid; + int8_t ltype; +} SLastKey; static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { SVnode *pVnode = pTsdb->pVnode; @@ -62,9 +68,56 @@ static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { } } +static const char *myCmpName(void *state) { + (void)state; + return "myCmp"; +} + +static void myCmpDestroy(void *state) { (void)state; } + +static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) { + (void)state; + (void)alen; + (void)blen; + SLastKey *lhs = (SLastKey *)a; + SLastKey *rhs = (SLastKey *)b; + + if (lhs->uid < rhs->uid) { + return -1; + } else if (lhs->uid > rhs->uid) { + return 1; + } + + if (lhs->cid < rhs->cid) { + return -1; + } else if (lhs->cid > rhs->cid) { + return 1; + } + + if (lhs->ltype < rhs->ltype) { + return -1; + } else if (lhs->ltype > rhs->ltype) { + return 1; + } + + return 0; +} + static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { int32_t code = 0; + rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName); + if (NULL == cmp) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + rocksdb_cache_t *cache = rocksdb_cache_create_lru(5 * 1024 * 1024); + pTsdb->rCache.blockcache = cache; + + rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create(); + pTsdb->rCache.tableoptions = tableoptions; + rocksdb_options_t *options = rocksdb_options_create(); if (NULL == options) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -72,6 +125,9 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { } rocksdb_options_set_create_if_missing(options, 1); + rocksdb_options_set_comparator(options, cmp); + rocksdb_block_based_options_set_block_cache(tableoptions, cache); + rocksdb_options_set_block_based_table_factory(options, tableoptions); // rocksdb_options_set_inplace_update_support(options, 1); // rocksdb_options_set_allow_concurrent_memtable_write(options, 0); @@ -80,12 +136,12 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err2; } - // rocksdb_writeoptions_disable_WAL(writeoptions, 1); + rocksdb_writeoptions_disable_WAL(writeoptions, 1); rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); if (NULL == readoptions) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err2; + goto _err3; } char *err = NULL; @@ -94,19 +150,23 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_t *db = rocksdb_open(options, cachePath, &err); if (NULL == db) { - code = -1; - goto _err3; + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err4; } rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); if (NULL == flushoptions) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err4; + goto _err5; } rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; + pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.readoptions = readoptions; @@ -115,15 +175,22 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); + pTsdb->rCache.pTSchema = NULL; + return code; +_err5: + rocksdb_close(pTsdb->rCache.db); _err4: rocksdb_readoptions_destroy(readoptions); _err3: rocksdb_writeoptions_destroy(writeoptions); _err2: rocksdb_options_destroy(options); + rocksdb_block_based_options_destroy(tableoptions); + rocksdb_cache_destroy(cache); _err: + rocksdb_comparator_destroy(cmp); return code; } @@ -134,7 +201,11 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); + rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions); + rocksdb_cache_destroy(pTsdb->rCache.blockcache); + rocksdb_comparator_destroy(pTsdb->rCache.my_comparator); taosThreadMutexDestroy(&pTsdb->rCache.rMutex); + taosMemoryFree(pTsdb->rCache.pTSchema); } int32_t tsdbCacheCommit(STsdb *pTsdb) { @@ -191,15 +262,15 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { *size = length; } -static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char const *lstring) { +static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) { SLastCol *pLastCol = NULL; - char *err = NULL; - size_t vlen = 0; - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); - char *value = NULL; - value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, key, klen, &vlen, &err); + char *err = NULL; + size_t vlen = 0; + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + size_t klen = ROCKS_KEY_LEN; + char *value = NULL; + value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); rocksdb_free(err); @@ -210,18 +281,40 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char c return pLastCol; } +static void rocksMayWrite(STsdb *pTsdb) { + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + + int count = rocksdb_writebatch_count(wb); + if (count >= 1024) { + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + rocksdb_writebatch_clear(wb); + } +} + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; // 1, fetch schema - STSchema *pTSchema = NULL; + STSchema *pTSchema = pTsdb->rCache.pTSchema; int32_t sver = TSDBROW_SVERSION(pRow); - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } + if (!pTSchema || sver != pTSchema->version) { + if (pTSchema) { + taosMemoryFree(pTSchema); + } + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + pTsdb->rCache.pTSchema = pTSchema; + } // 2, iterate col values into array SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); @@ -229,22 +322,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow tsdbRowIterOpen(&iter, pRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - /* - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - - pColVal->value.pData = NULL; - code = tRealloc(&pColVal->value.pData, pColVal->value.nData); - if (code) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } - */ taosArrayPush(aColVal, pColVal); } @@ -254,23 +331,18 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow int num_keys = TARRAY_SIZE(aColVal); char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN * 2); for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); int16_t cid = pColVal->cid; - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); - if (lr_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - keys_list[i] = keys; - keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; - keys_list_sizes[i] = last_key_len; - keys_list_sizes[num_keys + i] = lr_key_len; + memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = 1, .uid = uid, .cid = cid}, ROCKS_KEY_LEN); + memcpy(key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN, &(SLastKey){.ltype = 0, .uid = uid, .cid = cid}, + ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list[num_keys + i] = key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; + keys_list_sizes[num_keys + i] = ROCKS_KEY_LEN; } char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); @@ -278,12 +350,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosThreadMutexLock(&pTsdb->rCache.rMutex); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); - for (int i = 0; i < num_keys; ++i) { - taosMemoryFree(keys_list[i]); - } for (int i = 0; i < num_keys * 2; ++i) { rocksdb_free(errs[i]); } + taosMemoryFree(key_list); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); taosMemoryFree(errs); @@ -292,19 +362,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); - if (COL_VAL_IS_VALUE(pColVal)) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - - if (NULL == pLastCol || pLastCol->ts <= keyTs) { - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid); - rocksdb_writebatch_put(wb, key, klen, value, vlen); - taosMemoryFree(value); - } - } if (!COL_VAL_IS_NONE(pColVal)) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); @@ -313,11 +370,26 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid); - rocksdb_writebatch_put(wb, key, klen, value, vlen); + SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); taosMemoryFree(value); } + + if (COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; + size_t klen = ROCKS_KEY_LEN; + + rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); + taosMemoryFree(value); + } + } } rocksdb_free(values_list[i]); @@ -326,18 +398,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } + rocksMayWrite(pTsdb); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - rocksdb_writebatch_clear(wb); _exit: taosArrayDestroy(aColVal); - taosMemoryFree(pTSchema); + // taosMemoryFree(pTSchema); return code; } @@ -356,38 +422,36 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); - -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) { - static char const *alstring[2] = {"last_row", "last"}; - char const *lstring = alstring[ltype]; +#if 1 +int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { rocksdb_writebatch_t *wb = NULL; int32_t code = 0; SArray *pCidList = pr->pCidList; int num_keys = TARRAY_SIZE(pCidList); - char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); - size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + + char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); + size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN); for (int i = 0; i < num_keys; ++i) { int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - - keys_list[i] = keys; - keys_list_sizes[i] = last_key_len; + memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}, ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; } + char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); - char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); + char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { - taosMemoryFree(keys_list[i]); - rocksdb_free(errs[i]); + if (errs[i]) { + rocksdb_free(errs[i]); + } } + taosMemoryFree(key_list); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); taosMemoryFree(errs); @@ -403,7 +467,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } else { taosThreadMutexLock(&pTsdb->rCache.rMutex); - pLastCol = tsdbCacheLookup(pTsdb, uid, cid, lstring); + pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); if (!pLastCol) { // recalc: load from tsdb int16_t aCols[1] = {cid}; @@ -432,9 +496,10 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(pLastCol, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring); - rocksdb_writebatch_put(wb, key, klen, value, vlen); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); taosMemoryFree(value); } else { @@ -442,21 +507,13 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } if (wb) { - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - rocksdb_writebatch_clear(wb); + rocksMayWrite(pTsdb); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); } taosArrayPush(pLastArray, pLastCol); - taosArrayDestroy(pTmpColArray); if (freeCol) { taosMemoryFree(pLastCol); @@ -467,6 +524,356 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR return code; } +#endif + +static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid, + int8_t ltype) { + SLastCol *pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); + if (!pLastCol) { + rocksdb_writebatch_t *wb = NULL; + + taosThreadMutexLock(&pTsdb->rCache.rMutex); + pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); + if (!pLastCol) { + // recalc: load from tsdb + int16_t aCols[1] = {cid}; + int16_t slotIds[1] = {slotid}; + SArray *pTmpColArray = NULL; + + if (ltype) { + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } else { + mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } + + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { + pLastCol = taosArrayGet(pTmpColArray, 0); + } + + // still null, then make up a none col value + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[slotid].type)}; + if (!pLastCol) { + pLastCol = &noneCol; + } + + // store result back to rocks cache + wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); + + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + taosArrayDestroy(pTmpColArray); + } + + if (wb) { + rocksMayWrite(pTsdb); + } + + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + } + + return pLastCol; +} + +static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { + SLastCol *pLastCol = (SLastCol *)value; + + // TODO: add dirty flag to SLastCol + if (pLastCol->dirty) { + // TODO: queue into dirty list, free it after save to backstore + } else { + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { + taosMemoryFree(pLastCol->colVal.value.pData); + } + + taosMemoryFree(value); + } +} + +typedef struct { + int idx; + SLastKey key; +} SIdxKey; + +static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, + SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + rocksdb_writebatch_t *wb = NULL; + SArray *pTmpColArray = NULL; + int num_keys = TARRAY_SIZE(remainCols); + int16_t *aCols = taosMemoryMalloc(num_keys * sizeof(int16_t)); + int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + + for (int i = 0; i < num_keys; ++i) { + SIdxKey *idxKey = taosArrayGet(remainCols, i); + aCols[i] = idxKey->key.cid; + slotIds[i] = pr->pSlotIds[idxKey->idx]; + } + + if (ltype) { + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds); + } else { + mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds); + } + + SLRUCache *pCache = pTsdb->lruCache; + for (int i = 0; i < num_keys; ++i) { + SIdxKey *idxKey = taosArrayGet(remainCols, i); + SLastCol *pLastCol = NULL; + + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) { + pLastCol = taosArrayGet(pTmpColArray, i); + } + + // still null, then make up a none col value + SLastCol noneCol = {.ts = TSKEY_MIN, + .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; + if (!pLastCol) { + pLastCol = &noneCol; + } + + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, + TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + // store result back to rocks cache + wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + + SLastKey *key = &idxKey->key; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); + + taosArraySet(pLastArray, idxKey->idx, pLastCol); + // taosArrayRemove(remainCols, i); + } + + if (wb) { + rocksMayWrite(pTsdb); + } + + taosArrayDestroy(pTmpColArray); + + taosMemoryFree(aCols); + taosMemoryFree(slotIds); + + return code; +} + +static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, + SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + int num_keys = TARRAY_SIZE(remainCols); + char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); + size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN); + for (int i = 0; i < num_keys; ++i) { + int16_t cid = *(int16_t *)taosArrayGet(remainCols, i); + + memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; + } + + char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, + keys_list_sizes, values_list, values_list_sizes, errs); + for (int i = 0; i < num_keys; ++i) { + if (errs[i]) { + rocksdb_free(errs[i]); + } + } + taosMemoryFree(key_list); + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(errs); + + SLRUCache *pCache = pTsdb->lruCache; + for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + SIdxKey *idxKey = taosArrayGet(remainCols, j); + int16_t cid = idxKey->key.cid; + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + if (pLastCol) { + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, + NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + + taosArraySet(pLastArray, idxKey->idx, &lastCol); + taosArrayRemove(remainCols, j); + + taosMemoryFree(values_list[i]); + } else { + ++j; + } + } + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + if (TARRAY_SIZE(remainCols) > 0) { + code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype); + } + + return code; +} + +int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int num_keys = TARRAY_SIZE(pCidList); + + SArray *remainCols = NULL; + + for (int i = 0; i < num_keys; ++i) { + int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + + LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + taosArrayPush(pLastArray, &lastCol); + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + } else { + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + + taosArrayPush(pLastArray, &noneCol); + + if (!remainCols) { + remainCols = taosArrayInit(num_keys, sizeof(SIdxKey)); + } + taosArrayPush(remainCols, &(SIdxKey){i, *key}); + } + } + + if (remainCols && TARRAY_SIZE(remainCols) > 0) { + taosThreadMutexLock(&pTsdb->lruMutex); + for (int i = 0; i < TARRAY_SIZE(remainCols);) { + SIdxKey *idxKey = taosArrayGet(remainCols, i); + LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + + taosArraySet(pLastArray, idxKey->idx, &lastCol); + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + + taosArrayRemove(remainCols, i); + } else { + ++i; + } + } + + code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + } + + if (remainCols) { + taosArrayDestroy(remainCols); + } + + return code; +} + +int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int num_keys = TARRAY_SIZE(pCidList); + + for (int i = 0; i < num_keys; ++i) { + SLastCol *pLastCol = NULL; + int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (!h) { + taosThreadMutexLock(&pTsdb->lruMutex); + h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (!h) { + pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype); + + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h, + TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + } + + taosThreadMutexUnlock(&pTsdb->lruMutex); + } + + pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + + taosArrayPush(pLastArray, &lastCol); + } + + return code; +} int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; @@ -486,19 +893,15 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE for (int i = 0; i < num_keys; ++i) { int16_t cid = pTSchema->columns[i].colId; - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); - if (lr_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } + size_t klen = ROCKS_KEY_LEN; + char *keys = taosMemoryCalloc(2, klen); + ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + keys_list[i] = keys; - keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; - keys_list_sizes[i] = last_key_len; - keys_list_sizes[num_keys + i] = lr_key_len; + keys_list[num_keys + i] = keys + klen; + keys_list_sizes[i] = klen; + keys_list_sizes[num_keys + i] = klen; } char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); @@ -520,16 +923,18 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE for (int i = 0; i < num_keys; ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid); - rocksdb_writebatch_delete(wb, key, klen); + SLastKey *key = &(SLastKey){.ltype = 1, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = sizeof(*key); + + rocksdb_writebatch_delete(wb, (char *)key, klen); } pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pLastCol->colVal.cid); - rocksdb_writebatch_delete(wb, key, klen); + SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = sizeof(*key); + + rocksdb_writebatch_delete(wb, (char *)key, klen); } rocksdb_free(values_list[i]); @@ -538,14 +943,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } + rocksMayWrite(pTsdb); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - rocksdb_writebatch_clear(wb); _exit: taosMemoryFree(pTSchema); @@ -1111,7 +1510,7 @@ typedef struct { SMergeTree mergeTree; SMergeTree *pMergeTree; SSttBlockLoadInfo *pLoadInfo; - SLDataIter* pDataIter; + SLDataIter *pDataIter; int64_t lastTs; } SFSLastNextRowIter; @@ -1159,7 +1558,8 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa } tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, state->pDataIter); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, + state->pDataIter); state->pMergeTree = &state->mergeTree; state->state = SFSLASTNEXTROW_BLOCKROW; } @@ -1394,11 +1794,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie tBlockDataReset(state->pBlockData); TABLEID tid = {.suid = state->suid, .uid = state->uid}; int nTmpCols = nCols; - if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID && nCols == 1) { - nTmpCols = 0; + bool hasTs = false; + if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { + --nTmpCols; skipBlock = false; + hasTs = true; } - code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, aCols, nTmpCols); + code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, hasTs ? aCols + 1 : aCols, nTmpCols); if (code) goto _err; code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData); @@ -1730,8 +2132,8 @@ typedef struct { } CacheNextRowIter; static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, - SSttBlockLoadInfo *pLoadInfo, SLDataIter* pLDataIter, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader, - SDataFReader **pDataFReaderLast, int64_t lastTs) { + SSttBlockLoadInfo *pLoadInfo, SLDataIter *pLDataIter, STsdbReadSnap *pReadSnap, + SDataFReader **pDataFReader, SDataFReader **pDataFReaderLast, int64_t lastTs) { int code = 0; STbData *pMem = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 53103e9fbb..f6e37d9427 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -168,8 +168,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, } SVnodeCfg* pCfg = &((SVnode*)pVnode)->config; - - int32_t numOfStt = pCfg->sttTrigger; + int32_t numOfStt = pCfg->sttTrigger; p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt); if (p->pLoadInfo == NULL) { tsdbCacherowsReaderClose(p); @@ -203,7 +202,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { taosMemoryFree(p->pSchema); } - taosMemoryFreeClear(p->pDataIter); + taosMemoryFree(p->pDataIter); taosMemoryFree(p->pCurrSchema); destroyLastBlockLoadInfo(p->pLoadInfo); @@ -294,21 +293,23 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; - int32_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; + int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; // retrieve the only one last row of all tables in the uid list. if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { int64_t st = taosGetTimestampUs(); int64_t totalLastTs = INT64_MAX; + for (int32_t i = 0; i < pr->numOfTables; ++i) { STableKeyInfo* pKeyInfo = &pr->pTableList[i]; - tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + // tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0) { taosArrayClearEx(pRow, freeItem); continue; } - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + SLastCol* pColVal = taosArrayGet(pRow, 0); if (COL_VAL_IS_NONE(&pColVal->colVal)) { taosArrayClearEx(pRow, freeItem); continue; @@ -361,7 +362,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } - if (taosArrayGetSize(pTableUidList) == 0) { + if (TARRAY_SIZE(pTableUidList) == 0) { taosArrayPush(pTableUidList, &pKeyInfo->uid); } else { taosArraySet(pTableUidList, 0, &pKeyInfo->uid); @@ -375,9 +376,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { - STableKeyInfo* pKeyInfo = &pr->pTableList[i]; + tb_uid_t uid = pr->pTableList[i].uid; - tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0) { taosArrayClearEx(pRow, freeItem); continue; @@ -391,9 +392,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); taosArrayClearEx(pRow, freeItem); - taosArrayPush(pTableUidList, &pKeyInfo->uid); + taosArrayPush(pTableUidList, &uid); - pr->tableIndex += 1; + ++pr->tableIndex; if (pResBlock->info.rows >= pResBlock->info.capacity) { goto _end; } @@ -419,5 +420,6 @@ _end: taosMemoryFree(pRes); taosArrayDestroyEx(pRow, freeItem); taosArrayDestroyEx(pLastCols, freeItem); + return code; } From 86e3c1d20d8eb8413cf5022e9aed40546c2c0180 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 17 May 2023 14:05:28 +0800 Subject: [PATCH 02/10] cache/stt: remove ts from merge tree loading --- source/dnode/vnode/src/tsdb/tsdbCache.c | 24 ++++++++++++--------- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 21 ++++++++++++------ 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a637ba4968..3890e3d1f7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -716,9 +716,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA SLRUCache *pCache = pTsdb->lruCache; for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - SIdxKey *idxKey = taosArrayGet(remainCols, j); - int16_t cid = idxKey->key.cid; - SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; if (pLastCol) { SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); *pTmpLastCol = *pLastCol; @@ -747,6 +745,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA ++j; } } + taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); @@ -759,14 +758,13 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { int32_t code = 0; + SArray *remainCols = NULL; SLRUCache *pCache = pTsdb->lruCache; SArray *pCidList = pr->pCidList; int num_keys = TARRAY_SIZE(pCidList); - SArray *remainCols = NULL; - for (int i = 0; i < num_keys; ++i) { - int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; @@ -775,7 +773,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol lastCol = *pLastCol; - reallocVarData(&lastCol.colVal); + // reallocVarData(&lastCol.colVal); taosArrayPush(pLastArray, &lastCol); if (h) { @@ -796,7 +794,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache if (remainCols && TARRAY_SIZE(remainCols) > 0) { taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < TARRAY_SIZE(remainCols);) { - SIdxKey *idxKey = taosArrayGet(remainCols, i); + SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); @@ -1551,9 +1549,15 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa if (code) goto _err; } + int nTmpCols = nCols; + bool hasTs = false; + if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { + --nTmpCols; + hasTs = true; + } for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) { - state->pLoadInfo[i].colIds = aCols; - state->pLoadInfo[i].numOfCols = nCols; + state->pLoadInfo[i].colIds = hasTs ? aCols + 1 : aCols; + state->pLoadInfo[i].numOfCols = nTmpCols; state->pLoadInfo[i].isLast = isLast; } tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index f6e37d9427..d15eb0a911 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -306,12 +306,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); // tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0) { - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); continue; } SLastCol* pColVal = taosArrayGet(pRow, 0); if (COL_VAL_IS_NONE(&pColVal->colVal)) { - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); continue; } @@ -368,7 +370,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArraySet(pTableUidList, 0, &pKeyInfo->uid); } - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); } if (hasRes) { @@ -380,17 +383,20 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0) { - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); continue; } SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); if (COL_VAL_IS_NONE(&pColVal->colVal)) { - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); continue; } saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); taosArrayPush(pTableUidList, &uid); @@ -418,7 +424,8 @@ _end: } taosMemoryFree(pRes); - taosArrayDestroyEx(pRow, freeItem); + // taosArrayDestroyEx(pRow, freeItem); + taosArrayDestroy(pRow); taosArrayDestroyEx(pLastCols, freeItem); return code; From e95613ec10910c74a52e1d66de820f3ccd87b5c5 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 17 May 2023 14:32:02 +0800 Subject: [PATCH 03/10] cache/commit: update & commit cache if cache not off --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 9 ++++++--- source/dnode/vnode/src/vnd/vnodeCommit.c | 6 ++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 97b648201c..803a4f6c2d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -302,12 +302,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) { return rowsNum; } -void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj* pTableMap, int64_t *rowsNum) { +void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) { taosRLockLatch(&pMemTable->latch); for (int32_t i = 0; i < pMemTable->nBucket; ++i) { STbData *pTbData = pMemTable->aBucket[i]; while (pTbData) { - void* p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); + void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); if (p == NULL) { pTbData = pTbData->next; continue; @@ -673,7 +673,10 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.ts >= pTbData->maxKey) { pTbData->maxKey = key.ts; } - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + } // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 74168591d2..77453fd894 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -439,8 +439,10 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { code = tsdbCommit(pVnode->pTsdb, pInfo); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbCacheCommit(pVnode->pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); + if (!TSDB_CACHE_NO(pVnode->config)) { + code = tsdbCacheCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } if (VND_IS_RSMA(pVnode)) { code = smaCommit(pVnode->pSma, pInfo); From b892b1fdfe521f46415deddad405afaea5e28e35 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 17 May 2023 15:26:10 +0800 Subject: [PATCH 04/10] cache/flag: fix cache flag compiling issue --- source/dnode/vnode/src/inc/tsdb.h | 4 --- source/dnode/vnode/src/inc/vnodeInt.h | 6 +++- source/dnode/vnode/src/tsdb/tsdbCache.c | 48 ++++++++++++------------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 486becdf96..9baf38bca8 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -305,10 +305,6 @@ void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proa // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); -#define TSDB_CACHE_NO(c) ((c).cacheLast == 0) -#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) -#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) - // tsdbDiskData ============================================================================================== int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d7f0ef041a..c92cdd32b0 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -197,7 +197,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); @@ -405,6 +405,10 @@ struct SVnode { #define VND_IS_RSMA(v) ((v)->config.isRsma == 1) #define VND_IS_TSMA(v) ((v)->config.isTsma == 1) +#define TSDB_CACHE_NO(c) ((c).cacheLast == 0) +#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) +#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) + struct STbUidStore { tb_uid_t suid; SArray* tbUids; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3890e3d1f7..8f0d541d49 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -208,10 +208,26 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosMemoryFree(pTsdb->rCache.pTSchema); } +static void rocksMayWrite(STsdb *pTsdb, bool force) { + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + + if (force || rocksdb_writebatch_count(wb) >= 1024) { + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + rocksdb_writebatch_clear(wb); + } +} + int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; char *err = NULL; + rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); @@ -281,22 +297,6 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t return pLastCol; } -static void rocksMayWrite(STsdb *pTsdb) { - rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - - int count = rocksdb_writebatch_count(wb); - if (count >= 1024) { - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - rocksdb_writebatch_clear(wb); - } -} - int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; @@ -398,7 +398,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: @@ -507,7 +507,7 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR } if (wb) { - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -575,7 +575,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl } if (wb) { - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); @@ -672,7 +672,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if (wb) { - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); } taosArrayDestroy(pTmpColArray); @@ -734,10 +734,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA code = -1; } - SLastCol lastCol = *pLastCol; - reallocVarData(&lastCol.colVal); + // SLastCol lastCol = *pLastCol; + // reallocVarData(&lastCol.colVal); - taosArraySet(pLastArray, idxKey->idx, &lastCol); + taosArraySet(pLastArray, idxKey->idx, pLastCol); taosArrayRemove(remainCols, j); taosMemoryFree(values_list[i]); @@ -941,7 +941,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb); + rocksMayWrite(pTsdb, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: From f428a68ec1c585f22d02784dea9c4ef437f98007 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 17 May 2023 16:08:48 +0800 Subject: [PATCH 05/10] cache/update: update lru when deleting or updating --- source/dnode/vnode/src/tsdb/tsdbCache.c | 113 ++++++++++++++++-------- 1 file changed, 75 insertions(+), 38 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 8f0d541d49..2eddc29c0e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -297,6 +297,31 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t return pLastCol; } +static void reallocVarData(SColVal *pColVal) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } +} + +static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { + SLastCol *pLastCol = (SLastCol *)value; + + // TODO: add dirty flag to SLastCol + if (pLastCol->dirty) { + // TODO: queue into dirty list, free it after save to backstore + } else { + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { + taosMemoryFree(pLastCol->colVal.value.pData); + } + + taosMemoryFree(value); + } +} + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; @@ -373,6 +398,24 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; size_t klen = ROCKS_KEY_LEN; rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); + + pLastCol = (SLastCol *)value; + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, + NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + taosMemoryFree(value); } @@ -384,9 +427,26 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow size_t vlen = 0; tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; - size_t klen = ROCKS_KEY_LEN; - rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); + rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen); + + pLastCol = (SLastCol *)value; + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, + tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + taosMemoryFree(value); } } @@ -407,16 +467,6 @@ _exit: return code; } -static void reallocVarData(SColVal *pColVal) { - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } -} - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); @@ -584,21 +634,6 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl return pLastCol; } -static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { - SLastCol *pLastCol = (SLastCol *)value; - - // TODO: add dirty flag to SLastCol - if (pLastCol->dirty) { - // TODO: queue into dirty list, free it after save to backstore - } else { - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { - taosMemoryFree(pLastCol->colVal.value.pData); - } - - taosMemoryFree(value); - } -} - typedef struct { int idx; SLastKey key; @@ -734,9 +769,6 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA code = -1; } - // SLastCol lastCol = *pLastCol; - // reallocVarData(&lastCol.colVal); - taosArraySet(pLastArray, idxKey->idx, pLastCol); taosArrayRemove(remainCols, j); @@ -875,16 +907,19 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; - // 1, fetch schema - STSchema *pTSchema = NULL; - int32_t sver = -1; - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; + // fetch schema + STSchema *pTSchema = pTsdb->rCache.pTSchema; + if (!pTSchema) { + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + pTsdb->rCache.pTSchema = pTSchema; } - // 3, build keys & multi get from rocks + // build keys & multi get from rocks int num_keys = pTSchema->numOfCols; char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); @@ -925,6 +960,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE size_t klen = sizeof(*key); rocksdb_writebatch_delete(wb, (char *)key, klen); + taosLRUCacheErase(pTsdb->lruCache, key, klen); } pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); @@ -933,6 +969,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE size_t klen = sizeof(*key); rocksdb_writebatch_delete(wb, (char *)key, klen); + taosLRUCacheErase(pTsdb->lruCache, key, klen); } rocksdb_free(values_list[i]); From 765cc72618b13fee47baedaa9eb2f5d263785c23 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 17 May 2023 18:56:03 +0800 Subject: [PATCH 06/10] cache/memtable: update cache when flag is on --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2eddc29c0e..fe7e808db3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -314,7 +314,7 @@ static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { if (pLastCol->dirty) { // TODO: queue into dirty list, free it after save to backstore } else { - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) { taosMemoryFree(pLastCol->colVal.value.pData); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 803a4f6c2d..80967a906f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -737,7 +737,9 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.ts >= pTbData->maxKey) { pTbData->maxKey = key.ts; } - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + } // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); From 0f8b7c87f7ba1733dfe17f488f12bc68d435c2f7 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 18 May 2023 13:15:25 +0800 Subject: [PATCH 07/10] cache/delete: fix rocks key length --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index fe7e808db3..5d7df048f8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -927,7 +927,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE int16_t cid = pTSchema->columns[i].colId; size_t klen = ROCKS_KEY_LEN; - char *keys = taosMemoryCalloc(2, klen); + char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; From 3b316051bfb73c922c83ea823a5866da1e7e2695 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 18 May 2023 13:51:07 +0800 Subject: [PATCH 08/10] cache/schema: fix schema double free --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 5d7df048f8..bc79cacc4b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -982,7 +982,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: - taosMemoryFree(pTSchema); + // taosMemoryFree(pTSchema); return code; } From f95867d2558b014fd965aaa2f32323ed4bbe8b71 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 18 May 2023 14:48:08 +0800 Subject: [PATCH 09/10] cache/del: fix lru key length --- source/dnode/vnode/src/tsdb/tsdbCache.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index bc79cacc4b..505bb4b906 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -932,7 +932,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; keys_list[i] = keys; - keys_list[num_keys + i] = keys + klen; + keys_list[num_keys + i] = keys + sizeof(SLastKey); keys_list_sizes[i] = klen; keys_list_sizes[num_keys + i] = klen; } @@ -940,6 +940,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); taosThreadMutexLock(&pTsdb->rCache.rMutex); + rocksMayWrite(pTsdb, true); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { @@ -957,7 +958,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { SLastKey *key = &(SLastKey){.ltype = 1, .uid = uid, .cid = pLastCol->colVal.cid}; - size_t klen = sizeof(*key); + size_t klen = ROCKS_KEY_LEN; rocksdb_writebatch_delete(wb, (char *)key, klen); taosLRUCacheErase(pTsdb->lruCache, key, klen); @@ -966,7 +967,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = pLastCol->colVal.cid}; - size_t klen = sizeof(*key); + size_t klen = ROCKS_KEY_LEN; rocksdb_writebatch_delete(wb, (char *)key, klen); taosLRUCacheErase(pTsdb->lruCache, key, klen); @@ -978,7 +979,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - rocksMayWrite(pTsdb, false); + rocksMayWrite(pTsdb, true); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: From 35f74ce2e8ffc439eccc574361238065459ce857 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 18 May 2023 18:35:06 +0800 Subject: [PATCH 10/10] cache/schema: not cache schema --- source/dnode/vnode/src/tsdb/tsdbCache.c | 36 ++++++++++--------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 505bb4b906..845fd2f304 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -326,20 +326,15 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow int32_t code = 0; // 1, fetch schema - STSchema *pTSchema = pTsdb->rCache.pTSchema; + STSchema *pTSchema = NULL; int32_t sver = TSDBROW_SVERSION(pRow); - if (!pTSchema || sver != pTSchema->version) { - if (pTSchema) { - taosMemoryFree(pTSchema); - } - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } - pTsdb->rCache.pTSchema = pTSchema; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; } + // 2, iterate col values into array SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); @@ -463,7 +458,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow _exit: taosArrayDestroy(aColVal); - // taosMemoryFree(pTSchema); + taosMemoryFree(pTSchema); return code; } @@ -908,15 +903,12 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; // fetch schema - STSchema *pTSchema = pTsdb->rCache.pTSchema; - if (!pTSchema) { - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } - - pTsdb->rCache.pTSchema = pTSchema; + STSchema *pTSchema = NULL; + int sver = -1; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; } // build keys & multi get from rocks @@ -983,7 +975,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosThreadMutexUnlock(&pTsdb->rCache.rMutex); _exit: - // taosMemoryFree(pTSchema); + taosMemoryFree(pTSchema); return code; }