Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
b53dc5602e
|
@ -173,6 +173,7 @@ typedef enum EStreamType {
|
|||
STREAM_CHECKPOINT,
|
||||
STREAM_CREATE_CHILD_TABLE,
|
||||
STREAM_TRANS_STATE,
|
||||
STREAM_MID_RETRIEVE,
|
||||
} EStreamType;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
|
|
@ -211,6 +211,7 @@ extern int32_t tsUptimeInterval;
|
|||
|
||||
extern bool tsDisableStream;
|
||||
extern int64_t tsStreamBufferSize;
|
||||
extern int tsStreamAggCnt;
|
||||
extern bool tsFilterScalarMode;
|
||||
extern int32_t tsMaxStreamBackendCache;
|
||||
extern int32_t tsPQSortMemThreshold;
|
||||
|
|
|
@ -435,6 +435,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL,
|
||||
} ENodeType;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -244,7 +244,7 @@ bool fmIsSkipScanCheckFunc(int32_t funcId);
|
|||
void getLastCacheDataType(SDataType* pType);
|
||||
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);
|
||||
|
||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
|
||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc);
|
||||
|
||||
typedef enum EFuncDataRequired {
|
||||
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
||||
|
|
|
@ -258,6 +258,7 @@ typedef enum EWindowAlgorithm {
|
|||
SESSION_ALGO_STREAM_FINAL,
|
||||
SESSION_ALGO_STREAM_SINGLE,
|
||||
SESSION_ALGO_MERGE,
|
||||
INTERVAL_ALGO_STREAM_MID,
|
||||
} EWindowAlgorithm;
|
||||
|
||||
typedef struct SWindowLogicNode {
|
||||
|
@ -590,6 +591,7 @@ typedef SIntervalPhysiNode SMergeAlignedIntervalPhysiNode;
|
|||
typedef SIntervalPhysiNode SStreamIntervalPhysiNode;
|
||||
typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode;
|
||||
typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode;
|
||||
typedef SIntervalPhysiNode SStreamMidIntervalPhysiNode;
|
||||
|
||||
typedef struct SFillPhysiNode {
|
||||
SPhysiNode node;
|
||||
|
|
|
@ -747,7 +747,6 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
|||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||
|
||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
||||
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
||||
|
||||
typedef struct SStreamTaskCheckpointReq {
|
||||
|
@ -764,7 +763,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
|||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq);
|
||||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||
|
||||
void streamTaskInputFail(SStreamTask* pTask);
|
||||
|
@ -890,6 +889,9 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
|
|||
|
||||
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
||||
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
|
||||
|
||||
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req);
|
||||
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -681,8 +681,9 @@ void taos_init_imp(void) {
|
|||
snprintf(logDirName, 64, "taoslog");
|
||||
#endif
|
||||
if (taosCreateLog(logDirName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||
// ignore create log failed, only print
|
||||
printf(" WARING: Create %s failed:%s. configDir=%s\n", logDirName, strerror(errno), configDir);
|
||||
tscInitRes = -1;
|
||||
return;
|
||||
}
|
||||
|
||||
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||
|
@ -750,8 +751,11 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
tstrncpy(configDir, str, PATH_MAX);
|
||||
tscInfo("set cfg:%s to %s", configDir, str);
|
||||
return 0;
|
||||
} else {
|
||||
taos_init(); // initialize global config
|
||||
}
|
||||
|
||||
// initialize global config
|
||||
if (taos_init() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConfig *pCfg = taosGetCfg();
|
||||
|
|
|
@ -265,6 +265,7 @@ bool tsDisableStream = false;
|
|||
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
|
||||
bool tsFilterScalarMode = false;
|
||||
int tsResolveFQDNRetryTime = 100; // seconds
|
||||
int tsStreamAggCnt = 1000;
|
||||
|
||||
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
|
||||
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";
|
||||
|
@ -750,6 +751,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_ENT_SERVER) != 0)
|
||||
|
@ -1213,6 +1216,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
|
||||
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
|
||||
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
|
||||
tsStreamAggCnt = cfgGetItem(pCfg, "streamAggCnt")->i32;
|
||||
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
|
||||
tsStreamCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i32;
|
||||
tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->fval;
|
||||
|
||||
|
|
|
@ -697,6 +697,8 @@ typedef struct {
|
|||
|
||||
// 3.0.5.
|
||||
int64_t checkpointId;
|
||||
|
||||
int32_t indexForMultiAggBalance;
|
||||
char reserve[256];
|
||||
|
||||
} SStreamObj;
|
||||
|
|
|
@ -27,9 +27,6 @@
|
|||
#define SINK_NODE_LEVEL (0)
|
||||
extern bool tsDeployOnSnode;
|
||||
|
||||
static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
|
||||
SEpSet* pEpset, bool isFillhistory);
|
||||
|
||||
static bool hasCountWindowNode(SPhysiNode* pNode) {
|
||||
if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
|
||||
return true;
|
||||
|
@ -111,6 +108,8 @@ END:
|
|||
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
|
||||
STaskOutputInfo* pInfo = &pTask->outputInfo;
|
||||
|
||||
mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
|
||||
|
||||
if (pStream->smaId != 0) {
|
||||
pInfo->type = TASK_OUTPUT__SMA;
|
||||
pInfo->smaSink.smaId = pStream->smaId;
|
||||
|
@ -179,12 +178,7 @@ int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan
|
|||
|
||||
plan->execNode.nodeId = pTask->info.nodeId;
|
||||
plan->execNode.epSet = pTask->info.epSet;
|
||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
|
||||
}
|
||||
|
||||
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
|
||||
|
@ -206,32 +200,79 @@ int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan*
|
|||
plan->execNode.epSet = pTask->info.epSet;
|
||||
mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
|
||||
|
||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
|
||||
}
|
||||
|
||||
// todo random choose a node to do compute
|
||||
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
|
||||
// random choose a node to do compute
|
||||
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
|
||||
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
|
||||
if (pDbObj == NULL) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if(pStream->indexForMultiAggBalance == -1){
|
||||
taosSeedRand(taosSafeRand());
|
||||
pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
|
||||
}
|
||||
|
||||
int32_t index = 0;
|
||||
void* pIter = NULL;
|
||||
SVgObj* pVgroup = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != dbUid) {
|
||||
if (pVgroup->dbUid != pStream->sourceDbUid) {
|
||||
sdbRelease(pMnode->pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return pVgroup;
|
||||
if (index++ == pStream->indexForMultiAggBalance){
|
||||
pStream->indexForMultiAggBalance++;
|
||||
pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
break;
|
||||
}
|
||||
sdbRelease(pMnode->pSdb, pVgroup);
|
||||
}
|
||||
sdbRelease(pMnode->pSdb, pDbObj);
|
||||
|
||||
return pVgroup;
|
||||
}
|
||||
|
||||
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup,
|
||||
SEpSet* pEpset, bool isFillhistory) {
|
||||
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory);
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
mDebug("doAddSinkTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
||||
|
||||
pTask->info.nodeId = pVgroup->vgId;
|
||||
pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
return mndSetSinkTaskInfo(pStream, pTask);
|
||||
}
|
||||
|
||||
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj){
|
||||
int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
if(pStream->conf.fillHistory){
|
||||
code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// create sink node for each vgroup.
|
||||
int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset, bool fillHistory) {
|
||||
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
void* pIter = NULL;
|
||||
|
||||
|
@ -247,27 +288,16 @@ int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStr
|
|||
continue;
|
||||
}
|
||||
|
||||
doAddSinkTask(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory);
|
||||
int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
|
||||
if(code != 0){
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return code;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
|
||||
SEpSet* pEpset, bool isFillhistory) {
|
||||
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||
SStreamTask* pTask =
|
||||
tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, pTaskList, pStream->conf.fillHistory);
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTask->info.nodeId = vgId;
|
||||
pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
mndSetSinkTaskInfo(pStream, pTask);
|
||||
return 0;
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
|
||||
|
@ -312,52 +342,46 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
|
||||
SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, int64_t skey,
|
||||
SArray* pVerList, bool fillHistory, bool hasExtraSink, bool hasFillHistory) {
|
||||
int64_t t = pStream->conf.triggerParam;
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, fillHistory, t, pTaskList, hasFillHistory);
|
||||
if (pTask == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
||||
|
||||
// sink or dispatch
|
||||
if (hasExtraSink) {
|
||||
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, pTask);
|
||||
} else {
|
||||
mndSetSinkTaskInfo(pStream, pTask);
|
||||
}
|
||||
|
||||
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
bool hasCountWindowNode = countWindowStreamTask(plan);
|
||||
if (hasCountWindowNode && (!fillHistory) && hasFillHistory) {
|
||||
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) {
|
||||
bool hasCountWindowNode = countWindowStreamTask(pPlan);
|
||||
bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0);
|
||||
if (hasCountWindowNode && isRelStreamTask) {
|
||||
SStreamStatus* pStatus = &pTask->status;
|
||||
mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set",
|
||||
pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT));
|
||||
pStatus->taskStatus = TASK_STATUS__HALT;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
|
||||
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
|
||||
streamTaskSetUpstreamInfo(pSinkTask, pTask);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SArray* addNewTaskList(SArray* pTasksList) {
|
||||
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset,
|
||||
bool isFillhistory, bool useTriggerParam) {
|
||||
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset,
|
||||
isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||
*pTaskList, pStream->conf.fillHistory);
|
||||
if (pTask == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pTask;
|
||||
}
|
||||
|
||||
static void addNewTaskList(SStreamObj* pStream){
|
||||
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
|
||||
taosArrayPush(pTasksList, &pTaskList);
|
||||
return pTaskList;
|
||||
taosArrayPush(pStream->tasks, &pTaskList);
|
||||
if (pStream->conf.fillHistory) {
|
||||
pTaskList = taosArrayInit(0, POINTER_BYTES);
|
||||
taosArrayPush(pStream->pHTasksList, &pTaskList);
|
||||
}
|
||||
}
|
||||
|
||||
// set the history task id
|
||||
static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
|
||||
static void setHTasksId(SStreamObj* pStream) {
|
||||
SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->tasks);
|
||||
SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTasksList);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
|
||||
SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
|
||||
SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
|
||||
|
@ -373,30 +397,65 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream,
|
||||
SEpSet* pEpset, bool hasExtraSink, int64_t skey, SArray* pVerList) {
|
||||
// create exec stream task, since only one level, the exec task is also the source task
|
||||
SArray* pTaskList = addNewTaskList(pStream->tasks);
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
|
||||
SArray* pHTaskList = NULL;
|
||||
if (pStream->conf.fillHistory) {
|
||||
pHTaskList = addNewTaskList(pStream->pHTasksList);
|
||||
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
|
||||
int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){
|
||||
// new stream task
|
||||
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||
if(pTask == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
||||
|
||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||
haltInitialTaskStatus(pTask, plan);
|
||||
|
||||
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
||||
|
||||
int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
|
||||
if(code != 0){
|
||||
terrno = code;
|
||||
return terrno;
|
||||
}
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){
|
||||
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
|
||||
if (LIST_LENGTH(inner->pNodeList) != 1) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
return NULL;
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
|
||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
|
||||
if (LIST_LENGTH(inner->pNodeList) != 1) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return NULL;
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream,
|
||||
SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
|
||||
addNewTaskList(pStream);
|
||||
|
||||
void* pIter = NULL;
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
while (1) {
|
||||
SVgObj* pVgroup;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
|
@ -409,187 +468,15 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
|
|||
continue;
|
||||
}
|
||||
|
||||
// new stream task
|
||||
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
|
||||
int32_t code = addSourceTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset, skey,
|
||||
pVerList, false, hasExtraSink, pStream->conf.fillHistory);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
|
||||
if(code != 0){
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pStream->conf.fillHistory) {
|
||||
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
|
||||
code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, pEpset, skey,
|
||||
pVerList, true, hasExtraSink, true);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pStream->conf.fillHistory) {
|
||||
setHTasksId(pTaskList, pHTaskList);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t addSourceTaskForMultiLevelStream(SArray* pTaskList, bool isFillhistory, int64_t uid,
|
||||
SStreamTask* pDownstreamTask, SMnode* pMnode, SSubplan* pPlan,
|
||||
SVgObj* pVgroup, SEpSet* pEpset, int64_t skey, SArray* pVerList,
|
||||
bool hasFillHistory) {
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, 0, pTaskList, hasFillHistory);
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
||||
|
||||
// all the source tasks dispatch result to a single agg node.
|
||||
streamTaskSetFixedDownstreamInfo(pTask, pDownstreamTask);
|
||||
if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return streamTaskSetUpstreamInfo(pDownstreamTask, pTask);
|
||||
}
|
||||
|
||||
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
|
||||
SEpSet* pEpset, bool fillHistory, SStreamTask** pAggTask, bool hasFillhistory) {
|
||||
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillhistory);
|
||||
if (*pAggTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// dispatch
|
||||
if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset,
|
||||
SStreamTask** pAggTask, SStreamTask** pHAggTask) {
|
||||
SArray* pAggTaskList = addNewTaskList(pStream->tasks);
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
|
||||
SNodeListNode* pInnerNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||
SSubplan* plan = (SSubplan*)nodesListGetNode(pInnerNode->pNodeList, 0);
|
||||
if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
*pAggTask = NULL;
|
||||
SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
|
||||
|
||||
int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, false, pAggTask,
|
||||
pStream->conf.fillHistory);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SVgObj* pVgroup = NULL;
|
||||
SSnodeObj* pSnode = NULL;
|
||||
|
||||
if (tsDeployOnSnode) {
|
||||
pSnode = mndSchedFetchOneSnode(pMnode);
|
||||
if (pSnode == NULL) {
|
||||
pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
}
|
||||
} else {
|
||||
pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
}
|
||||
|
||||
if (pSnode != NULL) {
|
||||
code = mndAssignStreamTaskToSnode(pMnode, *pAggTask, plan, pSnode);
|
||||
} else {
|
||||
code = mndAssignStreamTaskToVgroup(pMnode, *pAggTask, plan, pVgroup);
|
||||
}
|
||||
|
||||
if (pStream->conf.fillHistory) {
|
||||
SArray* pHAggTaskList = addNewTaskList(pStream->pHTasksList);
|
||||
SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL);
|
||||
|
||||
*pHAggTask = NULL;
|
||||
code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pEpset, pStream->conf.fillHistory,
|
||||
pHAggTask, pStream->conf.fillHistory);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (pSnode != NULL) {
|
||||
sdbRelease(pSdb, pSnode);
|
||||
} else {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pSnode != NULL) {
|
||||
code = mndAssignStreamTaskToSnode(pMnode, *pHAggTask, plan, pSnode);
|
||||
} else {
|
||||
code = mndAssignStreamTaskToVgroup(pMnode, *pHAggTask, plan, pVgroup);
|
||||
}
|
||||
|
||||
setHTasksId(pAggTaskList, pHAggTaskList);
|
||||
}
|
||||
|
||||
if (pSnode != NULL) {
|
||||
sdbRelease(pSdb, pSnode);
|
||||
} else {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream,
|
||||
SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask,
|
||||
SEpSet* pEpset, int64_t skey, SArray* pVerList) {
|
||||
SArray* pSourceTaskList = addNewTaskList(pStream->tasks);
|
||||
|
||||
SArray* pHSourceTaskList = NULL;
|
||||
if (pStream->conf.fillHistory) {
|
||||
pHSourceTaskList = addNewTaskList(pStream->pHTasksList);
|
||||
}
|
||||
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
|
||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
SVgObj* pVgroup;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t code = addSourceTaskForMultiLevelStream(pSourceTaskList, false, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset,
|
||||
skey, pVerList, pStream->conf.fillHistory);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pStream->conf.fillHistory) {
|
||||
code = addSourceTaskForMultiLevelStream(pHSourceTaskList, true, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, pEpset,
|
||||
skey, pVerList, pStream->conf.fillHistory);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
|
||||
if(code != 0){
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return code;
|
||||
}
|
||||
|
@ -599,49 +486,160 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
|
|||
}
|
||||
|
||||
if (pStream->conf.fillHistory) {
|
||||
setHTasksId(pSourceTaskList, pHSourceTaskList);
|
||||
setHTasksId(pStream);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList,
|
||||
SEpSet* pEpset, bool fillHistory) {
|
||||
SArray* pSinkTaskList = addNewTaskList(pTasksList);
|
||||
if (pStream->fixedSinkVgId == 0) {
|
||||
if (doAddShuffleSinkTask(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) {
|
||||
// TODO free
|
||||
return -1;
|
||||
static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) {
|
||||
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||
|
||||
SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory,
|
||||
useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||
*pTaskList, pStream->conf.fillHistory);
|
||||
if (pAggTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pAggTask;
|
||||
}
|
||||
|
||||
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset,
|
||||
SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam){
|
||||
int32_t code = 0;
|
||||
SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||
if (pTask == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
if (pSnode != NULL) {
|
||||
code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
|
||||
mDebug("doAddAggTask taskId:%s, snode id:%d, isFillHistory:%d", pTask->id.idStr, pSnode->id, isFillhistory);
|
||||
|
||||
} else {
|
||||
code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
|
||||
mDebug("doAddAggTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam){
|
||||
SVgObj* pVgroup = NULL;
|
||||
SSnodeObj* pSnode = NULL;
|
||||
int32_t code = 0;
|
||||
if (tsDeployOnSnode) {
|
||||
pSnode = mndSchedFetchOneSnode(pMnode);
|
||||
if (pSnode == NULL) {
|
||||
pVgroup = mndSchedFetchOneVg(pMnode, pStream);
|
||||
}
|
||||
} else {
|
||||
if (doAddSinkTask(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, pEpset,
|
||||
fillHistory) < 0) {
|
||||
// TODO free
|
||||
return -1;
|
||||
pVgroup = mndSchedFetchOneVg(pMnode, pStream);
|
||||
}
|
||||
|
||||
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
|
||||
if(code != 0){
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (pStream->conf.fillHistory) {
|
||||
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
|
||||
if(code != 0){
|
||||
goto END;
|
||||
}
|
||||
|
||||
setHTasksId(pStream);
|
||||
}
|
||||
|
||||
END:
|
||||
if (pSnode != NULL) {
|
||||
sdbRelease(pMnode->pSdb, pSnode);
|
||||
} else {
|
||||
sdbRelease(pMnode->pSdb, pVgroup);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){
|
||||
int32_t code = 0;
|
||||
addNewTaskList(pStream);
|
||||
|
||||
if (pStream->fixedSinkVgId == 0) {
|
||||
code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
*pCreatedTaskList = pSinkTaskList;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
if (pStream->conf.fillHistory) {
|
||||
setHTasksId(pStream);
|
||||
}
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) {
|
||||
if (taosArrayGetSize(pTasksList) < SINK_NODE_LEVEL || pUpstreamTask == NULL) {
|
||||
return;
|
||||
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task){
|
||||
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task);
|
||||
for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
|
||||
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
|
||||
streamTaskSetUpstreamInfo(pSinkTask, task);
|
||||
}
|
||||
mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
|
||||
}
|
||||
|
||||
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
|
||||
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
|
||||
streamTaskSetUpstreamInfo(pSinkTask, pUpstreamTask);
|
||||
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
|
||||
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
|
||||
SArray** pAggTaskList = taosArrayGetLast(tasks);
|
||||
|
||||
for(int i = 0; i < taosArrayGetSize(*pAggTaskList); i++){
|
||||
SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
|
||||
bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
|
||||
mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
|
||||
}
|
||||
}
|
||||
|
||||
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
|
||||
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
|
||||
SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
|
||||
|
||||
for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){
|
||||
SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
|
||||
mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
|
||||
|
||||
if (hasExtraSink) {
|
||||
bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
|
||||
} else {
|
||||
mndSetSinkTaskInfo(pStream, pSourceTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
|
||||
size_t size = taosArrayGetSize(tasks);
|
||||
ASSERT(size >= 2);
|
||||
SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
|
||||
SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
|
||||
|
||||
SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
|
||||
end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList): end;
|
||||
for(int i = begin; i < end; i++){
|
||||
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
|
||||
pUpTask->info.selfChildId = i - begin;
|
||||
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
|
||||
streamTaskSetUpstreamInfo(*pDownTask, pUpTask);
|
||||
}
|
||||
mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
|
||||
}
|
||||
|
||||
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
|
||||
SArray* pVerList) {
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||
|
||||
bool hasExtraSink = false;
|
||||
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
||||
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
|
||||
|
@ -653,54 +651,90 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
|||
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
|
||||
sdbRelease(pSdb, pDbObj);
|
||||
|
||||
mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s",
|
||||
numOfPlanLevel, externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
|
||||
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
||||
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
||||
|
||||
if (numOfPlanLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
|
||||
if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
|
||||
// add extra sink
|
||||
hasExtraSink = true;
|
||||
|
||||
SArray* pSinkTaskList = NULL;
|
||||
int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, pEpset, 0);
|
||||
int32_t code = addSinkTask(pMnode, pStream, pEpset);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// check for fill history
|
||||
if (pStream->conf.fillHistory) {
|
||||
SArray* pHSinkTaskList = NULL;
|
||||
code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pEpset, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
setHTasksId(pSinkTaskList, pHSinkTaskList);
|
||||
}
|
||||
}
|
||||
|
||||
pStream->totalLevel = numOfPlanLevel + hasExtraSink;
|
||||
|
||||
if (numOfPlanLevel > 1) {
|
||||
SStreamTask* pAggTask = NULL;
|
||||
SStreamTask* pHAggTask = NULL;
|
||||
|
||||
int32_t code = addAggTask(pStream, pMnode, pPlan, pEpset, &pAggTask, &pHAggTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
setSinkTaskUpstreamInfo(pStream->tasks, pAggTask);
|
||||
if (pHAggTask != NULL) {
|
||||
setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
|
||||
}
|
||||
|
||||
// source level
|
||||
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, skey, pVerList);
|
||||
} else if (numOfPlanLevel == 1) {
|
||||
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, skey, pVerList);
|
||||
SSubplan* plan = getScanSubPlan(pPlan); // source plan
|
||||
if (plan == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return 0;
|
||||
if (numOfPlanLevel == 1) {
|
||||
bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink);
|
||||
if (pStream->conf.fillHistory) {
|
||||
bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink);
|
||||
}
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if(numOfPlanLevel == 3){
|
||||
plan = getAggSubPlan(pPlan, 1); // middle agg plan
|
||||
if (plan == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
do{
|
||||
SArray** list = taosArrayGetLast(pStream->tasks);
|
||||
float size = (float)taosArrayGetSize(*list);
|
||||
size_t cnt = (size_t)ceil(size/tsStreamAggCnt);
|
||||
if(cnt <= 1) break;
|
||||
|
||||
mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
|
||||
addNewTaskList(pStream);
|
||||
|
||||
for(int j = 0; j < cnt; j++){
|
||||
code = addAggTask(pStream, pMnode, plan, pEpset, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
bindTwoLevel(pStream->tasks, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt);
|
||||
if (pStream->conf.fillHistory) {
|
||||
bindTwoLevel(pStream->pHTasksList, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt);
|
||||
}
|
||||
}
|
||||
}while(1);
|
||||
}
|
||||
|
||||
plan = getAggSubPlan(pPlan, 0);
|
||||
if (plan == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
mDebug("doScheduleStream add final agg");
|
||||
SArray** list = taosArrayGetLast(pStream->tasks);
|
||||
size_t size = taosArrayGetSize(*list);
|
||||
addNewTaskList(pStream);
|
||||
code = addAggTask(pStream, pMnode, plan, pEpset, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
bindTwoLevel(pStream->tasks, 0, size);
|
||||
if (pStream->conf.fillHistory) {
|
||||
bindTwoLevel(pStream->pHTasksList, 0, size);
|
||||
}
|
||||
|
||||
bindAggSink(pStream, pMnode, pStream->tasks);
|
||||
if (pStream->conf.fillHistory) {
|
||||
bindAggSink(pStream, pMnode, pStream->pHTasksList);
|
||||
}
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
|
||||
|
|
|
@ -566,6 +566,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
||||
streamObj.conf.triggerParam = pCreate->maxDelay;
|
||||
streamObj.ast = taosStrdup(smaObj.ast);
|
||||
streamObj.indexForMultiAggBalance = -1;
|
||||
|
||||
// check the maxDelay
|
||||
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
|
||||
|
|
|
@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
pObj->updateTime = pObj->createTime;
|
||||
pObj->version = 1;
|
||||
pObj->smaId = 0;
|
||||
pObj->indexForMultiAggBalance = -1;
|
||||
|
||||
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ aux_source_directory(. MNODE_STREAM_TEST_SRC)
|
|||
add_executable(streamTest ${MNODE_STREAM_TEST_SRC})
|
||||
target_link_libraries(
|
||||
streamTest
|
||||
PRIVATE dnode gtest
|
||||
PRIVATE dnode nodes planner gtest qcom
|
||||
)
|
||||
|
||||
add_test(
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -317,15 +317,25 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||
req.dstTaskId);
|
||||
taosMemoryFree(req.pRetrieve);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
if(pTask->info.taskLevel == TASK_LEVEL__SOURCE){
|
||||
code = streamProcessRetrieveReq(pTask, &req);
|
||||
}else{
|
||||
req.srcNodeId = pTask->info.nodeId;
|
||||
req.srcTaskId = pTask->id.taskId;
|
||||
code = broadcastRetrieveMsg(pTask, &req);
|
||||
}
|
||||
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||
streamProcessRetrieveReq(pTask, &req, &rsp);
|
||||
sendRetrieveRsp(&req, &rsp);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
tDeleteStreamRetrieveReq(&req);
|
||||
return 0;
|
||||
taosMemoryFree(req.pRetrieve);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
|
|
|
@ -558,6 +558,11 @@ typedef struct SStreamIntervalOperatorInfo {
|
|||
bool reCkBlock;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
struct SUpdateInfo* pUpdateInfo;
|
||||
bool recvRetrive;
|
||||
SSDataBlock* pMidRetriveRes;
|
||||
bool recvPullover;
|
||||
SSDataBlock* pMidPulloverRes;
|
||||
bool clearState;
|
||||
} SStreamIntervalOperatorInfo;
|
||||
|
||||
typedef struct SDataGroupInfo {
|
||||
|
|
|
@ -2229,6 +2229,8 @@ char* getStreamOpName(uint16_t opType) {
|
|||
return "interval final";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
return "interval semi";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
return "interval mid";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
|
||||
return "stream fill";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
|
||||
|
|
|
@ -947,7 +947,7 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
|
|||
while (1) {
|
||||
int32_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
|
||||
|
||||
|
@ -1035,7 +1035,7 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
|
|||
while (1) {
|
||||
uint16_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
||||
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
||||
|
|
|
@ -489,6 +489,9 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
|
||||
int32_t children = 0;
|
||||
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL == type) {
|
||||
int32_t children = pHandle->numOfVgroups;
|
||||
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
|
||||
int32_t children = pHandle->numOfVgroups;
|
||||
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
|
||||
|
|
|
@ -1272,6 +1272,7 @@ static bool isStateWindow(SStreamScanInfo* pInfo) {
|
|||
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
|
||||
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
|
||||
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL ||
|
||||
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "ttime.h"
|
||||
|
||||
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
|
||||
#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL)
|
||||
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
|
||||
#define DEAULT_DELETE_MARK INT64_MAX
|
||||
#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState"
|
||||
|
@ -48,6 +49,8 @@ typedef struct SPullWindowInfo {
|
|||
STimeWindow calWin;
|
||||
} SPullWindowInfo;
|
||||
|
||||
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator);
|
||||
|
||||
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);
|
||||
|
||||
static int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) {
|
||||
|
@ -235,7 +238,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
|
|||
dumyInfo.cur.pageId = -1;
|
||||
|
||||
STimeWindow win = {0};
|
||||
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||
if (IS_FINAL_INTERVAL_OP(pOperator) || IS_MID_INTERVAL_OP(pOperator)) {
|
||||
win.skey = startTsCols[i];
|
||||
win.ekey = endTsCols[i];
|
||||
} else {
|
||||
|
@ -407,6 +410,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
|||
blockDataDestroy(pInfo->pPullDataRes);
|
||||
taosArrayDestroy(pInfo->pDelWins);
|
||||
blockDataDestroy(pInfo->pDelRes);
|
||||
blockDataDestroy(pInfo->pMidRetriveRes);
|
||||
blockDataDestroy(pInfo->pMidPulloverRes);
|
||||
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
||||
|
||||
if (pInfo->pState->dump == 1) {
|
||||
|
@ -599,7 +604,7 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
}
|
||||
|
||||
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
|
||||
static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
|
||||
int32_t numOfCh, SOperatorInfo* pOperator) {
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsData = (TSKEY*)pStartCol->pData;
|
||||
|
@ -608,6 +613,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S
|
|||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
|
||||
int32_t chId = getChildIndex(pBlock);
|
||||
bool res = false;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
TSKEY winTs = tsData[i];
|
||||
while (winTs <= tsEndData[i]) {
|
||||
|
@ -623,6 +629,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S
|
|||
// pull data is over
|
||||
taosArrayDestroy(chArray);
|
||||
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
|
||||
res =true;
|
||||
qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts);
|
||||
|
||||
void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey));
|
||||
|
@ -646,6 +653,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S
|
|||
winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, int32_t childId) {
|
||||
|
@ -1182,6 +1190,12 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
|||
printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
if (pInfo->recvPullover) {
|
||||
pInfo->recvPullover = false;
|
||||
printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pMidPulloverRes;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1236,11 +1250,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
} else {
|
||||
if (!IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
if (resBlock != NULL) {
|
||||
return resBlock;
|
||||
}
|
||||
|
||||
if (pInfo->recvRetrive) {
|
||||
pInfo->recvRetrive = false;
|
||||
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pMidRetriveRes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1314,9 +1332,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pInfo->recvGetAll = true;
|
||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
|
||||
if (taosArrayGetSize(pInfo->pUpdated) > 0) {
|
||||
} else if (pBlock->info.type == STREAM_RETRIEVE) {
|
||||
if(!IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||
pInfo->recvRetrive = true;
|
||||
copyDataBlock(pInfo->pMidRetriveRes, pBlock);
|
||||
pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE;
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
|
@ -1331,6 +1352,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doStreamIntervalSaveCheckpoint(pOperator);
|
||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||
continue;
|
||||
} else if (IS_FINAL_INTERVAL_OP(pOperator) && pBlock->info.type == STREAM_MID_RETRIEVE) {
|
||||
continue;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
@ -1359,7 +1382,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pInfo->pUpdated = NULL;
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
return buildIntervalResult(pOperator);
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
if (resBlock != NULL) {
|
||||
return resBlock;
|
||||
}
|
||||
|
||||
if (pInfo->recvRetrive) {
|
||||
pInfo->recvRetrive = false;
|
||||
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pMidRetriveRes;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
|
||||
|
@ -1400,7 +1434,8 @@ static int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
|
|||
}
|
||||
|
||||
static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL &&
|
||||
pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
int32_t resSize = sizeof(TSKEY);
|
||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
||||
|
@ -1417,7 +1452,8 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
|
|||
|
||||
void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL &&
|
||||
pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||
int32_t size = 0;
|
||||
void* pBuf = NULL;
|
||||
int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
||||
|
@ -1442,6 +1478,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
int32_t code = 0;
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -1471,7 +1508,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
if (pIntervalPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -1490,7 +1527,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||
|
||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
||||
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -1526,6 +1563,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->recvGetAll = false;
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->recvRetrive = false;
|
||||
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
|
||||
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
|
||||
pInfo->clearState = false;
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
|
||||
|
@ -1536,10 +1577,16 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
} else {
|
||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
}
|
||||
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
||||
}
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
@ -4114,6 +4161,285 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pUpdatedMap) {
|
||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
|
||||
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
|
||||
|
||||
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
int32_t numOfOutput = pSup->numOfExprs;
|
||||
int32_t step = 1;
|
||||
SRowBuffPos* pResPos = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t forwardRows = 1;
|
||||
uint64_t groupId = pSDataBlock->info.id.groupId;
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
TSKEY* tsCol = (int64_t*)pColDataInfo->pData;
|
||||
|
||||
int32_t startPos = 0;
|
||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCol);
|
||||
STimeWindow nextWin = getFinalTimeWindow(ts, &pInfo->interval);
|
||||
|
||||
while (1) {
|
||||
SWinKey key = {
|
||||
.ts = nextWin.skey,
|
||||
.groupId = groupId,
|
||||
};
|
||||
void* chIds = taosHashGet(pInfo->pPullDataMap, &key, sizeof(SWinKey));
|
||||
int32_t index = -1;
|
||||
SArray* chArray = NULL;
|
||||
int32_t chId = 0;
|
||||
if (chIds) {
|
||||
chArray = *(void**)chIds;
|
||||
chId = getChildIndex(pSDataBlock);
|
||||
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||
}
|
||||
if (!(index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA)) {
|
||||
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCol, startPos);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
|
||||
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCol, startPos, nextWin.ekey, &nextWin);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
|
||||
pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore);
|
||||
pResult = (SResultRow*)pResPos->pRowBuff;
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||
saveWinResult(&key, pResPos, pUpdatedMap);
|
||||
}
|
||||
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES);
|
||||
}
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
|
||||
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
pSDataBlock->info.rows, numOfOutput);
|
||||
key.ts = nextWin.skey;
|
||||
|
||||
if (pInfo->delKey.ts > key.ts) {
|
||||
pInfo->delKey = key;
|
||||
}
|
||||
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
|
||||
if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
|
||||
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
|
||||
",maxKey %" PRId64,
|
||||
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
|
||||
blockDataUpdateTsWindow(pSDataBlock, 0);
|
||||
|
||||
// timestamp of the data is incorrect
|
||||
if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
|
||||
qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64,
|
||||
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
|
||||
}
|
||||
}
|
||||
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCol, prevEndPos);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t numOfChild) {
|
||||
int32_t size = taosArrayGetSize(wins);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SWinKey* winKey = taosArrayGet(wins, i);
|
||||
void* chIds = taosHashGet(pMidPullMap, winKey, sizeof(SWinKey));
|
||||
if (!chIds) {
|
||||
addPullWindow(pMidPullMap, winKey, numOfChild);
|
||||
qDebug("===stream===prepare mid operator retrive for delete %" PRId64 ", size:%d", winKey->ts, numOfChild);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
if (resBlock != NULL) {
|
||||
return resBlock;
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
clearFunctionContext(&pOperator->exprSupp);
|
||||
clearStreamIntervalOperator(pInfo);
|
||||
qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType));
|
||||
return NULL;
|
||||
} else {
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
if (resBlock != NULL) {
|
||||
return resBlock;
|
||||
}
|
||||
|
||||
if (pInfo->recvRetrive) {
|
||||
pInfo->recvRetrive = false;
|
||||
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pMidRetriveRes;
|
||||
}
|
||||
|
||||
if (pInfo->clearState) {
|
||||
pInfo->clearState = false;
|
||||
clearFunctionContext(&pOperator->exprSupp);
|
||||
clearStreamIntervalOperator(pInfo);
|
||||
}
|
||||
}
|
||||
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
|
||||
}
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
if (pInfo->pUpdated != NULL) {
|
||||
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
||||
}
|
||||
|
||||
if (pInfo->pUpdatedMap != NULL) {
|
||||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
}
|
||||
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
|
||||
pInfo->numOfDatapack);
|
||||
pInfo->numOfDatapack = 0;
|
||||
break;
|
||||
}
|
||||
pInfo->numOfDatapack++;
|
||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||
|
||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
|
||||
removeResults(delWins, pInfo->pUpdatedMap);
|
||||
taosArrayAddAll(pInfo->pDelWins, delWins);
|
||||
taosArrayDestroy(delWins);
|
||||
|
||||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
if (pBlock->info.type == STREAM_CLEAR) {
|
||||
pInfo->pDelRes->info.type = STREAM_CLEAR;
|
||||
} else {
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||
}
|
||||
ASSERT(taosArrayGetSize(pInfo->pUpdated) == 0);
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
return pBlock;
|
||||
} else if (pBlock->info.type == STREAM_PULL_OVER) {
|
||||
pInfo->recvPullover = processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval,
|
||||
pInfo->pPullWins, pInfo->numOfChild, pOperator);
|
||||
if (pInfo->recvPullover) {
|
||||
copyDataBlock(pInfo->pMidPulloverRes, pBlock);
|
||||
pInfo->clearState = true;
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||
doStreamIntervalSaveCheckpoint(pOperator);
|
||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_MID_RETRIEVE) {
|
||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
|
||||
addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild);
|
||||
taosArrayDestroy(delWins);
|
||||
pInfo->recvRetrive = true;
|
||||
copyDataBlock(pInfo->pMidRetriveRes, pBlock);
|
||||
pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE;
|
||||
pInfo->clearState = true;
|
||||
break;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
}
|
||||
|
||||
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||
SExprSupp* pExprSup = &pInfo->scalarSupp;
|
||||
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||
}
|
||||
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
doStreamMidIntervalAggImpl(pOperator, pBlock, pInfo->pUpdatedMap);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
||||
}
|
||||
|
||||
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
|
||||
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||
|
||||
void* pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
|
||||
taosArrayPush(pInfo->pUpdated, pIte);
|
||||
}
|
||||
|
||||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
taosArraySort(pInfo->pUpdated, winPosCmprImpl);
|
||||
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||
pInfo->pUpdated = NULL;
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
if (resBlock != NULL) {
|
||||
return resBlock;
|
||||
}
|
||||
|
||||
if (pInfo->recvRetrive) {
|
||||
pInfo->recvRetrive = false;
|
||||
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pMidRetriveRes;
|
||||
}
|
||||
|
||||
if (pInfo->clearState) {
|
||||
pInfo->clearState = false;
|
||||
clearFunctionContext(&pOperator->exprSupp);
|
||||
clearStreamIntervalOperator(pInfo);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void setStreamOperatorCompleted(SOperatorInfo* pOperator) {
|
||||
setOperatorCompleted(pOperator);
|
||||
qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
|
||||
|
|
|
@ -45,6 +45,7 @@ typedef struct SBuiltinFuncDefinition {
|
|||
#endif
|
||||
FExecCombine combineFunc;
|
||||
const char* pPartialFunc;
|
||||
const char* pMiddleFunc;
|
||||
const char* pMergeFunc;
|
||||
FCreateMergeFuncParameters createMergeParaFuc;
|
||||
FEstimateReturnRows estimateReturnRowsFunc;
|
||||
|
|
|
@ -433,6 +433,20 @@ static int32_t translateAvgPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateAvgMiddle(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (TSDB_DATA_TYPE_BINARY != paraType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = getAvgInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -2515,6 +2529,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
#endif
|
||||
.combineFunc = avgCombine,
|
||||
.pPartialFunc = "_avg_partial",
|
||||
.pMiddleFunc = "_avg_middle",
|
||||
.pMergeFunc = "_avg_merge"
|
||||
},
|
||||
{
|
||||
|
@ -3775,6 +3790,21 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = toCharFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "_avg_middle",
|
||||
.type = FUNCTION_TYPE_AVG_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.translateFunc = translateAvgMiddle,
|
||||
.dataRequiredFunc = statisDataRequired,
|
||||
.getEnvFunc = getAvgFuncEnv,
|
||||
.initFunc = avgFunctionSetup,
|
||||
.processFunc = avgFunctionMerge,
|
||||
.finalizeFunc = avgPartialFinalize,
|
||||
#ifdef BUILD_NO_CALL
|
||||
.invertFunc = avgInvertFunction,
|
||||
#endif
|
||||
.combineFunc = avgCombine,
|
||||
},
|
||||
{
|
||||
.name = "_vgver",
|
||||
.type = FUNCTION_TYPE_VGVER,
|
||||
|
@ -3785,7 +3815,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = qPseudoTagFunction,
|
||||
.finalizeFunc = NULL
|
||||
}
|
||||
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -424,6 +424,35 @@ static int32_t createMergeFuncPara(const SFunctionNode* pSrcFunc, const SFunctio
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
|
||||
SFunctionNode** pMidFunc) {
|
||||
SNodeList* pParameterList = NULL;
|
||||
SFunctionNode* pFunc = NULL;
|
||||
|
||||
int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc != NULL){
|
||||
pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pParameterList);
|
||||
}else{
|
||||
pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
|
||||
}
|
||||
if (NULL == pFunc) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
strcpy(pFunc->node.aliasName, pPartialFunc->node.aliasName);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pMidFunc = pFunc;
|
||||
} else {
|
||||
nodesDestroyList(pParameterList);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
|
||||
SFunctionNode** pMergeFunc) {
|
||||
SNodeList* pParameterList = NULL;
|
||||
|
@ -453,18 +482,22 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) {
|
||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc) {
|
||||
if (!fmIsDistExecFunc(pFunc->funcId)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t code = createPartialFunction(pFunc, pPartialFunc);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createMidFunction(pFunc, *pPartialFunc, pMidFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode((SNode*)*pPartialFunc);
|
||||
nodesDestroyNode((SNode*)*pMidFunc);
|
||||
nodesDestroyNode((SNode*)*pMergeFunc);
|
||||
}
|
||||
|
||||
|
|
|
@ -971,6 +971,7 @@ SNode* nodesCloneNode(const SNode* pNode) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
code = physiIntervalCopy((const SIntervalPhysiNode*)pNode, (SIntervalPhysiNode*)pDst);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
|
||||
|
|
|
@ -375,6 +375,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "PhysiStreamFinalInterval";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
return "PhysiStreamSemiInterval";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
return "PhysiStreamMidInterval";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
|
||||
return "PhysiFill";
|
||||
|
@ -7274,6 +7276,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
return physiIntervalNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
|
||||
|
@ -7618,6 +7621,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
return jsonToPhysiIntervalNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
|
||||
|
|
|
@ -4219,6 +4219,7 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
code = physiIntervalNodeToMsg(pObj, pEncoder);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
|
@ -4377,6 +4378,7 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
code = msgToPhysiIntervalNode(pDecoder, pObj);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
|
|
|
@ -566,6 +566,8 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SStreamFinalIntervalPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
return makeNode(type, sizeof(SStreamSemiIntervalPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
return makeNode(type, sizeof(SStreamMidIntervalPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
|
||||
return makeNode(type, sizeof(SFillPhysiNode));
|
||||
|
@ -1402,6 +1404,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
destroyWinodwPhysiNode((SWindowPhysiNode*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
|
|
|
@ -1617,6 +1617,8 @@ static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
|
|||
return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
|
||||
case INTERVAL_ALGO_STREAM_SEMI:
|
||||
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
|
||||
case INTERVAL_ALGO_STREAM_MID:
|
||||
return QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL;
|
||||
case INTERVAL_ALGO_STREAM_SINGLE:
|
||||
return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
|
||||
case SESSION_ALGO_STREAM_FINAL:
|
||||
|
|
|
@ -344,11 +344,12 @@ static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SL
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
|
||||
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMidFuncs, SNodeList** pMergeFuncs) {
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pFuncs) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||
SFunctionNode* pPartFunc = NULL;
|
||||
SFunctionNode* pMidFunc = NULL;
|
||||
SFunctionNode* pMergeFunc = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
|
||||
|
@ -359,18 +360,33 @@ static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFu
|
|||
nodesDestroyNode((SNode*)pMergeFunc);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if(pMidFuncs != NULL){
|
||||
pMidFunc = (SFunctionNode*)nodesCloneNode(pNode);
|
||||
if (NULL == pMidFunc) {
|
||||
nodesDestroyNode((SNode*)pMidFunc);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc);
|
||||
code = fmGetDistMethod(pFunc, &pPartFunc, &pMidFunc, &pMergeFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if(pMidFuncs != NULL){
|
||||
code = nodesListMakeStrictAppend(pMidFuncs, (SNode*)pMidFunc);
|
||||
}else{
|
||||
nodesDestroyNode((SNode*)pMidFunc);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyList(*pPartialFuncs);
|
||||
nodesDestroyList(*pMergeFuncs);
|
||||
nodesDestroyNode((SNode*)pPartFunc);
|
||||
nodesDestroyNode((SNode*)pMidFunc);
|
||||
nodesDestroyNode((SNode*)pMergeFunc);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -463,7 +479,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
|
|||
splSetParent((SLogicNode*)pPartWin);
|
||||
|
||||
int32_t index = 0;
|
||||
int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
|
||||
int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, NULL, &pMergeWindow->pFuncs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
|
||||
}
|
||||
|
@ -488,6 +504,85 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) {
|
||||
SNodeList* pFunc = pMergeWindow->pFuncs;
|
||||
pMergeWindow->pFuncs = NULL;
|
||||
SNodeList* pTargets = pMergeWindow->node.pTargets;
|
||||
pMergeWindow->node.pTargets = NULL;
|
||||
SNodeList* pChildren = pMergeWindow->node.pChildren;
|
||||
pMergeWindow->node.pChildren = NULL;
|
||||
SNode* pConditions = pMergeWindow->node.pConditions;
|
||||
pMergeWindow->node.pConditions = NULL;
|
||||
|
||||
SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
|
||||
if (NULL == pPartWin) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SWindowLogicNode* pMidWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
|
||||
if (NULL == pMidWin) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pPartWin->node.groupAction = GROUP_ACTION_KEEP;
|
||||
pMidWin->node.groupAction = GROUP_ACTION_KEEP;
|
||||
pMergeWindow->node.pTargets = pTargets;
|
||||
pMergeWindow->node.pConditions = pConditions;
|
||||
|
||||
pPartWin->node.pChildren = pChildren;
|
||||
splSetParent((SLogicNode*)pPartWin);
|
||||
|
||||
SNodeList* pFuncPart = NULL;
|
||||
SNodeList* pFuncMid = NULL;
|
||||
SNodeList* pFuncMerge = NULL;
|
||||
int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMid, &pFuncMerge);
|
||||
pPartWin->pFuncs = pFuncPart;
|
||||
pMidWin->pFuncs = pFuncMid;
|
||||
pMergeWindow->pFuncs = pFuncMerge;
|
||||
|
||||
int32_t index = 0;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesDestroyNode(pMidWin->pTspk);
|
||||
pMidWin->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
|
||||
if (NULL == pMidWin->pTspk) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplAppendWStart(pMidWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExprs(pMidWin->pFuncs, &pMidWin->node.pTargets);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesDestroyNode(pMergeWindow->pTspk);
|
||||
pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pMidWin->node.pTargets, index));
|
||||
if (NULL == pMergeWindow->pTspk) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
nodesDestroyList(pFunc);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pPartWindow = (SLogicNode*)pPartWin;
|
||||
*pMidWindow = (SLogicNode*)pMidWin;
|
||||
} else {
|
||||
nodesDestroyNode((SNode*)pPartWin);
|
||||
nodesDestroyNode((SNode*)pMidWin);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
|
||||
|
@ -635,18 +730,31 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
static int32_t stbSplSplitIntervalForStreamMultiAgg(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pPartWindow = NULL;
|
||||
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
|
||||
SLogicNode* pMidWindow = NULL;
|
||||
int32_t code = stbSplCreatePartMidWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow, &pMidWindow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
|
||||
((SWindowLogicNode*)pMidWindow)->windowAlgo = INTERVAL_ALGO_STREAM_MID;
|
||||
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
|
||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
|
||||
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
|
||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pMidWindow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateExchangeNode(pCxt, pMidWindow, pPartWindow);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
SNode* subPlan = (SNode*)splCreateSubplan(pCxt, pMidWindow);
|
||||
((SLogicSubplan*)subPlan)->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
|
||||
code = nodesListMakeStrictAppend(&((SLogicSubplan*)subPlan)->pChildren,
|
||||
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, subPlan);
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
++(pCxt->groupId);
|
||||
return code;
|
||||
|
@ -654,7 +762,7 @@ static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInf
|
|||
|
||||
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
if (pCxt->pPlanCxt->streamQuery) {
|
||||
return stbSplSplitIntervalForStream(pCxt, pInfo);
|
||||
return stbSplSplitIntervalForStreamMultiAgg(pCxt, pInfo);
|
||||
} else {
|
||||
return stbSplSplitIntervalForBatch(pCxt, pInfo);
|
||||
}
|
||||
|
@ -874,7 +982,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
|
|||
pPartAgg->node.pChildren = pChildren;
|
||||
splSetParent((SLogicNode*)pPartAgg);
|
||||
|
||||
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
|
||||
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, NULL, &pMergeAgg->pAggFuncs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
|
||||
|
|
|
@ -95,6 +95,7 @@ int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
|
||||
|
|
|
@ -107,7 +107,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
|
|||
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
|
||||
|
||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
|
||||
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
|
||||
|
||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
||||
|
||||
|
|
|
@ -181,7 +181,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
|
|||
return status;
|
||||
}
|
||||
|
||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
|
||||
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
||||
|
||||
|
@ -203,17 +203,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
|||
/*status = TASK_INPUT_STATUS__FAILED;*/
|
||||
}
|
||||
|
||||
// rsp by input status
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
||||
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
||||
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
pCont->streamId = pReq->streamId;
|
||||
pCont->rspToTaskId = pReq->srcTaskId;
|
||||
pCont->rspFromTaskId = pReq->dstTaskId;
|
||||
pRsp->pCont = buf;
|
||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
|
||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||
}
|
||||
|
||||
|
@ -295,11 +284,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
|
||||
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK);
|
||||
streamSchedExec(pTask);
|
||||
return 0;
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||
int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
|
||||
if(code != 0){
|
||||
return code;
|
||||
}
|
||||
return streamSchedExec(pTask);
|
||||
}
|
||||
|
||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
|
||||
|
|
|
@ -162,16 +162,71 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
|
||||
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
||||
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
||||
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
pCont->streamId = pReq->streamId;
|
||||
pCont->rspToTaskId = pReq->srcTaskId;
|
||||
pCont->rspFromTaskId = pReq->dstTaskId;
|
||||
pRsp->pCont = buf;
|
||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
}
|
||||
|
||||
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
|
||||
int32_t code = -1;
|
||||
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req){
|
||||
int32_t code = 0;
|
||||
void* buf = NULL;
|
||||
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
ASSERT(sz > 0);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
req->reqId = tGenIdPI64();
|
||||
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
req->dstNodeId = pEpInfo->nodeId;
|
||||
req->dstTaskId = pEpInfo->taskId;
|
||||
int32_t len;
|
||||
tEncodeSize(tEncodeStreamRetrieveReq, req, len, code);
|
||||
if (code != 0) {
|
||||
ASSERT(0);
|
||||
return code;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
if (buf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeStreamRetrieveReq(&encoder, req);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead));
|
||||
|
||||
code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
ASSERT(0);
|
||||
rpcFreeCont(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
buf = NULL;
|
||||
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){
|
||||
SRetrieveTableRsp* pRetrieve = NULL;
|
||||
void* buf = NULL;
|
||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||
|
||||
pRetrieve = taosMemoryCalloc(1, dataStrLen);
|
||||
if (pRetrieve == NULL) return -1;
|
||||
if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
pRetrieve->useconds = 0;
|
||||
|
@ -187,57 +242,24 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
|||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||
|
||||
SStreamRetrieveReq req = {
|
||||
.streamId = pTask->id.streamId,
|
||||
.srcNodeId = pTask->info.nodeId,
|
||||
.srcTaskId = pTask->id.taskId,
|
||||
.pRetrieve = pRetrieve,
|
||||
.retrieveLen = dataStrLen,
|
||||
};
|
||||
req->streamId = pTask->id.streamId;
|
||||
req->srcNodeId = pTask->info.nodeId;
|
||||
req->srcTaskId = pTask->id.taskId;
|
||||
req->pRetrieve = pRetrieve;
|
||||
req->retrieveLen = dataStrLen;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
ASSERT(sz > 0);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
req.reqId = tGenIdPI64();
|
||||
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
req.dstNodeId = pEpInfo->nodeId;
|
||||
req.dstTaskId = pEpInfo->taskId;
|
||||
int32_t len;
|
||||
tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
if (buf == NULL) {
|
||||
goto CLEAR;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeStreamRetrieveReq(&encoder, &req);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead));
|
||||
|
||||
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
|
||||
ASSERT(0);
|
||||
goto CLEAR;
|
||||
}
|
||||
|
||||
buf = NULL;
|
||||
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
|
||||
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock) {
|
||||
SStreamRetrieveReq req;
|
||||
int32_t code = buildStreamRetrieveReq(pTask, pBlock, &req);
|
||||
if(code != 0){
|
||||
return code;
|
||||
}
|
||||
code = 0;
|
||||
|
||||
CLEAR:
|
||||
taosMemoryFree(pRetrieve);
|
||||
rpcFreeCont(buf);
|
||||
code = broadcastRetrieveMsg(pTask, &req);
|
||||
taosMemoryFree(req.pRetrieve);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -138,7 +138,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
}
|
||||
|
||||
if (output->info.type == STREAM_RETRIEVE) {
|
||||
if (streamBroadcastToChildren(pTask, output) < 0) {
|
||||
if (streamBroadcastToUpTasks(pTask, output) < 0) {
|
||||
// TODO
|
||||
}
|
||||
continue;
|
||||
|
|
|
@ -104,7 +104,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
|
|||
}
|
||||
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
|
||||
sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
|
||||
|
||||
pTask->id.idStr = taosStrdup(buf);
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
|
|
|
@ -21,10 +21,10 @@
|
|||
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1
|
||||
|
||||
|
||||
#
|
||||
# system test
|
||||
#
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_multi_agg.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py
|
||||
|
|
|
@ -475,6 +475,8 @@ sql create table ts1 using st tags(1,1,1);
|
|||
sql create table ts2 using st tags(2,2,2);
|
||||
sql create stream stream_t3 trigger at_once into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from st interval(10s) ;
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into ts1 values(1648791211000,1,2,3);
|
||||
sleep 50
|
||||
sql insert into ts1 values(1648791222001,2,2,3);
|
||||
|
@ -488,6 +490,9 @@ $loop_count = 0
|
|||
loop3:
|
||||
sql select * from streamtST3;
|
||||
|
||||
print $data00 $data01 $data02 $data03
|
||||
print $data10 $data11 $data12 $data13
|
||||
|
||||
sleep 1000
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 30 then
|
||||
|
|
|
@ -243,6 +243,8 @@ sql create table ts1 using st tags(1,1,1);
|
|||
sql create table ts2 using st tags(2,2,2);
|
||||
sql create stream stream_t3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from st interval(10s) ;
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into ts1 values(1648791211000,1,2,3);
|
||||
sleep 50
|
||||
sql insert into ts1 values(1648791222001,2,2,3);
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.common import *
|
||||
from util.sqlset import *
|
||||
from util.autogen import *
|
||||
|
||||
import random
|
||||
import time
|
||||
import traceback
|
||||
import os
|
||||
from os import path
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'streamAggCnt': 2}
|
||||
# init
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def case1(self):
|
||||
tdLog.debug("========case1 start========")
|
||||
|
||||
os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &")
|
||||
time.sleep(10)
|
||||
tdSql.query("use test")
|
||||
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
|
||||
tdLog.debug("========create stream and insert data ok========")
|
||||
time.sleep(15)
|
||||
|
||||
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
|
||||
rowCnt = tdSql.getRows()
|
||||
results = []
|
||||
for i in range(rowCnt):
|
||||
results.append(tdSql.getData(i,1))
|
||||
|
||||
tdSql.query("select * from st1 order by groupid,_wstart")
|
||||
tdSql.checkRows(rowCnt)
|
||||
for i in range(rowCnt):
|
||||
data1 = tdSql.getData(i,1)
|
||||
data2 = results[i]
|
||||
if data1 != data2:
|
||||
tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2))
|
||||
tdLog.exit("check data error!")
|
||||
|
||||
tdLog.debug("case1 end")
|
||||
|
||||
def case2(self):
|
||||
tdLog.debug("========case2 start========")
|
||||
|
||||
os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1")
|
||||
# create stream
|
||||
tdSql.execute("use db")
|
||||
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
|
||||
time.sleep(5)
|
||||
|
||||
sql = "select count(*) from sta"
|
||||
# loop wait max 60s to check count is ok
|
||||
tdLog.info("loop wait result ...")
|
||||
tdSql.checkDataLoop(0, 0, 100, sql, loopCount=10, waitTime=0.5)
|
||||
|
||||
# check all data is correct
|
||||
sql = "select * from sta where cnt != 200;"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(0)
|
||||
|
||||
# check ts interval is correct
|
||||
sql = "select * from ( select diff(_wstart) as tsdif from sta ) where tsdif != 10;"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(0)
|
||||
tdLog.debug("case2 end")
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
self.case1()
|
||||
self.case2()
|
||||
|
||||
# stop
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -28,7 +28,7 @@ void shellCrashHandler(int signum, void *sigInfo, void *context) {
|
|||
|
||||
#if !defined(WINDOWS)
|
||||
taosIgnSignal(SIGBUS);
|
||||
#endif
|
||||
#endif
|
||||
taosIgnSignal(SIGABRT);
|
||||
taosIgnSignal(SIGFPE);
|
||||
taosIgnSignal(SIGSEGV);
|
||||
|
@ -82,7 +82,9 @@ int main(int argc, char *argv[]) {
|
|||
#ifdef WEBSOCKET
|
||||
shellCheckConnectMode();
|
||||
#endif
|
||||
taos_init();
|
||||
if (taos_init() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// kill heart-beat thread when quit
|
||||
taos_set_hb_quit(1);
|
||||
|
@ -105,7 +107,7 @@ int main(int argc, char *argv[]) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// support port feature
|
||||
// support port feature
|
||||
shellAutoInit();
|
||||
int32_t ret = shellExecute();
|
||||
shellAutoExit();
|
||||
|
|
Loading…
Reference in New Issue