Merge pull request #22764 from taosdata/fix/TD-26132
fix: fix coverity issues
This commit is contained in:
commit
e4f52bae5b
|
@ -380,8 +380,7 @@ void destroySubRequests(SRequestObj *pRequest) {
|
|||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
|
||||
pTmp->requestId);
|
||||
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -398,7 +397,7 @@ void destroySubRequests(SRequestObj *pRequest) {
|
|||
removeRequest(pTmp->self);
|
||||
releaseRequest(pTmp->self);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " is not there", tmpRefId);
|
||||
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -492,8 +491,7 @@ void stopAllQueries(SRequestObj *pRequest) {
|
|||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
|
||||
pTmp->requestId);
|
||||
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -512,7 +510,7 @@ void stopAllQueries(SRequestObj *pRequest) {
|
|||
taosStopQueryImpl(pTmp);
|
||||
releaseRequest(pTmp->self);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " is not there", tmpRefId);
|
||||
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -874,8 +874,13 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = cloneCatalogReq(&pNewWrapper->pCatalogReq, pWrapper->pCatalogReq);
|
||||
}
|
||||
doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
|
||||
nodesDestroyNode(pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
|
||||
nodesDestroyNode(pRoot);
|
||||
} else {
|
||||
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) {
|
||||
|
@ -1148,8 +1153,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
|||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
|
||||
pTmp->requestId);
|
||||
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1162,7 +1166,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
|||
removeRequest(pTmp->self);
|
||||
releaseRequest(pTmp->self);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " is not there", tmpRefId);
|
||||
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -308,10 +308,11 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
|
|||
|
||||
if (retentions) {
|
||||
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
|
||||
taosMemoryFree(retentions);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(retentions);
|
||||
|
||||
(varDataLen(buf2)) = len;
|
||||
|
||||
colDataSetVal(pCol2, 0, buf2, false);
|
||||
|
|
|
@ -446,7 +446,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
taosThreadMutexInit(&inserter->mutex, NULL);
|
||||
if (NULL == inserter->pDataBlocks) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
|
||||
|
|
|
@ -151,14 +151,21 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin)
|
|||
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
|
||||
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
|
||||
if (NULL == *ppRes) {
|
||||
freeOperatorParam(pChild, OP_GET_PARAM);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (pChild) {
|
||||
(*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
|
||||
if (NULL == *ppRes) {
|
||||
if (NULL == (*ppRes)->pChildren) {
|
||||
freeOperatorParam(pChild, OP_GET_PARAM);
|
||||
freeOperatorParam(*ppRes, OP_GET_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
|
||||
freeOperatorParam(pChild, OP_GET_PARAM);
|
||||
freeOperatorParam(*ppRes, OP_GET_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
} else {
|
||||
|
@ -167,6 +174,8 @@ static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t down
|
|||
|
||||
SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
|
||||
if (NULL == pGc) {
|
||||
freeOperatorParam(*ppRes, OP_GET_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -193,6 +202,7 @@ static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_
|
|||
|
||||
SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
|
||||
if (NULL == pGc) {
|
||||
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -248,6 +258,7 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
|
|||
|
||||
SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
|
||||
if (NULL == pExc) {
|
||||
taosMemoryFreeClear(*ppRes);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -255,6 +266,7 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
|
|||
pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||
if (NULL == pExc->pBatchs) {
|
||||
taosMemoryFree(pExc);
|
||||
taosMemoryFreeClear(*ppRes);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
|
||||
|
@ -288,21 +300,36 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
|
|||
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) {
|
||||
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
|
||||
if (NULL == *ppRes) {
|
||||
freeOperatorParam(pChild0, OP_GET_PARAM);
|
||||
freeOperatorParam(pChild1, OP_GET_PARAM);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
|
||||
if (NULL == *ppRes) {
|
||||
freeOperatorParam(pChild0, OP_GET_PARAM);
|
||||
freeOperatorParam(pChild1, OP_GET_PARAM);
|
||||
freeOperatorParam(*ppRes, OP_GET_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
|
||||
freeOperatorParam(pChild0, OP_GET_PARAM);
|
||||
freeOperatorParam(pChild1, OP_GET_PARAM);
|
||||
freeOperatorParam(*ppRes, OP_GET_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
|
||||
freeOperatorParam(pChild1, OP_GET_PARAM);
|
||||
freeOperatorParam(*ppRes, OP_GET_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
|
||||
if (NULL == pJoin) {
|
||||
freeOperatorParam(*ppRes, OP_GET_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -318,16 +345,28 @@ static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initPara
|
|||
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
|
||||
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
|
||||
if (NULL == *ppRes) {
|
||||
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
|
||||
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
|
||||
if (NULL == *ppRes) {
|
||||
taosMemoryFreeClear(*ppRes);
|
||||
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
|
||||
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
|
||||
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
|
||||
freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
|
||||
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
|
||||
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
|
||||
freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
|
||||
*ppRes = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -420,13 +459,34 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
|
|||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
|
||||
pSrcParam0 = NULL;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
|
||||
pSrcParam1 = NULL;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildMergeJoinOperatorParam(ppParam, pSrcParam0 ? true : false, pGcParam0, pGcParam1);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
if (pSrcParam0) {
|
||||
freeOperatorParam(pSrcParam0, OP_GET_PARAM);
|
||||
}
|
||||
if (pSrcParam1) {
|
||||
freeOperatorParam(pSrcParam1, OP_GET_PARAM);
|
||||
}
|
||||
if (pGcParam0) {
|
||||
freeOperatorParam(pGcParam0, OP_GET_PARAM);
|
||||
}
|
||||
if (pGcParam1) {
|
||||
freeOperatorParam(pGcParam1, OP_GET_PARAM);
|
||||
}
|
||||
if (*ppParam) {
|
||||
freeOperatorParam(*ppParam, OP_GET_PARAM);
|
||||
*ppParam = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -488,7 +548,7 @@ static void handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCt
|
|||
|
||||
if (pPost->leftNeedCache) {
|
||||
uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
||||
if (--(*num) <= 0) {
|
||||
if (num && --(*num) <= 0) {
|
||||
tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
||||
notifySeqJoinTableCacheEnd(pOperator, pPost, true);
|
||||
}
|
||||
|
|
|
@ -277,7 +277,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
|
|||
SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
|
||||
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶m2);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
int32_t status2 = 0;
|
||||
|
@ -331,10 +331,12 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
|
|||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
colDataDestroy(ps);
|
||||
taosMemoryFree(ps);
|
||||
colDataDestroy(pe);
|
||||
taosMemoryFree(pe);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -223,6 +223,9 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S
|
|||
SGroupCacheFileInfo newFile = {0};
|
||||
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
|
||||
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
|
||||
if (NULL == pTmp) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTmp->deleted) {
|
||||
|
@ -287,7 +290,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
|||
|
||||
if (deleted) {
|
||||
qTrace("FileId:%d-%d-%d already be deleted, skip write",
|
||||
pCtx->id, pGroup->vgId, pHead->basic.fileId);
|
||||
pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);
|
||||
|
||||
int64_t blkId = pHead->basic.blkId;
|
||||
pHead = pHead->next;
|
||||
|
@ -337,7 +340,9 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));
|
||||
|
||||
if (NULL == pBufInfo) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SGcBlkBufInfo* pWriteHead = NULL;
|
||||
|
||||
|
@ -378,6 +383,10 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
|
|||
|
||||
static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) {
|
||||
SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
|
||||
if (NULL == pFileInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (0 == pFileInfo->groupNum) {
|
||||
removeGroupCacheFile(pFileInfo);
|
||||
|
||||
|
@ -711,6 +720,9 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int
|
|||
newFile.groupNum = 1;
|
||||
taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile));
|
||||
pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
|
||||
if (NULL == pTmp) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
} else {
|
||||
pTmp->groupNum++;
|
||||
}
|
||||
|
@ -786,6 +798,9 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
|
|||
}
|
||||
|
||||
*ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid));
|
||||
if (NULL == *ppGrp) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache);
|
||||
|
||||
qError("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache);
|
||||
|
|
|
@ -636,12 +636,14 @@ static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, S
|
|||
|
||||
int32_t code = getValBufFromPages(pJoin->pRowBufs, getHJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow);
|
||||
if (code) {
|
||||
taosMemoryFree(pRow);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (NULL == pGroup) {
|
||||
pRow->next = NULL;
|
||||
if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) {
|
||||
taosMemoryFree(pRow);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -711,6 +711,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pJoinInfo->pLeft || NULL == pJoinInfo->pRight) {
|
||||
setMergeJoinDone(pOperator);
|
||||
return false;
|
||||
}
|
||||
|
||||
// only the timestamp match support for ordinary table
|
||||
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
|
||||
|
|
|
@ -2883,7 +2883,7 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
|
|||
}
|
||||
|
||||
|
||||
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) {
|
||||
static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfTables = taosArrayGetSize(aUidTags);
|
||||
|
||||
|
@ -2894,9 +2894,15 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF
|
|||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||
|
||||
SScalarParam output = {0};
|
||||
tagScanCreateResultData(&type, numOfTables, &output);
|
||||
code = tagScanCreateResultData(&type, numOfTables, &output);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
scalarCalculate(pTagCond, pBlockList, &output);
|
||||
code = scalarCalculate(pTagCond, pBlockList, &output);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
bool* result = (bool*)output.columnData->pData;
|
||||
for (int32_t i = 0 ; i < numOfTables; ++i) {
|
||||
|
@ -2911,7 +2917,7 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF
|
|||
blockDataDestroy(pResBlock);
|
||||
taosArrayDestroy(pBlockList);
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void tagScanFillOneCellWithTag(SOperatorInfo* pOperator, const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
|
||||
|
@ -3024,7 +3030,11 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
bool ignoreFilterIdx = true;
|
||||
if (pInfo->pTagCond != NULL) {
|
||||
ignoreFilterIdx = false;
|
||||
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo);
|
||||
int32_t code = tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
pOperator->pTaskInfo->code = code;
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
}
|
||||
} else {
|
||||
ignoreFilterIdx = true;
|
||||
}
|
||||
|
|
|
@ -904,6 +904,7 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx
|
|||
}
|
||||
|
||||
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
|
||||
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
|
||||
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
|
||||
|
@ -930,7 +931,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
|||
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
|
||||
|
||||
SMultiwayMergeTreeInfo* pTree = NULL;
|
||||
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
||||
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(sup.aRowIdx);
|
||||
taosMemoryFree(sup.aTs);
|
||||
|
||||
return code;
|
||||
}
|
||||
int32_t nRows = 0;
|
||||
int32_t nMergedRows = 0;
|
||||
bool mergeLimitReached = false;
|
||||
|
@ -1054,7 +1061,14 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
tSimpleHashClear(mUidBlk);
|
||||
|
||||
int64_t p = taosGetTimestampUs();
|
||||
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
|
||||
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tSimpleHashCleanup(mUidBlk);
|
||||
taosArrayDestroy(aBlkSort);
|
||||
taosArrayDestroy(aExtSrc);
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
||||
|
|
Loading…
Reference in New Issue