diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index daffc0624a..59840b5a1e 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -278,11 +278,11 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_DELETE, QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, - QUERY_NODE_PHYSICAL_SUBPLAN, - QUERY_NODE_PHYSICAL_PLAN, QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT, - QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT + QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, + QUERY_NODE_PHYSICAL_SUBPLAN, + QUERY_NODE_PHYSICAL_PLAN } ENodeType; /** diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index eecfae0812..79add06cfd 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -159,6 +159,7 @@ typedef struct SInterpFuncLogicNode { typedef struct SGroupCacheLogicNode { SLogicNode node; + bool grpColsMayBeNull; SNodeList* pGroupCols; } SGroupCacheLogicNode; @@ -437,6 +438,7 @@ typedef struct SHashJoinPhysiNode { typedef struct SGroupCachePhysiNode { SPhysiNode node; + bool grpColsMayBeNull; SNodeList* pGroupCols; } SGroupCachePhysiNode; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h new file mode 100755 index 0000000000..b1d45cc0f5 --- /dev/null +++ b/source/libs/executor/inc/groupcache.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_GROUPCACHE_H +#define TDENGINE_GROUPCACHE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760 + +typedef struct SGcSessionRes { + +} SGcSessionRes; + +typedef struct SGcOperatorParam { + int64_t sessionId; + bool newFetch; + void* pGroupValue; + int32_t groupValueSize; +} SGcOperatorParam; + +#pragma pack(push, 1) +typedef struct SGcBlkBufInfo { + void* next; + uint16_t pageId; + int32_t offset; +} SGcBlkBufInfo; +#pragma pack(pop) + +typedef struct SGcBufPageInfo { + int32_t pageSize; + int32_t offset; + char* data; +} SGcBufPageInfo; + +typedef struct SGroupData { + SGcBlkBufInfo* blks; +} SGroupData; + +typedef struct SGroupColInfo { + int32_t slot; + bool vardata; + int32_t bytes; +} SGroupColInfo; + +typedef struct SGroupColsInfo { + int32_t colNum; + bool withNull; + SGroupColInfo* pColsInfo; + int32_t bitMapSize; + int32_t bufSize; + char* pBuf; + char* pData; +} SGroupColsInfo; + +typedef struct SGroupCacheOperatorInfo { + SSHashObj* pSessionHash; + SGroupColsInfo groupColsInfo; + SArray* pBlkBufs; + SSHashObj* pBlkHash; + SOperatorInfo** ppDownStream; + int32_t downStreamNum; +} SGroupCacheOperatorInfo; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_GROUPCACHE_H diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index c18edd870f..8281f6484f 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -27,6 +27,11 @@ typedef struct SOperatorCostInfo { struct SOperatorInfo; +typedef struct SOperatorParam { + int32_t opType; + void* value; +} SOperatorParam; + typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); @@ -35,6 +40,8 @@ typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); typedef void (*__optr_close_fn_t)(void* param); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); +typedef SSDataBlock* (*__optr_get_ext_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param); +typedef SSDataBlock* (*__optr_notify_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param); typedef struct SOperatorFpSet { __optr_open_fn_t _openFn; // DO NOT invoke this function directly @@ -45,6 +52,8 @@ typedef struct SOperatorFpSet { __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; __optr_explain_fn_t getExplainFn; + __optr_get_ext_fn_t getNextExtFn; + __optr_notify_fn_t notifyFn; } SOperatorFpSet; enum { @@ -144,7 +153,8 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo // clang-format on SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, - __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain); + __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, + __optr_explain_fn_t explain, __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn); int32_t optrDummyOpenFn(SOperatorInfo* pOperator); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); void setOperatorCompleted(SOperatorInfo* pOperator); @@ -160,6 +170,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI); int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList); +void * getOperatorParam(int32_t opType, SOperatorParam* param); #ifdef __cplusplus } diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index be0ad1c239..5e28d73084 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -114,7 +114,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index ce39ebab59..0732cdaa3d 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -134,7 +134,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL, NULL, NULL); pOperator->cost.openCost = 0; return pOperator; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index bbdc50183e..918c51d5d4 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -133,7 +133,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 4fbe6785a3..b3aa41a1a5 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -316,7 +316,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode } pOperator->fpSet = - createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL); + createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); return pOperator; _error: diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 92152924f9..64e2bd4020 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -444,7 +444,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -1561,7 +1561,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c new file mode 100755 index 0000000000..400416dc20 --- /dev/null +++ b/source/libs/executor/src/groupcacheoperator.c @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "operator.h" +#include "os.h" +#include "querynodes.h" +#include "querytask.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "thash.h" +#include "tmsg.h" +#include "ttypes.h" +#include "groupcache.h" + +static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) { + pCols->colNum = LIST_LENGTH(pList); + pCols->withNull = grpColsMayBeNull; + pCols->pColsInfo = taosMemoryMalloc(pCols->colNum * sizeof(SGroupColInfo)); + if (NULL == pCols->pColsInfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t i = 0; + SNode* pNode = NULL; + FOREACH(pNode, pList) { + SColumnNode* pColNode = (SColumnNode*)pNode; + pCols->pColsInfo[i].slot = pColNode->slotId; + pCols->pColsInfo[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + pCols->pColsInfo[i].bytes = pColNode->node.resType.bytes; + pCols->bufSize += pColNode->node.resType.bytes; + ++i; + } + + if (pCols->withNull) { + pCols->bitMapSize = pCols->colNum / sizeof(int8_t) + ((pCols->colNum % sizeof(int8_t)) ? 1 : 0); + pCols->bufSize += pCols->bitMapSize; + } + + if (pCols->colNum > 1) { + pCols->pBuf = taosMemoryMalloc(pCols->bufSize); + if (NULL == pCols->pBuf) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + +static void freeGroupCacheBufPage(void* param) { + SGcBufPageInfo* pInfo = (SGcBufPageInfo*)param; + taosMemoryFree(pInfo->data); +} + +static void destroyGroupCacheOperator(void* param) { + SGroupCacheOperatorInfo* pGrpCacheOperator = (SGroupCacheOperatorInfo*)param; + + taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo); + taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf); + taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage); + taosMemoryFree(pGrpCacheOperator->ppDownStream); + + taosMemoryFreeClear(param); +} + +static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) { + SBufPageInfo page; + page.pageSize = GROUP_CACHE_DEFAULT_PAGE_SIZE; + page.offset = 0; + page.data = taosMemoryMalloc(page.pageSize); + if (NULL == page.data) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pBlkBufs, &page); + return TSDB_CODE_SUCCESS; +} + +static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { + pInfo->pBlkBufs = taosArrayInit(32, sizeof(SBufPageInfo)); + if (NULL == pInfo->pBlkBufs) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return addPageToGroupCacheBuf(pInfo->pBlkBufs); +} + +static int32_t getFromSessionCache(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes) { + +} + +static SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator, SOperatorParam* param) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pRes = NULL; + SGcOperatorParam* pParam = getOperatorParam(pOperator->operatorType, param) + if (NULL == pParam || pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + code = getFromSessionCache(pGCache, pParam, &pRes, ); + + while (true) { + SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); + if (NULL == pBlock) { + setHJoinDone(pOperator); + break; + } + + code = launchBlockHashJoin(pOperator, pBlock); + if (code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } + + if (pRes->info.rows < pOperator->resultInfo.threshold) { + continue; + } + + if (pOperator->exprSupp.pFilterInfo != NULL) { + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + } + if (pRes->info.rows > 0) { + break; + } + } + + return (pRes->info.rows > 0) ? pRes : NULL; +} + + + +SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { + SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + int32_t code = TSDB_CODE_SUCCESS; + if (pOperator == NULL || pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo); + + code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols); + if (code) { + goto _error; + } + + code = initGroupCacheBufPages(pInfo); + if (code) { + goto _error; + } + + pInfo->pBlkHash = tSimpleHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (pInfo->pBlkHash == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + pInfo->pSessionHash = tSimpleHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (pInfo->pSessionHash == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + pInfo->ppDownStream = taosMemoryMalloc(numOfDownstream * POINTER_BYTES); + if (NULL == pInfo->ppDownStream) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES); + + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, getFromGroupCache, NULL); + + return pOperator; + +_error: + if (pInfo != NULL) { + destroyGroupCacheOperator(pInfo); + } + + taosMemoryFree(pOperator); + pTaskInfo->code = code; + return NULL; +} + + diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 1489e3fbdb..d072b45e4e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -480,7 +480,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -906,7 +906,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -1323,7 +1323,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, - destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL); + destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 311a2d1172..aab81da801 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -810,7 +810,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, NULL, NULL); return pOperator; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 7b526a95d8..1e010f5ae0 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -271,7 +271,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 730252c7ee..0ffddbd3f6 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -30,7 +30,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, - __optr_explain_fn_t explain) { + __optr_explain_fn_t explain, __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) { SOperatorFpSet fpSet = { ._openFn = openFn, .getNextFn = nextFn, @@ -38,6 +38,8 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, .closeFn = closeFn, .reqBufFn = reqBufFn, .getExplainFn = explain, + .getNextExtFn = nextExtFn, + .notifyFn = notifyFn }; return fpSet; @@ -422,6 +424,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else { terrno = TSDB_CODE_INVALID_PARA; + pTaskInfo->code = terrno; return NULL; } @@ -435,6 +438,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR size_t size = LIST_LENGTH(pPhyNode->pChildren); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); if (ops == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = terrno; return NULL; } @@ -503,6 +508,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) { + pOptr = createHashJoinOperatorInfo(ops, size, (SHashJoinPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) { @@ -513,8 +520,13 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) { pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) { + pOptr = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) { + pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo); } else { terrno = TSDB_CODE_INVALID_PARA; + pTaskInfo->code = terrno; taosMemoryFree(ops); return NULL; } @@ -579,3 +591,11 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf return TSDB_CODE_SUCCESS; } + +void *getOperatorParam(int32_t opType, SOperatorParam* param) { + if (NULL == param || opType != param->opType) { + return NULL; + } + return param->value; +} + diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index cd450c5bb7..e2dfebe818 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -135,7 +135,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -416,7 +416,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5fd52d3831..5dd0383fe9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -981,7 +981,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, - optrDefaultBufFn, getTableScannerExecInfo); + optrDefaultBufFn, getTableScannerExecInfo, NULL, NULL); // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; @@ -1006,7 +1006,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL, NULL, NULL); return pOperator; } @@ -2252,7 +2252,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); return pOperator; _end: @@ -2474,7 +2474,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); return pOperator; @@ -2639,7 +2639,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); return pOperator; @@ -3094,7 +3094,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pOperator->exprSupp.numOfExprs = numOfCols; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo, - optrDefaultBufFn, getTableMergeScanExplainExecInfo); + optrDefaultBufFn, getTableMergeScanExplainExecInfo, NULL, NULL); pOperator->cost.openCost = 0; return pOperator; @@ -3244,7 +3244,7 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); return pOperator; _error: diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 20fb588a02..8a5ec610f8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -87,7 +87,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* // TODO dynamic set the available sort buffer pOperator->fpSet = - createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo); + createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -552,7 +552,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo, - optrDefaultBufFn, getGroupSortExplainExecInfo); + optrDefaultBufFn, getGroupSortExplainExecInfo, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -841,7 +841,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, - destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo); + destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, NULL, NULL); code = appendDownstream(pOperator, downStreams, numStreams); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 70fe42595e..cf238266b3 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1783,7 +1783,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, NULL, NULL); return pOperator; _error: @@ -2320,7 +2320,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); return pOperator; _error: diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 022440b2ad..06daa2c201 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -998,7 +998,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL); + createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 78d1e97554..3e1b19ca1c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1684,7 +1684,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -1907,7 +1907,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -1983,7 +1983,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2834,7 +2834,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { initIntervalDownStream(downstream, pPhyNode->type, pInfo); } @@ -3670,7 +3670,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); if (downstream) { initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); @@ -3826,7 +3826,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, - destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); + destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); } setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4203,7 +4203,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4484,7 +4484,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, false, OP_NOT_OPENED, miaInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4771,7 +4771,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, - optrDefaultBufFn, NULL); + optrDefaultBufFn, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5027,7 +5027,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, - destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); + destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); pInfo->statestore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 97df998aa3..7e5b19a7fb 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1182,12 +1182,16 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkGroupCacheLogicPlanGrpColsMayBeNull = "GroupColsMayBeNull"; static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols"; static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) { const SGroupCacheLogicNode* pNode = (const SGroupCacheLogicNode*)pObj; int32_t code = logicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGrpColsMayBeNull, pNode->grpColsMayBeNull); + } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols); } @@ -1199,6 +1203,9 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) { SGroupCacheLogicNode* pNode = (SGroupCacheLogicNode*)pObj; int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGrpColsMayBeNull, &pNode->grpColsMayBeNull); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols); } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index a9de5ce309..31dad29cce 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3230,6 +3230,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** } pGrpCache->node.dynamicOp = true; + pGrpCache->grpColsMayBeNull = false; pGrpCache->node.pChildren = pChildren; pGrpCache->node.pTargets = nodesMakeList(); if (NULL == pGrpCache->node.pTargets) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 31bf048917..c3c69d7b39 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -984,6 +984,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh return TSDB_CODE_OUT_OF_MEMORY; } + pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull; SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; int32_t code = TSDB_CODE_SUCCESS; if (TSDB_CODE_SUCCESS == code) {