feat: add dynamic query ctrl operator

This commit is contained in:
dapan1121 2023-07-05 11:07:08 +08:00
parent ac35ae9839
commit 7b9d73c77a
25 changed files with 460 additions and 121 deletions

View File

@ -1848,6 +1848,7 @@ typedef struct {
uint64_t queryId;
uint64_t taskId;
int32_t execId;
void* opParam;
} SResFetchReq;
int32_t tSerializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);

View File

@ -444,16 +444,16 @@ typedef struct SGroupCachePhysiNode {
SNodeList* pGroupCols;
} SGroupCachePhysiNode;
typedef struct SStbJoinDynCtrlInfo {
typedef struct SStbJoinDynCtrlBasic {
int32_t vgSlot[2];
int32_t uidSlot[2];
} SStbJoinDynCtrlInfo;
} SStbJoinDynCtrlBasic;
typedef struct SDynQueryCtrlPhysiNode {
SPhysiNode node;
EDynQueryType qType;
union {
SStbJoinDynCtrlInfo stbJoin;
SStbJoinDynCtrlBasic stbJoin;
};
} SDynQueryCtrlPhysiNode;

View File

@ -19,8 +19,30 @@
extern "C" {
#endif
typedef struct SStbJoinPrevJoinCtx {
SSDataBlock* pLastBlk;
int32_t lastRow;
} SStbJoinPrevJoinCtx;
typedef struct SStbJoinPostJoinCtx {
bool isStarted;
} SStbJoinPostJoinCtx;
typedef struct SStbJoinDynCtrlCtx {
SStbJoinPrevJoinCtx prev;
SStbJoinPostJoinCtx post;
} SStbJoinDynCtrlCtx;
typedef struct SStbJoinDynCtrlInfo {
SStbJoinDynCtrlBasic basic;
SStbJoinDynCtrlCtx ctx;
} SStbJoinDynCtrlInfo;
typedef struct SDynQueryCtrlOperatorInfo {
SStbJoinDynCtrlInfo ctrlInfo;
EDynQueryType qType;
union {
SStbJoinDynCtrlInfo stbJoin;
};
} SDynQueryCtrlOperatorInfo;
#ifdef __cplusplus

View File

@ -115,8 +115,9 @@ typedef struct SExprSupp {
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
EX_SOURCE_DATA_STARTED,
EX_SOURCE_DATA_READY,
EX_SOURCE_DATA_EXHAUSTED,
} EX_SOURCE_STATUS;
#define COL_MATCH_FROM_COL_ID 0x1
@ -138,8 +139,19 @@ typedef struct SLimitInfo {
int64_t numOfOutputRows;
} SLimitInfo;
typedef struct SSortMergeJoinOperatorParam {
SOperatorParam* pChild;
} SSortMergeJoinOperatorParam;
typedef struct SExchangeOperatorParam {
SOperatorParam* pChild;
int32_t vgId;
SArray* uidList;
} SExchangeOperatorParam;
typedef struct SExchangeInfo {
SArray* pSources;
SSHashObj* pHashSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
@ -150,6 +162,7 @@ typedef struct SExchangeInfo {
SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
bool dynamicOp;
int32_t current;
SLoadRemoteDataInfo loadInfo;
uint64_t self;

View File

@ -22,7 +22,7 @@ extern "C" {
#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760
typedef struct SGcOperatorParam {
SOperatorBasicParam basic;
SOperatorParam* pChild;
int64_t sessionId;
int32_t downstreamIdx;
bool needCache;

View File

@ -64,6 +64,7 @@ typedef struct SGroupData {
} SGroupData;
typedef struct SHJoinTableInfo {
int32_t downStreamIdx;
SOperatorInfo* downStream;
int32_t blkId;
SQueryStat inputStat;

View File

@ -27,19 +27,17 @@ typedef struct SOperatorCostInfo {
struct SOperatorInfo;
typedef struct SOperatorBasicParam {
bool newExec;
} SOperatorBasicParam;
typedef struct SOperatorSpecParam {
int32_t opType;
void* value;
} SOperatorSpecParam;
typedef struct SOperatorBaseParam {
SOperatorParam* pChild;
} SOperatorBaseParam;
typedef struct SOperatorParam {
SOperatorBasicParam basic;
int32_t opNum;
SOperatorSpecParam* pOpParams;
SArray* pOpParams; //SArray<SOperatorSpecParam>
} SOperatorParam;
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
@ -84,6 +82,8 @@ typedef struct SOperatorInfo {
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
SResultInfo resultInfo;
SOperatorBaseParam* pOperatorParam;
SOperatorParam** pDownstreamParams;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
SOperatorFpSet fpSet;
@ -174,6 +174,7 @@ void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
void* pInfo, SExecTaskInfo* pTaskInfo);
int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
SSDataBlock* optrDefaultGetExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam);
SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser, const char* dbname);
@ -183,7 +184,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI);
int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
void * getOperatorParam(int32_t opType, SOperatorParam* param);
void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx);
#ifdef __cplusplus
}

View File

@ -114,7 +114,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info;
@ -172,7 +172,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
while (1) {
bool blockAllocated = false;
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
if (!hasValidBlock) {
createDataBlockForEmptyInput(pOperator, &pBlock);

View File

@ -134,7 +134,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->cost.openCost = 0;
return pOperator;

View File

@ -27,28 +27,227 @@
#include "ttypes.h"
#include "dynqueryctrl.h"
int64_t gSessionId = 0;
static void destroyDynQueryCtrlOperator(void* param) {
SDynQueryCtrlOperatorInfo* pDynCtrlOperator = (SDynQueryCtrlOperatorInfo*)param;
taosMemoryFreeClear(param);
}
SSDataBlock* getBlockFromDynQueryCtrl(SOperatorInfo* pOperator) {
static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, bool needCache, void* pGrpValue, int32_t grpValSize, SOperatorParam* pChild) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
if (NULL == pGc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
pGc->downstreamIdx = downstreamIdx;
pGc->needCache = needCache;
pGc->pGroupValue = pGrpValue;
pGc->groupValueSize = grpValSize;
SOperatorSpecParam specParam;
specParam.opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
specParam.value = pGc;
taosArrayPush((*ppRes)->pOpParams, &specParam);
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t* pVgId, int64_t* pUid, SOperatorParam* pChild) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
if (NULL == pExc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExc->pChild = pChild;
pExc->vgId = *pVgId;
pExc->uidList = taosArrayInit(1, sizeof(int64_t));
if (NULL == pExc->uidList) {
taosMemoryFree(pExc);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pExc->uidList, pUid);
SOperatorSpecParam specParam;
specParam.opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
specParam.value = pExc;
taosArrayPush((*ppRes)->pOpParams, &specParam);
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pOpParams = taosArrayInit(2, sizeof(SOperatorSpecParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSortMergeJoinOperatorParam* pJoin0 = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
SSortMergeJoinOperatorParam* pJoin1 = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
if (NULL == pJoin0 || NULL == pJoin1) {
taosMemoryFree(pJoin0);
taosMemoryFree(pJoin1);
return TSDB_CODE_OUT_OF_MEMORY;
}
pJoin0->pChild = pChild0;
pJoin1->pChild = pChild1;
SOperatorSpecParam specParam;
specParam.opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
specParam.value = pJoin0;
taosArrayPush((*ppRes)->pOpParams, &specParam);
specParam.value = pJoin1;
taosArrayPush((*ppRes)->pOpParams, &specParam);
return TSDB_CODE_SUCCESS;
}
static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) {
int32_t rowIdx = pPrev->lastRow + 1;
SColumnInfoData* pVg0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[0]);
SColumnInfoData* pVg1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[1]);
SColumnInfoData* pUid0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.uidSlot[0]);
SColumnInfoData* pUid1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.uidSlot[1]);
SOperatorParam* pExcParam0 = NULL;
SOperatorParam* pExcParam1 = NULL;
SOperatorParam* pGcParam0 = NULL;
SOperatorParam* pGcParam1 = NULL;
int32_t code = buildExchangeOperatorParam(&pExcParam0, pVg0->pData + pVg0->info.bytes * rowIdx, pUid0->pData + pUid0->info.bytes * rowIdx, NULL);
if (TSDB_CODE_SUCCESS == code) {
code = buildExchangeOperatorParam(&pExcParam1, pVg1->pData + pVg1->info.bytes * rowIdx, pUid1->pData + pUid1->info.bytes * rowIdx, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, pUid1->pData + pUid1->info.bytes * rowIdx, pUid1->info.bytes, pExcParam1);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1);
}
return code;
}
static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
SOperatorParam* pParam = NULL;
int32_t code = buildStbJoinOperatorParam(pInfo, pPrev, &pParam);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
*ppRes = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam);
if (*ppRes) {
pPost->isStarted = true;
}
}
static void seqJoinWithSeqRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
while (true) {
SSDataBlock* pBlock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]);
if ((pPrev->lastRow + 1) >= pPrev->pLastBlk->info.rows) {
*ppRes = NULL;
pPrev->pLastBlk = NULL;
return;
}
seqJoinLaunchPostJoin(pOperator, ppRes);
pPrev->lastRow++;
if (*ppRes) {
break;
}
}
}
static void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
if (pPost->isStarted) {
*ppRes = getNextBlockFromDownstream(pOperator, 1);
if (NULL == *ppRes) {
pPost->isStarted = false;
} else {
return;
}
}
if (pStbJoin->ctx.prev.pLastBlk) {
seqJoinWithSeqRetrieve(pOperator, ppRes);
}
}
SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SSDataBlock* pRes = NULL;
seqJoinContinueRetrieve(pOperator, &pRes);
if (pRes) {
return pRes;
}
while (true) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (NULL == pBlock) {
break;
}
addBlkToGroupCache(pOperator, pBlock, &pRes);
pStbJoin->ctx.prev.pLastBlk = pBlock;
pStbJoin->ctx.prev.lastRow = -1;
seqJoinWithSeqRetrieve(pOperator, &pRes);
if (pRes) {
break;
}
}
return pRes;
}
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
__optr_fn_t nextFp = NULL;
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
@ -61,11 +260,21 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
goto _error;
}
memcpy(&pInfo->ctrlInfo, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
pInfo->qType = pPhyciNode->qType;
switch (pInfo->qType) {
case DYN_QTYPE_STB_HASH:
memcpy(&pInfo->stbJoin, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
nextFp = getResFromStbJoin;
break;
default:
qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
code = TSDB_CODE_INVALID_PARA;
goto _error;
}
setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getBlockFromDynQueryCtrl, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;

View File

@ -133,7 +133,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -193,7 +193,7 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}

View File

@ -40,6 +40,7 @@ typedef struct SSourceDataInfo {
int32_t code;
EX_SOURCE_STATUS status;
const char* taskId;
SArray* pUidList;
} SSourceDataInfo;
static void destroyExchangeOperatorInfo(void* param);
@ -244,6 +245,10 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pInfo->dynamicOp) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
@ -272,9 +277,17 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pExNode->node.dynamicOp) {
pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pInfo->pHashSources) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
for (int32_t i = 0; i < numOfSources; ++i) {
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
taosArrayPush(pInfo->pSources, pNode);
tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &i, sizeof(i));
}
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
@ -290,6 +303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto _error;
}
pInfo->dynamicOp = pExNode->node.dynamicOp;
int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -316,7 +330,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
}
pOperator->fpSet =
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;
_error:
@ -403,13 +417,15 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
}
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
pDataInfo->startTime = taosGetTimestampUs();
if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
return TSDB_CODE_SUCCESS;
}
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
pDataInfo->status = EX_SOURCE_DATA_STARTED;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
pDataInfo->startTime = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
pWrapper->exchangeId = pExchangeInfo->self;
@ -429,6 +445,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
req.taskId = pSource->taskId;
req.queryId = pTaskInfo->id.queryId;
req.execId = pSource->execId;
if (pDataInfo->pUidList) {
req.opParam = buildTableScanOperatorParam(pDataInfo->pUidList);
}
int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
if (msgSize < 0) {
@ -560,7 +579,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
int64_t startTs = taosGetTimestampUs();
// Asynchronously send all fetch requests to all sources.
@ -693,14 +712,41 @@ _error:
return code;
}
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam;
int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pParam->vgId, sizeof(pParam->vgId));
if (NULL == pIdx) {
qError("No exchange source for vgId: %d", pParam->vgId);
pOperator->pTaskInfo->code = TSDB_CODE_INVALID_PARA;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo);
dataInfo.index = *pIdx;
dataInfo.pUidList = taosArrayDup(pParam->uidList, NULL);
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
}
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
if (OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) {
return TSDB_CODE_SUCCESS;
}
if (pExchangeInfo->dynamicOp) {
code = addDynamicExchangeSource(pOperator);
if (code) {
return code;
}
}
int64_t st = taosGetTimestampUs();
SExchangeInfo* pExchangeInfo = pOperator->info;
if (!pExchangeInfo->seqLoadData) {
int32_t code = prepareConcurrentlyLoad(pOperator);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -197,7 +197,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
}
while (1) {
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
if (pInfo->totalInputRows == 0 &&
(pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
@ -444,7 +444,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
@ -1304,7 +1304,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
while (1) {
if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
// If there are delete datablocks, we receive them first.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
pInfo->pFillInfo->preRowKey = INT64_MIN;
@ -1561,7 +1561,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -147,7 +147,8 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato
static void getFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pParam->basic.newExec) {
SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId));
if (NULL == pCtx) {
int32_t code = initGroupCacheSession(pOperator, pParam, ppSession);
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
@ -161,12 +162,6 @@ static void getFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOper
return;
}
SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId));
if (NULL == pCtx) {
qError("session %" PRIx64 " not exists", pParam->sessionId);
pTaskInfo->code = TSDB_CODE_INVALID_PARA;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
*ppSession = pCtx;
if (pCtx->cacheHit) {
@ -198,16 +193,13 @@ static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBl
*ppRes = pBlock;
}
static SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator, SOperatorParam* param) {
SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam;
SGcSessionCtx* pSession = NULL;
SSDataBlock* pRes = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SGcOperatorParam* pParam = getOperatorParam(pOperator->operatorType, param);
if (NULL == pParam) {
return NULL;
}
getFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession);
pGCache->pCurrent = pSession;
@ -274,7 +266,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, getFromGroupCache, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;

View File

@ -383,7 +383,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
@ -480,7 +480,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -754,7 +754,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
@ -906,7 +906,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -1110,7 +1110,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
{
pInfo->pInputDataBlock = NULL;
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
setOperatorCompleted(pOperator);
return NULL;
@ -1323,7 +1323,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL,
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1);

View File

@ -188,6 +188,9 @@ static void setHJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysi
pInfo->pBuild = &pInfo->tbs[buildIdx];
pInfo->pProbe = &pInfo->tbs[probeIdx];
pInfo->pBuild->downStreamIdx = buildIdx;
pInfo->pProbe->downStreamIdx = probeIdx;
}
static int32_t buildHJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
@ -630,12 +633,13 @@ static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin
return code;
}
static int32_t buildHJoinKeyHash(SHJoinOperatorInfo* pJoin) {
static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SSDataBlock* pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS;
while (true) {
pBlock = pJoin->pBuild->downStream->fpSet.getNextFn(pJoin->pBuild->downStream);
pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx)
if (NULL == pBlock) {
break;
}
@ -690,7 +694,7 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
}
if (NULL == pJoin->pKeyHash) {
code = buildHJoinKeyHash(pJoin);
code = buildHJoinKeyHash(pOperator);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
@ -714,7 +718,7 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
}
while (true) {
SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pJoin->pProbe->downStreamIdx);
if (NULL == pBlock) {
setHJoinDone(pOperator);
break;
@ -815,7 +819,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;

View File

@ -271,7 +271,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->rightBuildTable = tSimpleHashInit(256, hashFn);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -416,7 +416,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
while (endPos == dataBlock->info.rows) {
SOperatorInfo* ds = pOperator->pDownstream[whichChild];
dataBlock = ds->fpSet.getNextFn(ds);
dataBlock = getNextBlockFromDownstream(pOperator, whichChild);
if (whichChild == 0) {
pJoinInfo->leftPos = 0;
pJoinInfo->pLeft = dataBlock;
@ -616,8 +616,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
SOperatorInfo* ds1 = pOperator->pDownstream[0];
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0);
pJoinInfo->leftPos = 0;
if (pJoinInfo->pLeft == NULL) {
@ -627,8 +626,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
}
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
SOperatorInfo* ds2 = pOperator->pDownstream[1];
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1);
pJoinInfo->rightPos = 0;
if (pJoinInfo->pRight == NULL) {
@ -715,3 +713,6 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
}
return (pRes->info.rows > 0) ? pRes : NULL;
}

View File

@ -87,6 +87,16 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
}
}
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0);
int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : NULL);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
return pOperator->fpSet.getNextFn(pOperator);
}
static int64_t getQuerySupportBufSize(size_t numOfTables) {
size_t s1 = sizeof(STableQueryInfo);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
@ -592,16 +602,54 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
return TSDB_CODE_SUCCESS;
}
void *getOperatorParam(int32_t opType, SOperatorParam* param) {
void *getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx) {
if (NULL == param) {
return NULL;
}
for (int32_t i = 0; i < param->opNum; ++i) {
if (opType == param->pOpParams[i].opType) {
memcpy(&param->pOpParams[i], param, sizeof(param->basic));
return &param->pOpParams[i];
int32_t opNum = taosArrayGetSize(param->pOpParams);
if (idx >= opNum) {
return NULL;
}
SOperatorSpecParam *pSpecParam = taosArrayGet(param->pOpParams, idx);
if (opType == pSpecParam->opType) {
return pSpecParam->value;
}
return NULL;
}
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
if (NULL == pParam) {
taosMemoryFreeClear(pOperator->pDownstreamParams);
return TSDB_CODE_SUCCESS;
}
if (NULL == pOperator->pDownstreamParams) {
pOperator->pDownstreamParams = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
if (NULL == pOperator->pDownstreamParams) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SOperatorBaseParam* pBaseParam = NULL;
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
pBaseParam = getOperatorParam(pOperator->operatorType, pParam, i);
if (pBaseParam) {
pOperator->pDownstreamParams[i] = pBaseParam->pChild;
} else {
pOperator->pDownstreamParams[i] = pParam;
}
}
return TSDB_CODE_SUCCESS;
}
FORCE_INLINE SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) {
return pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]);
}
return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]);
}

View File

@ -135,7 +135,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -260,7 +260,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
blockDataCleanup(pRes);
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
setOperatorCompleted(pOperator);
@ -416,7 +416,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -501,7 +501,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
while (1) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
setOperatorCompleted(pOperator);
break;

View File

@ -981,7 +981,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
optrDefaultBufFn, getTableScannerExecInfo, NULL, NULL);
optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL);
// for non-blocking operator, the open cost is always 0
pOperator->cost.openCost = 0;
@ -1006,7 +1006,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;
}
@ -2252,7 +2252,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;
_end:
@ -2474,7 +2474,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
__optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;
@ -2639,7 +2639,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;
@ -3094,7 +3094,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
optrDefaultBufFn, getTableMergeScanExplainExecInfo, NULL, NULL);
optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL);
pOperator->cost.openCost = 0;
return pOperator;
@ -3244,7 +3244,7 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;
_error:

View File

@ -87,7 +87,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
// TODO dynamic set the available sort buffer
pOperator->fpSet =
createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, NULL, NULL);
createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -457,7 +457,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]);
pInfo->prefetchedSortInput = getNextBlockFromDownstream(pOperator, 0);
if (pInfo->prefetchedSortInput == NULL) {
setOperatorCompleted(pOperator);
return NULL;
@ -552,7 +552,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
optrDefaultBufFn, getGroupSortExplainExecInfo, NULL, NULL);
optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -841,7 +841,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, NULL, NULL);
destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, downStreams, numStreams);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -1783,7 +1783,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;
_error:
@ -2320,7 +2320,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;
_error:

View File

@ -887,7 +887,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
while (1) {
SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
setOperatorCompleted(pOperator);
break;
@ -998,7 +998,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

View File

@ -1026,7 +1026,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
int64_t st = taosGetTimestampUs();
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
@ -1162,7 +1162,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
@ -1684,7 +1684,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -1807,7 +1807,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
@ -1907,7 +1907,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -1983,7 +1983,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -2572,7 +2572,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
@ -2834,7 +2834,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
}
@ -3509,7 +3509,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
@ -3670,7 +3670,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
@ -3735,7 +3735,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
clearSpecialDataBlock(pInfo->pUpdateRes);
pOperator->status = OP_RES_TO_RETURN;
@ -3826,7 +3826,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
}
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
@ -4072,7 +4072,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
break;
}
@ -4203,7 +4203,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -4335,7 +4335,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* pBlock = NULL;
if (pMiaInfo->prefetchedBlock == NULL) {
pBlock = downstream->fpSet.getNextFn(downstream);
pBlock = getNextBlockFromDownstream(pOperator, 0);
} else {
pBlock = pMiaInfo->prefetchedBlock;
pMiaInfo->prefetchedBlock = NULL;
@ -4484,7 +4484,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -4665,7 +4665,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* pBlock = NULL;
if (miaInfo->prefetchedBlock == NULL) {
pBlock = downstream->fpSet.getNextFn(downstream);
pBlock = getNextBlockFromDownstream(pOperator, 0);
} else {
pBlock = miaInfo->prefetchedBlock;
miaInfo->groupId = pBlock->info.id.groupId;
@ -4771,7 +4771,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo,
optrDefaultBufFn, NULL, NULL, NULL);
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -4842,7 +4842,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
@ -5027,7 +5027,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;

View File

@ -108,6 +108,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
pExchange->srcStartGroupId = pCxt->groupId;
pExchange->srcEndGroupId = pCxt->groupId;
pExchange->node.precision = pChild->precision;
pExchange->node.dynamicOp = pChild->dynamicOp;
pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
if (NULL == pExchange->node.pTargets) {
return TSDB_CODE_OUT_OF_MEMORY;