diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b2bc9abf33..9baf38bca8 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -305,10 +305,6 @@ void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proa // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); -#define TSDB_CACHE_NO(c) ((c).cacheLast == 0) -#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) -#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) - // tsdbDiskData ============================================================================================== int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder); @@ -346,13 +342,17 @@ struct STsdbFS { }; typedef struct { - rocksdb_t *db; - rocksdb_options_t *options; - rocksdb_flushoptions_t *flushoptions; - rocksdb_writeoptions_t *writeoptions; - rocksdb_readoptions_t *readoptions; - rocksdb_writebatch_t *writebatch; - TdThreadMutex rMutex; + rocksdb_t *db; + rocksdb_comparator_t *my_comparator; + rocksdb_cache_t *blockcache; + rocksdb_block_based_table_options_t *tableoptions; + rocksdb_options_t *options; + rocksdb_flushoptions_t *flushoptions; + rocksdb_writeoptions_t *writeoptions; + rocksdb_readoptions_t *readoptions; + rocksdb_writebatch_t *writebatch; + TdThreadMutex rMutex; + STSchema *pTSchema; } SRocksCache; struct STsdb { @@ -782,7 +782,7 @@ typedef struct SLDataIter { #define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row)) int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter); + bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); @@ -822,13 +822,15 @@ typedef struct SCacheRowsReader { typedef struct { TSKEY ts; + int8_t dirty; SColVal colVal; } SLastCol; int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype); +int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype); +int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 230e8b8f88..f056311388 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -197,7 +197,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); @@ -403,6 +403,10 @@ struct SVnode { #define VND_IS_RSMA(v) ((v)->config.isRsma == 1) #define VND_IS_TSMA(v) ((v)->config.isTsma == 1) +#define TSDB_CACHE_NO(c) ((c).cacheLast == 0) +#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) +#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) + struct STbUidStore { tb_uid_t suid; SArray* tbUids; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c0a8de5743..845fd2f304 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -46,7 +46,13 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } -#define ROCKS_KEY_LEN 64 +#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) + +typedef struct { + tb_uid_t uid; + int16_t cid; + int8_t ltype; +} SLastKey; static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { SVnode *pVnode = pTsdb->pVnode; @@ -62,9 +68,56 @@ static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { } } +static const char *myCmpName(void *state) { + (void)state; + return "myCmp"; +} + +static void myCmpDestroy(void *state) { (void)state; } + +static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) { + (void)state; + (void)alen; + (void)blen; + SLastKey *lhs = (SLastKey *)a; + SLastKey *rhs = (SLastKey *)b; + + if (lhs->uid < rhs->uid) { + return -1; + } else if (lhs->uid > rhs->uid) { + return 1; + } + + if (lhs->cid < rhs->cid) { + return -1; + } else if (lhs->cid > rhs->cid) { + return 1; + } + + if (lhs->ltype < rhs->ltype) { + return -1; + } else if (lhs->ltype > rhs->ltype) { + return 1; + } + + return 0; +} + static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { int32_t code = 0; + rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName); + if (NULL == cmp) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + rocksdb_cache_t *cache = rocksdb_cache_create_lru(5 * 1024 * 1024); + pTsdb->rCache.blockcache = cache; + + rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create(); + pTsdb->rCache.tableoptions = tableoptions; + rocksdb_options_t *options = rocksdb_options_create(); if (NULL == options) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -72,6 +125,9 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { } rocksdb_options_set_create_if_missing(options, 1); + rocksdb_options_set_comparator(options, cmp); + rocksdb_block_based_options_set_block_cache(tableoptions, cache); + rocksdb_options_set_block_based_table_factory(options, tableoptions); // rocksdb_options_set_inplace_update_support(options, 1); // rocksdb_options_set_allow_concurrent_memtable_write(options, 0); @@ -80,12 +136,12 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err2; } - // rocksdb_writeoptions_disable_WAL(writeoptions, 1); + rocksdb_writeoptions_disable_WAL(writeoptions, 1); rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); if (NULL == readoptions) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err2; + goto _err3; } char *err = NULL; @@ -94,19 +150,23 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_t *db = rocksdb_open(options, cachePath, &err); if (NULL == db) { - code = -1; - goto _err3; + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err4; } rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); if (NULL == flushoptions) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err4; + goto _err5; } rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; + pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.readoptions = readoptions; @@ -115,15 +175,22 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); + pTsdb->rCache.pTSchema = NULL; + return code; +_err5: + rocksdb_close(pTsdb->rCache.db); _err4: rocksdb_readoptions_destroy(readoptions); _err3: rocksdb_writeoptions_destroy(writeoptions); _err2: rocksdb_options_destroy(options); + rocksdb_block_based_options_destroy(tableoptions); + rocksdb_cache_destroy(cache); _err: + rocksdb_comparator_destroy(cmp); return code; } @@ -134,13 +201,33 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); + rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions); + rocksdb_cache_destroy(pTsdb->rCache.blockcache); + rocksdb_comparator_destroy(pTsdb->rCache.my_comparator); taosThreadMutexDestroy(&pTsdb->rCache.rMutex); + taosMemoryFree(pTsdb->rCache.pTSchema); +} + +static void rocksMayWrite(STsdb *pTsdb, bool force) { + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + + if (force || rocksdb_writebatch_count(wb) >= 1024) { + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + rocksdb_writebatch_clear(wb); + } } int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; char *err = NULL; + rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); @@ -191,15 +278,15 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { *size = length; } -static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char const *lstring) { +static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) { SLastCol *pLastCol = NULL; - char *err = NULL; - size_t vlen = 0; - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); - char *value = NULL; - value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, key, klen, &vlen, &err); + char *err = NULL; + size_t vlen = 0; + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + size_t klen = ROCKS_KEY_LEN; + char *value = NULL; + value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); rocksdb_free(err); @@ -210,12 +297,38 @@ static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char c return pLastCol; } +static void reallocVarData(SColVal *pColVal) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } +} + +static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) { + SLastCol *pLastCol = (SLastCol *)value; + + // TODO: add dirty flag to SLastCol + if (pLastCol->dirty) { + // TODO: queue into dirty list, free it after save to backstore + } else { + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) { + taosMemoryFree(pLastCol->colVal.value.pData); + } + + taosMemoryFree(value); + } +} + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; // 1, fetch schema STSchema *pTSchema = NULL; int32_t sver = TSDBROW_SVERSION(pRow); + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -229,22 +342,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow tsdbRowIterOpen(&iter, pRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - /* - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - - pColVal->value.pData = NULL; - code = tRealloc(&pColVal->value.pData, pColVal->value.nData); - if (code) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } - */ taosArrayPush(aColVal, pColVal); } @@ -254,23 +351,18 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow int num_keys = TARRAY_SIZE(aColVal); char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN * 2); for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); int16_t cid = pColVal->cid; - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); - if (lr_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - keys_list[i] = keys; - keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; - keys_list_sizes[i] = last_key_len; - keys_list_sizes[num_keys + i] = lr_key_len; + memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = 1, .uid = uid, .cid = cid}, ROCKS_KEY_LEN); + memcpy(key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN, &(SLastKey){.ltype = 0, .uid = uid, .cid = cid}, + ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list[num_keys + i] = key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; + keys_list_sizes[num_keys + i] = ROCKS_KEY_LEN; } char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); @@ -278,12 +370,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosThreadMutexLock(&pTsdb->rCache.rMutex); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); - for (int i = 0; i < num_keys; ++i) { - taosMemoryFree(keys_list[i]); - } for (int i = 0; i < num_keys * 2; ++i) { rocksdb_free(errs[i]); } + taosMemoryFree(key_list); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); taosMemoryFree(errs); @@ -292,19 +382,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); - if (COL_VAL_IS_VALUE(pColVal)) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - - if (NULL == pLastCol || pLastCol->ts <= keyTs) { - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid); - rocksdb_writebatch_put(wb, key, klen, value, vlen); - taosMemoryFree(value); - } - } if (!COL_VAL_IS_NONE(pColVal)) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); @@ -313,11 +390,61 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid); - rocksdb_writebatch_put(wb, key, klen, value, vlen); + SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); + + pLastCol = (SLastCol *)value; + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, + NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + taosMemoryFree(value); } + + if (COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; + + rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen); + + pLastCol = (SLastCol *)value; + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, + tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + taosMemoryFree(value); + } + } } rocksdb_free(values_list[i]); @@ -326,14 +453,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } + rocksMayWrite(pTsdb, false); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - rocksdb_writebatch_clear(wb); _exit: taosArrayDestroy(aColVal); @@ -341,53 +462,41 @@ _exit: return code; } -static void reallocVarData(SColVal *pColVal) { - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } -} - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); - -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) { - static char const *alstring[2] = {"last_row", "last"}; - char const *lstring = alstring[ltype]; +#if 1 +int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { rocksdb_writebatch_t *wb = NULL; int32_t code = 0; SArray *pCidList = pr->pCidList; int num_keys = TARRAY_SIZE(pCidList); - char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); - size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + + char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); + size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN); for (int i = 0; i < num_keys; ++i) { int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - - keys_list[i] = keys; - keys_list_sizes[i] = last_key_len; + memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}, ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; } + char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); - char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); + char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { - taosMemoryFree(keys_list[i]); - rocksdb_free(errs[i]); + if (errs[i]) { + rocksdb_free(errs[i]); + } } + taosMemoryFree(key_list); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); taosMemoryFree(errs); @@ -403,7 +512,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } else { taosThreadMutexLock(&pTsdb->rCache.rMutex); - pLastCol = tsdbCacheLookup(pTsdb, uid, cid, lstring); + pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); if (!pLastCol) { // recalc: load from tsdb int16_t aCols[1] = {cid}; @@ -432,9 +541,10 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR char *value = NULL; size_t vlen = 0; tsdbCacheSerialize(pLastCol, &value, &vlen); - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring); - rocksdb_writebatch_put(wb, key, klen, value, vlen); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); taosMemoryFree(value); } else { @@ -442,21 +552,13 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } if (wb) { - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - rocksdb_writebatch_clear(wb); + rocksMayWrite(pTsdb, false); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); } taosArrayPush(pLastArray, pLastCol); - taosArrayDestroy(pTmpColArray); if (freeCol) { taosMemoryFree(pLastCol); @@ -467,43 +569,370 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR return code; } +#endif + +static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid, + int8_t ltype) { + SLastCol *pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); + if (!pLastCol) { + rocksdb_writebatch_t *wb = NULL; + + taosThreadMutexLock(&pTsdb->rCache.rMutex); + pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); + if (!pLastCol) { + // recalc: load from tsdb + int16_t aCols[1] = {cid}; + int16_t slotIds[1] = {slotid}; + SArray *pTmpColArray = NULL; + + if (ltype) { + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } else { + mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } + + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { + pLastCol = taosArrayGet(pTmpColArray, 0); + } + + // still null, then make up a none col value + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[slotid].type)}; + if (!pLastCol) { + pLastCol = &noneCol; + } + + // store result back to rocks cache + wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); + + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + taosArrayDestroy(pTmpColArray); + } + + if (wb) { + rocksMayWrite(pTsdb, false); + } + + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + } + + return pLastCol; +} + +typedef struct { + int idx; + SLastKey key; +} SIdxKey; + +static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, + SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + rocksdb_writebatch_t *wb = NULL; + SArray *pTmpColArray = NULL; + int num_keys = TARRAY_SIZE(remainCols); + int16_t *aCols = taosMemoryMalloc(num_keys * sizeof(int16_t)); + int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + + for (int i = 0; i < num_keys; ++i) { + SIdxKey *idxKey = taosArrayGet(remainCols, i); + aCols[i] = idxKey->key.cid; + slotIds[i] = pr->pSlotIds[idxKey->idx]; + } + + if (ltype) { + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds); + } else { + mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds); + } + + SLRUCache *pCache = pTsdb->lruCache; + for (int i = 0; i < num_keys; ++i) { + SIdxKey *idxKey = taosArrayGet(remainCols, i); + SLastCol *pLastCol = NULL; + + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) { + pLastCol = taosArrayGet(pTmpColArray, i); + } + + // still null, then make up a none col value + SLastCol noneCol = {.ts = TSKEY_MIN, + .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; + if (!pLastCol) { + pLastCol = &noneCol; + } + + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, + TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + // store result back to rocks cache + wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + + SLastKey *key = &idxKey->key; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); + + taosArraySet(pLastArray, idxKey->idx, pLastCol); + // taosArrayRemove(remainCols, i); + } + + if (wb) { + rocksMayWrite(pTsdb, false); + } + + taosArrayDestroy(pTmpColArray); + + taosMemoryFree(aCols); + taosMemoryFree(slotIds); + + return code; +} + +static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, + SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + int num_keys = TARRAY_SIZE(remainCols); + char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); + size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); + char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN); + for (int i = 0; i < num_keys; ++i) { + int16_t cid = *(int16_t *)taosArrayGet(remainCols, i); + + memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN); + keys_list[i] = key_list + i * ROCKS_KEY_LEN; + keys_list_sizes[i] = ROCKS_KEY_LEN; + } + + char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, + keys_list_sizes, values_list, values_list_sizes, errs); + for (int i = 0; i < num_keys; ++i) { + if (errs[i]) { + rocksdb_free(errs[i]); + } + } + taosMemoryFree(key_list); + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(errs); + + SLRUCache *pCache = pTsdb->lruCache; + for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; + if (pLastCol) { + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, + NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + + taosArraySet(pLastArray, idxKey->idx, pLastCol); + taosArrayRemove(remainCols, j); + + taosMemoryFree(values_list[i]); + } else { + ++j; + } + } + + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + if (TARRAY_SIZE(remainCols) > 0) { + code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype); + } + + return code; +} + +int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + SArray *remainCols = NULL; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int num_keys = TARRAY_SIZE(pCidList); + + for (int i = 0; i < num_keys; ++i) { + int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + + LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + // reallocVarData(&lastCol.colVal); + taosArrayPush(pLastArray, &lastCol); + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + } else { + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + + taosArrayPush(pLastArray, &noneCol); + + if (!remainCols) { + remainCols = taosArrayInit(num_keys, sizeof(SIdxKey)); + } + taosArrayPush(remainCols, &(SIdxKey){i, *key}); + } + } + + if (remainCols && TARRAY_SIZE(remainCols) > 0) { + taosThreadMutexLock(&pTsdb->lruMutex); + for (int i = 0; i < TARRAY_SIZE(remainCols);) { + SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; + LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + + taosArraySet(pLastArray, idxKey->idx, &lastCol); + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + + taosArrayRemove(remainCols, i); + } else { + ++i; + } + } + + code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + } + + if (remainCols) { + taosArrayDestroy(remainCols); + } + + return code; +} + +int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int num_keys = TARRAY_SIZE(pCidList); + + for (int i = 0; i < num_keys; ++i) { + SLastCol *pLastCol = NULL; + int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (!h) { + taosThreadMutexLock(&pTsdb->lruMutex); + h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + if (!h) { + pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype); + + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h, + TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + } + + taosThreadMutexUnlock(&pTsdb->lruMutex); + } + + pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); + + SLastCol lastCol = *pLastCol; + reallocVarData(&lastCol.colVal); + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } + + taosArrayPush(pLastArray, &lastCol); + } + + return code; +} int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; - // 1, fetch schema + // fetch schema STSchema *pTSchema = NULL; - int32_t sver = -1; + int sver = -1; code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); if (code != TSDB_CODE_SUCCESS) { terrno = code; return -1; } - // 3, build keys & multi get from rocks + // build keys & multi get from rocks int num_keys = pTSchema->numOfCols; char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); for (int i = 0; i < num_keys; ++i) { int16_t cid = pTSchema->columns[i].colId; - char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); - int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); - if (last_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } - int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); - if (lr_key_len >= ROCKS_KEY_LEN) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - } + size_t klen = ROCKS_KEY_LEN; + char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); + ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + keys_list[i] = keys; - keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; - keys_list_sizes[i] = last_key_len; - keys_list_sizes[num_keys + i] = lr_key_len; + keys_list[num_keys + i] = keys + sizeof(SLastKey); + keys_list_sizes[i] = klen; + keys_list_sizes[num_keys + i] = klen; } char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); taosThreadMutexLock(&pTsdb->rCache.rMutex); + rocksMayWrite(pTsdb, true); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { @@ -520,16 +949,20 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE for (int i = 0; i < num_keys; ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid); - rocksdb_writebatch_delete(wb, key, klen); + SLastKey *key = &(SLastKey){.ltype = 1, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = ROCKS_KEY_LEN; + + rocksdb_writebatch_delete(wb, (char *)key, klen); + taosLRUCacheErase(pTsdb->lruCache, key, klen); } pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { - char key[ROCKS_KEY_LEN]; - size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pLastCol->colVal.cid); - rocksdb_writebatch_delete(wb, key, klen); + SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = pLastCol->colVal.cid}; + size_t klen = ROCKS_KEY_LEN; + + rocksdb_writebatch_delete(wb, (char *)key, klen); + taosLRUCacheErase(pTsdb->lruCache, key, klen); } rocksdb_free(values_list[i]); @@ -538,14 +971,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFree(values_list); taosMemoryFree(values_list_sizes); - char *err = NULL; - rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } + rocksMayWrite(pTsdb, true); taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - rocksdb_writebatch_clear(wb); _exit: taosMemoryFree(pTSchema); @@ -1111,7 +1538,7 @@ typedef struct { SMergeTree mergeTree; SMergeTree *pMergeTree; SSttBlockLoadInfo *pLoadInfo; - SLDataIter* pDataIter; + SLDataIter *pDataIter; int64_t lastTs; } SFSLastNextRowIter; @@ -1152,14 +1579,21 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa if (code) goto _err; } + int nTmpCols = nCols; + bool hasTs = false; + if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { + --nTmpCols; + hasTs = true; + } for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) { - state->pLoadInfo[i].colIds = aCols; - state->pLoadInfo[i].numOfCols = nCols; + state->pLoadInfo[i].colIds = hasTs ? aCols + 1 : aCols; + state->pLoadInfo[i].numOfCols = nTmpCols; state->pLoadInfo[i].isLast = isLast; } tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, state->pDataIter); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, + state->pDataIter); state->pMergeTree = &state->mergeTree; state->state = SFSLASTNEXTROW_BLOCKROW; } @@ -1394,11 +1828,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie tBlockDataReset(state->pBlockData); TABLEID tid = {.suid = state->suid, .uid = state->uid}; int nTmpCols = nCols; - if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID && nCols == 1) { - nTmpCols = 0; + bool hasTs = false; + if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { + --nTmpCols; skipBlock = false; + hasTs = true; } - code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, aCols, nTmpCols); + code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, hasTs ? aCols + 1 : aCols, nTmpCols); if (code) goto _err; code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData); @@ -1730,8 +2166,8 @@ typedef struct { } CacheNextRowIter; static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, - SSttBlockLoadInfo *pLoadInfo, SLDataIter* pLDataIter, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader, - SDataFReader **pDataFReaderLast, int64_t lastTs) { + SSttBlockLoadInfo *pLoadInfo, SLDataIter *pLDataIter, STsdbReadSnap *pReadSnap, + SDataFReader **pDataFReader, SDataFReader **pDataFReaderLast, int64_t lastTs) { int code = 0; STbData *pMem = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 61c0317e11..6c8b8a4d01 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -180,8 +180,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, } SVnodeCfg* pCfg = &((SVnode*)pVnode)->config; - - int32_t numOfStt = pCfg->sttTrigger; + int32_t numOfStt = pCfg->sttTrigger; p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt); if (p->pLoadInfo == NULL) { tsdbCacherowsReaderClose(p); @@ -215,7 +214,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { taosMemoryFree(p->pSchema); } - taosMemoryFreeClear(p->pDataIter); + taosMemoryFree(p->pDataIter); taosMemoryFree(p->pCurrSchema); destroyLastBlockLoadInfo(p->pLoadInfo); @@ -306,23 +305,27 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; - int32_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; + int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; // retrieve the only one last row of all tables in the uid list. if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { int64_t st = taosGetTimestampUs(); int64_t totalLastTs = INT64_MAX; + for (int32_t i = 0; i < pr->numOfTables; ++i) { STableKeyInfo* pKeyInfo = &pr->pTableList[i]; - tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + // tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0) { - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); continue; } - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + SLastCol* pColVal = taosArrayGet(pRow, 0); if (COL_VAL_IS_NONE(&pColVal->colVal)) { - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); continue; } @@ -373,13 +376,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } - if (taosArrayGetSize(pTableUidList) == 0) { + if (TARRAY_SIZE(pTableUidList) == 0) { taosArrayPush(pTableUidList, &pKeyInfo->uid); } else { taosArraySet(pTableUidList, 0, &pKeyInfo->uid); } - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); } if (hasRes) { @@ -387,25 +391,28 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { - STableKeyInfo* pKeyInfo = &pr->pTableList[i]; + tb_uid_t uid = pr->pTableList[i].uid; - tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype); if (TARRAY_SIZE(pRow) <= 0) { - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); continue; } SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); if (COL_VAL_IS_NONE(&pColVal->colVal)) { - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); continue; } saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); - taosArrayClearEx(pRow, freeItem); + // taosArrayClearEx(pRow, freeItem); + taosArrayClear(pRow); - taosArrayPush(pTableUidList, &pKeyInfo->uid); + taosArrayPush(pTableUidList, &uid); - pr->tableIndex += 1; + ++pr->tableIndex; if (pResBlock->info.rows >= pResBlock->info.capacity) { goto _end; } @@ -429,7 +436,9 @@ _end: } taosMemoryFree(pRes); - taosArrayDestroyEx(pRow, freeItem); + // taosArrayDestroyEx(pRow, freeItem); + taosArrayDestroy(pRow); taosArrayDestroyEx(pLastCols, freeItem); + return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 97b648201c..80967a906f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -302,12 +302,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) { return rowsNum; } -void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj* pTableMap, int64_t *rowsNum) { +void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) { taosRLockLatch(&pMemTable->latch); for (int32_t i = 0; i < pMemTable->nBucket; ++i) { STbData *pTbData = pMemTable->aBucket[i]; while (pTbData) { - void* p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); + void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); if (p == NULL) { pTbData = pTbData->next; continue; @@ -673,7 +673,10 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.ts >= pTbData->maxKey) { pTbData->maxKey = key.ts; } - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + } // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); @@ -734,7 +737,9 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.ts >= pTbData->maxKey) { pTbData->maxKey = key.ts; } - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + } // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 74168591d2..77453fd894 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -439,8 +439,10 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { code = tsdbCommit(pVnode->pTsdb, pInfo); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbCacheCommit(pVnode->pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); + if (!TSDB_CACHE_NO(pVnode->config)) { + code = tsdbCacheCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } if (VND_IS_RSMA(pVnode)) { code = smaCommit(pVnode->pSma, pInfo);