fix(query): fix memory leak.
This commit is contained in:
parent
5f23ba709a
commit
c118df0ec4
|
@ -27,11 +27,8 @@ typedef struct SOperatorCostInfo {
|
||||||
|
|
||||||
struct SOperatorInfo;
|
struct SOperatorInfo;
|
||||||
|
|
||||||
//typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
|
|
||||||
//typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
|
|
||||||
|
|
||||||
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
|
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
|
||||||
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
|
typedef int32_t (*__optr_fn_t)(struct SOperatorInfo* pOptr, SSDataBlock** pResBlock);
|
||||||
typedef void (*__optr_close_fn_t)(void* param);
|
typedef void (*__optr_close_fn_t)(void* param);
|
||||||
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
||||||
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
|
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
|
||||||
|
|
|
@ -56,9 +56,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
|
||||||
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
|
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
|
||||||
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
|
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
|
||||||
|
|
||||||
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
|
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
|
||||||
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator);
|
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
|
||||||
|
|
||||||
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
||||||
const char* pKey);
|
const char* pKey);
|
||||||
|
|
||||||
|
@ -123,7 +122,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
|
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
|
||||||
!pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
|
!pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResult, NULL, destroyAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
|
|
|
@ -50,7 +50,7 @@ typedef struct SCacheRowsScanInfo {
|
||||||
SColumnInfo pkCol;
|
SColumnInfo pkCol;
|
||||||
} SCacheRowsScanInfo;
|
} SCacheRowsScanInfo;
|
||||||
|
|
||||||
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
|
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
|
||||||
static void destroyCacheScanOperator(void* param);
|
static void destroyCacheScanOperator(void* param);
|
||||||
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
|
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
|
||||||
int32_t** pDstSlotIds);
|
int32_t** pDstSlotIds);
|
||||||
|
@ -235,7 +235,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCacheNext, NULL, destroyCacheScanOperator, optrDefaultBufFn,
|
||||||
NULL, optrDefaultGetNextExtFn, NULL);
|
NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
|
@ -259,7 +259,7 @@ _error:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -445,12 +445,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doScanCacheNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
void destroyCacheScanOperator(void* param) {
|
void destroyCacheScanOperator(void* param) {
|
||||||
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
|
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
|
||||||
blockDataDestroy(pInfo->pRes);
|
blockDataDestroy(pInfo->pRes);
|
||||||
|
|
|
@ -64,6 +64,8 @@ void destroyCountWindowOperatorInfo(void* param) {
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
|
||||||
|
|
||||||
static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; }
|
static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; }
|
||||||
|
|
||||||
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
|
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
|
||||||
|
@ -227,7 +229,6 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
|
||||||
SExprSupp* pExprSup = &pOperator->exprSupp;
|
SExprSupp* pExprSup = &pOperator->exprSupp;
|
||||||
int32_t order = pInfo->binfo.inputTsOrder;
|
int32_t order = pInfo->binfo.inputTsOrder;
|
||||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
||||||
|
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
|
|
||||||
|
@ -292,12 +293,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = countWindowAggregateNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
|
int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
|
||||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
@ -374,7 +369,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregate, NULL, destroyCountWindowOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, countWindowAggregateNext, NULL, destroyCountWindowOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -878,20 +878,18 @@ static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock**
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
|
static FORCE_INLINE void seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
|
||||||
pBlock->info.id.blockId = pStbJoin->outputBlkId;
|
pBlock->info.id.blockId = pStbJoin->outputBlkId;
|
||||||
return pBlock;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
|
||||||
SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
|
||||||
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
|
|
||||||
|
QRY_OPTR_CHECK(pRes);
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return pRes;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = 0;
|
int64_t st = 0;
|
||||||
|
@ -907,25 +905,24 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, &pRes));
|
QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
|
||||||
if (pRes) {
|
if (pRes) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, &pRes));
|
QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (pOperator->cost.openCost == 0) {
|
if (pOperator->cost.openCost == 0) {
|
||||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
pOperator->pTaskInfo->code = code;
|
pOperator->pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
|
} else {
|
||||||
|
seqStableJoinComposeRes(pStbJoin, *pRes);
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
return pRes ? seqStableJoinComposeRes(pStbJoin, pRes) : NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
|
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
|
||||||
|
|
|
@ -39,9 +39,9 @@ typedef struct SEventWindowOperatorInfo {
|
||||||
SSDataBlock* pPreDataBlock;
|
SSDataBlock* pPreDataBlock;
|
||||||
} SEventWindowOperatorInfo;
|
} SEventWindowOperatorInfo;
|
||||||
|
|
||||||
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator);
|
static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** pRes);
|
||||||
static void destroyEWindowOperatorInfo(void* param);
|
static void destroyEWindowOperatorInfo(void* param);
|
||||||
static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
|
static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
|
||||||
|
|
||||||
// todo : move to util
|
// todo : move to util
|
||||||
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
|
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
|
||||||
|
@ -131,7 +131,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregateNext, NULL, destroyEWindowOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -250,12 +250,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = eventWindowAggregateNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
|
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
|
||||||
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
|
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
|
||||||
if (*pResult == NULL) {
|
if (*pResult == NULL) {
|
||||||
|
|
|
@ -305,12 +305,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = loadRemoteDataNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
|
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
|
||||||
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
|
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
|
||||||
if (pInfo->pSourceDataInfo == NULL) {
|
if (pInfo->pSourceDataInfo == NULL) {
|
||||||
|
@ -447,7 +441,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
|
||||||
code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -692,9 +692,11 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
pTaskInfo->paramSet = true;
|
pTaskInfo->paramSet = true;
|
||||||
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
|
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
|
||||||
} else {
|
} else {
|
||||||
pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
|
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pRes == NULL) {
|
if (pRes == NULL) {
|
||||||
st = taosGetTimestampUs();
|
st = taosGetTimestampUs();
|
||||||
}
|
}
|
||||||
|
@ -718,6 +720,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
} else {
|
} else {
|
||||||
void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
|
void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
p = *(SSDataBlock**)tmp;
|
p = *(SSDataBlock**)tmp;
|
||||||
code = copyDataBlock(p, pRes);
|
code = copyDataBlock(p, pRes);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -735,8 +738,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
|
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->pSubplan->dynamicRowThreshold) {
|
if (pTaskInfo->pSubplan->dynamicRowThreshold) {
|
||||||
pTaskInfo->pSubplan->rowsThreshold -= current;
|
pTaskInfo->pSubplan->rowsThreshold -= current;
|
||||||
}
|
}
|
||||||
|
@ -750,7 +755,6 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
|
||||||
(void)cleanUpUdfs();
|
(void)cleanUpUdfs();
|
||||||
|
|
||||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||||
|
@ -758,6 +762,11 @@ _end:
|
||||||
GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
|
GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
|
||||||
|
|
||||||
atomic_store_64(&pTaskInfo->owner, 0);
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
|
if (code) {
|
||||||
|
pTaskInfo->code = code;
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
return pTaskInfo->code;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -778,9 +787,10 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
|
||||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
int64_t threadId = taosGetSelfPthreadId();
|
int64_t threadId = taosGetSelfPthreadId();
|
||||||
|
int32_t lino = 0;
|
||||||
|
int64_t curOwner = 0;
|
||||||
|
|
||||||
*pRes = NULL;
|
*pRes = NULL;
|
||||||
int64_t curOwner = 0;
|
|
||||||
|
|
||||||
// todo extract method
|
// todo extract method
|
||||||
taosRLockLatch(&pTaskInfo->lock);
|
taosRLockLatch(&pTaskInfo->lock);
|
||||||
|
@ -822,7 +832,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
|
int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
|
||||||
|
if (code) {
|
||||||
|
pTaskInfo->code = code;
|
||||||
|
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t el = (taosGetTimestampUs() - st);
|
uint64_t el = (taosGetTimestampUs() - st);
|
||||||
|
|
||||||
pTaskInfo->cost.elapsedTime += el;
|
pTaskInfo->cost.elapsedTime += el;
|
||||||
|
@ -830,8 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
*useconds = pTaskInfo->cost.elapsedTime;
|
*useconds = pTaskInfo->cost.elapsedTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmpRes = cleanUpUdfs();
|
(void) cleanUpUdfs();
|
||||||
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
|
|
||||||
|
|
||||||
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
|
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
|
||||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||||
|
|
|
@ -1291,12 +1291,8 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pResBlock = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]);
|
code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
|
||||||
if (*pResBlock == NULL && terrno != 0) {
|
return code;
|
||||||
return terrno;
|
|
||||||
} else {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool compareVal(const char* v, const SStateKeys* pKey) {
|
bool compareVal(const char* v, const SStateKeys* pKey) {
|
||||||
|
|
|
@ -318,12 +318,6 @@ static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doFillNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
void destroyFillOperatorInfo(void* param) {
|
void destroyFillOperatorInfo(void* param) {
|
||||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
|
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
|
||||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||||
|
@ -513,7 +507,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFillNext, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL,
|
||||||
optrDefaultGetNextExtFn, NULL);
|
optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -692,39 +692,48 @@ _return:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SSDataBlock** ppRes) {
|
static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx,
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
SSDataBlock** ppRes) {
|
||||||
SOperatorParam* pDownstreamParam = NULL;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SSDataBlock* pBlock = NULL;
|
SOperatorParam* pDownstreamParam = NULL;
|
||||||
|
SSDataBlock* pBlock = NULL;
|
||||||
SGroupCacheOperatorInfo* pGCache = pOperator->info;
|
SGroupCacheOperatorInfo* pGCache = pOperator->info;
|
||||||
|
|
||||||
code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
|
code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* pDownstream = pOperator->pDownstream[downstreamIdx];
|
||||||
if (pDownstreamParam) {
|
if (pDownstreamParam) {
|
||||||
code = pOperator->pDownstream[downstreamIdx]->fpSet.getNextExtFn(pOperator->pDownstream[downstreamIdx], pDownstreamParam, &pBlock);
|
code = pDownstream->fpSet.getNextExtFn(pDownstream, pDownstreamParam, &pBlock);
|
||||||
} else {
|
} else {
|
||||||
pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]);
|
code = pDownstream->fpSet.getNextFn(pDownstream, &pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
qError("failed to get block from downstream, code:%s %s", tstrerror(code), GET_TASKID(pOperator->pTaskInfo));
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock) {
|
if (pBlock) {
|
||||||
qDebug("%s blk retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId);
|
qDebug("%s res block retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId);
|
||||||
|
|
||||||
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
|
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
|
||||||
if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
|
if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
|
||||||
code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
|
code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock)) {
|
|
||||||
|
if (NULL == taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock,
|
||||||
|
&pGCache->pDownstreams[downstreamIdx].pBaseBlock)) {
|
||||||
QRY_ERR_RET(terrno);
|
QRY_ERR_RET(terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppRes = pBlock;
|
*ppRes = pBlock;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -531,12 +531,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = hashGroupbyAggregateNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
|
int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
|
||||||
SOperatorInfo** pOptrInfo) {
|
SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
@ -600,7 +594,7 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
|
||||||
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
|
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
|
||||||
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
|
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregateNext, NULL, destroyGroupOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
@ -1103,12 +1097,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = hashPartitionNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyPartitionOperatorInfo(void* param) {
|
static void destroyPartitionOperatorInfo(void* param) {
|
||||||
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
|
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
|
@ -1232,7 +1220,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
||||||
setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartitionNext, NULL, destroyPartitionOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -1583,12 +1571,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doStreamHashPartitionNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyStreamPartitionOperatorInfo(void* param) {
|
static void destroyStreamPartitionOperatorInfo(void* param) {
|
||||||
SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param;
|
SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
|
@ -1785,7 +1767,7 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo,
|
createOperatorFpSet(optrDummyOpenFn, doStreamHashPartitionNext, NULL, destroyStreamPartitionOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
|
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
|
||||||
|
|
||||||
|
|
|
@ -989,13 +989,14 @@ void hJoinSetDone(struct SOperatorInfo* pOperator) {
|
||||||
qDebug("hash Join done");
|
qDebug("hash Join done");
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
|
static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
SHJoinOperatorInfo* pJoin = pOperator->info;
|
SHJoinOperatorInfo* pJoin = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SSDataBlock* pRes = pJoin->finBlk;
|
SSDataBlock* pRes = pJoin->finBlk;
|
||||||
int64_t st = 0;
|
int64_t st = 0;
|
||||||
|
|
||||||
|
QRY_OPTR_CHECK(pResBlock);
|
||||||
if (pOperator->cost.openCost == 0) {
|
if (pOperator->cost.openCost == 0) {
|
||||||
st = taosGetTimestampUs();
|
st = taosGetTimestampUs();
|
||||||
}
|
}
|
||||||
|
@ -1004,7 +1005,7 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
|
||||||
pRes->info.rows = 0;
|
pRes->info.rows = 0;
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pJoin->keyHashBuilt) {
|
if (!pJoin->keyHashBuilt) {
|
||||||
pJoin->keyHashBuilt = true;
|
pJoin->keyHashBuilt = true;
|
||||||
|
|
||||||
|
@ -1012,7 +1013,7 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
|
||||||
code = hJoinBuildHash(pOperator, &queryDone);
|
code = hJoinBuildHash(pOperator, &queryDone);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queryDone) {
|
if (queryDone) {
|
||||||
|
@ -1026,18 +1027,20 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
|
||||||
code = (*pJoin->joinFp)(pOperator);
|
code = (*pJoin->joinFp)(pOperator);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
|
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
|
||||||
code = doFilter(pRes, pJoin->pFinFilter, NULL);
|
code = doFilter(pRes, pJoin->pFinFilter, NULL);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRes->info.rows > 0) {
|
if (pRes->info.rows > 0) {
|
||||||
return pRes;
|
*pResBlock = pRes;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1050,39 +1053,41 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pJoin->execInfo.probeBlkNum++;
|
pJoin->execInfo.probeBlkNum++;
|
||||||
pJoin->execInfo.probeBlkRows += pBlock->info.rows;
|
pJoin->execInfo.probeBlkRows += pBlock->info.rows;
|
||||||
|
|
||||||
code = hJoinPrepareStart(pOperator, pBlock);
|
code = hJoinPrepareStart(pOperator, pBlock);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) {
|
if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
|
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
|
||||||
code = doFilter(pRes, pJoin->pFinFilter, NULL);
|
code = doFilter(pRes, pJoin->pFinFilter, NULL);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRes->info.rows > 0) {
|
if (pRes->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (pOperator->cost.openCost == 0) {
|
if (pOperator->cost.openCost == 0) {
|
||||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (pRes->info.rows > 0) ? pRes : NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if (pRes->info.rows > 0) {
|
||||||
|
*pResBlock = pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static void destroyHashJoinOperator(void* param) {
|
static void destroyHashJoinOperator(void* param) {
|
||||||
SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param;
|
SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param;
|
||||||
|
|
|
@ -1691,13 +1691,13 @@ void mJoinResetOperator(struct SOperatorInfo* pOperator) {
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
|
int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
SMJoinOperatorInfo* pJoin = pOperator->info;
|
SMJoinOperatorInfo* pJoin = pOperator->info;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
|
if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
|
||||||
qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo));
|
qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo));
|
||||||
return NULL;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
mJoinResetOperator(pOperator);
|
mJoinResetOperator(pOperator);
|
||||||
qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo));
|
qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo));
|
||||||
|
@ -1739,7 +1739,10 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0;
|
pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0;
|
||||||
return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL;
|
if (pBlock && pBlock->info.rows > 0) {
|
||||||
|
*pResBlock = pBlock;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyGrpArray(void* ppArray) {
|
void destroyGrpArray(void* ppArray) {
|
||||||
|
|
|
@ -57,18 +57,16 @@ typedef struct SMultiwayMergeOperatorInfo {
|
||||||
bool inputWithGroupId;
|
bool inputWithGroupId;
|
||||||
} SMultiwayMergeOperatorInfo;
|
} SMultiwayMergeOperatorInfo;
|
||||||
|
|
||||||
static SSDataBlock* doSortMerge1(SOperatorInfo* pOperator);
|
static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||||
static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
static int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||||
static SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator);
|
|
||||||
static int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
static int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||||
static SSDataBlock* doNonSortMerge1(SOperatorInfo* pOperator);
|
|
||||||
static SSDataBlock* doColsMerge1(SOperatorInfo* pOperator);
|
|
||||||
static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||||
|
static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
|
||||||
|
|
||||||
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||||
*ppBlock = pOperator->fpSet.getNextFn(pOperator);
|
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
|
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
|
@ -185,12 +183,6 @@ static int32_t doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHand
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doSortMerge1(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pBlock = NULL;
|
|
||||||
pOperator->pTaskInfo->code = doSortMerge(pOperator, &pBlock);
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
|
@ -343,12 +335,6 @@ int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doNonSortMerge1(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pBlock = NULL;
|
|
||||||
pOperator->pTaskInfo->code = doNonSortMerge(pOperator, &pBlock);
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
QRY_OPTR_CHECK(pResBlock);
|
QRY_OPTR_CHECK(pResBlock);
|
||||||
|
|
||||||
|
@ -432,12 +418,6 @@ int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doColsMerge1(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pBlock = NULL;
|
|
||||||
pOperator->pTaskInfo->code = doColsMerge(pOperator, &pBlock);
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
QRY_OPTR_CHECK(pResBlock);
|
QRY_OPTR_CHECK(pResBlock);
|
||||||
|
|
||||||
|
@ -486,9 +466,9 @@ int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u
|
||||||
|
|
||||||
SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = {
|
SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = {
|
||||||
{0},
|
{0},
|
||||||
{._openFn = openSortMergeOperator, .getNextFn = doSortMerge1, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo},
|
{._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo},
|
||||||
{._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge1, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo},
|
{._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo},
|
||||||
{._openFn = openColsMergeOperator, .getNextFn = doColsMerge1, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo},
|
{._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -518,9 +498,11 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
|
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
|
QRY_OPTR_CHECK(pResBlock);
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
|
@ -529,19 +511,25 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
pTaskInfo->code = code;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
|
if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
|
||||||
pBlock = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator);
|
code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, &pBlock);
|
||||||
|
if (code) {
|
||||||
|
pTaskInfo->code = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
} else {
|
} else {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pBlock;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyMultiwayMergeOperatorInfo(void* param) {
|
void destroyMultiwayMergeOperatorInfo(void* param) {
|
||||||
|
|
|
@ -862,10 +862,13 @@ int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam*
|
||||||
int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
|
int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
pOperator->pTaskInfo->code = code;
|
pOperator->pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
|
} else {
|
||||||
|
code = pOperator->fpSet.getNextFn(pOperator, pRes);
|
||||||
|
if (code) {
|
||||||
|
pOperator->pTaskInfo->code = code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*pRes = pOperator->fpSet.getNextFn(pOperator);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -165,7 +165,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation1, NULL, destroyProjectOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
|
setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
|
||||||
|
|
||||||
|
@ -445,16 +445,6 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doProjectOperation(pOperator, &pRes);
|
|
||||||
if (code && pOperator->pTaskInfo->code == 0) {
|
|
||||||
pOperator->pTaskInfo->code = code;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
|
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
|
||||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
@ -526,7 +516,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction1, NULL, destroyIndefinitOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -68,6 +68,7 @@ typedef struct STableCountScanOperatorInfo {
|
||||||
} STableCountScanOperatorInfo;
|
} STableCountScanOperatorInfo;
|
||||||
|
|
||||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||||
|
static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
|
||||||
|
|
||||||
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -1001,30 +1002,30 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
static int32_t doGroupedTableScan(SOperatorInfo* pOperator, SSDataBlock** pBlock) {
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doTableScanImplNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
|
QRY_OPTR_CHECK(pBlock);
|
||||||
|
|
||||||
// The read handle is not initialized yet, since no qualified tables exists
|
// The read handle is not initialized yet, since no qualified tables exists
|
||||||
if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
|
if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// do the ascending order traverse in the first place.
|
// do the ascending order traverse in the first place.
|
||||||
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
SSDataBlock* p = NULL;
|
||||||
|
code = doTableScanImplNext(pOperator, &p);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
markGroupProcessed(pTableScanInfo, p->info.id.groupId);
|
markGroupProcessed(pTableScanInfo, p->info.id.groupId);
|
||||||
return p;
|
*pBlock = p;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->scanTimes += 1;
|
pTableScanInfo->scanTimes += 1;
|
||||||
|
@ -1052,10 +1053,14 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pTableScanInfo->scanTimes < total) {
|
while (pTableScanInfo->scanTimes < total) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
SSDataBlock* p = NULL;
|
||||||
|
code = doTableScanImplNext(pOperator, &p);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
markGroupProcessed(pTableScanInfo, p->info.id.groupId);
|
markGroupProcessed(pTableScanInfo, p->info.id.groupId);
|
||||||
return p;
|
*pBlock = p;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->scanTimes += 1;
|
pTableScanInfo->scanTimes += 1;
|
||||||
|
@ -1079,8 +1084,13 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
||||||
pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
||||||
STableKeyInfo* pStart =
|
STableKeyInfo* pStart =
|
||||||
(STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex);
|
(STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex);
|
||||||
if (NULL == pStart) return NULL;
|
|
||||||
return getBlockForEmptyTable(pOperator, pStart);
|
if (NULL == pStart) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pBlock = getBlockForEmptyTable(pOperator, pStart);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
} else { // group by tag + no sort
|
} else { // group by tag + no sort
|
||||||
int32_t numOfTables = 0;
|
int32_t numOfTables = 0;
|
||||||
|
@ -1096,7 +1106,9 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
||||||
STableKeyInfo info = {.uid = *(uint64_t*)pIte, .groupId = *pGroupId};
|
STableKeyInfo info = {.uid = *(uint64_t*)pIte, .groupId = *pGroupId};
|
||||||
taosHashCancelIterate(pTableListInfo->remainGroups, pIte);
|
taosHashCancelIterate(pTableListInfo->remainGroups, pIte);
|
||||||
markGroupProcessed(pTableScanInfo, *pGroupId);
|
markGroupProcessed(pTableScanInfo, *pGroupId);
|
||||||
return getBlockForEmptyTable(pOperator, &info);
|
*pBlock = getBlockForEmptyTable(pOperator, &info);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1107,9 +1119,9 @@ _end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
}
|
||||||
return NULL;
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
|
static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
|
||||||
|
@ -1169,13 +1181,16 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
static int32_t startNextGroupScan(SOperatorInfo* pOperator, SSDataBlock** pResult) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
STableScanInfo* pInfo = pOperator->info;
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
int32_t numOfTables = 0;
|
int32_t numOfTables = 0;
|
||||||
|
|
||||||
|
QRY_OPTR_CHECK(pResult);
|
||||||
|
|
||||||
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
@ -1185,7 +1200,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
||||||
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
|
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
|
||||||
taosHashClear(pInfo->base.pTableListInfo->map);
|
taosHashClear(pInfo->base.pTableListInfo->map);
|
||||||
}
|
}
|
||||||
return NULL;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset value for the next group data output
|
// reset value for the next group data output
|
||||||
|
@ -1204,21 +1219,22 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
|
|
||||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
code = doGroupedTableScan(pOperator, pResult);
|
||||||
if (result != NULL) {
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
if (*pResult != NULL) {
|
||||||
if (pOperator->dynamicTask) {
|
if (pOperator->dynamicTask) {
|
||||||
result->info.id.groupId = result->info.id.uid;
|
(*pResult)->info.id.groupId = (*pResult)->info.id.uid;
|
||||||
}
|
}
|
||||||
return result;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
}
|
||||||
return NULL;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
|
@ -1257,7 +1273,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result = doGroupedTableScan(pOperator);
|
result = NULL;
|
||||||
|
code = doGroupedTableScan(pOperator, &result);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (result != NULL) {
|
if (result != NULL) {
|
||||||
if (pOperator->dynamicTask) {
|
if (pOperator->dynamicTask) {
|
||||||
result->info.id.groupId = result->info.id.uid;
|
result->info.id.groupId = result->info.id.uid;
|
||||||
|
@ -1266,7 +1285,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
result = startNextGroupScan(pOperator);
|
code = startNextGroupScan(pOperator, &result);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (result || pOperator->status == OP_EXEC_DONE) {
|
if (result || pOperator->status == OP_EXEC_DONE) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -1287,6 +1308,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
STableScanInfo* pInfo = pOperator->info;
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
QRY_OPTR_CHECK(ppRes);
|
||||||
|
|
||||||
if (pOperator->pOperatorGetParam) {
|
if (pOperator->pOperatorGetParam) {
|
||||||
pOperator->dynamicTask = true;
|
pOperator->dynamicTask = true;
|
||||||
|
@ -1299,8 +1321,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
pInfo->currentGroupId = -1;
|
pInfo->currentGroupId = -1;
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
SSDataBlock* result = NULL;
|
SSDataBlock* result = NULL;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
result = startNextGroupScan(pOperator);
|
code = startNextGroupScan(pOperator, &result);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (result || pOperator->status == OP_EXEC_DONE) {
|
if (result || pOperator->status == OP_EXEC_DONE) {
|
||||||
(*ppRes) = result;
|
(*ppRes) = result;
|
||||||
return code;
|
return code;
|
||||||
|
@ -1316,7 +1341,10 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
pInfo->countState = TABLE_COUNT_STATE_END;
|
pInfo->countState = TABLE_COUNT_STATE_END;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
SSDataBlock* result = NULL;
|
||||||
|
code = doGroupedTableScan(pOperator, &result);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
|
if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
|
||||||
(*ppRes) = result;
|
(*ppRes) = result;
|
||||||
return code;
|
return code;
|
||||||
|
@ -1340,6 +1368,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
(*ppRes) = NULL;
|
(*ppRes) = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||||
|
@ -1368,16 +1397,9 @@ _end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
}
|
||||||
(*ppRes) = NULL;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
return code;
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doTableScanNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||||
|
@ -1497,7 +1519,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
||||||
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
||||||
|
|
||||||
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
|
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanNext, NULL, destroyTableScanOperatorInfo,
|
||||||
optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
// for non-blocking operator, the open cost is always 0
|
// for non-blocking operator, the open cost is always 0
|
||||||
|
@ -1535,7 +1557,7 @@ int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskIn
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImplNext, NULL, NULL, optrDefaultBufFn, NULL,
|
||||||
optrDefaultGetNextExtFn, NULL);
|
optrDefaultGetNextExtFn, NULL);
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
return code;
|
return code;
|
||||||
|
@ -1832,18 +1854,23 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t
|
||||||
static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex,
|
static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex,
|
||||||
SSDataBlock** ppRes) {
|
SSDataBlock** ppRes) {
|
||||||
qDebug("do stream range scan. windows index:%d", *pRowIndex);
|
qDebug("do stream range scan. windows index:%d", *pRowIndex);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
bool prepareRes = true;
|
bool prepareRes = true;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pResult = NULL;
|
SSDataBlock* pResult = NULL;
|
||||||
pResult = doTableScan(pInfo->pTableScanOp);
|
code = doTableScanNext(pInfo->pTableScanOp, &pResult);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
prepareRangeScan(pInfo, pSDB, pRowIndex, &prepareRes);
|
prepareRangeScan(pInfo, pSDB, pRowIndex, &prepareRes);
|
||||||
// scan next window data
|
// scan next window data
|
||||||
pResult = doTableScan(pInfo->pTableScanOp);
|
code = doTableScanNext(pInfo->pTableScanOp, &pResult);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
if (prepareRes) {
|
if (prepareRes) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -2891,7 +2918,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
|
||||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pResult = NULL;
|
||||||
|
code = doTableScanNext(pInfo->pTableScanOp, &pResult);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pResult && pResult->info.rows > 0) {
|
if (pResult && pResult->info.rows > 0) {
|
||||||
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
|
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
|
||||||
|
@ -3231,7 +3260,9 @@ static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
|
code = doTableScanNext(pInfo->pTableScanOp, &pInfo->pRecoverRes);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pInfo->pRecoverRes != NULL) {
|
if (pInfo->pRecoverRes != NULL) {
|
||||||
code = calBlockTbName(pInfo, pInfo->pRecoverRes, 0);
|
code = calBlockTbName(pInfo, pInfo->pRecoverRes, 0);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -3784,12 +3815,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doRawScanNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyRawScanOperatorInfo(void* param) {
|
static void destroyRawScanOperatorInfo(void* param) {
|
||||||
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
|
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
|
||||||
pRawScan->pAPI->tsdReader.tsdReaderClose(pRawScan->dataReader);
|
pRawScan->pAPI->tsdReader.tsdReaderClose(pRawScan->dataReader);
|
||||||
|
@ -3828,7 +3853,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo
|
||||||
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL,
|
pOperator->fpSet = createOperatorFpSet(NULL, doRawScanNext, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL,
|
||||||
optrDefaultGetNextExtFn, NULL);
|
optrDefaultGetNextExtFn, NULL);
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
return code;
|
return code;
|
||||||
|
@ -4207,7 +4232,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
|
||||||
__optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
|
__optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScanNext : doQueueScanNext;
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn,
|
||||||
NULL, optrDefaultGetNextExtFn, NULL);
|
NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState);
|
setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState);
|
||||||
|
@ -4779,7 +4804,7 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//TODO wjm check pInfo->filterCtx.code
|
//TODO wjm check pInfo->filterCtx.code
|
||||||
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry;
|
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdxNext : doTagScanFromMetaEntryNext;
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
|
@ -6044,7 +6069,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(
|
pOperator->fpSet = createOperatorFpSet(
|
||||||
optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL,
|
optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTablesNext : doTableMergeScanNext, NULL,
|
||||||
destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn,
|
destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn,
|
||||||
NULL);
|
NULL);
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
|
@ -6070,7 +6095,6 @@ _error:
|
||||||
|
|
||||||
// ====================================================================================================================
|
// ====================================================================================================================
|
||||||
// TableCountScanOperator
|
// TableCountScanOperator
|
||||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
|
|
||||||
static void destoryTableCountScanOperator(void* param);
|
static void destoryTableCountScanOperator(void* param);
|
||||||
static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||||
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
|
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
|
||||||
|
@ -6212,7 +6236,7 @@ int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountSca
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScanNext, NULL, destoryTableCountScanOperator,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
return code;
|
return code;
|
||||||
|
@ -6381,12 +6405,6 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doTableCountScanNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||||
STableCountScanSupp* pSupp, SSDataBlock* pRes) {
|
STableCountScanSupp* pSupp, SSDataBlock* pRes) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -43,11 +43,9 @@ typedef struct SSortOperatorInfo {
|
||||||
SSortOpGroupIdCalc* pGroupIdCalc;
|
SSortOpGroupIdCalc* pGroupIdCalc;
|
||||||
} SSortOperatorInfo;
|
} SSortOperatorInfo;
|
||||||
|
|
||||||
static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||||
static SSDataBlock* doSort1(SOperatorInfo* pOperator);
|
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
|
||||||
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
|
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
||||||
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
|
||||||
static SSDataBlock* doGroupSort1(SOperatorInfo* pOperator);
|
|
||||||
static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||||
|
|
||||||
static void destroySortOperatorInfo(void* param);
|
static void destroySortOperatorInfo(void* param);
|
||||||
|
@ -149,7 +147,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
||||||
// TODO dynamic set the available sort buffer
|
// TODO dynamic set the available sort buffer
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(doOpenSortOperator, doSort1, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -342,8 +340,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
||||||
|
|
||||||
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||||
*ppBlock = pOperator->fpSet.getNextFn(pOperator);
|
return pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor: merged with fetch fp
|
// todo refactor: merged with fetch fp
|
||||||
|
@ -404,12 +401,6 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doSort1(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pBlock = NULL;
|
|
||||||
pOperator->pTaskInfo->code = doSort(pOperator, &pBlock);
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
QRY_OPTR_CHECK(pResBlock);
|
QRY_OPTR_CHECK(pResBlock);
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -613,17 +604,23 @@ typedef struct SGroupSortSourceParam {
|
||||||
} SGroupSortSourceParam;
|
} SGroupSortSourceParam;
|
||||||
|
|
||||||
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
|
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||||
*ppBlock = NULL;
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
SGroupSortSourceParam* source = param;
|
SGroupSortSourceParam* source = param;
|
||||||
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
|
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
|
||||||
|
SSDataBlock* block = NULL;
|
||||||
|
|
||||||
|
QRY_OPTR_CHECK(ppBlock);
|
||||||
|
|
||||||
if (grpSortOpInfo->prefetchedSortInput) {
|
if (grpSortOpInfo->prefetchedSortInput) {
|
||||||
SSDataBlock* block = grpSortOpInfo->prefetchedSortInput;
|
block = grpSortOpInfo->prefetchedSortInput;
|
||||||
grpSortOpInfo->prefetchedSortInput = NULL;
|
grpSortOpInfo->prefetchedSortInput = NULL;
|
||||||
*ppBlock = block;
|
*ppBlock = block;
|
||||||
} else {
|
} else {
|
||||||
SOperatorInfo* childOp = source->childOpInfo;
|
SOperatorInfo* childOp = source->childOpInfo;
|
||||||
SSDataBlock* block = childOp->fpSet.getNextFn(childOp);
|
code = childOp->fpSet.getNextFn(childOp, &block);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (block != NULL) {
|
if (block != NULL) {
|
||||||
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
|
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
|
||||||
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
|
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
|
||||||
|
@ -637,7 +634,12 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
|
_end:
|
||||||
|
if (code != 0) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t beginSortGroup(SOperatorInfo* pOperator) {
|
int32_t beginSortGroup(SOperatorInfo* pOperator) {
|
||||||
|
@ -695,12 +697,6 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doGroupSort1(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pBlock = NULL;
|
|
||||||
pOperator->pTaskInfo->code = doGroupSort(pOperator, &pBlock);
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
QRY_OPTR_CHECK(pResBlock);
|
QRY_OPTR_CHECK(pResBlock);
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -836,7 +832,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
|
||||||
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
|
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
|
||||||
setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort1, NULL, destroyGroupSortOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
|
||||||
optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -658,10 +658,14 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
code = downstream->fpSet.getNextFn(downstream, &pBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
|
@ -745,12 +749,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doStreamCountAggNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamCountReleaseState(SOperatorInfo* pOperator) {
|
void streamCountReleaseState(SOperatorInfo* pOperator) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -908,7 +906,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
||||||
|
|
||||||
|
|
|
@ -629,10 +629,15 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
|
||||||
|
code = downstream->fpSet.getNextFn(downstream, &pBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
|
@ -734,12 +739,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doStreamEventAggNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamEventReleaseState(SOperatorInfo* pOperator) {
|
void streamEventReleaseState(SOperatorInfo* pOperator) {
|
||||||
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||||
|
@ -966,7 +965,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAggNext, NULL, destroyStreamEventOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
||||||
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
||||||
|
|
|
@ -1164,12 +1164,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doStreamFillNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
|
static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
|
||||||
pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols;
|
pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols;
|
||||||
for (int i = 0; i < pFillSup->numOfAllCols; i++) {
|
for (int i = 0; i < pFillSup->numOfAllCols; i++) {
|
||||||
|
@ -1444,7 +1438,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
||||||
pInfo->srcRowIndex = -1;
|
pInfo->srcRowIndex = -1;
|
||||||
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
|
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ typedef struct SPullWindowInfo {
|
||||||
STimeWindow calWin;
|
STimeWindow calWin;
|
||||||
} SPullWindowInfo;
|
} SPullWindowInfo;
|
||||||
|
|
||||||
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator);
|
static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
|
||||||
|
|
||||||
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);
|
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);
|
||||||
|
|
||||||
|
@ -1603,7 +1603,10 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
code = downstream->fpSet.getNextFn(downstream, &pBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
|
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
|
||||||
|
@ -1611,6 +1614,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
||||||
pInfo->numOfDatapack = 0;
|
pInfo->numOfDatapack = 0;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->numOfDatapack++;
|
pInfo->numOfDatapack++;
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
@ -1765,12 +1769,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doStreamFinalIntervalAggNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
|
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
|
||||||
if (pWinPhyNode->deleteMark <= 0) {
|
if (pWinPhyNode->deleteMark <= 0) {
|
||||||
return DEAULT_DELETE_MARK;
|
return DEAULT_DELETE_MARK;
|
||||||
|
@ -1997,10 +1995,10 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
} else {
|
} else {
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
}
|
}
|
||||||
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
||||||
|
@ -3397,7 +3395,10 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
||||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
code = downstream->fpSet.getNextFn(downstream, &pBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3840,7 +3841,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
||||||
|
|
||||||
|
@ -3966,7 +3967,10 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
code = downstream->fpSet.getNextFn(downstream, &pBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
break;
|
break;
|
||||||
|
@ -4076,7 +4080,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAggNext, NULL, destroyStreamSessionAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState);
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState);
|
||||||
}
|
}
|
||||||
|
@ -4693,7 +4697,10 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
code = downstream->fpSet.getNextFn(downstream, &pBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -5014,7 +5021,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAggNext, NULL, destroyStreamStateOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
||||||
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
||||||
|
@ -5108,7 +5115,10 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
code = downstream->fpSet.getNextFn(downstream, &pBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
|
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
|
||||||
pInfo->numOfDatapack);
|
pInfo->numOfDatapack);
|
||||||
|
@ -5222,12 +5232,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doStreamIntervalAggNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||||
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
@ -5338,7 +5342,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
|
createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
||||||
|
|
||||||
|
@ -5603,7 +5607,10 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
code = downstream->fpSet.getNextFn(downstream, &pBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
|
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
|
||||||
|
|
|
@ -2053,15 +2053,6 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doSysTableScanNext(pOperator, &pRes);
|
|
||||||
if (code) {
|
|
||||||
terrno = code;
|
|
||||||
}
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
|
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
|
||||||
SSDataBlock* pBlock) {
|
SSDataBlock* pBlock) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -2252,7 +2243,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo
|
||||||
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScanNext, NULL, destroySysScanOperator,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
return code;
|
return code;
|
||||||
|
@ -2868,7 +2859,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScanNext, NULL, destroyBlockDistScanOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -1085,12 +1085,6 @@ _finished:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doTimesliceNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn* pPkColumn) {
|
static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn* pPkColumn) {
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
FOREACH(pNode, pFuncs) {
|
FOREACH(pNode, pFuncs) {
|
||||||
|
@ -1189,7 +1183,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimesliceNext, NULL, destroyTimeSliceOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
|
@ -1161,12 +1161,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doStateWindowAggNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1211,12 +1205,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doBuildIntervalResultNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyStateWindowOperatorInfo(void* param) {
|
static void destroyStateWindowOperatorInfo(void* param) {
|
||||||
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
|
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
|
@ -1431,7 +1419,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
|
||||||
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -1631,12 +1619,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doSessionWindowAggNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo make this as an non-blocking operator
|
// todo make this as an non-blocking operator
|
||||||
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
|
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
|
||||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
|
@ -1712,7 +1694,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -1816,7 +1798,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -2061,12 +2043,6 @@ static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = mergeAlignedIntervalAggNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
|
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
|
||||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
@ -2139,7 +2115,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
||||||
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
|
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
|
||||||
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
|
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -2411,12 +2387,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doMergeIntervalAggNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
|
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
@ -2483,7 +2453,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
||||||
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
|
||||||
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
|
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
|
||||||
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
|
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
Loading…
Reference in New Issue