feat: add group cache operator

This commit is contained in:
dapan1121 2023-06-30 19:20:36 +08:00
parent 7dddfd0a5a
commit 60290b98e4
23 changed files with 369 additions and 41 deletions

View File

@ -278,11 +278,11 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE,
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
} ENodeType;
/**

View File

@ -159,6 +159,7 @@ typedef struct SInterpFuncLogicNode {
typedef struct SGroupCacheLogicNode {
SLogicNode node;
bool grpColsMayBeNull;
SNodeList* pGroupCols;
} SGroupCacheLogicNode;
@ -437,6 +438,7 @@ typedef struct SHashJoinPhysiNode {
typedef struct SGroupCachePhysiNode {
SPhysiNode node;
bool grpColsMayBeNull;
SNodeList* pGroupCols;
} SGroupCachePhysiNode;

View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_GROUPCACHE_H
#define TDENGINE_GROUPCACHE_H
#ifdef __cplusplus
extern "C" {
#endif
#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760
typedef struct SGcSessionRes {
} SGcSessionRes;
typedef struct SGcOperatorParam {
int64_t sessionId;
bool newFetch;
void* pGroupValue;
int32_t groupValueSize;
} SGcOperatorParam;
#pragma pack(push, 1)
typedef struct SGcBlkBufInfo {
void* next;
uint16_t pageId;
int32_t offset;
} SGcBlkBufInfo;
#pragma pack(pop)
typedef struct SGcBufPageInfo {
int32_t pageSize;
int32_t offset;
char* data;
} SGcBufPageInfo;
typedef struct SGroupData {
SGcBlkBufInfo* blks;
} SGroupData;
typedef struct SGroupColInfo {
int32_t slot;
bool vardata;
int32_t bytes;
} SGroupColInfo;
typedef struct SGroupColsInfo {
int32_t colNum;
bool withNull;
SGroupColInfo* pColsInfo;
int32_t bitMapSize;
int32_t bufSize;
char* pBuf;
char* pData;
} SGroupColsInfo;
typedef struct SGroupCacheOperatorInfo {
SSHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
SArray* pBlkBufs;
SSHashObj* pBlkHash;
SOperatorInfo** ppDownStream;
int32_t downStreamNum;
} SGroupCacheOperatorInfo;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_GROUPCACHE_H

View File

@ -27,6 +27,11 @@ typedef struct SOperatorCostInfo {
struct SOperatorInfo;
typedef struct SOperatorParam {
int32_t opType;
void* value;
} SOperatorParam;
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
@ -35,6 +40,8 @@ typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param);
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_get_ext_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param);
typedef SSDataBlock* (*__optr_notify_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param);
typedef struct SOperatorFpSet {
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
@ -45,6 +52,8 @@ typedef struct SOperatorFpSet {
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
__optr_get_ext_fn_t getNextExtFn;
__optr_notify_fn_t notifyFn;
} SOperatorFpSet;
enum {
@ -144,7 +153,8 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
// clang-format on
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
__optr_explain_fn_t explain, __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn);
int32_t optrDummyOpenFn(SOperatorInfo* pOperator);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
void setOperatorCompleted(SOperatorInfo* pOperator);
@ -160,6 +170,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI);
int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
void * getOperatorParam(int32_t opType, SOperatorParam* param);
#ifdef __cplusplus
}

View File

@ -114,7 +114,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info;

View File

@ -134,7 +134,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL, NULL, NULL);
pOperator->cost.openCost = 0;
return pOperator;

View File

@ -133,7 +133,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -316,7 +316,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
}
pOperator->fpSet =
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
_error:

View File

@ -444,7 +444,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
@ -1561,7 +1561,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -0,0 +1,204 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "groupcache.h"
static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) {
pCols->colNum = LIST_LENGTH(pList);
pCols->withNull = grpColsMayBeNull;
pCols->pColsInfo = taosMemoryMalloc(pCols->colNum * sizeof(SGroupColInfo));
if (NULL == pCols->pColsInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t i = 0;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
SColumnNode* pColNode = (SColumnNode*)pNode;
pCols->pColsInfo[i].slot = pColNode->slotId;
pCols->pColsInfo[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
pCols->pColsInfo[i].bytes = pColNode->node.resType.bytes;
pCols->bufSize += pColNode->node.resType.bytes;
++i;
}
if (pCols->withNull) {
pCols->bitMapSize = pCols->colNum / sizeof(int8_t) + ((pCols->colNum % sizeof(int8_t)) ? 1 : 0);
pCols->bufSize += pCols->bitMapSize;
}
if (pCols->colNum > 1) {
pCols->pBuf = taosMemoryMalloc(pCols->bufSize);
if (NULL == pCols->pBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static void freeGroupCacheBufPage(void* param) {
SGcBufPageInfo* pInfo = (SGcBufPageInfo*)param;
taosMemoryFree(pInfo->data);
}
static void destroyGroupCacheOperator(void* param) {
SGroupCacheOperatorInfo* pGrpCacheOperator = (SGroupCacheOperatorInfo*)param;
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage);
taosMemoryFree(pGrpCacheOperator->ppDownStream);
taosMemoryFreeClear(param);
}
static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) {
SBufPageInfo page;
page.pageSize = GROUP_CACHE_DEFAULT_PAGE_SIZE;
page.offset = 0;
page.data = taosMemoryMalloc(page.pageSize);
if (NULL == page.data) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBlkBufs, &page);
return TSDB_CODE_SUCCESS;
}
static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
pInfo->pBlkBufs = taosArrayInit(32, sizeof(SBufPageInfo));
if (NULL == pInfo->pBlkBufs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return addPageToGroupCacheBuf(pInfo->pBlkBufs);
}
static int32_t getFromSessionCache(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes) {
}
static SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator, SOperatorParam* param) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pRes = NULL;
SGcOperatorParam* pParam = getOperatorParam(pOperator->operatorType, param)
if (NULL == pParam || pOperator->status == OP_EXEC_DONE) {
return NULL;
}
code = getFromSessionCache(pGCache, pParam, &pRes, );
while (true) {
SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream);
if (NULL == pBlock) {
setHJoinDone(pOperator);
break;
}
code = launchBlockHashJoin(pOperator, pBlock);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
if (pOperator->exprSupp.pFilterInfo != NULL) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
if (pRes->info.rows > 0) {
break;
}
}
return (pRes->info.rows > 0) ? pRes : NULL;
}
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols);
if (code) {
goto _error;
}
code = initGroupCacheBufPages(pInfo);
if (code) {
goto _error;
}
pInfo->pBlkHash = tSimpleHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (pInfo->pBlkHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pInfo->pSessionHash = tSimpleHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pInfo->pSessionHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pInfo->ppDownStream = taosMemoryMalloc(numOfDownstream * POINTER_BYTES);
if (NULL == pInfo->ppDownStream) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, getFromGroupCache, NULL);
return pOperator;
_error:
if (pInfo != NULL) {
destroyGroupCacheOperator(pInfo);
}
taosMemoryFree(pOperator);
pTaskInfo->code = code;
return NULL;
}

View File

@ -480,7 +480,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -906,7 +906,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -1323,7 +1323,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL,
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1);

View File

@ -810,7 +810,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;

View File

@ -271,7 +271,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->rightBuildTable = tSimpleHashInit(256, hashFn);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;

View File

@ -30,7 +30,7 @@
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
__optr_explain_fn_t explain) {
__optr_explain_fn_t explain, __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) {
SOperatorFpSet fpSet = {
._openFn = openFn,
.getNextFn = nextFn,
@ -38,6 +38,8 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
.closeFn = closeFn,
.reqBufFn = reqBufFn,
.getExplainFn = explain,
.getNextExtFn = nextExtFn,
.notifyFn = notifyFn
};
return fpSet;
@ -422,6 +424,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else {
terrno = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = terrno;
return NULL;
}
@ -435,6 +438,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
size_t size = LIST_LENGTH(pPhyNode->pChildren);
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
if (ops == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pTaskInfo->code = terrno;
return NULL;
}
@ -503,6 +508,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) {
pOptr = createHashJoinOperatorInfo(ops, size, (SHashJoinPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
@ -513,8 +520,13 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
pOptr = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo);
} else {
terrno = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = terrno;
taosMemoryFree(ops);
return NULL;
}
@ -579,3 +591,11 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
return TSDB_CODE_SUCCESS;
}
void *getOperatorParam(int32_t opType, SOperatorParam* param) {
if (NULL == param || opType != param->opType) {
return NULL;
}
return param->value;
}

View File

@ -135,7 +135,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -416,7 +416,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -981,7 +981,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
optrDefaultBufFn, getTableScannerExecInfo);
optrDefaultBufFn, getTableScannerExecInfo, NULL, NULL);
// for non-blocking operator, the open cost is always 0
pOperator->cost.openCost = 0;
@ -1006,7 +1006,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
}
@ -2252,7 +2252,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
_end:
@ -2474,7 +2474,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
__optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
@ -2639,7 +2639,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
@ -3094,7 +3094,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
optrDefaultBufFn, getTableMergeScanExplainExecInfo);
optrDefaultBufFn, getTableMergeScanExplainExecInfo, NULL, NULL);
pOperator->cost.openCost = 0;
return pOperator;
@ -3244,7 +3244,7 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
_error:

View File

@ -87,7 +87,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
// TODO dynamic set the available sort buffer
pOperator->fpSet =
createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo);
createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -552,7 +552,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
optrDefaultBufFn, getGroupSortExplainExecInfo);
optrDefaultBufFn, getGroupSortExplainExecInfo, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -841,7 +841,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo);
destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, NULL, NULL);
code = appendDownstream(pOperator, downStreams, numStreams);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -1783,7 +1783,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
_error:
@ -2320,7 +2320,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
_error:

View File

@ -998,7 +998,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

View File

@ -1684,7 +1684,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -1907,7 +1907,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -1983,7 +1983,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -2834,7 +2834,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
}
@ -3670,7 +3670,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
@ -3826,7 +3826,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
}
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
@ -4203,7 +4203,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -4484,7 +4484,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -4771,7 +4771,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo,
optrDefaultBufFn, NULL);
optrDefaultBufFn, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -5027,7 +5027,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL);
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;

View File

@ -1182,12 +1182,16 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkGroupCacheLogicPlanGrpColsMayBeNull = "GroupColsMayBeNull";
static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols";
static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
const SGroupCacheLogicNode* pNode = (const SGroupCacheLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGrpColsMayBeNull, pNode->grpColsMayBeNull);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols);
}
@ -1199,6 +1203,9 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) {
SGroupCacheLogicNode* pNode = (SGroupCacheLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGrpColsMayBeNull, &pNode->grpColsMayBeNull);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols);
}

View File

@ -3230,6 +3230,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode**
}
pGrpCache->node.dynamicOp = true;
pGrpCache->grpColsMayBeNull = false;
pGrpCache->node.pChildren = pChildren;
pGrpCache->node.pTargets = nodesMakeList();
if (NULL == pGrpCache->node.pTargets) {

View File

@ -984,6 +984,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
return TSDB_CODE_OUT_OF_MEMORY;
}
pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull;
SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) {