enh(stream): optimize the stream status when creating without fill-history option.

This commit is contained in:
Haojun Liao 2023-09-28 11:23:03 +08:00
parent db897fb03a
commit c759d90bf6
5 changed files with 58 additions and 49 deletions

View File

@ -450,8 +450,8 @@ typedef struct SStreamMeta {
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam,
SArray* pTaskList);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);

View File

@ -27,8 +27,8 @@
#define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode;
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, SEpSet* pEpset, int32_t fillHistory);
static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
SEpSet* pEpset, bool isFillhistory);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
@ -207,8 +207,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
}
// create sink node for each vgroup.
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset,
int32_t fillHistory) {
int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset, bool fillHistory) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
@ -224,17 +223,17 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
continue;
}
mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory);
doAddSinkTask(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory);
sdbRelease(pSdb, pVgroup);
}
return 0;
}
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
SEpSet* pEpset, int32_t fillHistory) {
int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid;
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList);
int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
SEpSet* pEpset, bool isFillhistory) {
int64_t uid = (isFillhistory)? pStream->uid:pStream->hTaskUid;
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, pTaskList, pStream->conf.fillHistory);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -248,17 +247,16 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p
return 0;
}
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset,
int8_t fillHistory, bool hasExtraSink, int64_t firstWindowSkey) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList);
static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, bool fillHistory,
bool hasExtraSink, int64_t firstWindowSkey, bool hasFillHistory) {
SStreamTask* pTask =
tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillHistory);
if (pTask == NULL) {
return terrno;
}
epsetAssign(&pTask->info.mnodeEpset, pEpset);
// todo set the correct ts, which should be last key of queried table.
STimeWindow* pWindow = &pTask->dataRange.window;
pWindow->skey = INT64_MIN;
@ -345,8 +343,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
// new stream task
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset,
0, hasExtraSink, nextWindowSkey);
int32_t code = addSourceTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset,
false, hasExtraSink, nextWindowSkey, pStream->conf.fillHistory);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return -1;
@ -354,8 +352,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
pEpset, 1, hasExtraSink, nextWindowSkey);
code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
pEpset, true, hasExtraSink, nextWindowSkey, false);
}
sdbRelease(pSdb, pVgroup);
@ -371,10 +369,10 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
return TSDB_CODE_SUCCESS;
}
static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask,
static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t uid, SStreamTask* pDownstreamTask,
SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, SEpSet* pEpset,
int64_t nextWindowSkey) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList);
int64_t nextWindowSkey, bool hasFillHistory) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, isFillhistory, 0, pTaskList, hasFillHistory);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -400,8 +398,8 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
}
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
SEpSet* pEpset, int32_t fillHistory, SStreamTask** pAggTask) {
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList);
SEpSet* pEpset, bool fillHistory, SStreamTask** pAggTask, bool hasFillhistory) {
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillhistory);
if (*pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -432,7 +430,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
*pAggTask = NULL;
SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, 0, pAggTask);
int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, false, pAggTask,
pStream->conf.fillHistory);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
@ -461,7 +460,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
*pHAggTask = NULL;
code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pEpset, pStream->conf.fillHistory,
pHAggTask);
pHAggTask, false);
if (code != TSDB_CODE_SUCCESS) {
if (pSnode != NULL) {
sdbRelease(pSdb, pSnode);
@ -520,8 +519,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
continue;
}
int32_t code =
doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, nextWindowSkey);
int32_t code = doAddSourceTask(pSourceTaskList, false, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset,
nextWindowSkey, pStream->conf.fillHistory);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
terrno = code;
@ -529,8 +528,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
if (pStream->conf.fillHistory) {
code = doAddSourceTask(pHSourceTaskList, 1, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup,
pEpset, nextWindowSkey);
code = doAddSourceTask(pHSourceTaskList, true, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, pEpset,
nextWindowSkey, false);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return code;
@ -548,16 +547,16 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList,
SEpSet* pEpset, int32_t fillHistory) {
SEpSet* pEpset, bool fillHistory) {
SArray* pSinkTaskList = addNewTaskList(pTasksList);
if (pStream->fixedSinkVgId == 0) {
if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) {
if (doAddShuffleSinkTask(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) {
// TODO free
return -1;
}
} else {
if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg,
pEpset, fillHistory) < 0) {
if (doAddSinkTask(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, pEpset,
fillHistory) < 0) {
// TODO free
return -1;
}

View File

@ -2320,8 +2320,9 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
}
SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg));
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)};
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
@ -2383,7 +2384,6 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};

View File

@ -66,7 +66,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
pMeta->startInfo.startedAfterNodeUpdate = 0;
pMeta->startInfo.elapsedTime = pTask->execInfo.start - pMeta->startInfo.ts;
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2f sec",
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2fs",
vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pMeta->startInfo.elapsedTime / 1000.0);
}
taosWUnLockLatch(&pMeta->lock);
@ -580,15 +580,21 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
}
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
pHTask->dataRange.range.minVer = 0;
// the query version range should be limited to the already processed data
pHTask->dataRange.range.maxVer = pTask->chkInfo.nextProcessVer - 1;
SDataRange* pRange = &pHTask->dataRange;
pRange->range.minVer = 0;
// the query version range should be limited to the already processed data
pRange->range.maxVer = pTask->chkInfo.nextProcessVer - 1;
if (pRange->range.maxVer < pRange->range.minVer) {
pRange->range.maxVer = pRange->range.minVer;
}
pHTask->execInfo.init = taosGetTimestampMs();
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
" ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
" ver range:%" PRId64 " - %" PRId64", init:%"PRId64,
pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey,
pRange->range.minVer, pRange->range.maxVer, pHTask->execInfo.init);
} else {
stDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
}

View File

@ -27,8 +27,8 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
return 0;
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam,
SArray* pTaskList) {
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -46,10 +46,14 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
pTask->status.taskStatus = (fillHistory || hasFillhistory)? TASK_STATUS__SCAN_HISTORY:TASK_STATUS__NORMAL;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
if (fillHistory) {
ASSERT(hasFillhistory);
}
addToTaskset(pTaskList, pTask);
return pTask;
}