diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index b967130c43..06fa4a05ad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -533,14 +533,15 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { static int32_t reallocVarDataVal(SValue *pValue) { if (IS_VAR_DATA_TYPE(pValue->type)) { - if (pValue->nData > 0) { - uint8_t *p = taosMemoryMalloc(pValue->nData); + uint8_t *pVal = pValue->pData; + uint32_t nData = pValue->nData; + if (nData > 0) { + uint8_t *p = taosMemoryMalloc(nData); if (!p) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } - uint8_t *pVal = pValue->pData; pValue->pData = p; - memcpy(pValue->pData, pVal, pValue->nData); + (void)memcpy(pValue->pData, pVal, nData); } else { pValue->pData = NULL; } @@ -551,6 +552,54 @@ static int32_t reallocVarDataVal(SValue *pValue) { static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); } +// realloc pk data and col data. +static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t* pCharge) { + int32_t code = TSDB_CODE_SUCCESS, lino = 0; + size_t charge = sizeof(SLastCol); + + int8_t i = 0; + for (; i < pCol->rowKey.numOfPKs; i++) { + SValue *pValue = &pCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + TAOS_CHECK_EXIT(reallocVarDataVal(pValue)); + charge += pValue->nData; + } + } + + if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) { + TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal)); + charge += pCol->colVal.value.nData; + } + + if (pCharge) { + *pCharge = charge; + } + +_exit: + if (TSDB_CODE_SUCCESS != code) { + for (int8_t j = 0; j < i; j++) { + if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) { + taosMemoryFree(pCol->rowKey.pks[j].pData); + } + } + } + + TAOS_RETURN(code); +} + +void tsdbCacheFreeSLastColItem(void* pItem) { + SLastCol* pCol = (SLastCol*)pItem; + for (int i = 0; i < pCol->rowKey.numOfPKs; i++) { + if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) { + taosMemoryFree(pCol->rowKey.pks[i].pData); + } + } + + if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) { + taosMemoryFree(pCol->colVal.value.pData); + } +} + static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) { SLastCol *pLastCol = (SLastCol *)value; @@ -573,36 +622,22 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud } static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) { - int32_t code = 0; + int32_t code = 0, lino = 0; SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; SRowKey emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0}; SLastCol emptyCol = { .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; - SLastCol *pLastCol = &emptyCol; - SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - if (!pTmpLastCol) { + SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (!pLastCol) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } - *pTmpLastCol = *pLastCol; - pLastCol = pTmpLastCol; - size_t charge = sizeof(*pLastCol); - - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pValue->type)) { - TAOS_CHECK_RETURN(reallocVarDataVal(pValue)); - charge += pValue->nData; - } - } - - if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); - charge += pLastCol->colVal.value.nData; - } + size_t charge = 0; + *pLastCol = emptyCol; + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLastCol, &charge)); SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid}; LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, @@ -611,6 +646,11 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i // code = -1; } +_exit: + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(pLastCol); + } + TAOS_RETURN(code); } @@ -1062,21 +1102,9 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } + size_t charge = 0; *pLRULastCol = *pLastCol; - - size_t charge = sizeof(*pLRULastCol); - for (int8_t i = 0; i < pLRULastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLRULastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pValue->type)) { - TAOS_CHECK_GOTO(reallocVarDataVal(pValue), &lino, _exit); - charge += pValue->nData; - } - } - - if (IS_VAR_DATA_TYPE(pLRULastCol->colVal.value.type)) { - TAOS_CHECK_GOTO(reallocVarData(&pLRULastCol->colVal), &lino, _exit); - charge += pLRULastCol->colVal.value.nData; - } + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge)); LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); @@ -1507,7 +1535,6 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr .cacheStatus = TSDB_LAST_CACHE_VALID}; if (!pLastCol) { pLastCol = &noneCol; - TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal)); } taosArraySet(pLastArray, idxKey->idx, pLastCol); @@ -1524,20 +1551,14 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr if (!pTmpLastCol) { TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } + + size_t charge = 0; *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; - - size_t charge = sizeof(*pLastCol); - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pValue->type)) { - TAOS_CHECK_EXIT(reallocVarDataVal(pValue)); - charge += pValue->nData; - } - } - if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal)); - charge += pLastCol->colVal.value.nData; + code = tsdbCacheReallocSLastCol(pLastCol, &charge); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(pTmpLastCol); + TAOS_CHECK_EXIT(code); } LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, @@ -1585,7 +1606,7 @@ _exit: static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) { - int32_t code = 0; + int32_t code = 0, lino = 0; int num_keys = TARRAY_SIZE(remainCols); char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); @@ -1631,20 +1652,15 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + + size_t charge = 0; *pTmpLastCol = *pLastCol; pLastCol = pTmpLastCol; - - size_t charge = sizeof(*pLastCol); - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pValue->type)) { - TAOS_CHECK_RETURN(reallocVarDataVal(pValue)); - charge += pValue->nData; - } - } - if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal)); - charge += pLastCol->colVal.value.nData; + code = tsdbCacheReallocSLastCol(pLastCol, &charge); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(pTmpLastCol); + taosMemoryFreeClear(PToFree); + goto _exit; } LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, @@ -1654,10 +1670,8 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA } SLastCol lastCol = *pLastCol; - for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) { - TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[i])); - } - TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal)); + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL)); + taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArrayRemove(remainCols, j); taosArrayRemove(ignoreFromRocks, j); @@ -1715,10 +1729,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL; if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { SLastCol lastCol = *pLastCol; - for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) { - TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[j])); - } - TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal)); + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), NULL, _exit); + if (taosArrayPush(pLastArray, &lastCol) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -1769,18 +1781,12 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL; if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { SLastCol lastCol = *pLastCol; - for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) { - code = reallocVarDataVal(&lastCol.rowKey.pks[j]); - if (code) { - (void)taosThreadMutexUnlock(&pTsdb->lruMutex); - TAOS_RETURN(code); - } - } - code = reallocVarData(&lastCol.colVal); + code = tsdbCacheReallocSLastCol(&lastCol, NULL); if (code) { (void)taosThreadMutexUnlock(&pTsdb->lruMutex); TAOS_RETURN(code); } + taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArrayRemove(remainCols, i); @@ -3102,24 +3108,18 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC } if (slotIds[iCol] == 0) { STColumn *pTColumn = &pTSchema->columns[0]; - - SRowKey key = rowKey.key; - for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) { - TAOS_CHECK_GOTO(reallocVarDataVal(&key.pks[i]), &lino, _err); - } - *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts})); - taosArraySet(pColArray, 0, - &(SLastCol){.rowKey = key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}); + + SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err); + + taosArraySet(pColArray, 0, &colTmp); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; - for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) { - TAOS_CHECK_GOTO(reallocVarDataVal(&pCol->rowKey.pks[i]), &lino, _err); - } - TAOS_CHECK_GOTO(reallocVarData(&pCol->colVal), &lino, _err); + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err); if (!COL_VAL_IS_VALUE(pColVal)) { if (!setNoneCol) { @@ -3158,16 +3158,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); if (COL_VAL_IS_VALUE(pColVal)) { SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; - for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) { - TAOS_CHECK_GOTO(reallocVarDataVal(&lastCol.rowKey.pks[i]), &lino, _err); - } - - if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { - SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); - taosMemoryFree(pLastCol->colVal.value.pData); - - TAOS_CHECK_GOTO(reallocVarData(&lastCol.colVal), &lino, _err); - } + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err); taosArraySet(pColArray, iCol, &lastCol); int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); @@ -3200,7 +3191,7 @@ _err: nextRowIterClose(&iter); // taosMemoryFreeClear(pTSchema); *ppLastArray = NULL; - taosArrayDestroy(pColArray); + taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem); taosArrayDestroy(aColArray); if (code) { @@ -3276,24 +3267,18 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, } if (slotIds[iCol] == 0) { STColumn *pTColumn = &pTSchema->columns[0]; - - SRowKey key = rowKey.key; - for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) { - TAOS_CHECK_GOTO(reallocVarDataVal(&key.pks[i]), &lino, _err); - } - *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts})); - taosArraySet(pColArray, 0, - &(SLastCol){.rowKey = key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}); + + SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err); + + taosArraySet(pColArray, 0, &colTmp); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID}; - for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) { - TAOS_CHECK_GOTO(reallocVarDataVal(&pCol->rowKey.pks[i]), &lino, _err); - } - TAOS_CHECK_GOTO(reallocVarData(&pCol->colVal), &lino, _err); + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err); int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); if (aColIndex >= 0) { @@ -3323,7 +3308,7 @@ _err: nextRowIterClose(&iter); *ppLastArray = NULL; - taosArrayDestroy(pColArray); + taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem); taosArrayDestroy(aColArray); if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 5ad846bd7b..842b2cfa90 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -404,32 +404,6 @@ void tsdbCacherowsReaderClose(void* pReader) { taosMemoryFree(pReader); } -static void freeItemOfRow(void* pItem) { - SLastCol* pCol = (SLastCol*)pItem; - for (int i = 0; i < pCol->rowKey.numOfPKs; i++) { - if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) { - taosMemoryFree(pCol->rowKey.pks[i].pData); - } - } - - if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) { - taosMemoryFree(pCol->colVal.value.pData); - } -} - -static void freeItemWithPk(void* pItem) { - SLastCol* pCol = (SLastCol*)pItem; - for (int i = 0; i < pCol->rowKey.numOfPKs; i++) { - if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) { - taosMemoryFree(pCol->rowKey.pks[i].pData); - } - } - - if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) { - taosMemoryFree(pCol->colVal.value.pData); - } -} - static int32_t tsdbCacheQueryReseek(void* pQHandle) { int32_t code = 0; SCacheRowsReader* pReader = pQHandle; @@ -565,7 +539,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) { - taosArrayClearEx(pRow, freeItemOfRow); + taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem); continue; } @@ -648,7 +622,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } - taosArrayClearEx(pRow, freeItemOfRow); + taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem); } if (hasRes) { @@ -658,7 +632,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } - taosArrayDestroyEx(pLastCols, freeItemWithPk); + taosArrayDestroyEx(pLastCols, tsdbCacheFreeSLastColItem); } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { tb_uid_t uid = pTableList[i].uid; @@ -672,7 +646,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) { - taosArrayClearEx(pRow, freeItemOfRow); + taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem); continue; } @@ -681,7 +655,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - taosArrayClearEx(pRow, freeItemOfRow); + taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem); void* px = taosArrayPush(pTableUidList, &uid); if (px == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 865e8e2d41..ed895b7d27 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -398,6 +398,7 @@ typedef struct SCacheRowsReader { } SCacheRowsReader; int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); +void tsdbCacheFreeSLastColItem(void* pItem); #ifdef __cplusplus }