feat:add multi level agg operator for stream task

This commit is contained in:
wangmm0220 2023-12-08 18:48:34 +08:00
parent c3cbc1fbce
commit 2a506b3e5e
1 changed files with 102 additions and 78 deletions

View File

@ -24,6 +24,9 @@
#include "tname.h" #include "tname.h"
#include "tuuid.h" #include "tuuid.h"
#define SINK_NODE_LEVEL (0)
#define SOURCE_NODE_LEVEL (0)
#define SINK_NODE_LEVEL (0)
#define SINK_NODE_LEVEL (0) #define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode; extern bool tsDeployOnSnode;
@ -311,8 +314,9 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan, int planIndex){ static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, planIndex); int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
if (LIST_LENGTH(inner->pNodeList) != 1) { if (LIST_LENGTH(inner->pNodeList) != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return NULL; return NULL;
@ -326,8 +330,8 @@ static SSubplan* getScanSubPlan(const SQueryPlan* pPlan, int planIndex){
return plan; return plan;
} }
static SSubplan* getFinalAggSubPlan(const SQueryPlan* pPlan){ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
if (LIST_LENGTH(inner->pNodeList) != 1) { if (LIST_LENGTH(inner->pNodeList) != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return NULL; return NULL;
@ -402,9 +406,7 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
return pAggTask; return pAggTask;
} }
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset) { static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset){
SArray* pAggTaskList = addNewTaskList(pStream->tasks);
SStreamTask* pTask = buildAggTask(pStream, pEpset, false); SStreamTask* pTask = buildAggTask(pStream, pEpset, false);
if (pTask == NULL) { if (pTask == NULL) {
return terrno; return terrno;
@ -428,15 +430,13 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S
code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
} }
if(code != 0){ if(code != 0){
terrno = code;
goto END; goto END;
} }
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
SArray* pHAggTaskList = addNewTaskList(pStream->pHTasksList);
pTask = buildAggTask(pStream, pEpset, true); pTask = buildAggTask(pStream, pEpset, true);
if (pTask == NULL) { if (pTask == NULL) {
code = terrno;
goto END; goto END;
} }
@ -446,21 +446,29 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S
code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
} }
if(code != 0){ if(code != 0){
terrno = code;
goto END; goto END;
} }
setHTasksId(pAggTaskList, pHAggTaskList); SArray** pAggTaskList = taosArrayGetLast(pStream->tasks);
SArray** pHAggTaskList = taosArrayGetLast(pStream->pHTasksList);
setHTasksId(*pAggTaskList, *pHAggTaskList);
} }
END: END:
if (pSnode != NULL) { if (pSnode != NULL) {
sdbRelease(pMnode->pSdb, pSnode); sdbRelease(pMnode->pSdb, pSnode);
} else { } else {
sdbRelease(pMnode->pSdb, pVgroup); sdbRelease(pMnode->pSdb, pVgroup);
} }
return code;
}
return terrno; static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset) {
addNewTaskList(pStream->tasks);
if (pStream->conf.fillHistory) {
addNewTaskList(pStream->pHTasksList);
}
return doAddAggTask(pStream, pMnode, plan, pEpset);
} }
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){
@ -498,7 +506,7 @@ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task){ static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task){
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task); mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task);
for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); ++k) { for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k); SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
streamTaskSetUpstreamInfo(pSinkTask, task); streamTaskSetUpstreamInfo(pSinkTask, task);
} }
@ -506,17 +514,17 @@ static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSin
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) { static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
SArray* pAggTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); SArray** pAggTaskList = taosArrayGetLast(tasks);
for(int i = 0; i < taosArrayGetSize(pAggTaskList); i++){ for(int i = 0; i < taosArrayGetSize(*pAggTaskList); i++){
SStreamTask* pAggTask = taosArrayGetP(pAggTaskList, i); SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask); bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
} }
} }
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) { static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
SArray* pSourceTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){ for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){
SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i); SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
@ -528,23 +536,27 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b
} }
} }
static void bindSourceAgg(SArray* tasks) { static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
SArray* pAggTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); size_t size = taosArrayGetSize(tasks);
SArray* pSourceTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 2); ASSERT(size >= 2);
SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
SStreamTask* pAggTask = taosArrayGetP(pAggTaskList, 0); SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){ for(int i = begin; i < end; i++){
SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i); SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
streamTaskSetFixedDownstreamInfo(pSourceTask, pAggTask); if(pUpTask == NULL) { // out of range
streamTaskSetUpstreamInfo(pAggTask, pSourceTask); break;
}
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
streamTaskSetUpstreamInfo(*pDownTask, pUpTask);
} }
} }
//#define AGGNUM 2 #define AGGNUM 2
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
bool hasExtraSink = false; bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
@ -570,62 +582,74 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
pStream->totalLevel = numOfPlanLevel + hasExtraSink; pStream->totalLevel = numOfPlanLevel + hasExtraSink;
// if(numOfPlanLevel == 3){ SSubplan* plan = getScanSubPlan(pPlan); // source plan
// pDbObj = mndAcquireDb(pMnode, pStream->sourceDb); if (plan == NULL) {
// if (pDbObj == NULL) { return terrno;
// terrno = TSDB_CODE_QRY_INVALID_INPUT; }
// return -1; int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey);
// } if (code != TSDB_CODE_SUCCESS) {
// return code;
// if(pDbObj->cfg.numOfVgroups >= AGGNUM){ }
//
// }
// sdbRelease(pSdb, pDbObj);
// }else
if (numOfPlanLevel > 1) {
SSubplan *plan = getFinalAggSubPlan(pPlan);
if(plan == NULL){
return terrno;
}
int32_t code = addAggTask(pStream, pMnode, plan, pEpset);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bindAggSink(pStream, pMnode, pStream->tasks);
if (pStream->conf.fillHistory) {
bindAggSink(pStream, pMnode, pStream->pHTasksList);
}
plan = getScanSubPlan(pPlan, numOfPlanLevel - 1);
if(plan == NULL){
return terrno;
}
code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bindSourceAgg(pStream->tasks);
if (pStream->conf.fillHistory) {
bindSourceAgg(pStream->pHTasksList);
}
} else if (numOfPlanLevel == 1) {
SSubplan *plan = getScanSubPlan(pPlan, 0);
if(plan == NULL){
return terrno;
}
int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (numOfPlanLevel == 1) {
bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink); bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink);
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink); bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink);
} }
return TDB_CODE_SUCCESS;
} }
return 0; if(numOfPlanLevel == 3){
plan = getAggSubPlan(pPlan, 1); // middle agg plan
if (plan == NULL) {
return terrno;
}
do{
SArray** list = taosArrayGetLast(pStream->tasks);
float size = (float)taosArrayGetSize(*list);
size_t cnt = (int)(size/AGGNUM + 0.5);
if(cnt <= 1) break;
addNewTaskList(pStream->tasks);
if (pStream->conf.fillHistory) {
addNewTaskList(pStream->pHTasksList);
}
for(int j = 0; j < cnt; j++){
code = doAddAggTask(pStream, pMnode, plan, pEpset);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bindTwoLevel(pStream->tasks, j*AGGNUM, (j+1)*AGGNUM);
if (pStream->conf.fillHistory) {
bindTwoLevel(pStream->pHTasksList, j*AGGNUM, (j+1)*AGGNUM);
}
}
}while(1);
}
plan = getAggSubPlan(pPlan, 0);
if (plan == NULL) {
return terrno;
}
SArray** list = taosArrayGetLast(pStream->tasks);
size_t size = taosArrayGetSize(*list);
code = addAggTask(pStream, pMnode, plan, pEpset);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bindTwoLevel(pStream->tasks, 0, size);
if (pStream->conf.fillHistory) {
bindTwoLevel(pStream->pHTasksList, 0, size);
}
bindAggSink(pStream, pMnode, pStream->tasks);
if (pStream->conf.fillHistory) {
bindAggSink(pStream, pMnode, pStream->pHTasksList);
}
return TDB_CODE_SUCCESS;
} }
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey) { int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey) {