From 7ae44158901c1348043bc1b7478192f5342db26b Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 9 May 2024 18:57:45 +0800 Subject: [PATCH 1/4] fix: refactor lastcache ser/des for compatibility --- include/libs/executor/storageapi.h | 1 - source/dnode/vnode/src/inc/tsdb.h | 5 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 274 ++++++++++--------- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 3 - source/libs/executor/src/cachescanoperator.c | 3 - 5 files changed, 151 insertions(+), 135 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index fcb2e4d405..ec92bd56dd 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -35,7 +35,6 @@ extern "C" { #define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2 #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -#define CACHESCAN_RETRIEVE_PK 0x10 #define META_READER_LOCK 0x0 #define META_READER_NOLOCK 0x1 diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c578b95c1a..64dc5bc31a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -894,13 +894,16 @@ typedef enum { READER_EXEC_ROWS = 0x2, } EExecMode; +#define LAST_COL_VERSION (0x1) + typedef struct { SRowKey rowKey; int8_t dirty; SColVal colVal; + int8_t version; } SLastCol; -typedef struct { +typedef struct { union { int64_t val; struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 7556f993aa..161be8904a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -130,20 +130,14 @@ static void tsdbClosePgCache(STsdb *pTsdb) { enum { LFLAG_LAST_ROW = 0, LFLAG_LAST = 1, - LFLAG_VERSION = 1 << 2, - LFLAG_VERSION_BITS = (1 << 2 | 1 << 3), - LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK, }; -#define LAST_KEY_HAS_VERSION ((k).lflag & LFLAG_VERSION_BITS) - typedef struct { tb_uid_t uid; int16_t cid; int8_t lflag; } SLastKey; -#define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY) #define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW) #define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST) @@ -336,93 +330,100 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { } } -// note: new object do not own colVal's resource, just copy the pointer -static SLastCol *tsdbCacheConvertLastColV0(SLastColV0 *pLastColV0) { - SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - if (pLastCol == NULL) return NULL; - pLastCol->rowKey.ts = pLastColV0->ts; - pLastCol->rowKey.numOfPKs = 0; - pLastCol->dirty = pLastColV0->dirty; - pLastCol->colVal.cid = pLastColV0->colVal.cid; - pLastCol->colVal.flag = pLastColV0->colVal.flag; - pLastCol->colVal.value.type = pLastColV0->colVal.type; - pLastCol->colVal.value.val = pLastColV0->colVal.value.val; +static SLastColV0 *tsdbCacheDeserializeV0(char const *value, size_t *inOutOffset) { + SLastColV0 *pLastColV0 = NULL; + size_t localOffset = 0; - return pLastCol; + if (!value) { + goto _OUT; + } + + pLastColV0 = taosMemoryMalloc(sizeof(SLastColV0)); + *pLastColV0 = *(SLastColV0 *)(value); + + localOffset = sizeof(*pLastColV0); + + SColValV0 *pColValV0 = &pLastColV0->colVal; + if (IS_VAR_DATA_TYPE(pColValV0->type)) { + if (pColValV0->value.nData > 0) { + pColValV0->value.pData = (char *)value + localOffset; + localOffset += pColValV0->value.nData; + } else { + pColValV0->value.pData = NULL; + } + } + +_OUT: + *inOutOffset += localOffset; + return pLastColV0; } -static SLastCol *tsdbCacheDeserializeV0(char const *value) { +static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) { if (!value) { return NULL; } - SLastColV0 *pLastColV0 = (SLastColV0 *)value; - SColValV0 *pColVal = &pLastColV0->colVal; - if (IS_VAR_DATA_TYPE(pColVal->type)) { - if (pColVal->value.nData > 0) { - pColVal->value.pData = (char *)value + sizeof(*pLastColV0); - } else { - pColVal->value.pData = NULL; - } - } + size_t offset = 0; - return tsdbCacheConvertLastColV0(pLastColV0); -} - -static SLastCol *tsdbCacheDeserializeV1(char const *value) { - if (!value) { + SLastColV0 *pLastColV0 = tsdbCacheDeserializeV0(value, &offset); + if (NULL == pLastColV0) { return NULL; } SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol)); - *pLastCol = *(SLastCol *)(value); + pLastCol->rowKey.ts = pLastColV0->ts; + pLastCol->dirty = pLastColV0->dirty; + pLastCol->colVal.cid = pLastColV0->colVal.cid; + pLastCol->colVal.flag = pLastColV0->colVal.flag; + pLastCol->colVal.value.type = pLastColV0->colVal.type; + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData; + pLastCol->colVal.value.pData = pLastColV0->colVal.value.pData; + } else { + pLastCol->colVal.value.val = pLastColV0->colVal.value.val; + } + + taosMemoryFreeClear(pLastColV0); + + if (offset == size) { + // version 0 + pLastCol->version = LAST_COL_VERSION; + pLastCol->rowKey.numOfPKs = 0; + memset(pLastCol->rowKey.pks, 0, sizeof(pLastCol->rowKey.pks)); + return pLastCol; + } + + pLastCol->version = *(int8_t *)(value + offset); + offset += sizeof(int8_t); + + pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset); + offset += sizeof(uint8_t); - char *currentPos = (char *)value + sizeof(*pLastCol); for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { SValue *pValue = &pLastCol->rowKey.pks[i]; + *pValue = *(SValue *)(value + offset); + offset += sizeof(SValue); + if (IS_VAR_DATA_TYPE(pValue->type)) { if (pValue->nData > 0) { - pValue->pData = currentPos; - currentPos += pValue->nData; + pValue->pData = (char *)value + offset; + offset += pValue->nData; } else { pValue->pData = NULL; } - } - } - - SColVal *pColVal = &pLastCol->colVal; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - if (pColVal->value.nData > 0) { - pColVal->value.pData = currentPos; - currentPos += pColVal->value.nData; } else { - pColVal->value.pData = NULL; + pValue->val = *(int64_t *)(value + offset); + offset += sizeof(int64_t); } } - return pLastCol; -} - -static SLastCol *tsdbCacheDeserialize(char const *value, int8_t lflag) { - if (!value) { + if (size < offset) { + terrno = TSDB_CODE_INVALID_DATA_FMT; + taosMemoryFreeClear(pLastCol); return NULL; } - int8_t version = lflag & LFLAG_VERSION_BITS; - - SLastCol *lastCol = NULL; - switch (version) { - case 0: - lastCol = tsdbCacheDeserializeV0(value); - break; - case LFLAG_VERSION: - lastCol = tsdbCacheDeserializeV1(value); - break; - defalut: - tsdbError("invalid last key version %" PRId8 " , lflag:%" PRId8, version, lflag); - break; - } - return lastCol; + return pLastCol; } static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { @@ -435,42 +436,82 @@ static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { return from->nData; } +static uint32_t tsdbCacheCopyVarDataToV0(SValue *from, SValueV0 *to) { + ASSERT(from->nData >= 0); + if (from->nData > 0) { + memcpy(to->pData, from->pData, from->nData); + } + to->nData = from->nData; + return from->nData; +} + +/* +typedef struct { + SLastColV0 lastColV0; + char colData[]; + int8_t version; + uint8_t numOfPKs; + SValue pks[0]; + char pk0Data[]; + SValue pks[1]; + char pk1Data[]; + ... +} SLastColDisk; +*/ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { SColVal *pColVal = &pLastCol->colVal; - size_t length = sizeof(*pLastCol); + size_t length = sizeof(SLastColV0); + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + length += pColVal->value.nData; + } + + uint8_t numOfPKs = pLastCol->rowKey.numOfPKs; + + length += sizeof(int8_t) + sizeof(uint8_t) + (sizeof(SValue) * numOfPKs); // version + numOfPKs + pks + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { length += pLastCol->rowKey.pks[i].nData; } } - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - length += pColVal->value.nData; - } *value = taosMemoryMalloc(length); // copy last col - SLastCol *pToLastCol = (SLastCol *)(*value); - *pToLastCol = *pLastCol; - char *currentPos = *value + sizeof(*pLastCol); + SLastColV0 *pToLastColV0 = (SLastColV0 *)(*value); + pToLastColV0->ts = pLastCol->rowKey.ts; + pToLastColV0->dirty = pLastCol->dirty; + pToLastColV0->colVal.cid = pColVal->cid; + pToLastColV0->colVal.flag = pColVal->flag; + pToLastColV0->colVal.type = pColVal->value.type; - // copy var data pks - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pFromValue = &pLastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pFromValue->type)) { - SValue *pToValue = &pToLastCol->rowKey.pks[i]; - pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; - currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); - } - } + char *currentPos = *value + sizeof(*pToLastColV0); // copy var data value if (IS_VAR_DATA_TYPE(pColVal->value.type)) { SValue *pFromValue = &pColVal->value; - SValue *pToValue = &pToLastCol->colVal.value; + SValueV0 *pToValue = &pToLastColV0->colVal.value; pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; - currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); + currentPos += tsdbCacheCopyVarDataToV0(pFromValue, pToValue); + } else { + pToLastColV0->colVal.value.val = pColVal->value.val; } + + *(int8_t *)currentPos = LAST_COL_VERSION; + currentPos += sizeof(int8_t); + + // copy var data pks + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pFromValue = &pLastCol->rowKey.pks[i]; + SValue *pToValue = (SValue *)currentPos; + *pToValue = *pFromValue; + currentPos += sizeof(SValue); + if (IS_VAR_DATA_TYPE(pFromValue->type)) { + pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; + currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); + } + } + *size = length; } @@ -603,7 +644,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i charge += pLastCol->colVal.value.nData; } - SLastKey *pLastKey = &(SLastKey){.lflag = lflag | LFLAG_VERSION, .uid = uid, .cid = cid}; + SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid}; LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); if (status != TAOS_LRU_STATUS_OK) { @@ -653,11 +694,9 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); const size_t klen = ROCKS_KEY_LEN; - int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0; - char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; keys_list[0] = keys; keys_list[1] = keys + sizeof(SLastKey); @@ -681,13 +720,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0], ((SLastKey*)keys_list[0])->lflag); + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0], values_list_sizes[0]); if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[0], klen); } taosMemoryFreeClear(pLastCol); - pLastCol = tsdbCacheDeserialize(values_list[1], ((SLastKey*)keys_list[1])->lflag); + pLastCol = tsdbCacheDeserialize(values_list[1], values_list_sizes[1]); if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[1], klen); } @@ -733,18 +772,12 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap taosThreadMutexLock(&pTsdb->lruMutex); if (suid < 0) { - int8_t lflag = 0; - int nCols = pSchemaRow->nCols; - if (nCols >= 2) { - lflag = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; - } - - for (int i = 0; i < nCols; ++i) { + for (int i = 0; i < pSchemaRow->nCols; ++i) { int16_t cid = pSchemaRow->pSchema[i].colId; int8_t col_type = pSchemaRow->pSchema[i].type; - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW); - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST); } } else { STSchema *pTSchema = NULL; @@ -754,18 +787,12 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap return -1; } - int8_t lflag = 0; - int nCols = pTSchema->numOfCols; - if (nCols >= 2) { - lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; - } - - for (int i = 0; i < nCols; ++i) { + for (int i = 0; i < pTSchema->numOfCols; ++i) { int16_t cid = pTSchema->columns[i].colId; int8_t col_type = pTSchema->columns[i].type; - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW); - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST); } taosMemoryFree(pTSchema); @@ -1024,14 +1051,13 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow STsdbRowKey tsdbRowKey = {0}; tsdbRowGetKey(pRow, &tsdbRowKey); SRowKey *pRowKey = &tsdbRowKey.key; - int8_t lflag = (pRowKey->numOfPKs != 0) ? LFLAG_PRIMARY_KEY : 0; taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); int16_t cid = pColVal->cid; - SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid}; + SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; size_t klen = ROCKS_KEY_LEN; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { @@ -1049,7 +1075,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } if (COL_VAL_IS_VALUE(pColVal)) { - key->lflag = lflag | LFLAG_LAST | LFLAG_VERSION; + key->lflag = LFLAG_LAST; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); @@ -1094,7 +1120,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag); + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); SLastCol *PToFree = pLastCol; if (IS_LAST_ROW_KEY(idxKey->key)) { @@ -1106,8 +1132,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, - &vlen); + tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -1401,7 +1426,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr SIdxKey *idxKey = taosArrayGet(remainCols, 0); if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) { - SLastKey *key = &(SLastKey){.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; + SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); } @@ -1570,7 +1595,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA SLRUCache *pCache = pTsdb->lruCache; for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag); + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); SLastCol *PToFree = pLastCol; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; if (pLastCol) { @@ -1637,7 +1662,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache for (int i = 0; i < num_keys; ++i) { int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; - SLastKey key = {.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = cid}; + SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid}; // for select last_row, last case int32_t funcType = FUNCTION_TYPE_CACHE_LAST; if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) { @@ -1726,17 +1751,12 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); const size_t klen = ROCKS_KEY_LEN; - int8_t lflag = 0; - if (num_keys >= 2) { - lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; - } - for (int i = 0; i < num_keys; ++i) { int16_t cid = pTSchema->columns[i].colId; char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; keys_list[i] = keys; keys_list[num_keys + i] = keys + sizeof(SLastKey); @@ -1766,14 +1786,14 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey *)keys_list[i])->lflag); + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); taosThreadMutexLock(&pTsdb->rCache.rMutex); if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); } taosMemoryFreeClear(pLastCol); - pLastCol = tsdbCacheDeserialize(values_list[i + num_keys], ((SLastKey *)keys_list[i + num_keys])->lflag); + pLastCol = tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys]); if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 52aabd4061..4d6ebf721e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -385,9 +385,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; - if (pr->rowKey.numOfPKs > 0) { - ltype |= CACHESCAN_RETRIEVE_PK; - } STableKeyInfo* pTableList = pr->pTableList; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 0d0870911e..ad7d089da9 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -160,9 +160,6 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe // partition by tbname if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); - if (pInfo->numOfPks > 0) { - pInfo->retrieveType |= CACHESCAN_RETRIEVE_PK; - } STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); From 457b98fee854a6bb0ddb043554c7b437b2c63d1b Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 9 May 2024 21:52:43 +0800 Subject: [PATCH 2/4] fix: adjust SValueV0 data order --- source/dnode/vnode/src/inc/tsdb.h | 24 --- source/dnode/vnode/src/tsdb/tsdbCache.c | 222 ++++++++++-------------- 2 files changed, 96 insertions(+), 150 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 64dc5bc31a..01db196479 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -900,32 +900,8 @@ typedef struct { SRowKey rowKey; int8_t dirty; SColVal colVal; - int8_t version; } SLastCol; -typedef struct { - union { - int64_t val; - struct { - uint8_t *pData; - uint32_t nData; - }; - }; -} SValueV0; - -typedef struct { - int16_t cid; - int8_t type; - int8_t flag; - SValueV0 value; -} SColValV0; - -typedef struct { - TSKEY ts; - int8_t dirty; - SColValV0 colVal; -} SLastColV0; - int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 161be8904a..208487b6f5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -330,32 +330,41 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { } } -static SLastColV0 *tsdbCacheDeserializeV0(char const *value, size_t *inOutOffset) { - SLastColV0 *pLastColV0 = NULL; - size_t localOffset = 0; +typedef struct { + TSKEY ts; + int8_t dirty; + struct { + int16_t cid; + int8_t type; + int8_t flag; + union { + int64_t val; + struct { + uint32_t nData; + uint8_t *pData; + }; + } value; + } colVal; +} SLastColV0; - if (!value) { - goto _OUT; +static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) { + SLastColV0 *pLastColV0 = (SLastColV0 *)value; + + pLastCol->rowKey.ts = pLastColV0->ts; + pLastCol->rowKey.numOfPKs = 0; + pLastCol->dirty = pLastColV0->dirty; + pLastCol->colVal.cid = pLastColV0->colVal.cid; + pLastCol->colVal.flag = pLastColV0->colVal.flag; + pLastCol->colVal.value.type = pLastColV0->colVal.type; + + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData; + pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]); + return sizeof(SLastColV0) + pLastColV0->colVal.value.nData; + } else { + pLastCol->colVal.value.val = pLastColV0->colVal.value.val; + return sizeof(SLastColV0); } - - pLastColV0 = taosMemoryMalloc(sizeof(SLastColV0)); - *pLastColV0 = *(SLastColV0 *)(value); - - localOffset = sizeof(*pLastColV0); - - SColValV0 *pColValV0 = &pLastColV0->colVal; - if (IS_VAR_DATA_TYPE(pColValV0->type)) { - if (pColValV0->value.nData > 0) { - pColValV0->value.pData = (char *)value + localOffset; - localOffset += pColValV0->value.nData; - } else { - pColValV0->value.pData = NULL; - } - } - -_OUT: - *inOutOffset += localOffset; - return pLastColV0; } static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) { @@ -363,61 +372,41 @@ static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) { return NULL; } - size_t offset = 0; - - SLastColV0 *pLastColV0 = tsdbCacheDeserializeV0(value, &offset); - if (NULL == pLastColV0) { + SLastCol* pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (NULL == pLastCol) { return NULL; } - SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol)); - pLastCol->rowKey.ts = pLastColV0->ts; - pLastCol->dirty = pLastColV0->dirty; - pLastCol->colVal.cid = pLastColV0->colVal.cid; - pLastCol->colVal.flag = pLastColV0->colVal.flag; - pLastCol->colVal.value.type = pLastColV0->colVal.type; - if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData; - pLastCol->colVal.value.pData = pLastColV0->colVal.value.pData; - } else { - pLastCol->colVal.value.val = pLastColV0->colVal.value.val; - } - - taosMemoryFreeClear(pLastColV0); - + int32_t offset = tsdbCacheDeserializeV0(value, pLastCol); if (offset == size) { // version 0 - pLastCol->version = LAST_COL_VERSION; - pLastCol->rowKey.numOfPKs = 0; - memset(pLastCol->rowKey.pks, 0, sizeof(pLastCol->rowKey.pks)); return pLastCol; + } else if (offset > size) { + terrno = TSDB_CODE_INVALID_DATA_FMT; + taosMemoryFreeClear(pLastCol); + return NULL; } - pLastCol->version = *(int8_t *)(value + offset); + // version + int8_t version = *(int8_t *)(value + offset); offset += sizeof(int8_t); + // numOfPKs pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset); offset += sizeof(uint8_t); - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLastCol->rowKey.pks[i]; - *pValue = *(SValue *)(value + offset); + // pks + for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + pLastCol->rowKey.pks[i] = *(SValue *)(value + offset); offset += sizeof(SValue); - if (IS_VAR_DATA_TYPE(pValue->type)) { - if (pValue->nData > 0) { - pValue->pData = (char *)value + offset; - offset += pValue->nData; - } else { - pValue->pData = NULL; - } - } else { - pValue->val = *(int64_t *)(value + offset); - offset += sizeof(int64_t); + if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { + pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset; + offset += pLastCol->rowKey.pks[i].nData; } } - if (size < offset) { + if (offset > size) { terrno = TSDB_CODE_INVALID_DATA_FMT; taosMemoryFreeClear(pLastCol); return NULL; @@ -426,25 +415,6 @@ static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) { return pLastCol; } -static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { - ASSERT(from->nData >= 0); - if (from->nData > 0) { - memcpy(to->pData, from->pData, from->nData); - } - to->type = from->type; - to->nData = from->nData; - return from->nData; -} - -static uint32_t tsdbCacheCopyVarDataToV0(SValue *from, SValueV0 *to) { - ASSERT(from->nData >= 0); - if (from->nData > 0) { - memcpy(to->pData, from->pData, from->nData); - } - to->nData = from->nData; - return from->nData; -} - /* typedef struct { SLastColV0 lastColV0; @@ -458,61 +428,61 @@ typedef struct { ... } SLastColDisk; */ -static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { - SColVal *pColVal = &pLastCol->colVal; - size_t length = sizeof(SLastColV0); - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - length += pColVal->value.nData; - } +static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) { + SLastColV0 *pLastColV0 = (SLastColV0 *)value; - uint8_t numOfPKs = pLastCol->rowKey.numOfPKs; - - length += sizeof(int8_t) + sizeof(uint8_t) + (sizeof(SValue) * numOfPKs); // version + numOfPKs + pks - - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { - length += pLastCol->rowKey.pks[i].nData; - } - } - - *value = taosMemoryMalloc(length); - - // copy last col - SLastColV0 *pToLastColV0 = (SLastColV0 *)(*value); - pToLastColV0->ts = pLastCol->rowKey.ts; - pToLastColV0->dirty = pLastCol->dirty; - pToLastColV0->colVal.cid = pColVal->cid; - pToLastColV0->colVal.flag = pColVal->flag; - pToLastColV0->colVal.type = pColVal->value.type; - - char *currentPos = *value + sizeof(*pToLastColV0); - - // copy var data value - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - SValue *pFromValue = &pColVal->value; - SValueV0 *pToValue = &pToLastColV0->colVal.value; - pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; - currentPos += tsdbCacheCopyVarDataToV0(pFromValue, pToValue); + pLastColV0->ts = pLastCol->rowKey.ts; + pLastColV0->dirty = pLastCol->dirty; + pLastColV0->colVal.cid = pLastCol->colVal.cid; + pLastColV0->colVal.flag = pLastCol->colVal.flag; + pLastColV0->colVal.type = pLastCol->colVal.value.type; + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData; + memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData); + return sizeof(SLastColV0) + pLastCol->colVal.value.nData; } else { - pToLastColV0->colVal.value.val = pColVal->value.val; + pLastColV0->colVal.value.val = pLastCol->colVal.value.val; + return sizeof(SLastColV0); } - *(int8_t *)currentPos = LAST_COL_VERSION; - currentPos += sizeof(int8_t); + return 0; +} + +static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { + *size = sizeof(SLastColV0); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + *size += pLastCol->colVal.value.nData; + } + *size += sizeof(uint8_t) + sizeof(uint8_t); // version + numOfPKs - // copy var data pks for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pFromValue = &pLastCol->rowKey.pks[i]; - SValue *pToValue = (SValue *)currentPos; - *pToValue = *pFromValue; - currentPos += sizeof(SValue); - if (IS_VAR_DATA_TYPE(pFromValue->type)) { - pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; - currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); + *size += sizeof(SValue); + if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { + *size += pLastCol->rowKey.pks[i].nData; } } - *size = length; + *value = taosMemoryMalloc(*size); + + int32_t offset = tsdbCacheSerializeV0(*value, pLastCol); + + // version + ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION; + offset++; + + // numOfPKs + ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs; + offset++; + + // pks + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i]; + offset += sizeof(SValue); + if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { + memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData); + offset += pLastCol->rowKey.pks[i].nData; + } + } } static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) { From 544999c01bcc14924f0cdb4c3dde34594927153b Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 9 May 2024 22:54:10 +0800 Subject: [PATCH 3/4] fix: tsdbCacheSerialize copy from null ptr --- source/dnode/vnode/src/tsdb/tsdbCache.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 208487b6f5..4fc393ee17 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -438,7 +438,9 @@ static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) { pLastColV0->colVal.type = pLastCol->colVal.value.type; if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData; - memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData); + if (pLastCol->colVal.value.nData > 0) { + memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData); + } return sizeof(SLastColV0) + pLastCol->colVal.value.nData; } else { pLastColV0->colVal.value.val = pLastCol->colVal.value.val; @@ -479,7 +481,9 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i]; offset += sizeof(SValue); if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { - memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData); + if (pLastCol->rowKey.pks[i].nData > 0) { + memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData); + } offset += pLastCol->rowKey.pks[i].nData; } } From 0ac5d393e68b3d5a66aec6f5861c3347ab4c01ca Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 9 May 2024 23:34:03 +0800 Subject: [PATCH 4/4] fix: tsdbcache last disk data format change --- source/dnode/vnode/src/tsdb/tsdbCache.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 4fc393ee17..6f26e3a63f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1106,7 +1106,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); + SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; + tsdbCacheSerialize(&lastColTmp, &value, &vlen); taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -1114,7 +1115,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - pLastCol = (SLastCol *)value; + pLastCol = &lastColTmp; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; @@ -1146,7 +1147,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); + SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; + tsdbCacheSerialize(&lastColTmp, &value, &vlen); taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -1154,7 +1156,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - pLastCol = (SLastCol *)value; + pLastCol = &lastColTmp; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol;