From fcc706c45ce3cbe6ac00520d5e20835f7bf6326b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Jun 2023 15:59:01 +0800 Subject: [PATCH] enh(stream): refactor the fill history task. --- include/libs/executor/executor.h | 2 +- include/libs/stream/tstream.h | 4 ++-- source/dnode/mnode/impl/src/mndScheduler.c | 12 +++++----- source/dnode/mnode/impl/src/mndStream.c | 5 ++++ source/dnode/vnode/src/tq/tq.c | 27 ++++++++++++++++++++-- source/libs/executor/inc/querytask.h | 1 + source/libs/executor/src/executor.c | 3 ++- source/libs/executor/src/scanoperator.c | 10 ++++---- source/libs/stream/src/streamMeta.c | 1 + source/libs/stream/src/streamRecover.c | 24 +++++++++---------- 10 files changed, 60 insertions(+), 29 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 3f53976c67..2e4fb23207 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -221,7 +221,7 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo); -int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver); +int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver, int64_t ekey); int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 579a4a65cb..5a5e930d42 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -553,7 +553,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); // recover and fill history int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version); -int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version); +int32_t streamTaskLaunchRecover(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version); @@ -562,7 +562,7 @@ int32_t streamSetParamForRecover(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); // source level -int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver); +int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq); int32_t streamSourceRecoverScanStep1(SStreamTask* pTask); int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e52ef25725..8ae3827936 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -312,11 +312,11 @@ static SArray* addNewTaskList(SArray* pTasksList) { // set the history task id static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { for(int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { - SStreamTask* pStreamTask = taosArrayGet(pTaskList, i); - SStreamTask* pHTask = taosArrayGet(pHTaskList, i); + SStreamTask** pStreamTask = taosArrayGet(pTaskList, i); + SStreamTask** pHTask = taosArrayGet(pHTaskList, i); - pStreamTask->historyTaskId.taskId = pHTask->id.taskId; - pStreamTask->historyTaskId.streamId = pHTask->id.streamId; + (*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId; + (*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId; } } @@ -359,7 +359,7 @@ static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPl // new stream task SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, - pStream->conf.fillHistory, hasExtraSink); + 0, hasExtraSink); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return -1; @@ -367,7 +367,7 @@ static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPl if (pStream->conf.fillHistory) { SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); - code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, 0, + code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, pStream->conf.fillHistory, hasExtraSink); setHTasksId(pTaskList, pHTaskList); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1ce4ce2b7e..d4c1e033dc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -299,6 +299,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->smaId = 0; pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name)); + + char p[TSDB_STREAM_FNAME_LEN + 32] = {0}; + snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory"); + + pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name)); pObj->status = 0; pObj->conf.igExpired = pCreate->igExpired; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5a8abe2bec..ce3362b677 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1027,13 +1027,36 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWUnLockLatch(&pStreamMeta->lock); - // 3.go through recover steps to fill history + // 3. for fill history task, do nothing. wait for the main task to start it if (pTask->fillHistory) { - streamTaskCheckDownstream(pTask, sversion); + tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); + } else { + if (pTask->historyTaskId.taskId != 0) { + // todo fix the bug: 1. maybe failed to located the fill history task, since it is not built yet. 2. race condition + + // an fill history task needs to be started. + // Set the execute conditions, including the query time window and the version range + SStreamTask* pHTask = taosHashGet(pStreamMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId)); + + pHTask->dataRange.range.minVer = 0; + pHTask->dataRange.range.maxVer = sversion; + + pHTask->dataRange.window.skey = INT64_MIN; + pHTask->dataRange.window.ekey = 1000000; + + tqDebug("s-task:%s set the launch condition for fill history task:%s, window:%" PRId64 " - %" PRId64 + " verrange:%" PRId64 " - %" PRId64, + pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, + pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); + + // check if downstream tasks have been ready + streamTaskCheckDownstream(pHTask, sversion); + } } tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, pTask->status.taskStatus, numOfTasks); + return 0; } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 6497bd90b4..2b9256f08e 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -65,6 +65,7 @@ typedef struct { int8_t recoverScanFinished; SQueryTableDataCond tableCond; int64_t fillHistoryVer1; + int64_t fillHisotryeKey1; int64_t fillHistoryVer2; SStreamState* pState; int64_t dataVersion; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c8b66836d5..0ea6ae7144 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -869,10 +869,11 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { } } -int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) { +int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver, int64_t ekey) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); pTaskInfo->streamInfo.fillHistoryVer1 = ver; + pTaskInfo->streamInfo.fillHisotryeKey1 = ekey; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1; return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c93b9f4c73..4b53a9918d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1779,7 +1779,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamScanInfo* pInfo = pOperator->info; - qDebug("stream scan started, %s", GET_TASKID(pTaskInfo)); + qDebug("stream scan started, %s", id); if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { @@ -1788,14 +1788,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) { pTSInfo->base.cond.startVersion = 0; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; - qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion, - pTSInfo->base.cond.endVersion); + qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64 ", %s", pTSInfo->base.cond.startVersion, + pTSInfo->base.cond.endVersion, id); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1; } else { pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; - qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion, - pTSInfo->base.cond.endVersion); + qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion, + pTSInfo->base.cond.endVersion, id); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8c26052fdb..c984bdba54 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -386,6 +386,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } + // todo handle the fill history task if (pTask->fillHistory) { ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); streamTaskCheckDownstream(pTask, ver); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index eb2535782e..e288694887 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -15,7 +15,7 @@ #include "streamInc.h" -int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { +int32_t streamTaskLaunchRecover(SStreamTask* pTask) { qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId); if (pTask->taskLevel == TASK_LEVEL__SOURCE) { @@ -23,7 +23,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus); streamSetParamForRecover(pTask); - streamSourceRecoverPrepareStep1(pTask, version); + streamSourceRecoverPrepareStep1(pTask, pTask->dataRange.range.maxVer, pTask->dataRange.window.ekey); SStreamRecoverStep1Req req; streamBuildSourceRecover1Req(pTask, &req); @@ -54,8 +54,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { } // checkstatus -int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { - qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); +int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t ver) { + qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, ver); SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -88,13 +88,13 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId); - streamTaskLaunchRecover(pTask, version); + qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId); + streamTaskLaunchRecover(pTask); } return 0; @@ -135,7 +135,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0; } -int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { +int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t ver) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr, @@ -166,14 +166,14 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pTask->checkReqIds = NULL; qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); - streamTaskLaunchRecover(pTask, version); + streamTaskLaunchRecover(pTask); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pRsp->reqId != pTask->checkReqId) { return -1; } - streamTaskLaunchRecover(pTask, version); + streamTaskLaunchRecover(pTask); } else { ASSERT(0); } @@ -204,9 +204,9 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { } // source -int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) { +int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey) { void* exec = pTask->exec.pExecutor; - return qStreamSourceRecoverStep1(exec, ver); + return qStreamSourceRecoverStep1(exec, ver, ekey); } int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {