enh: support single vnode join
This commit is contained in:
parent
a95295774a
commit
1aac643a9b
|
@ -172,6 +172,7 @@ typedef struct SDynQueryCtrlStbJoin {
|
|||
bool batchFetch;
|
||||
SNodeList* pVgList;
|
||||
SNodeList* pUidList;
|
||||
bool srcScan[2];
|
||||
} SDynQueryCtrlStbJoin;
|
||||
|
||||
typedef struct SDynQueryCtrlLogicNode {
|
||||
|
@ -465,6 +466,7 @@ typedef struct SStbJoinDynCtrlBasic {
|
|||
bool batchFetch;
|
||||
int32_t vgSlot[2];
|
||||
int32_t uidSlot[2];
|
||||
bool srcScan[2];
|
||||
} SStbJoinDynCtrlBasic;
|
||||
|
||||
typedef struct SDynQueryCtrlPhysiNode {
|
||||
|
|
|
@ -153,7 +153,7 @@ typedef struct SLimitInfo {
|
|||
} SLimitInfo;
|
||||
|
||||
typedef struct SSortMergeJoinOperatorParam {
|
||||
bool initParam;
|
||||
int32_t initDownstreamNum;
|
||||
} SSortMergeJoinOperatorParam;
|
||||
|
||||
typedef struct SExchangeOperatorBasicParam {
|
||||
|
@ -731,6 +731,7 @@ void streamOpReloadState(struct SOperatorInfo* pOperator);
|
|||
|
||||
void destroyOperatorParamValue(void* pValues);
|
||||
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc);
|
||||
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes,
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pJoin->initParam = initParam;
|
||||
pJoin->initDownstreamNum = initParam ? 2 : 0;
|
||||
|
||||
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
|
||||
(*ppRes)->value = pJoin;
|
||||
|
@ -307,11 +307,49 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin)
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgNum = tSimpleHashGetSize(pVg);
|
||||
if (vgNum <= 0 || vgNum > 1) {
|
||||
qError("Invalid vgroup num %d to build table scan operator param", vgNum);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
int32_t iter = 0;
|
||||
void* p = NULL;
|
||||
while (p = tSimpleHashIterate(pVg, p, &iter)) {
|
||||
int32_t* pVgId = tSimpleHashGetKey(p, NULL);
|
||||
SArray* pUidList = *(SArray**)p;
|
||||
|
||||
code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static FORCE_INLINE int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
|
||||
SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
|
||||
if (NULL == pUidList) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosArrayPush(pUidList, pUid);
|
||||
|
||||
int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
|
||||
int64_t rowIdx = pPrev->pListHead->readIdx;
|
||||
SOperatorParam* pExcParam0 = NULL;
|
||||
SOperatorParam* pExcParam1 = NULL;
|
||||
SOperatorParam* pSrcParam0 = NULL;
|
||||
SOperatorParam* pSrcParam1 = NULL;
|
||||
SOperatorParam* pGcParam0 = NULL;
|
||||
SOperatorParam* pGcParam1 = NULL;
|
||||
int32_t* leftVg = pPrev->pListHead->pLeftVg + rowIdx;
|
||||
|
@ -327,9 +365,9 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
|
|||
|
||||
if (pInfo->stbJoin.basic.batchFetch) {
|
||||
if (pPrev->leftHash) {
|
||||
code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftHash);
|
||||
code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildBatchExchangeOperatorParam(&pExcParam1, 1, pPrev->rightHash);
|
||||
code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tSimpleHashCleanup(pPrev->leftHash);
|
||||
|
@ -339,20 +377,20 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
|
|||
}
|
||||
}
|
||||
} else {
|
||||
code = buildExchangeOperatorParam(&pExcParam0, 0, leftVg, leftUid);
|
||||
code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid);
|
||||
code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pExcParam0);
|
||||
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pExcParam1);
|
||||
code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1);
|
||||
code = buildMergeJoinOperatorParam(ppParam, pSrcParam0 ? true : false, pGcParam0, pGcParam1);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -747,6 +785,8 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pTaskInfo->dynamicTask = pPhyciNode->node.dynamicOp;
|
||||
|
||||
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _error;
|
||||
|
|
|
@ -97,13 +97,16 @@ static void destroyGroupCacheOperator(void* param) {
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) {
|
||||
// TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
|
||||
TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE);
|
||||
TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
|
||||
//TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE);
|
||||
if (NULL == newFd) {
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
pFileFd->fd = newFd;
|
||||
taosThreadMutexInit(&pFileFd->mutex, NULL);
|
||||
|
||||
qTrace("file path %s created", filename);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -276,6 +279,22 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
|
|||
return code;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) {
|
||||
SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
|
||||
if (0 == pFileInfo->groupNum) {
|
||||
removeGroupCacheFile(pFileInfo);
|
||||
|
||||
#if 0
|
||||
/* debug only */
|
||||
sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", pFileCtx->fileId);
|
||||
taosRemoveFile(pFileCtx->baseFilename);
|
||||
/* debug only */
|
||||
#endif
|
||||
|
||||
qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId);
|
||||
//taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId, bool removeCheck) {
|
||||
if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
|
||||
|
@ -283,12 +302,7 @@ static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int3
|
|||
}
|
||||
|
||||
if (removeCheck) {
|
||||
SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId));
|
||||
if (0 == pFileInfo->groupNum) {
|
||||
removeGroupCacheFile(pFileInfo);
|
||||
qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId);
|
||||
//taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
|
||||
}
|
||||
chkRemoveVgroupCurrFile(pFileCtx, downstreamIdx, vgId);
|
||||
}
|
||||
|
||||
pFileCtx->fileId++;
|
||||
|
@ -1087,7 +1101,7 @@ static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) {
|
|||
|
||||
static void freeRemoveGroupCacheData(void* p) {
|
||||
SGroupCacheData* pGroup = p;
|
||||
if (pGroup->vgId >= 0) {
|
||||
if (pGroup->vgId > 0) {
|
||||
SGcFileCacheCtx* pFileCtx = &pGroup->pVgCtx->fileCtx;
|
||||
if (pGroup->fileId >= 0) {
|
||||
SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
|
||||
|
@ -1097,6 +1111,14 @@ static void freeRemoveGroupCacheData(void* p) {
|
|||
|
||||
if (0 == remainNum && pGroup->fileId != pFileCtx->fileId) {
|
||||
removeGroupCacheFile(pFileInfo);
|
||||
|
||||
#if 0
|
||||
/* debug only */
|
||||
sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", pGroup->fileId);
|
||||
taosRemoveFile(pFileCtx->baseFilename);
|
||||
/* debug only */
|
||||
#endif
|
||||
|
||||
qTrace("FileId:%d-%d-%d removed", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId);
|
||||
//taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId));
|
||||
}
|
||||
|
|
|
@ -667,8 +667,9 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
|
|||
}
|
||||
|
||||
if (pJoinInfo->pLeft == NULL) {
|
||||
if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initParam) {
|
||||
if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum > 0) {
|
||||
leftEmpty = true;
|
||||
((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum--;
|
||||
} else {
|
||||
setMergeJoinDone(pOperator);
|
||||
return false;
|
||||
|
@ -680,6 +681,10 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
|
|||
if (!pJoinInfo->downstreamFetchDone[1]) {
|
||||
pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1);
|
||||
|
||||
if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum > 0) {
|
||||
((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum--;
|
||||
}
|
||||
|
||||
pJoinInfo->rightPos = 0;
|
||||
qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0);
|
||||
} else {
|
||||
|
|
|
@ -553,6 +553,7 @@ static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQue
|
|||
COPY_SCALAR_FIELD(stbJoin.batchFetch);
|
||||
CLONE_NODE_LIST_FIELD(stbJoin.pVgList);
|
||||
CLONE_NODE_LIST_FIELD(stbJoin.pUidList);
|
||||
COPY_OBJECT_FIELD(stbJoin.srcScan, sizeof(pDst->stbJoin.srcScan));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -3010,6 +3010,8 @@ static const char* jkDynQueryCtrlPhysiPlanVgSlot0 = "VgSlot[0]";
|
|||
static const char* jkDynQueryCtrlPhysiPlanVgSlot1 = "VgSlot[1]";
|
||||
static const char* jkDynQueryCtrlPhysiPlanUidSlot0 = "UidSlot[0]";
|
||||
static const char* jkDynQueryCtrlPhysiPlanUidSlot1 = "UidSlot[1]";
|
||||
static const char* jkDynQueryCtrlPhysiPlanSrcScan0 = "SrcScan[0]";
|
||||
static const char* jkDynQueryCtrlPhysiPlanSrcScan1 = "SrcScan[1]";
|
||||
|
||||
static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SDynQueryCtrlPhysiNode* pNode = (const SDynQueryCtrlPhysiNode*)pObj;
|
||||
|
@ -3034,6 +3036,12 @@ static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanUidSlot1, pNode->stbJoin.uidSlot[1]);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanSrcScan0, pNode->stbJoin.srcScan[0]);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanSrcScan1, pNode->stbJoin.srcScan[1]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -3069,6 +3077,12 @@ static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanUidSlot1, pNode->stbJoin.uidSlot[1], code);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanSrcScan0, &pNode->stbJoin.srcScan[0]);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanSrcScan1, &pNode->stbJoin.srcScan[1]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
|
|
@ -3613,6 +3613,8 @@ enum {
|
|||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1,
|
||||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0,
|
||||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1,
|
||||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN0,
|
||||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN1,
|
||||
};
|
||||
|
||||
static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -3638,6 +3640,12 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1, pNode->stbJoin.uidSlot[1]);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN0, pNode->stbJoin.srcScan[0]);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN1, pNode->stbJoin.srcScan[1]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -3675,6 +3683,12 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT1:
|
||||
code = tlvDecodeEnum(pTlv, &pNode->stbJoin.uidSlot[1], sizeof(pNode->stbJoin.uidSlot[1]));
|
||||
break;
|
||||
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN0:
|
||||
code = tlvDecodeBool(pTlv, &pNode->stbJoin.srcScan[0]);
|
||||
break;
|
||||
case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN1:
|
||||
code = tlvDecodeBool(pTlv, &pNode->stbJoin.srcScan[1]);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -3224,13 +3224,14 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppList) {
|
||||
static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppList, bool* srcScan) {
|
||||
SNodeList* pList = nodesCloneList(pJoin->pChildren);
|
||||
if (NULL == pList) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t i = 0;
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pList) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
||||
|
@ -3238,7 +3239,10 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL
|
|||
if (code) {
|
||||
break;
|
||||
}
|
||||
|
||||
pScan->node.dynamicOp = true;
|
||||
*(srcScan + i++) = pScan->pVgroupList->numOfVgroups <= 1;
|
||||
|
||||
pScan->scanType = SCAN_TYPE_TABLE;
|
||||
}
|
||||
|
||||
|
@ -3259,7 +3263,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode**
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pGrpCache->node.dynamicOp = true;
|
||||
//pGrpCache->node.dynamicOp = true;
|
||||
pGrpCache->grpColsMayBeNull = false;
|
||||
pGrpCache->grpByUid = true;
|
||||
pGrpCache->batchFetch = true;
|
||||
|
@ -3342,7 +3346,7 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi
|
|||
}
|
||||
|
||||
pJoin->joinAlgo = JOIN_ALGO_MERGE;
|
||||
pJoin->node.dynamicOp = true;
|
||||
//pJoin->node.dynamicOp = true;
|
||||
|
||||
stbJoinOptRemoveTagEqCond(pJoin);
|
||||
NODES_DESTORY_NODE(pJoin->pTagEqCond);
|
||||
|
@ -3360,7 +3364,7 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi
|
|||
}
|
||||
|
||||
|
||||
static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* pPost, SLogicNode** ppDynNode) {
|
||||
static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* pPost, bool* srcScan, SLogicNode** ppDynNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SDynQueryCtrlLogicNode* pDynCtrl = (SDynQueryCtrlLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL);
|
||||
if (NULL == pDynCtrl) {
|
||||
|
@ -3369,6 +3373,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p
|
|||
|
||||
pDynCtrl->qType = DYN_QTYPE_STB_HASH;
|
||||
pDynCtrl->stbJoin.batchFetch = true;
|
||||
memcpy(pDynCtrl->stbJoin.srcScan, srcScan, sizeof(pDynCtrl->stbJoin.srcScan));
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pDynCtrl->node.pChildren = nodesMakeList();
|
||||
|
@ -3423,12 +3428,14 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p
|
|||
SLogicNode* pHJoinNode = NULL;
|
||||
SLogicNode* pMJoinNode = NULL;
|
||||
SLogicNode* pDynNode = NULL;
|
||||
bool srcScan[2] = {0};
|
||||
|
||||
int32_t code = stbJoinOptCreateTagScanNode(pJoin, &pTagScanNodes);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbJoinOptCreateTagHashJoinNode(pJoin, pTagScanNodes, &pHJoinNode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbJoinOptCreateTableScanNodes(pJoin, &pTbScanNodes);
|
||||
code = stbJoinOptCreateTableScanNodes(pJoin, &pTbScanNodes, srcScan);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbJoinOptCreateGroupCacheNode(pTbScanNodes, &pGrpCacheNode);
|
||||
|
@ -3437,7 +3444,7 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p
|
|||
code = stbJoinOptCreateMergeJoinNode(pJoin, pGrpCacheNode, &pMJoinNode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbJoinOptCreateDynQueryCtrlNode(pHJoinNode, pMJoinNode, &pDynNode);
|
||||
code = stbJoinOptCreateDynQueryCtrlNode(pHJoinNode, pMJoinNode, srcScan, &pDynNode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = replaceLogicNode(pLogicSubplan, pJoin, (SLogicNode*)pDynNode);
|
||||
|
|
|
@ -1015,6 +1015,8 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList*
|
|||
code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pUidList, &pUidList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
memcpy(pDynCtrl->stbJoin.srcScan, pLogicNode->stbJoin.srcScan, sizeof(pDynCtrl->stbJoin.srcScan));
|
||||
|
||||
SNode* pNode = NULL;
|
||||
int32_t i = 0;
|
||||
FOREACH(pNode, pVgList) {
|
||||
|
|
|
@ -1152,11 +1152,11 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp
|
|||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, pJoin->node.pChildren) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
|
||||
if (pJoin->node.dynamicOp) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
//if (pJoin->node.dynamicOp) {
|
||||
// code = TSDB_CODE_SUCCESS;
|
||||
//} else {
|
||||
code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false);
|
||||
}
|
||||
//}
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
|
||||
code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
|
||||
} else {
|
||||
|
@ -1172,9 +1172,9 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp
|
|||
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (!pInfo->pSplitNode->dynamicOp) {
|
||||
//if (!pInfo->pSplitNode->dynamicOp) {
|
||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
}
|
||||
//}
|
||||
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
|
||||
}
|
||||
return code;
|
||||
|
|
|
@ -684,14 +684,14 @@ _return:
|
|||
QW_SET_PHASE(ctx, phase);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask);
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
qwReleaseTaskCtx(mgmt, ctx);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask);
|
||||
}
|
||||
|
||||
QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
|
||||
|
||||
QW_RET(code);
|
||||
|
|
|
@ -234,7 +234,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
|
|||
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
qInfo("taosHashGet hb connection not exists, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue