Merge pull request #27729 from taosdata/fix/3_liaohj
fix(query): fix memory leak and do some internal refactor.
This commit is contained in:
commit
46246c47bb
|
@ -41,7 +41,7 @@ typedef struct SBlockOrderInfo {
|
|||
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
|
||||
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
|
||||
|
||||
#define QRY_OPTR_CHECK(_o) \
|
||||
#define QRY_PARAM_CHECK(_o) \
|
||||
do { \
|
||||
if ((_o) == NULL) { \
|
||||
return TSDB_CODE_INVALID_PARA; \
|
||||
|
|
|
@ -849,7 +849,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
|
|||
|
||||
int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) {
|
||||
int32_t code = 0;
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -1263,18 +1263,19 @@ static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBloc
|
|||
}
|
||||
|
||||
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
|
||||
*ppCols = NULL;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t rows = pDataBlock->info.capacity;
|
||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
int32_t i = 0;
|
||||
|
||||
*ppCols = NULL;
|
||||
|
||||
SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
|
||||
if (pCols == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
for (i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
if (pColInfoData == NULL) {
|
||||
continue;
|
||||
|
@ -1309,6 +1310,10 @@ static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoD
|
|||
return code;
|
||||
|
||||
_error:
|
||||
for(int32_t j = 0; j < i; ++j) {
|
||||
colDataDestroy(&pCols[j]);
|
||||
}
|
||||
|
||||
taosMemoryFree(pCols);
|
||||
return code;
|
||||
}
|
||||
|
@ -1753,7 +1758,7 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
|
|||
}
|
||||
|
||||
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
|
||||
QRY_OPTR_CHECK(pBlock);
|
||||
QRY_PARAM_CHECK(pBlock);
|
||||
|
||||
int32_t code = 0;
|
||||
SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
|
@ -1846,7 +1851,7 @@ _err:
|
|||
}
|
||||
|
||||
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
if (pDataBlock == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -1946,7 +1951,7 @@ _end:
|
|||
}
|
||||
|
||||
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
if (pDataBlock == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
@ -2029,7 +2034,7 @@ int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataB
|
|||
}
|
||||
|
||||
int32_t createDataBlock(SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
if (pBlock == NULL) {
|
||||
return terrno;
|
||||
|
@ -2080,7 +2085,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
|
|||
|
||||
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) {
|
||||
int32_t code = 0;
|
||||
QRY_OPTR_CHECK(pColInfoData);
|
||||
QRY_PARAM_CHECK(pColInfoData);
|
||||
|
||||
if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -2871,7 +2876,7 @@ bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
|
|||
}
|
||||
|
||||
int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) {
|
||||
QRY_OPTR_CHECK(pName);
|
||||
QRY_PARAM_CHECK(pName);
|
||||
|
||||
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
|
||||
if (!pBuf) {
|
||||
|
|
|
@ -408,9 +408,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
|
||||
for (int32_t k = startIndex; k < endIndex; ++k) {
|
||||
code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
int32_t i = 0;
|
||||
int32_t rows = block.numOfRecords;
|
||||
|
@ -536,12 +534,15 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
}
|
||||
|
||||
_end:
|
||||
(void)tStatisBlockDestroy(&block);
|
||||
|
||||
(void) tStatisBlockDestroy(&block);
|
||||
if (code != 0) {
|
||||
tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||
|
||||
tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -953,6 +954,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
|||
pMTree->pIter = NULL;
|
||||
pMTree->backward = pConf->backward;
|
||||
pMTree->idStr = pConf->idstr;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (!pMTree->backward) { // asc
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||
|
@ -1027,9 +1029,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
|||
// let's record the time window for current table of uid in the stt files
|
||||
if (pSttDataInfo != NULL && numOfRows > 0) {
|
||||
void *px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
QUERY_CHECK_NULL(px, code, lino, _end, terrno);
|
||||
|
||||
pSttDataInfo->numOfRows += numOfRows;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -403,7 +403,7 @@ static void initReaderStatus(SReaderStatus* pStatus) {
|
|||
}
|
||||
|
||||
static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
int32_t code = createDataBlock(&pBlock);
|
||||
|
|
|
@ -78,7 +78,7 @@ static int32_t getSchemaBytes(const SSchema* pSchema) {
|
|||
}
|
||||
|
||||
static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
|
||||
QRY_OPTR_CHECK(pOutput);
|
||||
QRY_PARAM_CHECK(pOutput);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
int32_t code = createDataBlock(&pBlock);
|
||||
|
@ -236,7 +236,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
|
|||
static int32_t execResetQueryCache() { return catalogClearCache(); }
|
||||
|
||||
static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) {
|
||||
QRY_OPTR_CHECK(pOutput);
|
||||
QRY_PARAM_CHECK(pOutput);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
int32_t code = createDataBlock(&pBlock);
|
||||
|
@ -475,7 +475,7 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT
|
|||
}
|
||||
|
||||
static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
|
||||
QRY_OPTR_CHECK(pOutput);
|
||||
QRY_PARAM_CHECK(pOutput);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
int32_t code = createDataBlock(&pBlock);
|
||||
|
@ -499,7 +499,7 @@ static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
|
|||
}
|
||||
|
||||
static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) {
|
||||
QRY_OPTR_CHECK(pOutput);
|
||||
QRY_PARAM_CHECK(pOutput);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
int32_t code = createDataBlock(&pBlock);
|
||||
|
@ -929,7 +929,7 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
|
|||
}
|
||||
|
||||
static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) {
|
||||
QRY_OPTR_CHECK(pOutput);
|
||||
QRY_PARAM_CHECK(pOutput);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
int32_t code = createDataBlock(&pBlock);
|
||||
|
|
|
@ -70,7 +70,7 @@ static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus
|
|||
|
||||
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -97,7 +97,7 @@ _end:
|
|||
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -295,7 +295,7 @@ _end:
|
|||
|
||||
int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -889,7 +889,7 @@ int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
|
|||
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||
|
||||
QRY_OPTR_CHECK(pRes);
|
||||
QRY_PARAM_CHECK(pRes);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return code;
|
||||
}
|
||||
|
@ -958,7 +958,7 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
|
|||
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||
SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
__optr_fn_t nextFp = NULL;
|
||||
|
|
|
@ -61,7 +61,7 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
|
|||
|
||||
int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -401,7 +401,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
|
|||
|
||||
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -1924,7 +1924,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
|
|||
}
|
||||
|
||||
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
|
||||
QRY_OPTR_CHECK(pExprInfo);
|
||||
QRY_PARAM_CHECK(pExprInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
|
||||
|
|
|
@ -1280,7 +1280,7 @@ void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType
|
|||
|
||||
FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
|
||||
SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
int32_t code = 0;
|
||||
if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
|
||||
|
|
|
@ -423,7 +423,7 @@ static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiN
|
|||
|
||||
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
|
|
|
@ -1444,7 +1444,7 @@ static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam*
|
|||
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));
|
||||
|
|
|
@ -353,38 +353,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (!hasRemainResults(&pInfo->groupResInfo)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return (pRes->info.rows == 0) ? NULL : pRes;
|
||||
}
|
||||
|
||||
bool hasRemainResultByHash(SOperatorInfo* pOperator) {
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable;
|
||||
|
@ -463,25 +431,23 @@ _end:
|
|||
}
|
||||
|
||||
static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
(*ppRes) = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
|
||||
int32_t order = pInfo->binfo.inputTsOrder;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
QRY_PARAM_CHECK(ppRes);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
|
||||
return code;
|
||||
}
|
||||
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
|
||||
|
||||
int32_t order = pInfo->binfo.inputTsOrder;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
|
||||
|
@ -511,10 +477,12 @@ static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
|
|||
if (pGroupResInfo->pRows != NULL) {
|
||||
taosArrayDestroy(pGroupResInfo->pRows);
|
||||
}
|
||||
|
||||
if (pGroupResInfo->pBuf) {
|
||||
taosMemoryFree(pGroupResInfo->pBuf);
|
||||
pGroupResInfo->pBuf = NULL;
|
||||
}
|
||||
|
||||
pGroupResInfo->index = 0;
|
||||
pGroupResInfo->iter = 0;
|
||||
pGroupResInfo->dataPos = NULL;
|
||||
|
@ -525,15 +493,16 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
} else {
|
||||
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -1127,7 +1096,7 @@ static void destroyPartitionOperatorInfo(void* param) {
|
|||
|
||||
int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -1668,7 +1637,7 @@ void freePartItem(void* ptr) {
|
|||
|
||||
int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -996,7 +996,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
|
|||
SSDataBlock* pRes = pJoin->finBlk;
|
||||
int64_t st = 0;
|
||||
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
if (pOperator->cost.openCost == 0) {
|
||||
st = taosGetTimestampUs();
|
||||
}
|
||||
|
@ -1182,7 +1182,7 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN
|
|||
|
||||
int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||
SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo));
|
||||
|
|
|
@ -1867,7 +1867,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
|
|||
|
||||
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t oldNum = numOfDownstream;
|
||||
bool newDownstreams = false;
|
||||
|
|
|
@ -336,7 +336,7 @@ int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
|
@ -419,7 +419,7 @@ int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock
|
|||
}
|
||||
|
||||
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
|
@ -499,7 +499,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return 0;
|
||||
|
@ -556,7 +556,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai
|
|||
|
||||
int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SMergePhysiNode* pMergePhyNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -180,7 +180,7 @@ ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, con
|
|||
|
||||
// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
|
||||
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
if (pOperator == NULL) {
|
||||
qError("invalid operator, failed to find tableScanOperator %s", id);
|
||||
|
@ -282,7 +282,7 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SSto
|
|||
|
||||
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
|
||||
SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t type = nodeType(pPhyNode);
|
||||
|
@ -878,7 +878,7 @@ SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, i
|
|||
}
|
||||
|
||||
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
|
||||
QRY_OPTR_CHECK(pRes);
|
||||
QRY_PARAM_CHECK(pRes);
|
||||
|
||||
int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
|
|
@ -93,7 +93,7 @@ void streamOperatorReloadState(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
|
||||
|
@ -262,7 +262,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
|
|||
}
|
||||
|
||||
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
SProjectOperatorInfo* pProjectInfo = pOperator->info;
|
||||
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
|
||||
|
@ -441,7 +441,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
|
||||
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t numOfRows = 4096;
|
||||
|
@ -572,7 +572,7 @@ SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||
|
@ -717,7 +717,7 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
|
|||
}
|
||||
|
||||
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
|
||||
QRY_OPTR_CHECK(pResList);
|
||||
QRY_PARAM_CHECK(pResList);
|
||||
SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pList == NULL) {
|
||||
return terrno;
|
||||
|
|
|
@ -937,13 +937,13 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
|||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
bool hasNext = false;
|
||||
pBlock->info.dataLoad = false;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
QRY_PARAM_CHECK(ppRes);
|
||||
pBlock->info.dataLoad = false;
|
||||
|
||||
while (true) {
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -957,7 +957,7 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
|||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -993,6 +993,7 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
|||
|
||||
pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
|
||||
pBlock->info.scanFlag = pTableScanInfo->base.scanFlag;
|
||||
|
||||
(*ppRes) = pBlock;
|
||||
return code;
|
||||
}
|
||||
|
@ -1001,9 +1002,7 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1014,7 +1013,7 @@ static int32_t doGroupedTableScan(SOperatorInfo* pOperator, SSDataBlock** pBlock
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
QRY_OPTR_CHECK(pBlock);
|
||||
QRY_PARAM_CHECK(pBlock);
|
||||
|
||||
// The read handle is not initialized yet, since no qualified tables exists
|
||||
if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -1194,7 +1193,7 @@ static int32_t startNextGroupScan(SOperatorInfo* pOperator, SSDataBlock** pResul
|
|||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
int32_t numOfTables = 0;
|
||||
|
||||
QRY_OPTR_CHECK(pResult);
|
||||
QRY_PARAM_CHECK(pResult);
|
||||
|
||||
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -1242,7 +1241,7 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||
static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
STableScanInfo* pInfo = pOperator->info;
|
||||
|
@ -1250,12 +1249,14 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
|||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
SSDataBlock* result = NULL;
|
||||
SSDataBlock* pResult = NULL;
|
||||
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
if (pInfo->currentGroupId == -1) {
|
||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
taosRLockLatch(&pTaskInfo->lock);
|
||||
|
@ -1273,28 +1274,32 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
|||
if (pInfo->filesetDelimited) {
|
||||
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
|
||||
}
|
||||
|
||||
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
|
||||
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
|
||||
}
|
||||
}
|
||||
|
||||
result = NULL;
|
||||
code = doGroupedTableScan(pOperator, &result);
|
||||
pResult = NULL;
|
||||
code = doGroupedTableScan(pOperator, &pResult);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (result != NULL) {
|
||||
if (pResult != NULL) {
|
||||
if (pOperator->dynamicTask) {
|
||||
result->info.id.groupId = result->info.id.uid;
|
||||
pResult->info.id.groupId = pResult->info.id.uid;
|
||||
}
|
||||
return result;
|
||||
|
||||
*pResBlock = pResult;
|
||||
return code;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
code = startNextGroupScan(pOperator, &result);
|
||||
code = startNextGroupScan(pOperator, &pResult);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (result || pOperator->status == OP_EXEC_DONE) {
|
||||
return result;
|
||||
if (pResult || pOperator->status == OP_EXEC_DONE) {
|
||||
*pResBlock = pResult;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1302,9 +1307,9 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return result;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
|
@ -1313,7 +1318,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
QRY_OPTR_CHECK(ppRes);
|
||||
QRY_PARAM_CHECK(ppRes);
|
||||
|
||||
if (pOperator->pOperatorGetParam) {
|
||||
pOperator->dynamicTask = true;
|
||||
|
@ -1394,8 +1399,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
pInfo->scanTimes = 0;
|
||||
}
|
||||
} else { // scan table group by group sequentially
|
||||
(*ppRes) = groupSeqTableScan(pOperator);
|
||||
return code;
|
||||
code = groupSeqTableScan(pOperator, ppRes);
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -1447,7 +1451,7 @@ static void destroyTableScanOperatorInfo(void* param) {
|
|||
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -1547,7 +1551,7 @@ _error:
|
|||
}
|
||||
|
||||
int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||
|
@ -3704,7 +3708,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
if (pInfo->dataReader && hasNext) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
return code;
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
|
@ -3806,6 +3810,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
}
|
||||
}
|
||||
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -3814,8 +3819,8 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -3837,7 +3842,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo
|
|||
// create meta reader
|
||||
// create tq reader
|
||||
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
||||
|
@ -4029,7 +4034,7 @@ _end:
|
|||
int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
|
||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -4262,7 +4267,7 @@ _error:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
|
||||
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
|
||||
SStorageAPI* pAPI) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -4276,7 +4281,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
|
|||
GET_TASKID(pTaskInfo));
|
||||
tDecoderClear(&(*mr).coder);
|
||||
pAPI->metaReaderFn.clearReader(mr);
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
goto _end;
|
||||
}
|
||||
|
||||
code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
|
||||
|
@ -4285,7 +4290,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
|
|||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
|
||||
GET_TASKID(pTaskInfo));
|
||||
pAPI->metaReaderFn.clearReader(mr);
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
goto _end;
|
||||
}
|
||||
|
||||
char str[512];
|
||||
|
@ -4314,12 +4319,13 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
|
|||
} else {
|
||||
data = (char*)p;
|
||||
}
|
||||
|
||||
code = colDataSetVal(pDst, (count), data,
|
||||
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
||||
data != NULL) {
|
||||
if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
||||
(data != NULL)) {
|
||||
taosMemoryFree(data);
|
||||
}
|
||||
}
|
||||
|
@ -4329,8 +4335,9 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void tagScanFreeUidTag(void* p) {
|
||||
|
@ -4558,10 +4565,6 @@ _end:
|
|||
}
|
||||
|
||||
static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
(*ppRes) = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -4569,6 +4572,12 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
|
||||
STagScanInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
|
||||
QRY_PARAM_CHECK(ppRes);
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
blockDataCleanup(pRes);
|
||||
|
||||
if (pInfo->pCtbCursor == NULL) {
|
||||
|
@ -4635,28 +4644,23 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
pInfo->pCtbCursor = NULL;
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
pRes->info.rows = count;
|
||||
|
||||
pRes->info.rows = count;
|
||||
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
||||
if (bLimitReached) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doTagScanFromCtbIdxNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
|
@ -4687,18 +4691,18 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
return code;
|
||||
}
|
||||
|
||||
char str[512] = {0};
|
||||
int32_t count = 0;
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
||||
|
||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
||||
doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
||||
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
||||
++count;
|
||||
if (++pInfo->curPos >= size) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
}
|
||||
|
||||
pRes->info.rows = count;
|
||||
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
@ -4706,6 +4710,7 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
if (bLimitReached) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
|
@ -4745,7 +4750,7 @@ static void destroyTagScanOperatorInfo(void* param) {
|
|||
int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
|
||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -4913,7 +4918,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
|||
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock,
|
||||
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||
if (code != 0) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4921,18 +4926,20 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
|||
|
||||
while (true) {
|
||||
bool hasNext = false;
|
||||
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
|
||||
if (code != 0) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
return code;
|
||||
}
|
||||
|
||||
*pSubTableHasBlock = false;
|
||||
break;
|
||||
}
|
||||
|
@ -4947,8 +4954,9 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
|||
code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status);
|
||||
if (code != 0) {
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
||||
*pSubTableHasBlock = false;
|
||||
break;
|
||||
|
@ -5215,49 +5223,56 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) {
|
||||
static int32_t getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t capacity, SSDataBlock** pResBlock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
bool finished = false;
|
||||
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
while (true) {
|
||||
while (true) {
|
||||
while (1) {
|
||||
if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) {
|
||||
finished = true;
|
||||
break;
|
||||
}
|
||||
|
||||
code = appendChosenRowToDataBlock(pSubTblsInfo, pResBlock);
|
||||
code = appendChosenRowToDataBlock(pSubTblsInfo, pBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
code = adjustSubTableForNextRow(pOperator, pSubTblsInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pResBlock->info.rows >= capacity) {
|
||||
if (pBlock->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, pTaskInfo->code);
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
||||
if (finished || limitReached || pResBlock->info.rows > 0) {
|
||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
|
||||
if (finished || limitReached || pBlock->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlock->info.rows > 0) {
|
||||
*pResBlock = pBlock;
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
|
||||
|
@ -5307,23 +5322,22 @@ static void stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
|
|||
}
|
||||
|
||||
int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
(*ppRes) = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
QRY_PARAM_CHECK(ppRes);
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t tableListSize = 0;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
int32_t tableListSize = 0;
|
||||
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -5335,6 +5349,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
pInfo->tableStartIndex = 0;
|
||||
STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
|
||||
|
@ -5346,15 +5361,19 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
SSDataBlock* pBlock = NULL;
|
||||
while (pInfo->tableStartIndex < tableListSize) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
break;
|
||||
}
|
||||
|
||||
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
code = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity, &pBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
|
||||
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
|
||||
|
||||
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
|
||||
}
|
||||
|
||||
if (pBlock != NULL) {
|
||||
pBlock->info.id.groupId = pInfo->groupId;
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
|
@ -5385,16 +5404,11 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
} else {
|
||||
(*ppRes) = pBlock;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeOpInfo) {
|
||||
|
@ -5426,7 +5440,7 @@ _end:
|
|||
}
|
||||
}
|
||||
|
||||
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
|
||||
static int32_t doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
@ -5439,7 +5453,8 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
|
|||
if (code != 0) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||
|
@ -5448,7 +5463,7 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
|
|||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
}
|
||||
*pFinished = true;
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
uint32_t status = 0;
|
||||
|
@ -5456,21 +5471,22 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
|
|||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
||||
*pFinished = true;
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
// current block is filter out according to filter condition, continue load the next block
|
||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||
*pSkipped = true;
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
|
||||
|
@ -5505,7 +5521,12 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
|
|||
} else {
|
||||
bool bFinished = false;
|
||||
bool bSkipped = false;
|
||||
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
|
||||
|
||||
code = doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pBlock = pInfo->pReaderBlock;
|
||||
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
|
||||
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
|
||||
|
@ -5518,7 +5539,6 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
|
|||
if (!bSkipped) {
|
||||
code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
*ppBlock = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -5657,7 +5677,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
param->pOperator = pOperator;
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
if (ps == NULL) {
|
||||
taosMemoryFree(param);
|
||||
QUERY_CHECK_NULL(ps, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
ps->param = param;
|
||||
ps->onlyRef = false;
|
||||
code = tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
@ -5852,7 +5876,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
SSDataBlock* pBlock = NULL;
|
||||
while (pInfo->tableStartIndex < tableListSize) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
|
||||
|
@ -5896,9 +5920,10 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
} else {
|
||||
(*ppRes) = pBlock;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -5971,7 +5996,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
|
|||
int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -6106,11 +6131,11 @@ static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo*
|
|||
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
|
||||
static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||
SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
|
||||
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
|
||||
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
|
||||
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes);
|
||||
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
|
||||
|
@ -6213,7 +6238,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList*
|
|||
|
||||
int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -6396,22 +6421,28 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
STableCountScanOperatorInfo* pInfo = pOperator->info;
|
||||
STableCountScanSupp* pSupp = &pInfo->supp;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
|
||||
blockDataCleanup(pRes);
|
||||
QRY_PARAM_CHECK(ppRes);
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pInfo->readHandle.mnd != NULL) {
|
||||
(*ppRes) = buildSysDbTableCount(pOperator, pInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
(*ppRes) = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
|
||||
code = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
|
||||
if ((pRes->info.rows > 0) && (code == 0)) {
|
||||
*ppRes = pRes;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -6424,27 +6455,29 @@ static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCount
|
|||
// get dbname
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
|
||||
SName sn = {0};
|
||||
|
||||
code = tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
code = tNameGetDbName(&sn, dbName);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pSupp->groupByDbName || pSupp->groupByStbName) {
|
||||
buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
|
||||
code = buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
|
||||
} else {
|
||||
buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
|
||||
code = buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return pRes->info.rows > 0 ? pRes : NULL;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -6458,6 +6491,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca
|
|||
code = pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
|
||||
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
|
||||
code = buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI);
|
||||
|
@ -6487,11 +6521,11 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -6527,9 +6561,10 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||
|
|
|
@ -55,7 +55,7 @@ static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
|
|||
|
||||
// todo add limit/offset impl
|
||||
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
@ -252,7 +252,7 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
|
|||
|
||||
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
blockDataCleanup(pDataBlock);
|
||||
int32_t lino = 0;
|
||||
int32_t code = 0;
|
||||
|
@ -355,25 +355,26 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
|||
pInfo->startTs = taosGetTimestampUs();
|
||||
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||
pInfo->pSortHandle = NULL;
|
||||
int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str,
|
||||
pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
|
||||
int32_t code =
|
||||
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
|
||||
pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
if (ps == NULL) {
|
||||
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
if (pSource == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
ps->param = pOperator->pDownstream[0];
|
||||
ps->onlyRef = true;
|
||||
pSource->param = pOperator->pDownstream[0];
|
||||
pSource->onlyRef = true;
|
||||
|
||||
code = tsortAddSource(pInfo->pSortHandle, ps);
|
||||
code = tsortAddSource(pInfo->pSortHandle, pSource);
|
||||
if (code) {
|
||||
taosMemoryFree(ps);
|
||||
taosMemoryFree(pSource);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -390,7 +391,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -400,7 +401,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
return code;
|
||||
}
|
||||
|
||||
// multi-group case not handle here
|
||||
|
@ -408,7 +409,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
while (1) {
|
||||
if (tsortIsClosed(pInfo->pSortHandle)) {
|
||||
code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||
|
@ -516,7 +517,7 @@ typedef struct SGroupSortOperatorInfo {
|
|||
|
||||
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||
SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
blockDataCleanup(pDataBlock);
|
||||
int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
|
||||
|
@ -598,7 +599,7 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
|
|||
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
|
||||
SSDataBlock* block = NULL;
|
||||
|
||||
QRY_OPTR_CHECK(ppBlock);
|
||||
QRY_PARAM_CHECK(ppBlock);
|
||||
|
||||
if (grpSortOpInfo->prefetchedSortInput) {
|
||||
block = grpSortOpInfo->prefetchedSortInput;
|
||||
|
@ -648,23 +649,22 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
|
|||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
|
||||
if (ps == NULL || param == NULL) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
taosMemoryFree(ps);
|
||||
taosMemoryFree(param);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
param->childOpInfo = pOperator->pDownstream[0];
|
||||
param->grpSortOpInfo = pInfo;
|
||||
|
||||
ps->param = param;
|
||||
ps->onlyRef = false;
|
||||
code = tsortAddSource(pInfo->pCurrSortHandle, ps);
|
||||
if (code) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tsortOpen(pInfo->pCurrSortHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -686,7 +686,7 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_OPTR_CHECK(pResBlock);
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SGroupSortOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
|
@ -696,7 +696,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!pInfo->hasGroupId) {
|
||||
|
@ -720,15 +720,14 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
while (pInfo->pCurrSortHandle != NULL) {
|
||||
if (tsortIsClosed(pInfo->pCurrSortHandle)) {
|
||||
code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
|
||||
if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
pOperator->pTaskInfo->code = code;
|
||||
pTaskInfo->code = code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||
|
@ -777,7 +776,7 @@ void destroyGroupSortOperatorInfo(void* param) {
|
|||
|
||||
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
|
|
|
@ -807,7 +807,7 @@ _end:
|
|||
|
||||
int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
SCountWinodwPhysiNode* pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
|
||||
int32_t numOfCols = 0;
|
||||
|
|
|
@ -858,7 +858,7 @@ _end:
|
|||
|
||||
int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode;
|
||||
int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId;
|
||||
|
|
|
@ -1349,7 +1349,7 @@ _end:
|
|||
|
||||
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -1886,7 +1886,7 @@ _end:
|
|||
int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
|
||||
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -3754,7 +3754,7 @@ _end:
|
|||
|
||||
int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
int32_t numOfCols = 0;
|
||||
|
@ -4081,7 +4081,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
|
||||
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -4931,7 +4931,7 @@ _end:
|
|||
|
||||
int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
|
@ -5248,7 +5248,7 @@ _end:
|
|||
|
||||
int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -2233,7 +2233,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
|
|||
|
||||
int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -2864,7 +2864,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
|
|||
int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -1104,7 +1104,7 @@ static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn
|
|||
}
|
||||
|
||||
int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -1320,7 +1320,7 @@ _end:
|
|||
|
||||
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -1616,7 +1616,7 @@ _end:
|
|||
// todo make this as an non-blocking operator
|
||||
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -1727,7 +1727,7 @@ void destroySWindowOperatorInfo(void* param) {
|
|||
|
||||
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -2027,7 +2027,7 @@ static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock
|
|||
|
||||
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -2366,7 +2366,7 @@ _end:
|
|||
|
||||
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -79,7 +79,6 @@ struct SSortHandle {
|
|||
bool forceUsePQSort;
|
||||
BoundedQueue* pBoundedQueue;
|
||||
uint32_t tmpRowIdx;
|
||||
|
||||
int64_t mergeLimit;
|
||||
int64_t currMergeLimitTs;
|
||||
|
||||
|
@ -288,7 +287,7 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
QRY_OPTR_CHECK(pHandle);
|
||||
QRY_PARAM_CHECK(pHandle);
|
||||
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
||||
QUERY_CHECK_NULL(pSortHandle, code, lino, _err, terrno);
|
||||
|
||||
|
@ -367,7 +366,7 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
|
||||
void tsortClearOrderedSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
|
||||
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
|
||||
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
|
||||
if (NULL == *pSource) {
|
||||
|
@ -413,10 +412,11 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
|
|||
destroyDiskbasedBuf(pSortHandle->pBuf);
|
||||
taosMemoryFreeClear(pSortHandle->idStr);
|
||||
blockDataDestroy(pSortHandle->pDataBlock);
|
||||
|
||||
if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
|
||||
|
||||
int64_t fetchUs = 0, fetchNum = 0;
|
||||
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
|
||||
tsortClearOrderedSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
|
||||
qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
|
||||
|
||||
taosArrayDestroy(pSortHandle->pOrderedSource);
|
||||
|
@ -465,7 +465,13 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
|
|||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
|
||||
int32_t code = blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
|
||||
if (code != 0) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
||||
|
@ -935,9 +941,99 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32_t numOfSorted,
|
||||
int32_t numOfInputSources, SArray* pResList, int32_t sortGroup, int32_t numOfRows) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SArray* pPageIdList = NULL;
|
||||
|
||||
for (int32_t i = 0; i < sortGroup; ++i) {
|
||||
qDebug("internal merge sort pass %d group %d. num input sources %d ", sortTimes, i, numOfInputSources);
|
||||
pHandle->sourceId += 1;
|
||||
|
||||
int32_t end = (i + 1) * numOfInputSources - 1;
|
||||
if (end > numOfSorted - 1) {
|
||||
end = numOfSorted - 1;
|
||||
}
|
||||
|
||||
pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
|
||||
|
||||
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
||||
code =
|
||||
tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
||||
int32_t nMergedRows = 0;
|
||||
pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
||||
QUERY_CHECK_NULL(pPageIdList, code, lino, _err, terrno);
|
||||
|
||||
while (1) {
|
||||
if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
|
||||
code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
|
||||
if (pDataBlock == NULL || code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||
QUERY_CHECK_NULL(pPage, code, lino, _err, terrno);
|
||||
|
||||
void* px = taosArrayPush(pPageIdList, &pageId);
|
||||
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
|
||||
|
||||
int32_t size =
|
||||
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
|
||||
if (size > getBufPageSize(pHandle->pBuf)) {
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = blockDataToBuf(pPage, pDataBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
nMergedRows += pDataBlock->info.rows;
|
||||
|
||||
blockDataCleanup(pDataBlock);
|
||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
code = sortComparCleanup(&pHandle->cmpParam);
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
||||
tMergeTreeDestroy(&pHandle->pMergeTree);
|
||||
pHandle->numOfCompletedSources = 0;
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
||||
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
taosArrayDestroy(pPageIdList);
|
||||
qError("%s error happens:%s line:%d, code:%s", pHandle->idStr, __func__, lino, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||
size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
|
||||
if (numOfSources == 0) {
|
||||
|
@ -959,8 +1055,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
pHandle->numOfPages);
|
||||
}
|
||||
|
||||
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
|
||||
blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
|
||||
int32_t size = (int32_t) blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock));
|
||||
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, size);
|
||||
if (numOfRows < 0) {
|
||||
return terrno;
|
||||
}
|
||||
|
@ -985,117 +1081,22 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
|
||||
|
||||
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
|
||||
for (int32_t i = 0; i < sortGroup; ++i) {
|
||||
qDebug("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources);
|
||||
pHandle->sourceId += 1;
|
||||
|
||||
int32_t end = (i + 1) * numOfInputSources - 1;
|
||||
if (end > numOfSorted - 1) {
|
||||
end = numOfSorted - 1;
|
||||
}
|
||||
|
||||
pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
|
||||
|
||||
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = doSortForEachGroup(pHandle, t, numOfSorted, numOfInputSources, pResList, sortGroup, numOfRows);
|
||||
if (code != 0) {
|
||||
tsortClearOrderedSource(pResList, NULL, NULL);
|
||||
taosArrayDestroy(pResList);
|
||||
return code;
|
||||
}
|
||||
|
||||
code =
|
||||
tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pResList);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t nMergedRows = 0;
|
||||
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pPageIdList == NULL) {
|
||||
taosArrayDestroy(pResList);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
|
||||
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
return code;
|
||||
}
|
||||
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
|
||||
if (pDataBlock == NULL || code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||
if (pPage == NULL) {
|
||||
taosArrayDestroy(pResList);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
void* px = taosArrayPush(pPageIdList, &pageId);
|
||||
if (px == NULL) {
|
||||
taosArrayDestroy(pResList);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t size =
|
||||
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
|
||||
if (size > getBufPageSize(pHandle->pBuf)) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
code= blockDataToBuf(pPage, pDataBlock);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
nMergedRows += pDataBlock->info.rows;
|
||||
|
||||
blockDataCleanup(pDataBlock);
|
||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
code = sortComparCleanup(&pHandle->cmpParam);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
tMergeTreeDestroy(&pHandle->pMergeTree);
|
||||
pHandle->numOfCompletedSources = 0;
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
|
||||
code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
|
||||
if (code) {
|
||||
taosArrayDestroy(pResList);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pResList);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList);
|
||||
if (px == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
tsortClearOrderedSource(pResList, NULL, NULL);
|
||||
taosArrayDestroy(pResList);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pResList);
|
||||
|
||||
numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
|
||||
|
||||
int64_t el = taosGetTimestampUs() - st;
|
||||
|
@ -2346,7 +2347,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
}
|
||||
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
if (!tsortIsClosed(pHandle)) {
|
||||
void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
|
||||
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
|
||||
|
@ -2378,37 +2379,44 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void freeSSortSource(SSortSource* source) {
|
||||
if (NULL == source) {
|
||||
static void freeSortSource(SSortSource* pSource) {
|
||||
if (NULL == pSource) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (source->param && !source->onlyRef) {
|
||||
taosMemoryFree(source->param);
|
||||
if (!pSource->onlyRef && pSource->param) {
|
||||
taosMemoryFree(pSource->param);
|
||||
}
|
||||
if (!source->onlyRef && source->src.pBlock) {
|
||||
blockDataDestroy(source->src.pBlock);
|
||||
source->src.pBlock = NULL;
|
||||
|
||||
if (!pSource->onlyRef && pSource->src.pBlock) {
|
||||
blockDataDestroy(pSource->src.pBlock);
|
||||
pSource->src.pBlock = NULL;
|
||||
}
|
||||
taosMemoryFree(source);
|
||||
|
||||
taosMemoryFree(pSource);
|
||||
}
|
||||
|
||||
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||
int32_t code = 0;
|
||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||
if (pSource == NULL) {
|
||||
SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SSortSource* source = *pSource;
|
||||
*pSource = NULL;
|
||||
SSortSource* pSource = *p;
|
||||
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
taosArrayRemove(pHandle->pOrderedSource, 0);
|
||||
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = NULL;
|
||||
TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
|
||||
code = pHandle->fetchfp(pSource->param, &pBlock);
|
||||
if (code != 0) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -2422,7 +2430,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
|
||||
if (code) {
|
||||
freeSSortSource(source);
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -2433,47 +2441,45 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
|
||||
code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
freeSSortSource(source);
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
if (size > sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
int64_t st = taosGetTimestampUs();
|
||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
freeSSortSource(source);
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
||||
|
||||
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
freeSSortSource(source);
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
freeSSortSource(source);
|
||||
freeSortSource(pSource);
|
||||
|
||||
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
||||
|
||||
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
|
||||
if (size <= sortBufSize && pHandle->pBuf == NULL) {
|
||||
|
@ -2488,6 +2494,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2500,7 +2507,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
code = createBlocksMergeSortInitialSources(pHandle);
|
||||
}
|
||||
|
||||
qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
|
||||
qDebug("%s %zu sources created", pHandle->idStr, taosArrayGetSize(pHandle->pOrderedSource));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1224,7 +1224,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
||||
QRY_OPTR_CHECK(pList);
|
||||
QRY_PARAM_CHECK(pList);
|
||||
|
||||
int32_t code = 0;
|
||||
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||
|
|
Loading…
Reference in New Issue