diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 7c134f07b0..59bb83ec9e 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -105,6 +105,15 @@ typedef struct SExchangeOpStopInfo { int64_t refId; } SExchangeOpStopInfo; +typedef struct SGcOperatorParam { + SOperatorParam* pChild; + int64_t sessionId; + int32_t downstreamIdx; + bool needCache; + void* pGroupValue; + int32_t groupValueSize; +} SGcOperatorParam; + typedef struct SExprSupp { SExprInfo* pExprInfo; int32_t numOfExprs; // the number of scalar expression in group operator diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index afd9f52823..d5e65be0ee 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -21,15 +21,6 @@ extern "C" { #define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760 -typedef struct SGcOperatorParam { - SOperatorParam* pChild; - int64_t sessionId; - int32_t downstreamIdx; - bool needCache; - void* pGroupValue; - int32_t groupValueSize; -} SGcOperatorParam; - #pragma pack(push, 1) typedef struct SGcBlkBufInfo { void* next; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index ec27f38254..e0d37b9872 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -154,6 +154,8 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo); + // clang-format on SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 6aecce1a76..6342c02330 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -141,9 +141,9 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ SOperatorParam* pGcParam0 = NULL; SOperatorParam* pGcParam1 = NULL; - int32_t code = buildExchangeOperatorParam(&pExcParam0, pVg0->pData + pVg0->info.bytes * rowIdx, pUid0->pData + pUid0->info.bytes * rowIdx, NULL); + int32_t code = buildExchangeOperatorParam(&pExcParam0, (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx), (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx), NULL); if (TSDB_CODE_SUCCESS == code) { - code = buildExchangeOperatorParam(&pExcParam1, pVg1->pData + pVg1->info.bytes * rowIdx, pUid1->pData + pUid1->info.bytes * rowIdx, NULL); + code = buildExchangeOperatorParam(&pExcParam1, (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx), (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx), NULL); } if (TSDB_CODE_SUCCESS == code) { code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6da37b6029..dd4bbdfcad 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -539,7 +539,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId); int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || NULL == *pTask) { qError("failed to createExecTaskInfo, code: %s", tstrerror(code)); goto _error; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index ad9238c833..f668a09aa0 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -70,8 +70,7 @@ typedef struct SMJoinOperatorInfo { static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param); -static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, - SSortMergeJoinPhysiNode* pJoinNode, const char* idStr); +static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr); static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond; @@ -203,8 +202,8 @@ SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorIn if (p) { p[0] = pDownstream[0]; p[1] = pDownstream[0]; - pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0); - pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 1); + pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(p[0], 0); + pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(p[1], 1); } return p; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index eaf51bf238..91f579ada1 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -535,7 +535,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) { pOptr = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) { - //pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo); + pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo); } else { terrno = TSDB_CODE_INVALID_PARA; pTaskInfo->code = terrno;