From 0dd933013cd53a5814a75e2ac415dec3aaf3abb5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Jun 2023 17:58:00 +0800 Subject: [PATCH] enh(stream): make history task for stream running. --- include/libs/stream/tstream.h | 6 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 9 +-- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tq/tq.c | 29 +++------ source/dnode/vnode/src/vnd/vnodeSvr.c | 34 +++++++++++ source/libs/stream/inc/streamInc.h | 2 +- source/libs/stream/src/stream.c | 2 + source/libs/stream/src/streamMeta.c | 3 +- source/libs/stream/src/streamRecover.c | 66 ++++++++++++++++++++- 10 files changed, 120 insertions(+), 34 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5a5e930d42..5d2c0e4e33 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -552,10 +552,11 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); // recover and fill history -int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version); +int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchRecover(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask); -int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version); +int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); +int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver); // common int32_t streamSetParamForRecover(SStreamTask* pTask); @@ -570,7 +571,6 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask); // agg level int32_t streamAggRecoverPrepare(SStreamTask* pTask); -// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); void streamMetaInit(); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e41c9e10d7..02c70b9eca 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -92,7 +92,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg); - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); + int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType), diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 8ae3827936..8886687f01 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -238,10 +238,6 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p return 0; } -static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) { - return 0; -} - static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory, bool hasExtraSink) { @@ -250,6 +246,11 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas return terrno; } + if (fillHistory) { // todo set the correct ts, which should be last key of queried table. + pTask->dataRange.window.skey = INT64_MIN; + pTask->dataRange.window.ekey = taosGetTimestampMs(); + } + // sink or dispatch if (hasExtraSink) { mndAddDispatcherForInnerTask(pMnode, pStream, pSinkTaskList, pTask); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7e19425d56..0187a9ac6e 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -95,6 +95,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ce3362b677..4addda25b7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -979,7 +979,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 return -1; } - code = streamProcessTaskCheckRsp(pTask, &rsp, sversion); + code = streamProcessCheckRsp(pTask, &rsp); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return code; } @@ -1032,25 +1032,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); } else { if (pTask->historyTaskId.taskId != 0) { - // todo fix the bug: 1. maybe failed to located the fill history task, since it is not built yet. 2. race condition - - // an fill history task needs to be started. - // Set the execute conditions, including the query time window and the version range - SStreamTask* pHTask = taosHashGet(pStreamMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId)); - - pHTask->dataRange.range.minVer = 0; - pHTask->dataRange.range.maxVer = sversion; - - pHTask->dataRange.window.skey = INT64_MIN; - pHTask->dataRange.window.ekey = 1000000; - - tqDebug("s-task:%s set the launch condition for fill history task:%s, window:%" PRId64 " - %" PRId64 - " verrange:%" PRId64 " - %" PRId64, - pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, - pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); - - // check if downstream tasks have been ready - streamTaskCheckDownstream(pHTask, sversion); + streamTaskStartHistoryTask(pTask, sversion); } } @@ -1091,8 +1073,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); + tqDebug("s-task:%s history scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); + // todo transfer the executor status, and then destroy this stream task + +#if 0 // build msg to launch next step SStreamRecoverStep2Req req; code = streamBuildSourceRecover2Req(pTask, &req); @@ -1123,6 +1108,8 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg); +#endif + return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b950437f23..28a5becfd4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -588,6 +588,40 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { } } +int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { + vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); + if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || + pMsg->msgType == TDMT_VND_BATCH_META) && + !syncIsReadyForRead(pVnode->sync)) { + vnodeRedirectRpcMsg(pVnode, pMsg, terrno); + return 0; + } + + switch (pMsg->msgType) { + case TDMT_STREAM_TASK_RUN: + return tqProcessTaskRunReq(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_DISPATCH: + return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); + case TDMT_STREAM_TASK_CHECK: + return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_DISPATCH_RSP: + return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE: + return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE_RSP: + return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE: + return tqProcessTaskRecover1Req(pVnode->pTq, pMsg); + case TDMT_STREAM_RECOVER_FINISH: + return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); + case TDMT_STREAM_RECOVER_FINISH_RSP: + return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); + default: + vError("unknown msg type:%d in fetch queue", pMsg->msgType); + return TSDB_CODE_APP_ERROR; + } +} + // TODO: remove the function void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 2c1956998a..4fc4d3ccf3 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -31,7 +31,7 @@ typedef struct { void* timer; } SStreamGlobalEnv; -static SStreamGlobalEnv streamEnv; +extern SStreamGlobalEnv streamEnv; int32_t streamDispatchStreamBlock(SStreamTask* pTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b64468a7f4..7d0a44d2b8 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -21,6 +21,8 @@ #define ONE_MB_F (1048576.0) #define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) +SStreamGlobalEnv streamEnv; + int32_t streamInit() { int8_t old; while (1) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c984bdba54..ef3ab2ae46 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -387,9 +387,10 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } // todo handle the fill history task + ASSERT(0); if (pTask->fillHistory) { ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); - streamTaskCheckDownstream(pTask, ver); + streamTaskCheckDownstreamTasks(pTask); } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index d3472a6a87..5d366aca05 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -14,6 +14,7 @@ */ #include "streamInc.h" +#include "ttimer.h" int32_t streamTaskLaunchRecover(SStreamTask* pTask) { qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId); @@ -54,8 +55,9 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { } // check status -int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t ver) { - qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, ver); +int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { + qDebug("s-task:%s in fill history stage, ver:%"PRId64" ekey:%"PRId64, pTask->id.idStr, pTask->dataRange.range.maxVer, + pTask->dataRange.window.ekey); SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -135,7 +137,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0; } -int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t ver) { +int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr, @@ -297,6 +299,64 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { return 0; } +static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { + pHTask->dataRange.range.minVer = 0; + pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; + + qDebug("s-task:%s set the launch condition for fill history task:%s, window:%" PRId64 " - %" PRId64 + " verrange:%" PRId64 " - %" PRId64, + pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, + pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); + + // check if downstream tasks have been ready + streamTaskCheckDownstreamTasks(pHTask); +} + +static void tryLaunchHistoryTask(void* param, void* tmrId) { + SStreamTask* pTask = param; + + SStreamMeta* pMeta = pTask->pMeta; + SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId)); + if (pHTask == NULL) { + qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, + pMeta->vgId, pTask->historyTaskId.taskId); + + taosTmrReset(tryLaunchHistoryTask, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); + return; + } + + doCheckDownstreamStatus(pTask, *pHTask); +} + +// todo fix the bug: 2. race condition +// an fill history task needs to be started. +int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver) { + SStreamMeta* pMeta = pTask->pMeta; + if (pTask->historyTaskId.taskId == 0) { + return TSDB_CODE_SUCCESS; + } + + // Set the execute conditions, including the query time window and the version range + SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId)); + if (pHTask == NULL) { + qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, + pMeta->vgId, pTask->historyTaskId.taskId); + + if (pTask->timer == NULL) { + pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pTask, streamEnv.timer); + if (pTask->timer == NULL) { + // todo failed to create timer + } + } + + // try again in 500ms + return TSDB_CODE_SUCCESS; + } + + doCheckDownstreamStatus(pTask, *pHTask); + return TSDB_CODE_SUCCESS; +} + int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;