Merge pull request #27404 from taosdata/fix/TD-31504-3.0

fix: (pk) memleak in tsdbRetrieveCacheRows
This commit is contained in:
Hongze Cheng 2024-08-23 13:31:31 +08:00 committed by GitHub
commit 5d1efa9478
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 106 additions and 150 deletions

View File

@ -534,13 +534,14 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
static int32_t reallocVarDataVal(SValue *pValue) {
if (IS_VAR_DATA_TYPE(pValue->type)) {
uint8_t *pVal = pValue->pData;
if (pValue->nData > 0) {
uint8_t *p = taosMemoryMalloc(pValue->nData);
uint32_t nData = pValue->nData;
if (nData > 0) {
uint8_t *p = taosMemoryMalloc(nData);
if (!p) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
pValue->pData = p;
memcpy(pValue->pData, pVal, pValue->nData);
(void)memcpy(pValue->pData, pVal, nData);
} else {
pValue->pData = NULL;
}
@ -551,6 +552,54 @@ static int32_t reallocVarDataVal(SValue *pValue) {
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
// realloc pk data and col data.
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t* pCharge) {
int32_t code = TSDB_CODE_SUCCESS, lino = 0;
size_t charge = sizeof(SLastCol);
int8_t i = 0;
for (; i < pCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
charge += pCol->colVal.value.nData;
}
if (pCharge) {
*pCharge = charge;
}
_exit:
if (TSDB_CODE_SUCCESS != code) {
for (int8_t j = 0; j < i; j++) {
if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
taosMemoryFree(pCol->rowKey.pks[j].pData);
}
}
}
TAOS_RETURN(code);
}
void tsdbCacheFreeSLastColItem(void* pItem) {
SLastCol* pCol = (SLastCol*)pItem;
for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
taosMemoryFree(pCol->rowKey.pks[i].pData);
}
}
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
taosMemoryFree(pCol->colVal.value.pData);
}
}
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value;
@ -573,36 +622,22 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
}
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
int32_t code = 0;
int32_t code = 0, lino = 0;
SLRUCache *pCache = pTsdb->lruCache;
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
SRowKey emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
SLastCol emptyCol = {
.rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
SLastCol *pLastCol = &emptyCol;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (!pTmpLastCol) {
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (!pLastCol) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
TAOS_CHECK_RETURN(reallocVarDataVal(pValue));
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal));
charge += pLastCol->colVal.value.nData;
}
size_t charge = 0;
*pLastCol = emptyCol;
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLastCol, &charge));
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
@ -611,6 +646,11 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
// code = -1;
}
_exit:
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pLastCol);
}
TAOS_RETURN(code);
}
@ -1062,21 +1102,9 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
size_t charge = 0;
*pLRULastCol = *pLastCol;
size_t charge = sizeof(*pLRULastCol);
for (int8_t i = 0; i < pLRULastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLRULastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
TAOS_CHECK_GOTO(reallocVarDataVal(pValue), &lino, _exit);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLRULastCol->colVal.value.type)) {
TAOS_CHECK_GOTO(reallocVarData(&pLRULastCol->colVal), &lino, _exit);
charge += pLRULastCol->colVal.value.nData;
}
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
@ -1507,7 +1535,6 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
.cacheStatus = TSDB_LAST_CACHE_VALID};
if (!pLastCol) {
pLastCol = &noneCol;
TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal));
}
taosArraySet(pLastArray, idxKey->idx, pLastCol);
@ -1524,20 +1551,14 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
if (!pTmpLastCol) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
size_t charge = 0;
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal));
charge += pLastCol->colVal.value.nData;
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pTmpLastCol);
TAOS_CHECK_EXIT(code);
}
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
@ -1585,7 +1606,7 @@ _exit:
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
int32_t code = 0, lino = 0;
int num_keys = TARRAY_SIZE(remainCols);
char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
@ -1631,20 +1652,15 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
size_t charge = 0;
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
TAOS_CHECK_RETURN(reallocVarDataVal(pValue));
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal));
charge += pLastCol->colVal.value.nData;
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pTmpLastCol);
taosMemoryFreeClear(PToFree);
goto _exit;
}
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
@ -1654,10 +1670,8 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
}
SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[i]));
}
TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal));
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArrayRemove(remainCols, j);
taosArrayRemove(ignoreFromRocks, j);
@ -1715,10 +1729,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
SLastCol lastCol = *pLastCol;
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[j]));
}
TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal));
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), NULL, _exit);
if (taosArrayPush(pLastArray, &lastCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
@ -1769,18 +1781,12 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
SLastCol lastCol = *pLastCol;
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
code = reallocVarDataVal(&lastCol.rowKey.pks[j]);
if (code) {
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
TAOS_RETURN(code);
}
}
code = reallocVarData(&lastCol.colVal);
code = tsdbCacheReallocSLastCol(&lastCol, NULL);
if (code) {
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
TAOS_RETURN(code);
}
taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArrayRemove(remainCols, i);
@ -3104,26 +3110,18 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
}
if (slotIds[iCol] == 0) {
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
taosArraySet(pColArray, 0,
&(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID});
SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
taosArraySet(pColArray, 0, &colTmp);
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
}
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
} else {
pCol->colVal.value.pData = NULL;
}
}
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
if (!COL_VAL_IS_VALUE(pColVal)) {
if (!setNoneCol) {
@ -3162,20 +3160,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
if (COL_VAL_IS_VALUE(pColVal)) {
SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
if (pColVal->value.nData > 0) {
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
} else {
lastCol.colVal.value.pData = NULL;
}
}
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
taosArraySet(pColArray, iCol, &lastCol);
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
@ -3208,7 +3193,7 @@ _err:
nextRowIterClose(&iter);
// taosMemoryFreeClear(pTSchema);
*ppLastArray = NULL;
taosArrayDestroy(pColArray);
taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
taosArrayDestroy(aColArray);
if (code) {
@ -3284,28 +3269,18 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
}
if (slotIds[iCol] == 0) {
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
taosArraySet(pColArray, 0,
&(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID});
SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
taosArraySet(pColArray, 0, &colTmp);
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
}
if (pColVal->value.nData > 0) {
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
} else {
pCol->colVal.value.pData = NULL;
}
}
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
if (aColIndex >= 0) {
@ -3335,7 +3310,7 @@ _err:
nextRowIterClose(&iter);
*ppLastArray = NULL;
taosArrayDestroy(pColArray);
taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
taosArrayDestroy(aColArray);
if (code) {

View File

@ -404,26 +404,6 @@ void tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(pReader);
}
static void freeItemOfRow(void* pItem) {
SLastCol* pCol = (SLastCol*)pItem;
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
taosMemoryFree(pCol->colVal.value.pData);
}
}
static void freeItemWithPk(void* pItem) {
SLastCol* pCol = (SLastCol*)pItem;
for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
taosMemoryFree(pCol->rowKey.pks[i].pData);
}
}
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
taosMemoryFree(pCol->colVal.value.pData);
}
}
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
int32_t code = 0;
SCacheRowsReader* pReader = pQHandle;
@ -559,7 +539,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
taosArrayClearEx(pRow, freeItemOfRow);
taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
continue;
}
@ -642,7 +622,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
taosArrayClearEx(pRow, freeItemOfRow);
taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
}
if (hasRes) {
@ -652,7 +632,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
taosArrayDestroyEx(pLastCols, freeItemWithPk);
taosArrayDestroyEx(pLastCols, tsdbCacheFreeSLastColItem);
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
tb_uid_t uid = pTableList[i].uid;
@ -666,7 +646,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
taosArrayClearEx(pRow, freeItemOfRow);
taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
continue;
}
@ -675,7 +655,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
goto _end;
}
taosArrayClearEx(pRow, freeItemOfRow);
taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
void* px = taosArrayPush(pTableUidList, &uid);
if (px == NULL) {

View File

@ -398,6 +398,7 @@ typedef struct SCacheRowsReader {
} SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);
void tsdbCacheFreeSLastColItem(void* pItem);
#ifdef __cplusplus
}