fix(query): refactor and fix some coverity issues.
This commit is contained in:
parent
b06870f70f
commit
319cd8af85
|
@ -1910,6 +1910,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
|
||||
if (minKey == k.ts) {
|
||||
if (init) {
|
||||
if (merge.pTSchema == NULL) {
|
||||
return code;
|
||||
}
|
||||
|
||||
tRowMerge(&merge, pRow);
|
||||
} else {
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
|
@ -1964,7 +1968,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
tRowMerge(&merge, &fRow1);
|
||||
} else {
|
||||
init = true;
|
||||
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -1975,17 +1979,24 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (minKey == key) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
if (!init) {
|
||||
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
if (merge.pTSchema == NULL) {
|
||||
return code;
|
||||
}
|
||||
tRowMerge(&merge, &fRow);
|
||||
}
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
}
|
||||
|
||||
if (merge.pTSchema == NULL) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tRowMergerGetRow(&merge, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -3232,7 +3243,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
|
|||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
|
||||
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -2560,57 +2560,6 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
|
|||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
|
||||
if (result == NULL) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
|
||||
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
|
||||
|
||||
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
|
||||
int32_t length = *(int32_t*)(result);
|
||||
int32_t offset = sizeof(int32_t);
|
||||
|
||||
int32_t count = *(int32_t*)(result + offset);
|
||||
offset += sizeof(int32_t);
|
||||
|
||||
while (count-- > 0 && length > offset) {
|
||||
int32_t keyLen = *(int32_t*)(result + offset);
|
||||
offset += sizeof(int32_t);
|
||||
|
||||
uint64_t tableGroupId = *(uint64_t*)(result + offset);
|
||||
SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
|
||||
if (!resultRow) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
// add a new result set for a new group
|
||||
SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||
tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
|
||||
|
||||
offset += keyLen;
|
||||
int32_t valueLen = *(int32_t*)(result + offset);
|
||||
if (valueLen != pSup->resultRowSize) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
offset += sizeof(int32_t);
|
||||
int32_t pageId = resultRow->pageId;
|
||||
int32_t pOffset = resultRow->offset;
|
||||
memcpy(resultRow, result + offset, valueLen);
|
||||
resultRow->pageId = pageId;
|
||||
resultRow->offset = pOffset;
|
||||
offset += valueLen;
|
||||
|
||||
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||
// releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
|
||||
}
|
||||
|
||||
if (offset != length) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
|
||||
if (pLimitInfo->remainGroupOffset > 0) {
|
||||
if (pLimitInfo->currentGroupId == 0) { // it is the first group
|
||||
|
@ -3098,9 +3047,11 @@ _error:
|
|||
destroyAggOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
cleanupExprSupp(&pOperator->exprSupp);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
if (pOperator != NULL) {
|
||||
cleanupExprSupp(&pOperator->exprSupp);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue