fix: (last) handle memory leaks in exception branch
This commit is contained in:
parent
995bb99a3e
commit
f8ffc4b08a
|
@ -533,14 +533,15 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
|
||||||
|
|
||||||
static int32_t reallocVarDataVal(SValue *pValue) {
|
static int32_t reallocVarDataVal(SValue *pValue) {
|
||||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||||
if (pValue->nData > 0) {
|
uint8_t *pVal = pValue->pData;
|
||||||
uint8_t *p = taosMemoryMalloc(pValue->nData);
|
uint32_t nData = pValue->nData;
|
||||||
|
if (nData > 0) {
|
||||||
|
uint8_t *p = taosMemoryMalloc(nData);
|
||||||
if (!p) {
|
if (!p) {
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
uint8_t *pVal = pValue->pData;
|
|
||||||
pValue->pData = p;
|
pValue->pData = p;
|
||||||
memcpy(pValue->pData, pVal, pValue->nData);
|
(void)memcpy(pValue->pData, pVal, nData);
|
||||||
} else {
|
} else {
|
||||||
pValue->pData = NULL;
|
pValue->pData = NULL;
|
||||||
}
|
}
|
||||||
|
@ -551,6 +552,54 @@ static int32_t reallocVarDataVal(SValue *pValue) {
|
||||||
|
|
||||||
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
|
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
|
||||||
|
|
||||||
|
// realloc pk data and col data.
|
||||||
|
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t* pCharge) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS, lino = 0;
|
||||||
|
size_t charge = sizeof(SLastCol);
|
||||||
|
|
||||||
|
int8_t i = 0;
|
||||||
|
for (; i < pCol->rowKey.numOfPKs; i++) {
|
||||||
|
SValue *pValue = &pCol->rowKey.pks[i];
|
||||||
|
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||||
|
TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
|
||||||
|
charge += pValue->nData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
|
||||||
|
TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
|
||||||
|
charge += pCol->colVal.value.nData;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCharge) {
|
||||||
|
*pCharge = charge;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
for (int8_t j = 0; j < i; j++) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
|
||||||
|
taosMemoryFree(pCol->rowKey.pks[j].pData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbCacheFreeSLastColItem(void* pItem) {
|
||||||
|
SLastCol* pCol = (SLastCol*)pItem;
|
||||||
|
for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
|
||||||
|
taosMemoryFree(pCol->rowKey.pks[i].pData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
|
||||||
|
taosMemoryFree(pCol->colVal.value.pData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
|
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
|
||||||
SLastCol *pLastCol = (SLastCol *)value;
|
SLastCol *pLastCol = (SLastCol *)value;
|
||||||
|
|
||||||
|
@ -573,36 +622,22 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
|
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
|
||||||
SLRUCache *pCache = pTsdb->lruCache;
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
SRowKey emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
|
SRowKey emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
|
||||||
SLastCol emptyCol = {
|
SLastCol emptyCol = {
|
||||||
.rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
.rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
SLastCol *pLastCol = &emptyCol;
|
|
||||||
|
|
||||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||||
if (!pTmpLastCol) {
|
if (!pLastCol) {
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
*pTmpLastCol = *pLastCol;
|
|
||||||
pLastCol = pTmpLastCol;
|
|
||||||
|
|
||||||
size_t charge = sizeof(*pLastCol);
|
size_t charge = 0;
|
||||||
|
*pLastCol = emptyCol;
|
||||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLastCol, &charge));
|
||||||
SValue *pValue = &pLastCol->rowKey.pks[i];
|
|
||||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
|
||||||
TAOS_CHECK_RETURN(reallocVarDataVal(pValue));
|
|
||||||
charge += pValue->nData;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
|
||||||
TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal));
|
|
||||||
charge += pLastCol->colVal.value.nData;
|
|
||||||
}
|
|
||||||
|
|
||||||
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
||||||
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||||
|
@ -611,6 +646,11 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
|
||||||
// code = -1;
|
// code = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
taosMemoryFree(pLastCol);
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1062,21 +1102,9 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t charge = 0;
|
||||||
*pLRULastCol = *pLastCol;
|
*pLRULastCol = *pLastCol;
|
||||||
|
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
|
||||||
size_t charge = sizeof(*pLRULastCol);
|
|
||||||
for (int8_t i = 0; i < pLRULastCol->rowKey.numOfPKs; i++) {
|
|
||||||
SValue *pValue = &pLRULastCol->rowKey.pks[i];
|
|
||||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
|
||||||
TAOS_CHECK_GOTO(reallocVarDataVal(pValue), &lino, _exit);
|
|
||||||
charge += pValue->nData;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pLRULastCol->colVal.value.type)) {
|
|
||||||
TAOS_CHECK_GOTO(reallocVarData(&pLRULastCol->colVal), &lino, _exit);
|
|
||||||
charge += pLRULastCol->colVal.value.nData;
|
|
||||||
}
|
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
|
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
|
||||||
NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||||
|
@ -1507,7 +1535,6 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
||||||
.cacheStatus = TSDB_LAST_CACHE_VALID};
|
.cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
if (!pLastCol) {
|
if (!pLastCol) {
|
||||||
pLastCol = &noneCol;
|
pLastCol = &noneCol;
|
||||||
TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArraySet(pLastArray, idxKey->idx, pLastCol);
|
taosArraySet(pLastArray, idxKey->idx, pLastCol);
|
||||||
|
@ -1524,20 +1551,14 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
||||||
if (!pTmpLastCol) {
|
if (!pTmpLastCol) {
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t charge = 0;
|
||||||
*pTmpLastCol = *pLastCol;
|
*pTmpLastCol = *pLastCol;
|
||||||
pLastCol = pTmpLastCol;
|
pLastCol = pTmpLastCol;
|
||||||
|
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
|
||||||
size_t charge = sizeof(*pLastCol);
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
taosMemoryFree(pTmpLastCol);
|
||||||
SValue *pValue = &pLastCol->rowKey.pks[i];
|
TAOS_CHECK_EXIT(code);
|
||||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
|
||||||
TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
|
|
||||||
charge += pValue->nData;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
|
||||||
TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal));
|
|
||||||
charge += pLastCol->colVal.value.nData;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||||
|
@ -1585,7 +1606,7 @@ _exit:
|
||||||
|
|
||||||
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
|
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
|
||||||
SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
|
SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
int num_keys = TARRAY_SIZE(remainCols);
|
int num_keys = TARRAY_SIZE(remainCols);
|
||||||
char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
|
char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
|
||||||
size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
|
size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
|
||||||
|
@ -1631,20 +1652,15 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t charge = 0;
|
||||||
*pTmpLastCol = *pLastCol;
|
*pTmpLastCol = *pLastCol;
|
||||||
pLastCol = pTmpLastCol;
|
pLastCol = pTmpLastCol;
|
||||||
|
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
|
||||||
size_t charge = sizeof(*pLastCol);
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
taosMemoryFree(pTmpLastCol);
|
||||||
SValue *pValue = &pLastCol->rowKey.pks[i];
|
taosMemoryFreeClear(PToFree);
|
||||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
goto _exit;
|
||||||
TAOS_CHECK_RETURN(reallocVarDataVal(pValue));
|
|
||||||
charge += pValue->nData;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
|
||||||
TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal));
|
|
||||||
charge += pLastCol->colVal.value.nData;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
||||||
|
@ -1654,10 +1670,8 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
}
|
}
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
|
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
|
||||||
TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[i]));
|
|
||||||
}
|
|
||||||
TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal));
|
|
||||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||||
taosArrayRemove(remainCols, j);
|
taosArrayRemove(remainCols, j);
|
||||||
taosArrayRemove(ignoreFromRocks, j);
|
taosArrayRemove(ignoreFromRocks, j);
|
||||||
|
@ -1715,10 +1729,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
|
SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
|
||||||
if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
|
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), NULL, _exit);
|
||||||
TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[j]));
|
|
||||||
}
|
|
||||||
TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal));
|
|
||||||
if (taosArrayPush(pLastArray, &lastCol) == NULL) {
|
if (taosArrayPush(pLastArray, &lastCol) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -1769,18 +1781,12 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
|
SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
|
||||||
if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
|
code = tsdbCacheReallocSLastCol(&lastCol, NULL);
|
||||||
code = reallocVarDataVal(&lastCol.rowKey.pks[j]);
|
|
||||||
if (code) {
|
|
||||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
|
||||||
TAOS_RETURN(code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
code = reallocVarData(&lastCol.colVal);
|
|
||||||
if (code) {
|
if (code) {
|
||||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||||
|
|
||||||
taosArrayRemove(remainCols, i);
|
taosArrayRemove(remainCols, i);
|
||||||
|
@ -3102,24 +3108,18 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
||||||
}
|
}
|
||||||
if (slotIds[iCol] == 0) {
|
if (slotIds[iCol] == 0) {
|
||||||
STColumn *pTColumn = &pTSchema->columns[0];
|
STColumn *pTColumn = &pTSchema->columns[0];
|
||||||
|
|
||||||
SRowKey key = rowKey.key;
|
|
||||||
for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) {
|
|
||||||
TAOS_CHECK_GOTO(reallocVarDataVal(&key.pks[i]), &lino, _err);
|
|
||||||
}
|
|
||||||
|
|
||||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
|
||||||
taosArraySet(pColArray, 0,
|
|
||||||
&(SLastCol){.rowKey = key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID});
|
SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
|
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
|
||||||
|
|
||||||
|
taosArraySet(pColArray, 0, &colTmp);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||||
|
|
||||||
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) {
|
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
|
||||||
TAOS_CHECK_GOTO(reallocVarDataVal(&pCol->rowKey.pks[i]), &lino, _err);
|
|
||||||
}
|
|
||||||
TAOS_CHECK_GOTO(reallocVarData(&pCol->colVal), &lino, _err);
|
|
||||||
|
|
||||||
if (!COL_VAL_IS_VALUE(pColVal)) {
|
if (!COL_VAL_IS_VALUE(pColVal)) {
|
||||||
if (!setNoneCol) {
|
if (!setNoneCol) {
|
||||||
|
@ -3158,16 +3158,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
||||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||||
SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) {
|
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
|
||||||
TAOS_CHECK_GOTO(reallocVarDataVal(&lastCol.rowKey.pks[i]), &lino, _err);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
|
|
||||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
|
|
||||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(reallocVarData(&lastCol.colVal), &lino, _err);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArraySet(pColArray, iCol, &lastCol);
|
taosArraySet(pColArray, iCol, &lastCol);
|
||||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
|
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
|
||||||
|
@ -3200,7 +3191,7 @@ _err:
|
||||||
nextRowIterClose(&iter);
|
nextRowIterClose(&iter);
|
||||||
// taosMemoryFreeClear(pTSchema);
|
// taosMemoryFreeClear(pTSchema);
|
||||||
*ppLastArray = NULL;
|
*ppLastArray = NULL;
|
||||||
taosArrayDestroy(pColArray);
|
taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
|
||||||
taosArrayDestroy(aColArray);
|
taosArrayDestroy(aColArray);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -3276,24 +3267,18 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
||||||
}
|
}
|
||||||
if (slotIds[iCol] == 0) {
|
if (slotIds[iCol] == 0) {
|
||||||
STColumn *pTColumn = &pTSchema->columns[0];
|
STColumn *pTColumn = &pTSchema->columns[0];
|
||||||
|
|
||||||
SRowKey key = rowKey.key;
|
|
||||||
for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) {
|
|
||||||
TAOS_CHECK_GOTO(reallocVarDataVal(&key.pks[i]), &lino, _err);
|
|
||||||
}
|
|
||||||
|
|
||||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
|
||||||
taosArraySet(pColArray, 0,
|
|
||||||
&(SLastCol){.rowKey = key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID});
|
SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
|
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
|
||||||
|
|
||||||
|
taosArraySet(pColArray, 0, &colTmp);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||||
|
|
||||||
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
for (int8_t i = 0; i < rowKey.key.numOfPKs; ++i) {
|
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
|
||||||
TAOS_CHECK_GOTO(reallocVarDataVal(&pCol->rowKey.pks[i]), &lino, _err);
|
|
||||||
}
|
|
||||||
TAOS_CHECK_GOTO(reallocVarData(&pCol->colVal), &lino, _err);
|
|
||||||
|
|
||||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
|
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
|
||||||
if (aColIndex >= 0) {
|
if (aColIndex >= 0) {
|
||||||
|
@ -3323,7 +3308,7 @@ _err:
|
||||||
nextRowIterClose(&iter);
|
nextRowIterClose(&iter);
|
||||||
|
|
||||||
*ppLastArray = NULL;
|
*ppLastArray = NULL;
|
||||||
taosArrayDestroy(pColArray);
|
taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
|
||||||
taosArrayDestroy(aColArray);
|
taosArrayDestroy(aColArray);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -404,32 +404,6 @@ void tsdbCacherowsReaderClose(void* pReader) {
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void freeItemOfRow(void* pItem) {
|
|
||||||
SLastCol* pCol = (SLastCol*)pItem;
|
|
||||||
for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
|
|
||||||
taosMemoryFree(pCol->rowKey.pks[i].pData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
|
|
||||||
taosMemoryFree(pCol->colVal.value.pData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void freeItemWithPk(void* pItem) {
|
|
||||||
SLastCol* pCol = (SLastCol*)pItem;
|
|
||||||
for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
|
|
||||||
taosMemoryFree(pCol->rowKey.pks[i].pData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
|
|
||||||
taosMemoryFree(pCol->colVal.value.pData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
|
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCacheRowsReader* pReader = pQHandle;
|
SCacheRowsReader* pReader = pQHandle;
|
||||||
|
@ -565,7 +539,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
||||||
taosArrayClearEx(pRow, freeItemOfRow);
|
taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -648,7 +622,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClearEx(pRow, freeItemOfRow);
|
taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasRes) {
|
if (hasRes) {
|
||||||
|
@ -658,7 +632,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroyEx(pLastCols, freeItemWithPk);
|
taosArrayDestroyEx(pLastCols, tsdbCacheFreeSLastColItem);
|
||||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||||
tb_uid_t uid = pTableList[i].uid;
|
tb_uid_t uid = pTableList[i].uid;
|
||||||
|
@ -672,7 +646,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
||||||
taosArrayClearEx(pRow, freeItemOfRow);
|
taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -681,7 +655,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClearEx(pRow, freeItemOfRow);
|
taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
|
||||||
|
|
||||||
void* px = taosArrayPush(pTableUidList, &uid);
|
void* px = taosArrayPush(pTableUidList, &uid);
|
||||||
if (px == NULL) {
|
if (px == NULL) {
|
||||||
|
|
|
@ -398,6 +398,7 @@ typedef struct SCacheRowsReader {
|
||||||
} SCacheRowsReader;
|
} SCacheRowsReader;
|
||||||
|
|
||||||
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);
|
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);
|
||||||
|
void tsdbCacheFreeSLastColItem(void* pItem);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue