diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a082d33a02..a2bd9889fc 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -895,6 +895,7 @@ typedef enum { } EExecMode; typedef struct { + int64_t version; SRowKey rowKey; int8_t dirty; SColVal colVal; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3a178f7ade..eac0678f3e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -339,6 +339,7 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) { SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); if (pLastCol == NULL) return NULL; + pLastCol->version = LAST_COL_VERSION; pLastCol->rowKey.ts = pLastColV1->ts; pLastCol->rowKey.numOfPKs = 0; pLastCol->dirty = pLastColV1->dirty; @@ -408,7 +409,7 @@ static SLastCol *tsdbCacheDeserialize(char const *value) { if (!hasVersion) { return tsdbCacheDeserializeV1(value); } - return tsdbCacheDeserializeV2(value + sizeof(int64_t)); + return tsdbCacheDeserializeV2(value); } static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { @@ -423,7 +424,7 @@ static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { SColVal *pColVal = &pLastCol->colVal; - size_t length = sizeof(int64_t) + sizeof(*pLastCol); + size_t length = sizeof(*pLastCol); for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { length += pLastCol->rowKey.pks[i].nData; @@ -435,14 +436,11 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { // set version *value = taosMemoryMalloc(length); - char *currentPos = *value; - *(int64_t *)currentPos = LAST_COL_VERSION; - currentPos += sizeof(int64_t); // copy last col - SLastCol* pToLastCol = (SLastCol *)currentPos; + SLastCol* pToLastCol = (SLastCol *)(*value); *pToLastCol = *pLastCol; - currentPos += sizeof(*pLastCol); + char *currentPos = *value + sizeof(*pLastCol); // copy var data pks for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { @@ -535,18 +533,22 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { return code; } -static void reallocVarData(SColVal *pColVal) { - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - uint8_t *pVal = pColVal->value.pData; - if (pColVal->value.nData > 0) { - pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); +static void reallocVarDataVal(SValue *pValue) { + if (IS_VAR_DATA_TYPE(pValue->type)) { + uint8_t *pVal = pValue->pData; + if (pValue->nData > 0) { + pValue->pData = taosMemoryMalloc(pValue->nData); + memcpy(pValue->pData, pVal, pValue->nData); } else { - pColVal->value.pData = NULL; + pValue->pData = NULL; } } } +static void reallocVarData(SColVal *pColVal) { + reallocVarDataVal(&pColVal->value); +} + static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) { SLastCol *pLastCol = (SLastCol *)value; @@ -569,16 +571,26 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i SRowKey noneRowKey = {0}; noneRowKey.ts = TSKEY_MIN; noneRowKey.numOfPKs = 0; - SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; + SLastCol noneCol = { + .version = LAST_COL_VERSION, .rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; SLastCol *pLastCol = &noneCol; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; - reallocVarData(&pLastCol->colVal); size_t charge = sizeof(*pLastCol); + + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + reallocVarDataVal(pValue); + charge += pValue->nData; + } + } + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + reallocVarData(&pLastCol->colVal); charge += pLastCol->colVal.value.nData; } @@ -923,6 +935,7 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal int nData = 0; // update rowkey + pLastCol->version = LAST_COL_VERSION; pLastCol->rowKey.ts = pRowKey->ts; pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs; for (int8_t i = 0; i < pRowKey->numOfPKs; i++) { @@ -1015,7 +1028,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); } @@ -1032,7 +1044,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); } @@ -1096,9 +1107,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; - reallocVarData(&pLastCol->colVal); size_t charge = sizeof(*pLastCol); + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + reallocVarDataVal(pValue); + charge += pValue->nData; + } + } + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + reallocVarData(&pLastCol->colVal); charge += pLastCol->colVal.value.nData; } @@ -1128,9 +1147,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; - reallocVarData(&pLastCol->colVal); size_t charge = sizeof(*pLastCol); + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + reallocVarDataVal(pValue); + charge += pValue->nData; + } + } + if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + reallocVarData(&pLastCol->colVal); charge += pLastCol->colVal.value.nData; } @@ -1428,7 +1455,8 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } // still null, then make up a none col value - SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, + SLastCol noneCol = {.version = LAST_COL_VERSION, + .rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; if (!pLastCol) { pLastCol = &noneCol; @@ -1446,9 +1474,16 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; - reallocVarData(&pLastCol->colVal); size_t charge = sizeof(*pLastCol); + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + reallocVarDataVal(pValue); + charge += pValue->nData; + } + } if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + reallocVarData(&pLastCol->colVal); charge += pLastCol->colVal.value.nData; } @@ -1530,9 +1565,16 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; - reallocVarData(&pLastCol->colVal); size_t charge = sizeof(*pLastCol); + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + reallocVarDataVal(pValue); + charge += pValue->nData; + } + } if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { + reallocVarData(&pLastCol->colVal); charge += pLastCol->colVal.value.nData; } @@ -1543,6 +1585,9 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA } SLastCol lastCol = *pLastCol; + for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) { + reallocVarDataVal(&lastCol.rowKey.pks[i]); + } reallocVarData(&lastCol.colVal); taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArrayRemove(remainCols, j); @@ -1591,12 +1636,17 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol lastCol = *pLastCol; + for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) { + reallocVarDataVal(&lastCol.rowKey.pks[i]); + } reallocVarData(&lastCol.colVal); taosArrayPush(pLastArray, &lastCol); taosLRUCacheRelease(pCache, h, false); } else { - SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + SLastCol noneCol = {.version = LAST_COL_VERSION, + .rowKey.ts = TSKEY_MIN, + .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; taosArrayPush(pLastArray, &noneCol); @@ -1616,6 +1666,9 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol lastCol = *pLastCol; + for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) { + reallocVarDataVal(&lastCol.rowKey.pks[i]); + } reallocVarData(&lastCol.colVal); taosArraySet(pLastArray, idxKey->idx, &lastCol); @@ -1702,7 +1755,6 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); } - taosMemoryFreeClear(pLastCol); pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); @@ -3232,7 +3284,9 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, for (int32_t i = 0; i < nCols; ++i) { int16_t slotId = slotIds[i]; - SLastCol col = {.rowKey.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; + SLastCol col = {.version = LAST_COL_VERSION, + .rowKey.ts = 0, + .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; taosArrayPush(pColArray, &col); } *ppColArray = pColArray; @@ -3337,12 +3391,12 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); - taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); + taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; + *pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); @@ -3392,7 +3446,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { - SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal}; + SLastCol lastCol = {.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); taosMemoryFree(pLastCol->colVal.value.pData); @@ -3516,12 +3570,12 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); - taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); + taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; + *pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);