Merge pull request #25690 from taosdata/fix/TD-29962

enh: refactor tsdbCache ser/des
This commit is contained in:
Hongze Cheng 2024-05-09 09:04:09 +08:00 committed by GitHub
commit e76d15e33f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 73 additions and 72 deletions

View File

@ -895,7 +895,6 @@ typedef enum {
} EExecMode; } EExecMode;
typedef struct { typedef struct {
int64_t version;
SRowKey rowKey; SRowKey rowKey;
int8_t dirty; int8_t dirty;
SColVal colVal; SColVal colVal;
@ -909,20 +908,20 @@ typedef struct {
uint32_t nData; uint32_t nData;
}; };
}; };
} SValueV1; } SValueV0;
typedef struct { typedef struct {
int16_t cid; int16_t cid;
int8_t type; int8_t type;
int8_t flag; int8_t flag;
SValueV1 value; SValueV0 value;
} SColValV1; } SColValV0;
typedef struct { typedef struct {
TSKEY ts; TSKEY ts;
int8_t dirty; int8_t dirty;
SColValV1 colVal; SColValV0 colVal;
} SLastColV1; } SLastColV0;
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb);

View File

@ -130,18 +130,19 @@ static void tsdbClosePgCache(STsdb *pTsdb) {
enum { enum {
LFLAG_LAST_ROW = 0, LFLAG_LAST_ROW = 0,
LFLAG_LAST = 1, LFLAG_LAST = 1,
LFLAG_VERSION = 1 << 2,
LFLAG_VERSION_BITS = (1 << 2 | 1 << 3),
LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK, LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK,
}; };
#define LAST_KEY_HAS_VERSION ((k).lflag & LFLAG_VERSION_BITS)
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int16_t cid; int16_t cid;
int8_t lflag; int8_t lflag;
} SLastKey; } SLastKey;
#define LAST_COL_VERSION_BASE (((int64_t)(0x1)) << 63)
#define LAST_COL_VERSION (LAST_COL_VERSION_BASE + 2)
#define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY) #define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY)
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW) #define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
#define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST) #define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
@ -180,9 +181,9 @@ static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t
return 1; return 1;
} }
if (lhs->lflag < rhs->lflag) { if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
return -1; return -1;
} else if (lhs->lflag > rhs->lflag) { } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
return 1; return 1;
} }
@ -336,40 +337,39 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
} }
// note: new object do not own colVal's resource, just copy the pointer // note: new object do not own colVal's resource, just copy the pointer
static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) { static SLastCol *tsdbCacheConvertLastColV0(SLastColV0 *pLastColV0) {
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (pLastCol == NULL) return NULL; if (pLastCol == NULL) return NULL;
pLastCol->version = LAST_COL_VERSION; pLastCol->rowKey.ts = pLastColV0->ts;
pLastCol->rowKey.ts = pLastColV1->ts;
pLastCol->rowKey.numOfPKs = 0; pLastCol->rowKey.numOfPKs = 0;
pLastCol->dirty = pLastColV1->dirty; pLastCol->dirty = pLastColV0->dirty;
pLastCol->colVal.cid = pLastColV1->colVal.cid; pLastCol->colVal.cid = pLastColV0->colVal.cid;
pLastCol->colVal.flag = pLastColV1->colVal.flag; pLastCol->colVal.flag = pLastColV0->colVal.flag;
pLastCol->colVal.value.type = pLastColV1->colVal.type; pLastCol->colVal.value.type = pLastColV0->colVal.type;
pLastCol->colVal.value.val = pLastColV1->colVal.value.val; pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
return pLastCol; return pLastCol;
} }
static SLastCol *tsdbCacheDeserializeV1(char const *value) { static SLastCol *tsdbCacheDeserializeV0(char const *value) {
if (!value) { if (!value) {
return NULL; return NULL;
} }
SLastColV1 *pLastColV1 = (SLastColV1 *)value; SLastColV0 *pLastColV0 = (SLastColV0 *)value;
SColValV1 *pColVal = &pLastColV1->colVal; SColValV0 *pColVal = &pLastColV0->colVal;
if (IS_VAR_DATA_TYPE(pColVal->type)) { if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (pColVal->value.nData > 0) { if (pColVal->value.nData > 0) {
pColVal->value.pData = (char *)value + sizeof(*pLastColV1); pColVal->value.pData = (char *)value + sizeof(*pLastColV0);
} else { } else {
pColVal->value.pData = NULL; pColVal->value.pData = NULL;
} }
} }
return tsdbCacheConvertLastColV1(pLastColV1); return tsdbCacheConvertLastColV0(pLastColV0);
} }
static SLastCol *tsdbCacheDeserializeV2(char const *value) { static SLastCol *tsdbCacheDeserializeV1(char const *value) {
if (!value) { if (!value) {
return NULL; return NULL;
} }
@ -403,16 +403,26 @@ static SLastCol *tsdbCacheDeserializeV2(char const *value) {
return pLastCol; return pLastCol;
} }
static SLastCol *tsdbCacheDeserialize(char const *value) { static SLastCol *tsdbCacheDeserialize(char const *value, int8_t lflag) {
if (!value) { if (!value) {
return NULL; return NULL;
} }
bool hasVersion = ((*(int64_t *)value) & LAST_COL_VERSION_BASE) == LAST_COL_VERSION_BASE; int8_t version = lflag & LFLAG_VERSION_BITS;
if (!hasVersion) {
return tsdbCacheDeserializeV1(value); SLastCol *lastCol = NULL;
switch (version) {
case 0:
lastCol = tsdbCacheDeserializeV0(value);
break;
case LFLAG_VERSION:
lastCol = tsdbCacheDeserializeV1(value);
break;
defalut:
tsdbError("invalid last key version %" PRId8 " , lflag:%" PRId8, version, lflag);
break;
} }
return tsdbCacheDeserializeV2(value); return lastCol;
} }
static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
@ -451,7 +461,7 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
SValue *pToValue = &pToLastCol->rowKey.pks[i]; SValue *pToValue = &pToLastCol->rowKey.pks[i];
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
} }
} }
// copy var data value // copy var data value
@ -571,8 +581,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
SRowKey noneRowKey = {0}; SRowKey noneRowKey = {0};
noneRowKey.ts = TSKEY_MIN; noneRowKey.ts = TSKEY_MIN;
noneRowKey.numOfPKs = 0; noneRowKey.numOfPKs = 0;
SLastCol noneCol = { SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
.version = LAST_COL_VERSION, .rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
SLastCol *pLastCol = &noneCol; SLastCol *pLastCol = &noneCol;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
@ -594,7 +603,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
charge += pLastCol->colVal.value.nData; charge += pLastCol->colVal.value.nData;
} }
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid}; SLastKey *pLastKey = &(SLastKey){.lflag = lflag | LFLAG_VERSION, .uid = uid, .cid = cid};
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) { if (status != TAOS_LRU_STATUS_OK) {
@ -647,8 +656,8 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0; int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0;
char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid}; ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
keys_list[0] = keys; keys_list[0] = keys;
keys_list[1] = keys + sizeof(SLastKey); keys_list[1] = keys + sizeof(SLastKey);
@ -672,13 +681,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
{ {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0], ((SLastKey*)keys_list[0])->lflag);
if (NULL != pLastCol) { if (NULL != pLastCol) {
rocksdb_writebatch_delete(wb, keys_list[0], klen); rocksdb_writebatch_delete(wb, keys_list[0], klen);
} }
taosMemoryFreeClear(pLastCol); taosMemoryFreeClear(pLastCol);
pLastCol = tsdbCacheDeserialize(values_list[1]); pLastCol = tsdbCacheDeserialize(values_list[1], ((SLastKey*)keys_list[1])->lflag);
if (NULL != pLastCol) { if (NULL != pLastCol) {
rocksdb_writebatch_delete(wb, keys_list[1], klen); rocksdb_writebatch_delete(wb, keys_list[1], klen);
} }
@ -935,7 +944,6 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal
int nData = 0; int nData = 0;
// update rowkey // update rowkey
pLastCol->version = LAST_COL_VERSION;
pLastCol->rowKey.ts = pRowKey->ts; pLastCol->rowKey.ts = pRowKey->ts;
pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs; pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs;
for (int8_t i = 0; i < pRowKey->numOfPKs; i++) { for (int8_t i = 0; i < pRowKey->numOfPKs; i++) {
@ -1023,7 +1031,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
int16_t cid = pColVal->cid; int16_t cid = pColVal->cid;
SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
size_t klen = ROCKS_KEY_LEN; size_t klen = ROCKS_KEY_LEN;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) { if (h) {
@ -1041,7 +1049,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
} }
if (COL_VAL_IS_VALUE(pColVal)) { if (COL_VAL_IS_VALUE(pColVal)) {
key->lflag = lflag | LFLAG_LAST; key->lflag = lflag | LFLAG_LAST | LFLAG_VERSION;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) { if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
@ -1079,9 +1087,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksdb_free(errs[i]); rocksdb_free(errs[i]);
} }
taosMemoryFree(errs); taosMemoryFree(errs);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list_sizes);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
@ -1089,7 +1094,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag);
SLastCol *PToFree = pLastCol; SLastCol *PToFree = pLastCol;
if (IS_LAST_ROW_KEY(idxKey->key)) { if (IS_LAST_ROW_KEY(idxKey->key)) {
@ -1101,10 +1106,9 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.version = LAST_COL_VERSION, .rowKey = *pRowKey, .colVal = *pColVal}, &value, tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value,
&vlen); &vlen);
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
@ -1143,10 +1147,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.version = LAST_COL_VERSION, .rowKey = *pRowKey, .colVal = *pColVal}, &value, tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen);
&vlen);
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
@ -1189,7 +1191,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksMayWrite(pTsdb, true, false, true); rocksMayWrite(pTsdb, true, false, true);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
taosArrayDestroy(remainCols); taosArrayDestroy(remainCols);
} }
@ -1396,7 +1401,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
SIdxKey *idxKey = taosArrayGet(remainCols, 0); SIdxKey *idxKey = taosArrayGet(remainCols, 0);
if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) { if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; SLastKey *key = &(SLastKey){.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key});
} }
@ -1465,8 +1470,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
} }
// still null, then make up a none col value // still null, then make up a none col value
SLastCol noneCol = {.version = LAST_COL_VERSION, SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)};
if (!pLastCol) { if (!pLastCol) {
pLastCol = &noneCol; pLastCol = &noneCol;
@ -1563,14 +1567,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
rocksdb_free(errs[i]); rocksdb_free(errs[i]);
} }
} }
taosMemoryFree(key_list);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(errs);
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag);
SLastCol *PToFree = pLastCol; SLastCol *PToFree = pLastCol;
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
if (pLastCol) { if (pLastCol) {
@ -1612,6 +1612,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
} }
} }
taosMemoryFree(errs);
taosMemoryFree(key_list);
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
@ -1633,7 +1637,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid}; SLastKey key = {.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = cid};
// for select last_row, last case // for select last_row, last case
int32_t funcType = FUNCTION_TYPE_CACHE_LAST; int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) { if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
@ -1657,8 +1661,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
} else { } else {
SLastCol noneCol = {.version = LAST_COL_VERSION, SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
taosArrayPush(pLastArray, &noneCol); taosArrayPush(pLastArray, &noneCol);
@ -1732,8 +1735,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
int16_t cid = pTSchema->columns[i].colId; int16_t cid = pTSchema->columns[i].colId;
char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid}; ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
keys_list[i] = keys; keys_list[i] = keys;
keys_list[num_keys + i] = keys + sizeof(SLastKey); keys_list[num_keys + i] = keys + sizeof(SLastKey);
@ -1763,14 +1766,14 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey *)keys_list[i])->lflag);
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen); rocksdb_writebatch_delete(wb, keys_list[i], klen);
} }
taosMemoryFreeClear(pLastCol); taosMemoryFreeClear(pLastCol);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); pLastCol = tsdbCacheDeserialize(values_list[i + num_keys], ((SLastKey *)keys_list[i + num_keys])->lflag);
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
} }
@ -3296,8 +3299,7 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray,
for (int32_t i = 0; i < nCols; ++i) { for (int32_t i = 0; i < nCols; ++i) {
int16_t slotId = slotIds[i]; int16_t slotId = slotIds[i];
SLastCol col = {.version = LAST_COL_VERSION, SLastCol col = {.rowKey.ts = 0,
.rowKey.ts = 0,
.colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
taosArrayPush(pColArray, &col); taosArrayPush(pColArray, &col);
} }
@ -3403,12 +3405,12 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}); taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
continue; continue;
} }
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) { if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
@ -3458,7 +3460,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
SLastCol lastCol = {.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData); taosMemoryFree(pLastCol->colVal.value.pData);
@ -3582,12 +3584,12 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}); taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
continue; continue;
} }
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) { if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);