diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f6f86850a4..8fae1b9290 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -343,7 +343,6 @@ typedef struct { rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; rocksdb_writebatch_t *rwritebatch; - TdThreadMutex rMutex; STSchema *pTSchema; } SRocksCache; @@ -915,12 +914,21 @@ typedef enum { READER_EXEC_ROWS = 0x2, } EExecMode; -#define LAST_COL_VERSION (0x1) +#define LAST_COL_VERSION_1 (0x1) // add primary key, version +#define LAST_COL_VERSION_2 (0x2) // add cache status +#define LAST_COL_VERSION LAST_COL_VERSION_2 + +typedef enum { + TSDB_LAST_CACHE_VALID = 0, // last_cache has valid data + TSDB_LAST_CACHE_EMPTY, // neither last_cache nor tsdb has data + TSDB_LAST_CACHE_NO_CACHE, // last_cache has no data, but tsdb may have data +} ELastCacheStatus; typedef struct { - SRowKey rowKey; - int8_t dirty; - SColVal colVal; + SRowKey rowKey; + int8_t dirty; + SColVal colVal; + ELastCacheStatus cacheStatus; } SLastCol; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 1216f0da81..dc3bf00c80 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -224,9 +224,6 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { pTsdb->rCache.readoptions = readoptions; pTsdb->rCache.flushoptions = flushoptions; pTsdb->rCache.db = db; - - (void)taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); - pTsdb->rCache.pTSchema = NULL; TAOS_RETURN(code); @@ -258,22 +255,11 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions); rocksdb_cache_destroy(pTsdb->rCache.blockcache); rocksdb_comparator_destroy(pTsdb->rCache.my_comparator); - (void)taosThreadMutexDestroy(&pTsdb->rCache.rMutex); taosMemoryFree(pTsdb->rCache.pTSchema); } -static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { - rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - if (read) { - if (lock) { - (void)taosThreadMutexLock(&pTsdb->lruMutex); - } - wb = pTsdb->rCache.rwritebatch; - } else { - if (lock) { - (void)taosThreadMutexLock(&pTsdb->rCache.rMutex); - } - } +static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) { + rocksdb_writebatch_t *wb = read ? pTsdb->rCache.rwritebatch : pTsdb->rCache.writebatch; int count = rocksdb_writebatch_count(wb); if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) { @@ -289,14 +275,6 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { rocksdb_writebatch_clear(wb); } - - if (lock) { - if (read) { - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - } else { - (void)taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - } - } } typedef struct { @@ -326,6 +304,8 @@ static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) { pLastCol->colVal.flag = pLastColV0->colVal.flag; pLastCol->colVal.value.type = pLastColV0->colVal.type; + pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID; + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData; pLastCol->colVal.value.pData = NULL; @@ -383,6 +363,10 @@ static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **p } } + if (version >= LAST_COL_VERSION_2) { + pLastCol->cacheStatus = *(uint8_t *)(value + offset); + } + if (offset > size) { taosMemoryFreeClear(pLastCol); @@ -434,7 +418,7 @@ static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { *size += pLastCol->colVal.value.nData; } - *size += sizeof(uint8_t) + sizeof(uint8_t); // version + numOfPKs + *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t); // version + numOfPKs + cacheStatus for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { *size += sizeof(SValue); @@ -470,6 +454,8 @@ static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size } } + ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus; + TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -487,8 +473,6 @@ static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, return; } - (void)taosThreadMutexLock(&rCache->rMutex); - rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen); taosMemoryFree(rocks_value); @@ -507,8 +491,6 @@ static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, state->flush_count = 0; } - - (void)taosThreadMutexUnlock(&rCache->rMutex); } int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { @@ -534,8 +516,8 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); - rocksMayWrite(pTsdb, true, false, false); - rocksMayWrite(pTsdb, true, true, false); + rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -595,11 +577,10 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - SRowKey noneRowKey = {0}; - noneRowKey.ts = TSKEY_MIN; - noneRowKey.numOfPKs = 0; - SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; - SLastCol *pLastCol = &noneCol; + SRowKey emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0}; + SLastCol emptyCol = { + .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_EMPTY}; + SLastCol *pLastCol = &emptyCol; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); if (!pTmpLastCol) { @@ -642,8 +623,8 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); - rocksMayWrite(pTsdb, true, false, false); - rocksMayWrite(pTsdb, true, true, false); + rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); if (NULL != err) { @@ -655,6 +636,29 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { TAOS_RETURN(code); } +static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList, + size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) { + char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *)); + size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t)); + char **errs = taosMemoryCalloc(numKeys, sizeof(char *)); + if (!valuesList || !valuesListSizes || !errs) { + taosMemoryFreeClear(valuesList); + taosMemoryFreeClear(valuesListSizes); + taosMemoryFreeClear(errs); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList, + valuesListSizes, errs); + for (size_t i = 0; i < numKeys; ++i) { + rocksdb_free(errs[i]); + } + taosMemoryFreeClear(errs); + + *pppValuesList = valuesList; + *ppValuesListSizes = valuesListSizes; + TAOS_RETURN(TSDB_CODE_SUCCESS); +} + static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) { int32_t code = 0; @@ -684,41 +688,11 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, keys_list_sizes[0] = klen; keys_list_sizes[1] = klen; - char **values_list = taosMemoryCalloc(2, sizeof(char *)); - if (!values_list) { - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(keys_list[0]); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - size_t *values_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); - if (!values_list_sizes) { - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(keys_list[0]); - taosMemoryFree(values_list); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - char **errs = taosMemoryCalloc(2, sizeof(char *)); - if (!errs) { - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(keys_list[0]); - taosMemoryFree(values_list); - taosMemoryFree(values_list_sizes); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - - // rocksMayWrite(pTsdb, true, false, false); - rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, 2, (const char *const *)keys_list, keys_list_sizes, - values_list, values_list_sizes, errs); - - for (int i = 0; i < 2; ++i) { - if (errs[i]) { - rocksdb_free(errs[i]); - } - } - taosMemoryFree(errs); + char **values_list = NULL; + size_t *values_list_sizes = NULL; + TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list, + &values_list_sizes), + NULL, _exit); rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; { @@ -760,6 +734,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, } } +_exit: taosMemoryFree(keys_list[0]); taosMemoryFree(keys_list); @@ -851,7 +826,7 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra taosMemoryFree(pTSchema); } - rocksMayWrite(pTsdb, true, false, false); + rocksMayWrite(pTsdb, true, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -892,7 +867,7 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { taosMemoryFree(pTSchema); - rocksMayWrite(pTsdb, true, false, false); + rocksMayWrite(pTsdb, true, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -923,7 +898,7 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); - rocksMayWrite(pTsdb, true, false, true); + rocksMayWrite(pTsdb, true, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -962,7 +937,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } - rocksMayWrite(pTsdb, true, false, true); + rocksMayWrite(pTsdb, true, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -1029,6 +1004,96 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal } } +static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) { + // update rowkey + pLastCol->rowKey.ts = TSKEY_MIN; + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pPKValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) { + taosMemoryFreeClear(pPKValue->pData); + pPKValue->nData = 0; + } else { + pPKValue->val = 0; + } + } + pLastCol->rowKey.numOfPKs = 0; + + // update colval + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) { + taosMemoryFreeClear(pLastCol->colVal.value.pData); + pLastCol->colVal.value.nData = 0; + } else { + pLastCol->colVal.value.val = 0; + } + + pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type); + + if (!pLastCol->dirty) { + pLastCol->dirty = 1; + } + + pLastCol->cacheStatus = cacheStatus; +} + +static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) { + int32_t code = 0; + char *rocks_value = NULL; + size_t vlen = 0; + + code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); + if (code) { + tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + TAOS_RETURN(code); + } + + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen); + + taosMemoryFree(rocks_value); + + TAOS_RETURN(code); +} + +static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) { + int32_t code = 0, lino = 0; + + SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (!pLRULastCol) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + + *pLRULastCol = *pLastCol; + + size_t charge = sizeof(*pLRULastCol); + for (int8_t i = 0; i < pLRULastCol->rowKey.numOfPKs; i++) { + SValue *pValue = &pLRULastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + TAOS_CHECK_GOTO(reallocVarDataVal(pValue), &lino, _exit); + charge += pValue->nData; + } + } + + if (IS_VAR_DATA_TYPE(pLRULastCol->colVal.value.type)) { + TAOS_CHECK_GOTO(reallocVarData(&pLRULastCol->colVal), &lino, _exit); + charge += pLRULastCol->colVal.value.nData; + } + + LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter, + NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); + if (TAOS_LRU_STATUS_OK != status) { + tsdbError("tsdb/cache/putlru: vgId:%d, failed to insert status %d.", TD_VID(pTsdb->pVnode), status); + code = TSDB_CODE_INVALID_DATA_FMT; + } + +_exit: + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(pLRULastCol); + tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code)); + } + + TAOS_RETURN(code); +} + static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) { if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) { TAOS_RETURN(TSDB_CODE_SUCCESS); @@ -1057,10 +1122,14 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); - if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { - tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); + if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { + int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); + if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { + tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); + pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID; + } } + (void)taosLRUCacheRelease(pCache, h, false); } else { if (!remainCols) { @@ -1092,23 +1161,14 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray keys_list[i] = (char *)&idxKey->key; keys_list_sizes[i] = ROCKS_KEY_LEN; } - values_list = taosMemoryCalloc(num_keys, sizeof(char *)); - values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); - errs = taosMemoryCalloc(num_keys, sizeof(char *)); - if (!values_list || !values_list_sizes || !errs) { + + code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list, + &values_list_sizes); + if (code) { taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list); - taosMemoryFree(values_list_sizes); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + goto _exit; } - rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, - keys_list_sizes, values_list, values_list_sizes, errs); - for (int i = 0; i < num_keys; ++i) { - rocksdb_free(errs[i]); - } - taosMemoryFree(errs); rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { @@ -1126,9 +1186,21 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray */ SLastCol *PToFree = pLastCol; + if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) { + if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol)) != TSDB_CODE_SUCCESS) { + tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, + tstrerror(code)); + taosMemoryFreeClear(PToFree); + break; + } + + // cache invalid => skip update + taosMemoryFreeClear(PToFree); + continue; + } + if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) { taosMemoryFreeClear(PToFree); - rocksdb_free(values_list[i]); continue; } @@ -1138,74 +1210,40 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray } if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { - char *value = NULL; - size_t vlen = 0; - SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; - code = tsdbCacheSerialize(&lastColTmp, &value, &vlen); - if (code) { - tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); - } else { - (void)taosThreadMutexLock(&pTsdb->rCache.rMutex); - - rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); - - (void)taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; + if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) { + tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, + tstrerror(code)); + taosMemoryFreeClear(PToFree); + break; } - - pLastCol = &lastColTmp; - SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - if (!pTmpLastCol) { - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list); - taosMemoryFree(values_list_sizes); - - taosArrayDestroy(remainCols); - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) { + tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, + tstrerror(code)); + taosMemoryFreeClear(PToFree); + break; } - *pTmpLastCol = *pLastCol; - pLastCol = pTmpLastCol; - - size_t charge = sizeof(*pLastCol); - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pValue->type)) { - TAOS_CHECK_GOTO(reallocVarDataVal(pValue), &lino, _exit); - charge += pValue->nData; - } - } - - if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - TAOS_CHECK_GOTO(reallocVarData(&pLastCol->colVal), &lino, _exit); - charge += pLastCol->colVal.value.nData; - } - - LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, - tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - - taosMemoryFree(value); } taosMemoryFreeClear(PToFree); - rocksdb_free(values_list[i]); } - rocksMayWrite(pTsdb, true, false, true); + rocksMayWrite(pTsdb, true, false); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list); + if (values_list) { + for (int i = 0; i < num_keys; ++i) { + rocksdb_free(values_list[i]); + } + taosMemoryFree(values_list); + } taosMemoryFree(values_list_sizes); - - taosArrayDestroy(remainCols); } _exit: (void)taosThreadMutexUnlock(&pTsdb->lruMutex); + taosArrayDestroy(remainCols); if (code) { tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code)); @@ -1374,8 +1412,11 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr SIdxKey *idxKey = taosArrayGet(remainCols, 0); if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) { - SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; + // ignore 'ts' loaded from cache and load it from tsdb + SLastCol* pLastCol = taosArrayGet(pLastArray, 0); + tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE); + SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; (void)taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); } @@ -1472,7 +1513,8 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr // still null, then make up a none col value SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, - .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; + .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type), + .cacheStatus = TSDB_LAST_CACHE_VALID}; if (!pLastCol) { pLastCol = &noneCol; TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); @@ -1535,7 +1577,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if (wb) { - rocksMayWrite(pTsdb, false, true, false); + rocksMayWrite(pTsdb, false, true); } taosArrayDestroy(lastrowTmpIndexArray); @@ -1556,7 +1598,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, - SCacheRowsReader *pr, int8_t ltype) { + SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) { int32_t code = 0; int num_keys = TARRAY_SIZE(remainCols); char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); @@ -1567,49 +1609,41 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA taosMemoryFree(keys_list_sizes); TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } + char **values_list = NULL; + size_t *values_list_sizes = NULL; for (int i = 0; i < num_keys; ++i) { - int16_t cid = *(int16_t *)taosArrayGet(remainCols, i); - memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN); keys_list[i] = key_list + i * ROCKS_KEY_LEN; keys_list_sizes[i] = ROCKS_KEY_LEN; } - char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); - size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); - char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); - if (!values_list || !values_list_sizes || !errs) { + code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list, + &values_list_sizes); + if (code) { + taosMemoryFree(key_list); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list); - taosMemoryFree(values_list_sizes); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + TAOS_RETURN(code); } - rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, - keys_list_sizes, values_list, values_list_sizes, errs); - for (int i = 0; i < num_keys; ++i) { - if (errs[i]) { - tsdbError("vgId:%d, %s failed at line %d since %s, index:%d", TD_VID(pTsdb->pVnode), __func__, __LINE__, errs[i], - i); - rocksdb_free(errs[i]); - } - } - taosMemoryFree(errs); SLRUCache *pCache = pTsdb->lruCache; for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { SLastCol *pLastCol = NULL; + bool ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i]; + if (ignore) { + ++j; + continue; + } + (void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); SLastCol *PToFree = pLastCol; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; - if (pLastCol) { + if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); if (!pTmpLastCol) { - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list); - taosMemoryFree(values_list_sizes); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + taosMemoryFreeClear(PToFree); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; @@ -1640,36 +1674,44 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal)); taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArrayRemove(remainCols, j); + taosArrayRemove(ignoreFromRocks, j); - taosMemoryFreeClear(PToFree); - taosMemoryFree(values_list[i]); } else { ++j; } - } - taosMemoryFree(key_list); - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list); - taosMemoryFree(values_list_sizes); + taosMemoryFreeClear(PToFree); + } if (TARRAY_SIZE(remainCols) > 0) { // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid); code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype); } +_exit: + taosMemoryFree(key_list); + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + if (values_list) { + for (int i = 0; i < num_keys; ++i) { + rocksdb_free(values_list[i]); + } + taosMemoryFree(values_list); + } + taosMemoryFree(values_list_sizes); + TAOS_RETURN(code); } int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { int32_t code = 0; SArray *remainCols = NULL; + SArray *ignoreFromRocks = NULL; SLRUCache *pCache = pTsdb->lruCache; SArray *pCidList = pr->pCidList; - int num_keys = TARRAY_SIZE(pCidList); + int numKeys = TARRAY_SIZE(pCidList); - for (int i = 0; i < num_keys; ++i) { + for (int i = 0; i < numKeys; ++i) { int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid}; @@ -1684,33 +1726,52 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache } LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN); - if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - + SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL; + if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { SLastCol lastCol = *pLastCol; for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) { TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[j])); } TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal)); - (void)taosArrayPush(pLastArray, &lastCol); - - (void)taosLRUCacheRelease(pCache, h, false); + if (taosArrayPush(pLastArray, &lastCol) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } } else { + // no cache or cache is invalid SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; - (void)taosArrayPush(pLastArray, &noneCol); + if (taosArrayPush(pLastArray, &noneCol) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } if (!remainCols) { - remainCols = taosArrayInit(num_keys, sizeof(SIdxKey)); - if (!remainCols) { - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } } - if (NULL == taosArrayPush(remainCols, &(SIdxKey){i, key})) { - taosArrayDestroy(remainCols); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - }; + if (!ignoreFromRocks) { + if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + if (taosArrayPush(remainCols, &(SIdxKey){i, key}) ==NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false; + if (taosArrayPush(ignoreFromRocks, &ignoreRocks) ==NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + if (h) { + (void)taosLRUCacheRelease(pCache, h, false); } } @@ -1719,9 +1780,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache for (int i = 0; i < TARRAY_SIZE(remainCols);) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); - if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - + SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL; + if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { SLastCol lastCol = *pLastCol; for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) { code = reallocVarDataVal(&lastCol.rowKey.pks[j]); @@ -1737,23 +1797,31 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache } taosArraySet(pLastArray, idxKey->idx, &lastCol); - (void)taosLRUCacheRelease(pCache, h, false); - taosArrayRemove(remainCols, i); + taosArrayRemove(ignoreFromRocks, i); } else { + // no cache or cache is invalid ++i; } + + if (h) { + (void)taosLRUCacheRelease(pCache, h, false); + } } // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid); - code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype); + code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); + } +_exit: if (remainCols) { taosArrayDestroy(remainCols); } - } + if (ignoreFromRocks) { + taosArrayDestroy(ignoreFromRocks); + } TAOS_RETURN(code); } @@ -1767,131 +1835,113 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema)); // build keys & multi get from rocks - int num_keys = pTSchema->numOfCols; - char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); - size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); - if (!keys_list || !keys_list_sizes) { - taosMemoryFree(keys_list); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - const size_t klen = ROCKS_KEY_LEN; - - for (int i = 0; i < num_keys; ++i) { - int16_t cid = pTSchema->columns[i].colId; - - char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - if (!keys) { - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; - - keys_list[i] = keys; - keys_list[num_keys + i] = keys + sizeof(SLastKey); - keys_list_sizes[i] = klen; - keys_list_sizes[num_keys + i] = klen; - } - char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); - size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); - char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); - if (!values_list_sizes || !values_list) { - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list); - for (int i = 0; i < num_keys; ++i) { - taosMemoryFree(keys_list[i]); - } - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } + int numCols = pTSchema->numOfCols; + int numKeys = 0; + SArray *remainCols = NULL; (void)tsdbCacheCommit(pTsdb); (void)taosThreadMutexLock(&pTsdb->lruMutex); - (void)taosThreadMutexLock(&pTsdb->rCache.rMutex); - // rocksMayWrite(pTsdb, true, false, false); - rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, - keys_list_sizes, values_list, values_list_sizes, errs); - (void)taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - - for (int i = 0; i < num_keys * 2; ++i) { - if (errs[i]) { - rocksdb_free(errs[i]); + for (int i = 0; i < numCols; ++i) { + int16_t cid = pTSchema->columns[i].colId; + for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) { + SLastKey lastKey = {.lflag = lflag, .uid = uid, .cid = cid}; + LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); + if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { + tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE); + } + (void)taosLRUCacheRelease(pTsdb->lruCache, h, false); + } else { + if (!remainCols) { + remainCols = taosArrayInit(numCols * 2, sizeof(SLastKey)); + } + (void)taosArrayPush(remainCols, &lastKey); + } } } - taosMemoryFree(errs); + + if (remainCols) { + numKeys = TARRAY_SIZE(remainCols); + } + + char **keys_list = taosMemoryCalloc(numKeys, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t)); + char **values_list = NULL; + size_t *values_list_sizes = NULL; + + if (!keys_list || !keys_list_sizes) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + const size_t klen = ROCKS_KEY_LEN; + + for (int i = 0; i < numKeys; ++i) { + char *key = taosMemoryCalloc(1, sizeof(SLastKey)); + if (!key) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + ((SLastKey *)key)[0] = *(SLastKey *)taosArrayGet(remainCols, i); + + keys_list[i] = key; + keys_list_sizes[i] = klen; + } + + TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes, + &values_list, &values_list_sizes), + NULL, _exit); rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - for (int i = 0; i < num_keys; ++i) { + for (int i = 0; i < numKeys; ++i) { SLastCol *pLastCol = NULL; (void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); - (void)taosThreadMutexLock(&pTsdb->rCache.rMutex); + SLastKey *pLastKey = (SLastKey *)keys_list[i]; if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { - rocksdb_writebatch_delete(wb, keys_list[i], klen); + SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN, + .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[i].type), + .cacheStatus = TSDB_LAST_CACHE_NO_CACHE}; + + if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pLastCol); + tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + goto _exit; + } + if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pLastCol); + tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + goto _exit; + } } + + if (pLastCol == NULL) { + tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode), + pLastKey->cid, pLastKey->uid, pLastKey->lflag); + } + taosMemoryFreeClear(pLastCol); - - pLastCol = NULL; - (void)tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys], &pLastCol); - if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { - rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); - } - (void)taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - taosMemoryFreeClear(pLastCol); - - rocksdb_free(values_list[i]); - rocksdb_free(values_list[i + num_keys]); - - // taosThreadMutexLock(&pTsdb->lruMutex); - - bool erase = false; - LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen); - if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); - if (pLastCol->dirty) { - pLastCol->dirty = 0; - } - if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { - erase = true; - } - (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase); - } - if (erase) { - taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen); - } - - erase = false; - h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[num_keys + i], klen); - if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); - if (pLastCol->dirty) { - pLastCol->dirty = 0; - } - if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { - erase = true; - } - (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase); - } - if (erase) { - taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); - } - // taosThreadMutexUnlock(&pTsdb->lruMutex); } - for (int i = 0; i < num_keys; ++i) { + + rocksMayWrite(pTsdb, true, false); + +_exit: + (void)taosThreadMutexUnlock(&pTsdb->lruMutex); + + for (int i = 0; i < numKeys; ++i) { taosMemoryFree(keys_list[i]); } taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); - taosMemoryFree(values_list); + if (values_list) { + for (int i = 0; i < numKeys; ++i) { + rocksdb_free(values_list[i]); + } + taosMemoryFree(values_list); + } taosMemoryFree(values_list_sizes); - - rocksMayWrite(pTsdb, true, false, true); - - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - -_exit: + taosArrayDestroy(remainCols); taosMemoryFree(pTSchema); TAOS_RETURN(code); @@ -3060,12 +3110,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts})); - taosArraySet(pColArray, 0, &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal}); + taosArraySet(pColArray, 0, + &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal}; + *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); @@ -3114,7 +3165,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); if (COL_VAL_IS_VALUE(pColVal)) { - SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal}; + SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); taosMemoryFree(pLastCol->colVal.value.pData); @@ -3237,12 +3288,13 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts})); - taosArraySet(pColArray, 0, &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal}); + taosArraySet(pColArray, 0, + &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal}; + *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);