diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 74c3120d87..0ffe06048c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -166,6 +166,8 @@ typedef struct SGroupCacheLogicNode { typedef struct SDynQueryCtrlLogicNode { SLogicNode node; EDynQueryType qType; + SNodeList* pVgList; + SNodeList* pUidList; } SDynQueryCtrlLogicNode; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bd6f0ca689..1185ca96d4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -292,8 +292,25 @@ int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows); } else { - return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, false); + int32_t colBytes = pColumnInfoData->info.bytes; + int32_t colOffset = currentRow * colBytes; + uint32_t num = 1; + + void* pStart = pColumnInfoData->pData + colOffset; + memcpy(pStart, pData, colBytes); + colOffset += num * colBytes; + + while (num < numOfRows) { + int32_t maxNum = num << 1; + int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num; + + memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes); + colOffset += tnum * colBytes; + num += tnum; + } } + + return TSDB_CODE_SUCCESS; } static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, @@ -2470,4 +2487,4 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); -} \ No newline at end of file +} diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index 8816e15a2f..003adf3437 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -19,6 +19,13 @@ extern "C" { #endif +typedef struct SDynQueryCtrlExecInfo { + int64_t prevBlkNum; + int64_t prevBlkRows; + int64_t postBlkNum; + int64_t postBlkRows; +} SDynQueryCtrlExecInfo; + typedef struct SStbJoinPrevJoinCtx { SSDataBlock* pLastBlk; int32_t lastRow; @@ -39,7 +46,8 @@ typedef struct SStbJoinDynCtrlInfo { } SStbJoinDynCtrlInfo; typedef struct SDynQueryCtrlOperatorInfo { - EDynQueryType qType; + EDynQueryType qType; + SDynQueryCtrlExecInfo execInfo; union { SStbJoinDynCtrlInfo stbJoin; }; diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 2207acccb2..751b76b0ef 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -94,6 +94,7 @@ typedef struct SHJoinOperatorInfo { SArray* pRowBufs; SNode* pCond; SSHashObj* pKeyHash; + bool keyHashBuilt; SHJoinCtx ctx; } SHJoinOperatorInfo; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index e0d37b9872..44be65f74b 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -77,6 +77,7 @@ typedef struct SOperatorInfo { SOperatorParam** pDownstreamParams; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator + int32_t numOfRealDownstream; SOperatorFpSet fpSet; } SOperatorInfo; @@ -170,6 +171,7 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32 int32_t optrDefaultBufFn(SOperatorInfo* pOperator); SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx); +SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx); int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx); SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 6342c02330..1cf91d8295 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -30,7 +30,9 @@ int64_t gSessionId = 0; static void destroyDynQueryCtrlOperator(void* param) { - SDynQueryCtrlOperatorInfo* pDynCtrlOperator = (SDynQueryCtrlOperatorInfo*)param; + SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param; + qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, + pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows); taosMemoryFreeClear(param); } @@ -82,6 +84,7 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i pExc->pChild = pChild; pExc->vgId = *pVgId; + pExc->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pExc->uidList = taosArrayInit(1, sizeof(int64_t)); if (NULL == pExc->uidList) { taosMemoryFree(pExc); @@ -203,10 +206,13 @@ static void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRe SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; if (pPost->isStarted) { + qDebug("%s dynQueryCtrl retrieve block from post op", GET_TASKID(pOperator->pTaskInfo)); *ppRes = getNextBlockFromDownstream(pOperator, 1); if (NULL == *ppRes) { pPost->isStarted = false; } else { + pInfo->execInfo.postBlkNum++; + pInfo->execInfo.postBlkRows += (*ppRes)->info.rows; return; } } @@ -232,6 +238,9 @@ SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) { break; } + pInfo->execInfo.prevBlkNum++; + pInfo->execInfo.prevBlkRows += pBlock->info.rows; + pStbJoin->ctx.prev.pLastBlk = pBlock; pStbJoin->ctx.prev.lastRow = -1; @@ -264,7 +273,7 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 pInfo->qType = pPhyciNode->qType; switch (pInfo->qType) { case DYN_QTYPE_STB_HASH: - memcpy(&pInfo->stbJoin, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); + memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); nextFp = getResFromStbJoin; break; default: diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 036023fc3f..1ef87875ae 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -161,7 +161,7 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - return NULL; + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); @@ -455,7 +455,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas } pDataInfo->status = EX_SOURCE_DATA_STARTED; - SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); + SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index); pDataInfo->startTime = taosGetTimestampUs(); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 06062faa20..47dd761a18 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -131,6 +131,11 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato ctx.downstreamIdx = pParam->downstreamIdx; ctx.needCache = pParam->needCache; } + + int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx)); + if (TSDB_CODE_SUCCESS == code) { + *ppSession = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); + } return TSDB_CODE_SUCCESS; } @@ -200,7 +205,7 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { } while (true) { - SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pSession->downstreamIdx); + SSDataBlock* pBlock = getNextBlockFromDownstreamOnce(pOperator, pSession->downstreamIdx); if (NULL == pBlock) { setCurrentGroupCacheDone(pOperator); break; diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index d9f96ae83f..9014f3683e 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -126,7 +126,7 @@ static int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { } } - pTable->valBitMapSize = colNum / sizeof(int8_t) + ((colNum % sizeof(int8_t)) ? 1 : 0); + pTable->valBitMapSize = BitmapLen(colNum); pTable->valBufSize += pTable->valBitMapSize; return TSDB_CODE_SUCCESS; @@ -330,7 +330,7 @@ static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, i buildValIdx++; } buildIdx++; - } else if (0 == i) { + } else if (0 == r) { SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); @@ -369,6 +369,9 @@ static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, pRes->info.rows = resNum; pCtx->rowRemains = pCtx->pBuildRow ? true : false; + if (!pCtx->rowRemains) { + pCtx->probeIdx++; + } } @@ -415,7 +418,9 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) { if (pJoin->ctx.pBuildRow) { appendHJoinResToBlock(pOperator, pRes); - return; + if (pRes->info.rows >= pRes->info.capacity) { + return; + } } for (int32_t i = pCtx->probeIdx; i < pCtx->pProbeData->info.rows; ++i) { @@ -425,10 +430,12 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) { pCtx->pBuildRow = pGroup->rows; appendHJoinResToBlock(pOperator, pRes); if (pRes->info.rows >= pRes->info.capacity) { - break; + return; } } } + + pCtx->rowRemains = false; } static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { @@ -489,6 +496,7 @@ static FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t r char *pData = NULL; size_t bufLen = pTable->valBitMapSize; + memset(pTable->valData, 0, pTable->valBitMapSize); for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) { if (pTable->valCols[i].keyCol) { continue; @@ -693,7 +701,9 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { return NULL; } - if (NULL == pJoin->pKeyHash) { + if (!pJoin->keyHashBuilt) { + pJoin->keyHashBuilt = true; + code = buildHJoinKeyHash(pOperator); if (code) { pTaskInfo->code = code; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index f668a09aa0..3ea347f233 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -299,6 +299,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t if (code != TSDB_CODE_SUCCESS) { goto _error; } + pOperator->numOfRealDownstream = newDownstreams ? 1 : 2; return pOperator; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 91f579ada1..b99562f879 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -66,6 +66,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES); p->numOfDownstream = num; + p->numOfRealDownstream = num; return TSDB_CODE_SUCCESS; } @@ -561,7 +562,7 @@ void destroyOperator(SOperatorInfo* pOperator) { } if (pOperator->pDownstream != NULL) { - for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + for (int32_t i = 0; i < pOperator->numOfRealDownstream; ++i) { destroyOperator(pOperator->pDownstream[i]); } @@ -646,15 +647,29 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara return TSDB_CODE_SUCCESS; } -SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { +SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam) { if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) { - return pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]); + qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name); + SSDataBlock* pBlock = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]); + if (clearParam) { + pOperator->pDownstreamParams[idx] = NULL; + } + return pBlock; } return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); } +SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { + return getNextBlockFromDownstreamImpl(pOperator, idx, false); +} + +SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx) { + return getNextBlockFromDownstreamImpl(pOperator, idx, true); +} + + SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0); int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 824af67f36..60c81e81b4 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -361,6 +361,7 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { COPY_SCALAR_FIELD(groupAction); COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(outputTsOrder); + COPY_SCALAR_FIELD(dynamicOp); return TSDB_CODE_SUCCESS; } @@ -544,6 +545,8 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQueryCtrlLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(qType); + CLONE_NODE_LIST_FIELD(pVgList); + CLONE_NODE_LIST_FIELD(pUidList); return TSDB_CODE_SUCCESS; } @@ -562,6 +565,7 @@ static int32_t physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) { CLONE_NODE_LIST_FIELD(pChildren); COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(outputTsOrder); + COPY_SCALAR_FIELD(dynamicOp); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 7e5b19a7fb..fd31de443c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2941,6 +2941,10 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) { } static const char* jkDynQueryCtrlPhysiPlanQueryType = "QueryType"; +static const char* jkDynQueryCtrlPhysiPlanVgSlot0 = "VgSlot[0]"; +static const char* jkDynQueryCtrlPhysiPlanVgSlot1 = "VgSlot[1]"; +static const char* jkDynQueryCtrlPhysiPlanUidSlot0 = "UidSlot[0]"; +static const char* jkDynQueryCtrlPhysiPlanUidSlot1 = "UidSlot[1]"; static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { const SDynQueryCtrlPhysiNode* pNode = (const SDynQueryCtrlPhysiNode*)pObj; @@ -2949,6 +2953,25 @@ static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType); } + if (TSDB_CODE_SUCCESS == code) { + switch (pNode->qType) { + case DYN_QTYPE_STB_HASH: { + code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0]); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot1, pNode->stbJoin.vgSlot[1]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanUidSlot0, pNode->stbJoin.uidSlot[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanUidSlot1, pNode->stbJoin.uidSlot[1]); + } + break; + } + default: + return TSDB_CODE_INVALID_PARA; + } + } return code; } @@ -2959,6 +2982,29 @@ static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code); } + if (TSDB_CODE_SUCCESS == code) { + switch (pNode->qType) { + case DYN_QTYPE_STB_HASH: { + tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code); + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0], code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanVgSlot1, pNode->stbJoin.vgSlot[1], code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanUidSlot0, pNode->stbJoin.uidSlot[0], code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanUidSlot1, pNode->stbJoin.uidSlot[1], code); + } + break; + } + default: + return TSDB_CODE_INVALID_PARA; + } + } + return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 4a1a6281a2..c0fad53690 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3570,7 +3570,11 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) { enum { PHY_DYN_QUERY_CTRL_CODE_BASE_NODE = 1, - PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE + PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE, + PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, + PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1, + PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0, + PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1, }; static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -3580,6 +3584,25 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE, pNode->qType); } + if (TSDB_CODE_SUCCESS == code) { + switch (pNode->qType) { + case DYN_QTYPE_STB_HASH: { + code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, pNode->stbJoin.vgSlot[0]); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1, pNode->stbJoin.vgSlot[1]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0, pNode->stbJoin.uidSlot[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1, pNode->stbJoin.uidSlot[1]); + } + break; + } + default: + return TSDB_CODE_INVALID_PARA; + } + } return code; } @@ -3596,6 +3619,18 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) { case PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE: code = tlvDecodeEnum(pTlv, &pNode->qType, sizeof(pNode->qType)); break; + case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0: + code = tlvDecodeEnum(pTlv, &pNode->stbJoin.vgSlot[0], sizeof(pNode->stbJoin.vgSlot[0])); + break; + case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1: + code = tlvDecodeEnum(pTlv, &pNode->stbJoin.vgSlot[1], sizeof(pNode->stbJoin.vgSlot[1])); + break; + case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0: + code = tlvDecodeEnum(pTlv, &pNode->stbJoin.uidSlot[0], sizeof(pNode->stbJoin.uidSlot[0])); + break; + case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1: + code = tlvDecodeEnum(pTlv, &pNode->stbJoin.uidSlot[1], sizeof(pNode->stbJoin.uidSlot[1])); + break; default: break; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 41cdf5a36f..dd54cc9955 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3339,6 +3339,26 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p code = TSDB_CODE_OUT_OF_MEMORY; } } + + if (TSDB_CODE_SUCCESS == code) { + pDynCtrl->pVgList = nodesMakeList(); + if (NULL == pDynCtrl->pVgList) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + pDynCtrl->pUidList = nodesMakeList(); + if (NULL == pDynCtrl->pUidList) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + SJoinLogicNode* pHJoin = (SJoinLogicNode*)pPrev; + nodesListStrictAppend(pDynCtrl->pUidList, nodesListGetNode(pHJoin->node.pTargets, 0)); + nodesListStrictAppend(pDynCtrl->pUidList, nodesListGetNode(pHJoin->node.pTargets, 2)); + nodesListStrictAppend(pDynCtrl->pVgList, nodesListGetNode(pHJoin->node.pTargets, 1)); + nodesListStrictAppend(pDynCtrl->pVgList, nodesListGetNode(pHJoin->node.pTargets, 3)); if (TSDB_CODE_SUCCESS == code) { nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPrev); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 72fc0af453..e07fd2507b 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1009,11 +1009,33 @@ static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* p return TSDB_CODE_OUT_OF_MEMORY; } - pDynCtrl->qType = pLogicNode->qType; + SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; + SNodeList* pVgList = NULL; + SNodeList* pUidList = NULL; + int32_t code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->pVgList, &pVgList); + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->pUidList, &pUidList); + } + if (TSDB_CODE_SUCCESS == code) { + SNode* pNode = NULL; + int32_t i = 0; + FOREACH(pNode, pVgList) { + pDynCtrl->stbJoin.vgSlot[i] = ((SColumnNode*)pNode)->slotId; + ++i; + } + i = 0; + FOREACH(pNode, pUidList) { + pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId; + ++i; + } + } + if (TSDB_CODE_SUCCESS == code) { + pDynCtrl->qType = pLogicNode->qType; - *pPhyNode = (SPhysiNode*)pDynCtrl; + *pPhyNode = (SPhysiNode*)pDynCtrl; + } - return TSDB_CODE_SUCCESS; + return code; } typedef struct SRewritePrecalcExprsCxt {