From 1aac643a9bae34595c3e62d574432941dabf09b6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 9 Aug 2023 10:30:35 +0800 Subject: [PATCH] enh: support single vnode join --- include/libs/nodes/plannodes.h | 2 + source/libs/executor/inc/executorInt.h | 3 +- .../libs/executor/src/dynqueryctrloperator.c | 60 +++++++++++++++---- source/libs/executor/src/groupcacheoperator.c | 42 +++++++++---- source/libs/executor/src/mergejoinoperator.c | 7 ++- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 14 +++++ source/libs/nodes/src/nodesMsgFuncs.c | 14 +++++ source/libs/planner/src/planOptimizer.c | 21 ++++--- source/libs/planner/src/planPhysiCreater.c | 2 + source/libs/planner/src/planSpliter.c | 12 ++-- source/libs/qworker/src/qworker.c | 8 +-- source/libs/scheduler/src/schUtil.c | 2 +- 13 files changed, 148 insertions(+), 40 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e058024ed0..90072b6053 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -172,6 +172,7 @@ typedef struct SDynQueryCtrlStbJoin { bool batchFetch; SNodeList* pVgList; SNodeList* pUidList; + bool srcScan[2]; } SDynQueryCtrlStbJoin; typedef struct SDynQueryCtrlLogicNode { @@ -465,6 +466,7 @@ typedef struct SStbJoinDynCtrlBasic { bool batchFetch; int32_t vgSlot[2]; int32_t uidSlot[2]; + bool srcScan[2]; } SStbJoinDynCtrlBasic; typedef struct SDynQueryCtrlPhysiNode { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a70fdfb71a..c412b3c807 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -153,7 +153,7 @@ typedef struct SLimitInfo { } SLimitInfo; typedef struct SSortMergeJoinOperatorParam { - bool initParam; + int32_t initDownstreamNum; } SSortMergeJoinOperatorParam; typedef struct SExchangeOperatorBasicParam { @@ -731,6 +731,7 @@ void streamOpReloadState(struct SOperatorInfo* pOperator); void destroyOperatorParamValue(void* pValues); int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc); +int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq); #ifdef __cplusplus } diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 150c3a73ff..404b0201e4 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -220,7 +220,7 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, return TSDB_CODE_OUT_OF_MEMORY; } - pJoin->initParam = initParam; + pJoin->initDownstreamNum = initParam ? 2 : 0; (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; (*ppRes)->value = pJoin; @@ -307,11 +307,49 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin) } } +static FORCE_INLINE int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgNum = tSimpleHashGetSize(pVg); + if (vgNum <= 0 || vgNum > 1) { + qError("Invalid vgroup num %d to build table scan operator param", vgNum); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + int32_t iter = 0; + void* p = NULL; + while (p = tSimpleHashIterate(pVg, p, &iter)) { + int32_t* pVgId = tSimpleHashGetKey(p, NULL); + SArray* pUidList = *(SArray**)p; + + code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false); + if (code) { + return code; + } + } + + return TSDB_CODE_SUCCESS; +} + + +static FORCE_INLINE int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { + SArray* pUidList = taosArrayInit(1, sizeof(int64_t)); + if (NULL == pUidList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pUidList, pUid); + + int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true); + if (code) { + return code; + } + + return TSDB_CODE_SUCCESS; +} static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) { int64_t rowIdx = pPrev->pListHead->readIdx; - SOperatorParam* pExcParam0 = NULL; - SOperatorParam* pExcParam1 = NULL; + SOperatorParam* pSrcParam0 = NULL; + SOperatorParam* pSrcParam1 = NULL; SOperatorParam* pGcParam0 = NULL; SOperatorParam* pGcParam1 = NULL; int32_t* leftVg = pPrev->pListHead->pLeftVg + rowIdx; @@ -327,9 +365,9 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS if (pInfo->stbJoin.basic.batchFetch) { if (pPrev->leftHash) { - code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftHash); + code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash); if (TSDB_CODE_SUCCESS == code) { - code = buildBatchExchangeOperatorParam(&pExcParam1, 1, pPrev->rightHash); + code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash); } if (TSDB_CODE_SUCCESS == code) { tSimpleHashCleanup(pPrev->leftHash); @@ -339,20 +377,20 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS } } } else { - code = buildExchangeOperatorParam(&pExcParam0, 0, leftVg, leftUid); + code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid); if (TSDB_CODE_SUCCESS == code) { - code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid); + code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid); } } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pExcParam0); + code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pExcParam1); + code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1); } if (TSDB_CODE_SUCCESS == code) { - code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1); + code = buildMergeJoinOperatorParam(ppParam, pSrcParam0 ? true : false, pGcParam0, pGcParam1); } return code; } @@ -747,6 +785,8 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 goto _error; } + pTaskInfo->dynamicTask = pPhyciNode->node.dynamicOp; + code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (TSDB_CODE_SUCCESS != code) { goto _error; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 5b6a6abe4a..2651373010 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -97,13 +97,16 @@ static void destroyGroupCacheOperator(void* param) { } static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) { -// TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); - TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE); + TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); + //TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE); if (NULL == newFd) { return TAOS_SYSTEM_ERROR(errno); } pFileFd->fd = newFd; taosThreadMutexInit(&pFileFd->mutex, NULL); + + qTrace("file path %s created", filename); + return TSDB_CODE_SUCCESS; } @@ -276,6 +279,22 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr return code; } +static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) { + SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId)); + if (0 == pFileInfo->groupNum) { + removeGroupCacheFile(pFileInfo); + +#if 0 + /* debug only */ + sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", pFileCtx->fileId); + taosRemoveFile(pFileCtx->baseFilename); + /* debug only */ +#endif + + qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId); + //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId)); + } +} static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId, bool removeCheck) { if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { @@ -283,12 +302,7 @@ static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int3 } if (removeCheck) { - SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId)); - if (0 == pFileInfo->groupNum) { - removeGroupCacheFile(pFileInfo); - qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId); - //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId)); - } + chkRemoveVgroupCurrFile(pFileCtx, downstreamIdx, vgId); } pFileCtx->fileId++; @@ -810,7 +824,7 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes handleGroupFetchDone(pNew->pGroup); } taosArrayClear(pVgCtx->pTbList); - } + } } taosHashClear(pCtx->pWaitSessions); @@ -1087,7 +1101,7 @@ static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) { static void freeRemoveGroupCacheData(void* p) { SGroupCacheData* pGroup = p; - if (pGroup->vgId >= 0) { + if (pGroup->vgId > 0) { SGcFileCacheCtx* pFileCtx = &pGroup->pVgCtx->fileCtx; if (pGroup->fileId >= 0) { SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId)); @@ -1097,6 +1111,14 @@ static void freeRemoveGroupCacheData(void* p) { if (0 == remainNum && pGroup->fileId != pFileCtx->fileId) { removeGroupCacheFile(pFileInfo); + +#if 0 + /* debug only */ + sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", pGroup->fileId); + taosRemoveFile(pFileCtx->baseFilename); + /* debug only */ +#endif + qTrace("FileId:%d-%d-%d removed", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId); //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId)); } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index aa580895af..8cdc74f623 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -667,8 +667,9 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs } if (pJoinInfo->pLeft == NULL) { - if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initParam) { + if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum > 0) { leftEmpty = true; + ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum--; } else { setMergeJoinDone(pOperator); return false; @@ -680,6 +681,10 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs if (!pJoinInfo->downstreamFetchDone[1]) { pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1); + if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum > 0) { + ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum--; + } + pJoinInfo->rightPos = 0; qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0); } else { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 800b747e68..1ade4fce67 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -553,6 +553,7 @@ static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQue COPY_SCALAR_FIELD(stbJoin.batchFetch); CLONE_NODE_LIST_FIELD(stbJoin.pVgList); CLONE_NODE_LIST_FIELD(stbJoin.pUidList); + COPY_OBJECT_FIELD(stbJoin.srcScan, sizeof(pDst->stbJoin.srcScan)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 970bd6fcc8..f7380be4ff 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -3010,6 +3010,8 @@ static const char* jkDynQueryCtrlPhysiPlanVgSlot0 = "VgSlot[0]"; static const char* jkDynQueryCtrlPhysiPlanVgSlot1 = "VgSlot[1]"; static const char* jkDynQueryCtrlPhysiPlanUidSlot0 = "UidSlot[0]"; static const char* jkDynQueryCtrlPhysiPlanUidSlot1 = "UidSlot[1]"; +static const char* jkDynQueryCtrlPhysiPlanSrcScan0 = "SrcScan[0]"; +static const char* jkDynQueryCtrlPhysiPlanSrcScan1 = "SrcScan[1]"; static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { const SDynQueryCtrlPhysiNode* pNode = (const SDynQueryCtrlPhysiNode*)pObj; @@ -3034,6 +3036,12 @@ static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanUidSlot1, pNode->stbJoin.uidSlot[1]); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanSrcScan0, pNode->stbJoin.srcScan[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanSrcScan1, pNode->stbJoin.srcScan[1]); + } break; } default: @@ -3069,6 +3077,12 @@ static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanUidSlot1, pNode->stbJoin.uidSlot[1], code); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanSrcScan0, &pNode->stbJoin.srcScan[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanSrcScan1, &pNode->stbJoin.srcScan[1]); + } break; } default: diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index b869216b36..b3a425f2fb 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3613,6 +3613,8 @@ enum { PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1, + PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN0, + PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN1, }; static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -3638,6 +3640,12 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1, pNode->stbJoin.uidSlot[1]); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN0, pNode->stbJoin.srcScan[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN1, pNode->stbJoin.srcScan[1]); + } break; } default: @@ -3675,6 +3683,12 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) { case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1: code = tlvDecodeEnum(pTlv, &pNode->stbJoin.uidSlot[1], sizeof(pNode->stbJoin.uidSlot[1])); break; + case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN0: + code = tlvDecodeBool(pTlv, &pNode->stbJoin.srcScan[0]); + break; + case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN1: + code = tlvDecodeBool(pTlv, &pNode->stbJoin.srcScan[1]); + break; default: break; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 45a1919f27..7fcf6d587b 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3224,13 +3224,14 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh return code; } -static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppList) { +static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppList, bool* srcScan) { SNodeList* pList = nodesCloneList(pJoin->pChildren); if (NULL == pList) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = TSDB_CODE_SUCCESS; + int32_t i = 0; SNode* pNode = NULL; FOREACH(pNode, pList) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; @@ -3238,7 +3239,10 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL if (code) { break; } + pScan->node.dynamicOp = true; + *(srcScan + i++) = pScan->pVgroupList->numOfVgroups <= 1; + pScan->scanType = SCAN_TYPE_TABLE; } @@ -3259,7 +3263,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** return TSDB_CODE_OUT_OF_MEMORY; } - pGrpCache->node.dynamicOp = true; + //pGrpCache->node.dynamicOp = true; pGrpCache->grpColsMayBeNull = false; pGrpCache->grpByUid = true; pGrpCache->batchFetch = true; @@ -3342,7 +3346,7 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi } pJoin->joinAlgo = JOIN_ALGO_MERGE; - pJoin->node.dynamicOp = true; + //pJoin->node.dynamicOp = true; stbJoinOptRemoveTagEqCond(pJoin); NODES_DESTORY_NODE(pJoin->pTagEqCond); @@ -3360,7 +3364,7 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi } -static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* pPost, SLogicNode** ppDynNode) { +static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* pPost, bool* srcScan, SLogicNode** ppDynNode) { int32_t code = TSDB_CODE_SUCCESS; SDynQueryCtrlLogicNode* pDynCtrl = (SDynQueryCtrlLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL); if (NULL == pDynCtrl) { @@ -3369,6 +3373,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p pDynCtrl->qType = DYN_QTYPE_STB_HASH; pDynCtrl->stbJoin.batchFetch = true; + memcpy(pDynCtrl->stbJoin.srcScan, srcScan, sizeof(pDynCtrl->stbJoin.srcScan)); if (TSDB_CODE_SUCCESS == code) { pDynCtrl->node.pChildren = nodesMakeList(); @@ -3422,13 +3427,15 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p SLogicNode* pGrpCacheNode = NULL; SLogicNode* pHJoinNode = NULL; SLogicNode* pMJoinNode = NULL; - SLogicNode* pDynNode = NULL; + SLogicNode* pDynNode = NULL; + bool srcScan[2] = {0}; + int32_t code = stbJoinOptCreateTagScanNode(pJoin, &pTagScanNodes); if (TSDB_CODE_SUCCESS == code) { code = stbJoinOptCreateTagHashJoinNode(pJoin, pTagScanNodes, &pHJoinNode); } if (TSDB_CODE_SUCCESS == code) { - code = stbJoinOptCreateTableScanNodes(pJoin, &pTbScanNodes); + code = stbJoinOptCreateTableScanNodes(pJoin, &pTbScanNodes, srcScan); } if (TSDB_CODE_SUCCESS == code) { code = stbJoinOptCreateGroupCacheNode(pTbScanNodes, &pGrpCacheNode); @@ -3437,7 +3444,7 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p code = stbJoinOptCreateMergeJoinNode(pJoin, pGrpCacheNode, &pMJoinNode); } if (TSDB_CODE_SUCCESS == code) { - code = stbJoinOptCreateDynQueryCtrlNode(pHJoinNode, pMJoinNode, &pDynNode); + code = stbJoinOptCreateDynQueryCtrlNode(pHJoinNode, pMJoinNode, srcScan, &pDynNode); } if (TSDB_CODE_SUCCESS == code) { code = replaceLogicNode(pLogicSubplan, pJoin, (SLogicNode*)pDynNode); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 2948cbce7c..0a6ac902ce 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1015,6 +1015,8 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pUidList, &pUidList); } if (TSDB_CODE_SUCCESS == code) { + memcpy(pDynCtrl->stbJoin.srcScan, pLogicNode->stbJoin.srcScan, sizeof(pDynCtrl->stbJoin.srcScan)); + SNode* pNode = NULL; int32_t i = 0; FOREACH(pNode, pVgList) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 52a4b6bf9f..63d8112722 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1152,11 +1152,11 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp SNode* pChild = NULL; FOREACH(pChild, pJoin->node.pChildren) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) { - if (pJoin->node.dynamicOp) { - code = TSDB_CODE_SUCCESS; - } else { + //if (pJoin->node.dynamicOp) { + // code = TSDB_CODE_SUCCESS; + //} else { code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false); - } + //} } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) { code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild); } else { @@ -1172,9 +1172,9 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode); if (TSDB_CODE_SUCCESS == code) { - if (!pInfo->pSplitNode->dynamicOp) { + //if (!pInfo->pSplitNode->dynamicOp) { pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; - } + //} SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); } return code; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 000200988d..8e6291cfb5 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -684,14 +684,14 @@ _return: QW_SET_PHASE(ctx, phase); } + if (code) { + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + } + QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); } - if (code) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); - } - QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code)); QW_RET(code); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 0582184730..df9cde6b7a 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -234,7 +234,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); if (NULL == hb) { SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); - qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); + qInfo("taosHashGet hb connection not exists, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); SCH_ERR_RET(TSDB_CODE_APP_ERROR); }