fix(tsdb): fix memory error.

This commit is contained in:
Haojun Liao 2024-04-10 11:18:08 +08:00
parent bd16bd12c3
commit 980382d433
3 changed files with 25 additions and 22 deletions

View File

@ -57,7 +57,6 @@ static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, c
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) { const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
// bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
uint64_t ts = TSKEY_MIN; uint64_t ts = TSKEY_MIN;
@ -108,11 +107,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
} }
} }
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it
p->hasResult = true; p->hasResult = true;
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE); varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false); colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
} }
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
if (idx < funcTypeBlockArray->size) { if (idx < funcTypeBlockArray->size) {
@ -233,6 +233,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
if (IS_VAR_DATA_TYPE(pPkCol->type)) { if (IS_VAR_DATA_TYPE(pPkCol->type)) {
p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes); p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes);
} }
p->pkColumn = *pPkCol;
} }
if (numOfTables == 0) { if (numOfTables == 0) {
@ -366,15 +368,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
goto _end; goto _end;
} }
for (int32_t j = 0; j < pr->numOfCols; ++j) { int32_t pkBufLen = 0;
int32_t bytes; if (pr->rowKey.numOfPKs > 0) {
if (slotIds[j] == -1) { pkBufLen = pr->pkColumn.bytes;
bytes = 1;
} else {
bytes = pr->pSchema->columns[slotIds[j]].bytes;
} }
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN; p->ts = INT64_MIN;
} }

View File

@ -382,6 +382,7 @@ typedef struct SCacheRowsReader {
SArray* pFuncTypeList; SArray* pFuncTypeList;
__compar_fn_t pkComparFn; __compar_fn_t pkComparFn;
SRowKey rowKey; SRowKey rowKey;
SColumnInfo pkColumn;
} SCacheRowsReader; } SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);

View File

@ -40,7 +40,7 @@ typedef struct SCacheRowsScanInfo {
SExprSupp pseudoExprSup; SExprSupp pseudoExprSup;
int32_t retrieveType; int32_t retrieveType;
int32_t currentGroupIndex; int32_t currentGroupIndex;
SSDataBlock* pBufferredRes; SSDataBlock* pBufferedRes;
SArray* pUidList; SArray* pUidList;
SArray* pCidList; SArray* pCidList;
int32_t indexOfBufferedRes; int32_t indexOfBufferedRes;
@ -160,9 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
capacity = TMIN(totalTables, 4096); capacity = TMIN(totalTables, 4096);
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false);
setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode); setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
} else { // by tags } else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
capacity = 1; // only one row output capacity = 1; // only one row output
@ -219,18 +219,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) {
blockDataCleanup(pInfo->pBufferredRes); blockDataCleanup(pInfo->pBufferedRes);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
pInfo->pDstSlotIds, pInfo->pUidList); pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
// check for tag values // check for tag values
int32_t resultRows = pInfo->pBufferredRes->info.rows; int32_t resultRows = pInfo->pBufferedRes->info.rows;
// the results may be null, if last values are all null // the results may be null, if last values are all null
ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
@ -239,12 +239,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) {
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
int32_t slotId = pCol->info.slotId; int32_t slotId = pCol->info.slotId;
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) { if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
@ -350,7 +350,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
void destroyCacheScanOperator(void* param) { void destroyCacheScanOperator(void* param) {
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pBufferredRes); blockDataDestroy(pInfo->pBufferedRes);
taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pSlotIds);
taosMemoryFree(pInfo->pDstSlotIds); taosMemoryFree(pInfo->pDstSlotIds);
taosArrayDestroy(pInfo->pCidList); taosArrayDestroy(pInfo->pCidList);