From 8e1f4d1d6a16130bd0f5d4a7f3849ffcc977c320 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jul 2023 19:31:56 +0800 Subject: [PATCH] fix: operator param issues --- source/common/src/tmsg.c | 3 +++ .../libs/executor/src/dynqueryctrloperator.c | 20 +++++++++++++++---- source/libs/executor/src/exchangeoperator.c | 15 ++++++++------ source/libs/executor/src/mergejoinoperator.c | 6 ++++++ source/libs/executor/src/operator.c | 2 ++ source/libs/executor/src/scanoperator.c | 3 ++- 6 files changed, 38 insertions(+), 11 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e1a06f515b..a841a66162 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5501,6 +5501,7 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) { if (tEncodeI32(pEncoder, pOpParam->opType) < 0) return -1; + if (tEncodeI32(pEncoder, pOpParam->downstreamIdx) < 0) return -1; switch (pOpParam->opType) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { STableScanOperatorParam* pScan = (STableScanOperatorParam*)pOpParam->value; @@ -5528,6 +5529,7 @@ int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) { int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) { if (tDecodeI32(pDecoder, &pOpParam->opType) < 0) return -1; + if (tDecodeI32(pDecoder, &pOpParam->downstreamIdx) < 0) return -1; switch (pOpParam->opType) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); @@ -5545,6 +5547,7 @@ int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) } else { pScan->pUidList = NULL; } + pOpParam->value = pScan; break; } default: diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 842b21097e..1980f77fe0 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -61,6 +61,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, pGc->groupValueSize = grpValSize; (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; + (*ppRes)->downstreamIdx = downstreamIdx; (*ppRes)->value = pGc; return TSDB_CODE_SUCCESS; @@ -132,16 +133,22 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ SOperatorParam* pExcParam1 = NULL; SOperatorParam* pGcParam0 = NULL; SOperatorParam* pGcParam1 = NULL; + int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx); + int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx); + int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx); + int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx); + + qDebug("start stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, *leftVg, *leftUid, *rightVg, *rightUid); - int32_t code = buildExchangeOperatorParam(&pExcParam0, 0, (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx), (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx), NULL); + int32_t code = buildExchangeOperatorParam(&pExcParam0, 0, leftVg, leftUid, NULL); if (TSDB_CODE_SUCCESS == code) { - code = buildExchangeOperatorParam(&pExcParam1, 1, (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx), (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx), NULL); + code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid, NULL); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0); + code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, leftUid, pUid0->info.bytes, pExcParam0); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, pUid1->pData + pUid1->info.bytes * rowIdx, pUid1->info.bytes, pExcParam1); + code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, rightUid, pUid1->info.bytes, pExcParam1); } if (TSDB_CODE_SUCCESS == code) { code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); @@ -165,6 +172,11 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) *ppRes = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam); if (*ppRes) { pPost->isStarted = true; + pInfo->execInfo.postBlkNum++; + pInfo->execInfo.postBlkRows += (*ppRes)->info.rows; + qDebug("join res block retrieved"); + } else { + qDebug("Empty join res block retrieved"); } } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 978216914b..7e6fa6830c 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -159,11 +159,6 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); - } - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; @@ -206,6 +201,11 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -434,6 +434,7 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in } (*ppRes)->opType = srcOpType; + (*ppRes)->downstreamIdx = 0; (*ppRes)->value = pScan; (*ppRes)->pChildren = NULL; @@ -762,6 +763,8 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { dataInfo.srcOpType = pParam->srcOpType; taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); + pOperator->pOperatorParam = NULL; + return TSDB_CODE_SUCCESS; } @@ -769,7 +772,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; int32_t code = TSDB_CODE_SUCCESS; - if (OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) { + if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorParam)) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 3ea347f233..694b054ed9 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -647,8 +647,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs pJoinInfo->leftPos = 0; if (pJoinInfo->pLeft == NULL) { + qDebug("merge join left got empty block"); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); return false; + } else { + qDebug("merge join left got block"); } } @@ -657,8 +660,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs pJoinInfo->rightPos = 0; if (pJoinInfo->pRight == NULL) { + qDebug("merge join right got empty block"); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); return false; + } else { + qDebug("merge join right got block"); } } // only the timestamp match support for ordinary table diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 9f0659cb2e..6521583053 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -640,6 +640,8 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild; } + taosArrayClear(pOperator->pOperatorParam->pChildren); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e7cefb8cd6..5d4b5e453c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -815,7 +815,6 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { qTrace("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo)); } - pOperator->pOperatorParam = NULL; return code; } @@ -852,12 +851,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pOperator->pOperatorParam) { int32_t code = createTableListInfoFromParam(pOperator); + pOperator->pOperatorParam = NULL; if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } if (pInfo->currentGroupId != -1) { + pInfo->currentGroupId = 0; return startNextGroupScan(pOperator); } }