From ac35ae98393775b64347011757a888bb770529ca Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 3 Jul 2023 19:29:46 +0800 Subject: [PATCH] feat: add dynamic query ctrl operator --- include/libs/nodes/plannodes.h | 9 ++ source/libs/executor/inc/dynqueryctrl.h | 30 +++++++ source/libs/executor/inc/groupcache.h | 21 ++--- .../libs/executor/src/dynqueryctrloperator.c | 82 +++++++++++++++++++ source/libs/executor/src/groupcacheoperator.c | 44 ++++------ source/libs/executor/src/hashjoinoperator.c | 5 ++ source/libs/nodes/src/nodesMsgFuncs.c | 9 +- source/libs/planner/src/planOptimizer.c | 1 + source/libs/planner/src/planPhysiCreater.c | 1 + source/libs/planner/src/planSpliter.c | 10 ++- 10 files changed, 166 insertions(+), 46 deletions(-) create mode 100755 source/libs/executor/inc/dynqueryctrl.h create mode 100755 source/libs/executor/src/dynqueryctrloperator.c diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 700b4e7be8..6ef8a92667 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -325,6 +325,7 @@ typedef struct SDataBlockDescNode { typedef struct SPhysiNode { ENodeType type; + bool dynamicOp; EOrder inputTsOrder; EOrder outputTsOrder; SDataBlockDescNode* pOutputDataBlockDesc; @@ -443,9 +444,17 @@ typedef struct SGroupCachePhysiNode { SNodeList* pGroupCols; } SGroupCachePhysiNode; +typedef struct SStbJoinDynCtrlInfo { + int32_t vgSlot[2]; + int32_t uidSlot[2]; +} SStbJoinDynCtrlInfo; + typedef struct SDynQueryCtrlPhysiNode { SPhysiNode node; EDynQueryType qType; + union { + SStbJoinDynCtrlInfo stbJoin; + }; } SDynQueryCtrlPhysiNode; typedef struct SAggPhysiNode { diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h new file mode 100755 index 0000000000..d5cd52e004 --- /dev/null +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_DYNQUERYCTRL_H +#define TDENGINE_DYNQUERYCTRL_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SDynQueryCtrlOperatorInfo { + SStbJoinDynCtrlInfo ctrlInfo; +} SDynQueryCtrlOperatorInfo; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_DYNQUERYCTRL_H diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 9f74ec4e02..96aba1d7bc 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -21,17 +21,10 @@ extern "C" { #define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760 -typedef struct SGcSessionCtx { - SOperatorInfo* pDownstream; - bool cacheHit; - bool needCache; - SGcBlkBufInfo* pLastBlk; -} SGcSessionCtx; - typedef struct SGcOperatorParam { SOperatorBasicParam basic; int64_t sessionId; - int32_t downstreamKey; + int32_t downstreamIdx; bool needCache; void* pGroupValue; int32_t groupValueSize; @@ -71,18 +64,18 @@ typedef struct SGroupColsInfo { char* pData; } SGroupColsInfo; -typedef struct SGcDownstreamInfo { - SSHashObj* pKey2Idx; - SOperatorInfo** ppDownStream; - int32_t downStreamNum; -} SGcDownstreamInfo; +typedef struct SGcSessionCtx { + SOperatorInfo* pDownstream; + bool cacheHit; + bool needCache; + SGcBlkBufInfo* pLastBlk; +} SGcSessionCtx; typedef struct SGroupCacheOperatorInfo { SSHashObj* pSessionHash; SGroupColsInfo groupColsInfo; SArray* pBlkBufs; SSHashObj* pBlkHash; - SGcDownstreamInfo downstreamInfo; int64_t pCurrentId; SGcSessionCtx* pCurrent; } SGroupCacheOperatorInfo; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c new file mode 100755 index 0000000000..e9ee6bb970 --- /dev/null +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "operator.h" +#include "os.h" +#include "querynodes.h" +#include "querytask.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "thash.h" +#include "tmsg.h" +#include "ttypes.h" +#include "dynqueryctrl.h" + +static void destroyDynQueryCtrlOperator(void* param) { + SDynQueryCtrlOperatorInfo* pDynCtrlOperator = (SDynQueryCtrlOperatorInfo*)param; + + taosMemoryFreeClear(param); +} + +SSDataBlock* getBlockFromDynQueryCtrl(SOperatorInfo* pOperator) { + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + while (true) { + SSDataBlock* pBlock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); + if (NULL == pBlock) { + break; + } + + addBlkToGroupCache(pOperator, pBlock, &pRes); + } +} + +SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { + SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + int32_t code = TSDB_CODE_SUCCESS; + if (pOperator == NULL || pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + code = appendDownstream(pOperator, pDownstream, numOfDownstream); + if (TSDB_CODE_SUCCESS != code) { + goto _error; + } + + memcpy(&pInfo->ctrlInfo, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); + + setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED, pInfo, pTaskInfo); + + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getBlockFromDynQueryCtrl, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn, NULL, NULL, NULL); + + return pOperator; + +_error: + if (pInfo != NULL) { + destroyDynQueryCtrlOperator(pInfo); + } + + taosMemoryFree(pOperator); + pTaskInfo->code = code; + return NULL; +} + + diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index de94a55606..4822f1960e 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -74,14 +74,12 @@ static void destroyGroupCacheOperator(void* param) { taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage); tSimpleHashCleanup(pGrpCacheOperator->pSessionHash); tSimpleHashCleanup(pGrpCacheOperator->pBlkHash); - tSimpleHashCleanup(pGrpCacheOperator->downstreamInfo.pKey2Idx); - taosMemoryFree(pGrpCacheOperator->downstreamInfo.ppDownStream); taosMemoryFreeClear(param); } static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) { - SBufPageInfo page; + SGcBufPageInfo page; page.pageSize = GROUP_CACHE_DEFAULT_PAGE_SIZE; page.offset = 0; page.data = taosMemoryMalloc(page.pageSize); @@ -94,7 +92,7 @@ static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) { } static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo* pBlkInfo) { - SBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId); + SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId); return pPage->data + pBlkInfo->offset; } @@ -105,7 +103,7 @@ static FORCE_INLINE char* moveRetrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBuf SGcBlkBufInfo* pCurr = (*ppLastBlk)->next; *ppLastBlk = pCurr; if (pCurr) { - SBufPageInfo *pPage = taosArrayGet(pBlkBufs, pCurr->pageId); + SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pCurr->pageId); return pPage->data + pCurr->offset; } @@ -114,7 +112,7 @@ static FORCE_INLINE char* moveRetrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBuf static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { - pInfo->pBlkBufs = taosArrayInit(32, sizeof(SBufPageInfo)); + pInfo->pBlkBufs = taosArrayInit(32, sizeof(SGcBufPageInfo)); if (NULL == pInfo->pBlkBufs) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -128,47 +126,35 @@ static int32_t initGroupCacheDownstreamInfo(SGroupCachePhysiNode* pPhyciNode, SO return TSDB_CODE_OUT_OF_MEMORY; } memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES); - pInfo->pKey2Idx = tSimpleHashInit(numOfDownstream, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - if (NULL == pInfo->pKey2Idx) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - for (int32_t i = 0; i < numOfDownstream; ++i) { - int32_t keyValue = taosArrayGet(pPhyciNode, i); - tSimpleHashPut(pInfo->pKey2Idx, &keyValue, sizeof(keyValue), &i, sizeof(i)); - } return TSDB_CODE_SUCCESS; } -static int32_t initGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) { +static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) { SGcSessionCtx ctx = {0}; + SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupData* pGroup = tSimpleHashGet(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); if (pGroup) { ctx.cacheHit = true; ctx.pLastBlk = pGroup->blks; } else { - int32_t* pIdx = tSimpleHashGet(pGCache->downstreamInfo.pKey2Idx, &pParam->downstreamKey, sizeof(pParam->downstreamKey)); - if (NULL == pIdx) { - qError("Invalid downstream key value: %d", pParam->downstreamKey); - return TSDB_CODE_INVALID_PARA; - } - ctx.pDownstream = pGCache->downstreamInfo.ppDownStream[*pIdx]; + ctx.pDownstream = pOperator->pDownstream[pParam->downstreamIdx]; ctx.needCache = pParam->needCache; } return TSDB_CODE_SUCCESS; } -static void getFromSessionCache(SExecTaskInfo* pTaskInfo, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) { +static void getFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pParam->basic.newExec) { - int32_t code = initGroupCacheSession(pGCache, pParam, ppSession); + int32_t code = initGroupCacheSession(pOperator, pParam, ppSession); if (TSDB_CODE_SUCCESS != code) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } if ((*ppSession)->pLastBlk) { - *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, (*ppSession)->pLastBlk); + *ppRes = (SSDataBlock*)retrieveBlkFromBlkBufs(pGCache->pBlkBufs, (*ppSession)->pLastBlk); } else { *ppRes = NULL; } @@ -184,7 +170,7 @@ static void getFromSessionCache(SExecTaskInfo* pTaskInfo, SGroupCacheOperatorInf *ppSession = pCtx; if (pCtx->cacheHit) { - *ppRes = moveRetrieveBlkFromBlkBufs(pGCache->pBlkBufs, &pCtx->pLastBlk); + *ppRes = (SSDataBlock*)moveRetrieveBlkFromBlkBufs(pGCache->pBlkBufs, &pCtx->pLastBlk); return; } @@ -223,7 +209,7 @@ static SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator, SOperator return NULL; } - getFromSessionCache(pTaskInfo, pGCache, pParam, &pRes, &pSession); + getFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession); pGCache->pCurrent = pSession; pGCache->pCurrentId = pParam->sessionId; @@ -283,8 +269,8 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - code = initGroupCacheDownstreamInfo(pPhyciNode, pDownstream, numOfDownstream, &pInfo->downstreamInfo); - if (code) { + code = appendDownstream(pOperator, pDownstream, numOfDownstream); + if (TSDB_CODE_SUCCESS != code) { goto _error; } diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index aab81da801..b700eec374 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -810,6 +810,11 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n goto _error; } + code = appendDownstream(pOperator, pDownstream, numOfDownstream); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, NULL, NULL); return pOperator; diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index bd93046b9d..4a1a6281a2 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -1853,7 +1853,8 @@ enum { PHY_NODE_CODE_LIMIT, PHY_NODE_CODE_SLIMIT, PHY_NODE_CODE_INPUT_TS_ORDER, - PHY_NODE_CODE_OUTPUT_TS_ORDER + PHY_NODE_CODE_OUTPUT_TS_ORDER, + PHY_NODE_CODE_DYNAMIC_OP, }; static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -1878,6 +1879,9 @@ static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_NODE_CODE_DYNAMIC_OP, pNode->dynamicOp); + } return code; } @@ -1910,6 +1914,9 @@ static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) { case PHY_NODE_CODE_OUTPUT_TS_ORDER: code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder)); break; + case PHY_NODE_CODE_DYNAMIC_OP: + code = tlvDecodeBool(pTlv, &pNode->dynamicOp); + break; default: break; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 31dad29cce..41cdf5a36f 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3210,6 +3210,7 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL break; } pScan->node.dynamicOp = true; + pScan->scanType = SCAN_TYPE_TABLE; } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index c3c69d7b39..72fc0af453 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -366,6 +366,7 @@ static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode TSWAP(pPhysiNode->pLimit, pLogicNode->pLimit); TSWAP(pPhysiNode->pSlimit, pLogicNode->pSlimit); + pPhysiNode->dynamicOp = pLogicNode->dynamicOp; pPhysiNode->inputTsOrder = pLogicNode->inputTsOrder; pPhysiNode->outputTsOrder = pLogicNode->outputTsOrder; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index c1b5f3ef18..0d7ca3dd6d 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1204,7 +1204,11 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp SNode* pChild = NULL; FOREACH(pChild, pJoin->node.pChildren) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) { - code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false); + if (pJoin->node.dynamicOp) { + code = TSDB_CODE_SUCCESS; + } else { + code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false); + } } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) { code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild); } else { @@ -1220,7 +1224,9 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode); if (TSDB_CODE_SUCCESS == code) { - pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + if (!pInfo->pSplitNode->dynamicOp) { + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + } SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); } return code;