cache/read: fix pRow, pLastCols memory issues

This commit is contained in:
Minglei Jin 2023-04-26 08:17:08 +08:00
parent c8cbabe419
commit 5758aa4d80
1 changed files with 12 additions and 14 deletions

View File

@ -39,9 +39,9 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
if (!p->isNull) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
varDataSetLen(p->buf, pColVal->colVal.value.nData);
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
taosMemoryFree(pColVal->colVal.value.pData);
} else {
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
p->bytes = pReader->pSchema->columns[slotId].bytes;
@ -72,10 +72,9 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
colDataSetNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
// tsdbError("buf:%p len:%d index:%d", pReader->transferBuf[slotId], pVal->value.nData, slotId);
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
taosMemoryFree(pVal->value.pData);
}
} else {
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
@ -83,7 +82,6 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
}
pBlock->info.rows += allNullRow ? 0 : 1;
// pBlock->info.rows += 1;
} else {
tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
return TSDB_CODE_INVALID_PARA;
@ -162,7 +160,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
// tsdbError("buf:%p len:%d index:%d", p->transferBuf[i], p->pSchema->columns[i].bytes, i);
if (p->transferBuf[i] == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
@ -269,8 +266,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
goto _end;
}
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
struct STColumn* pCol = &pr->pSchema->columns[i];
for (int32_t i = 0; i < pr->numOfCols; ++i) {
int32_t slotId = slotIds[i];
struct STColumn* pCol = &pr->pSchema->columns[slotId];
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
if (IS_VAR_DATA_TYPE(pCol->type)) {
@ -298,12 +296,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, lstring);
if (TARRAY_SIZE(pRow) <= 0) {
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
continue;
}
@ -360,7 +358,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
}
if (hasRes) {
@ -372,17 +370,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, lstring);
if (TARRAY_SIZE(pRow) <= 0) {
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
@ -410,7 +408,7 @@ _end:
}
taosMemoryFree(pRes);
taosArrayDestroy(pRow);
taosArrayDestroyEx(pRow, freeItem);
taosArrayDestroyEx(pLastCols, freeItem);
return code;
}