From b44447e6f3b79785b0673a3a680cb46687c06f51 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 14 Jun 2023 10:19:03 +0800 Subject: [PATCH] enh(stream): support restore from disk. --- include/libs/stream/tstream.h | 13 +-- source/dnode/mnode/impl/src/mndScheduler.c | 6 +- source/dnode/vnode/src/inc/tq.h | 22 +--- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 91 ++++++---------- source/dnode/vnode/src/tq/tqRestore.c | 107 ++++++++++++++++++- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/stream/src/streamDispatch.c | 4 +- source/libs/stream/src/streamExec.c | 10 +- source/libs/stream/src/streamMeta.c | 8 +- source/libs/stream/src/streamRecover.c | 115 +++++++++++++-------- source/libs/stream/src/streamTask.c | 1 + 12 files changed, 237 insertions(+), 143 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4c773ea30f..6ed97ac547 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -268,6 +268,7 @@ typedef struct SCheckpointInfo { typedef struct SStreamStatus { int8_t taskStatus; + int8_t checkDownstream; int8_t schedStatus; int8_t keepTaskStatus; bool transferState; @@ -528,11 +529,11 @@ typedef struct { SArray* checkpointVer; // SArray } SStreamRecoverDownstreamRsp; -int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); -int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); +int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); +int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); -int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); -int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); +int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); +int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq); int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq); @@ -568,10 +569,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); // recover and fill history int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); -int32_t streamTaskLaunchRecover(SStreamTask* pTask); +int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); -int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver); +int32_t streamTaskStartHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); // common diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b6274b57b8..24b2a60898 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -250,7 +250,8 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas pTask->dataRange.window.skey = INT64_MIN; pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); - mDebug("0x%x----------------window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey, + pTask->dataRange.window.ekey); // sink or dispatch if (hasExtraSink) { @@ -323,7 +324,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; - mDebug("s-task:0x%x related history task:0x%x", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId); + mDebug("s-task:0x%x related history task:0x%x, level:%d", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId, + (*pHTask)->info.taskLevel); } } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 4ba8d6d69f..fc1cf8e6e2 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -45,27 +45,10 @@ extern "C" { typedef struct STqOffsetStore STqOffsetStore; // tqPush - -// typedef struct { -// // msg info -// int64_t consumerId; -// int64_t reqOffset; -// int64_t processedVer; -// int32_t epoch; -// // rpc info -// int64_t reqId; -// SRpcHandleInfo rpcInfo; -// tmr_h timerId; -// int8_t tmrStopped; -// // exec -// int8_t inputStatus; -// int8_t execStatus; -// SStreamQueue inputQ; -// SRWLatch lock; -// } STqPushHandle; +#define EXTRACT_DATA_FROM_WAL_ID (-1) +#define STREAM_TASK_STATUS_CHECK_ID (-2) // tqExec - typedef struct { char* qmsg; // SubPlanToString } STqExecCol; @@ -181,6 +164,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqStreamTasksScanWal(STQ* pTq); +int32_t tqStreamTasksStatusCheck(STQ* pTq); // tq util int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cd97bd5753..f63bddb8b8 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -218,6 +218,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqCheckforStreamStatus(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1d29f245e2..dd36075eb1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -18,8 +18,6 @@ // 0: not init // 1: already inited // 2: wait to be inited or cleaup -#define WAL_READ_TASKS_ID (-1) - static int32_t tqInitialize(STQ* pTq); static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; } @@ -819,9 +817,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; - // expand executor - pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { @@ -903,10 +898,11 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - tDecodeSStreamTaskCheckReq(&decoder, &req); + tDecodeStreamTaskCheckReq(&decoder, &req); tDecoderClear(&decoder); - int32_t taskId = req.downstreamTaskId; + int32_t taskId = req.downstreamTaskId; + SStreamTaskCheckRsp rsp = { .reqId = req.reqId, .streamId = req.streamId, @@ -924,7 +920,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); + pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, + streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); } else { rsp.status = 0; tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", @@ -935,7 +932,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code; int32_t len; - tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); + tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code); if (code < 0) { tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId); return -1; @@ -946,7 +943,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeSStreamTaskCheckRsp(&encoder, &rsp); + tEncodeStreamTaskCheckRsp(&encoder, &rsp); tEncoderClear(&encoder); SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; @@ -961,7 +958,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp); + code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); if (code < 0) { tDecoderClear(&decoder); @@ -969,8 +966,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 } tDecoderClear(&decoder); - tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d (vgId:%d) check req from task:0x%x (vgId:%d), status %d", - rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); if (pTask == NULL) { @@ -1034,7 +1031,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // calculate the correct start time window, and start the handle the history data for the main task. if (pTask->historyTaskId.taskId != 0) { // launch the history fill stream task - streamTaskStartHistoryTask(pTask, sversion); + streamTaskStartHistoryTask(pTask); // launch current task SHistDataRange* pRange = &pTask->dataRange; @@ -1056,11 +1053,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } + ASSERT(pTask->status.checkDownstream == 0); streamTaskCheckDownstreamTasks(pTask); } - tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, numOfTasks:%d", vgId, pTask->id.idStr, - pTask->status.taskStatus, numOfTasks); + tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); return 0; } @@ -1120,8 +1118,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); // wait for the stream task get ready for scan history data - while (pStreamTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM || - pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + while (pStreamTask->status.checkDownstream == 0 || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr, pStreamTask->info.taskLevel); taosMsleep(100); @@ -1155,8 +1152,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 5. resume the related stream task. streamTryExec(pTask); - streamMetaReleaseTask(pMeta, pStreamTask); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + tqDebug("s-task:%s set status to be dropping", pTask->id.idStr); + + streamMetaSaveTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask); +// streamMetaRemoveTask(pMeta, pTask->id.taskId); + + streamMetaReleaseTask(pMeta, pStreamTask); + if (streamMetaCommit(pTask->pMeta) < 0) { + + } } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1338,7 +1344,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = pReq->taskId; int32_t vgId = TD_VID(pTq->pVnode); - if (taskId == WAL_READ_TASKS_ID) { // all tasks are extracted submit data from the wal + if (taskId == STREAM_TASK_STATUS_CHECK_ID) { + tqStreamTasksStatusCheck(pTq); + return 0; + } + + if (taskId == EXTRACT_DATA_FROM_WAL_ID) { // all tasks are extracted submit data from the wal tqStreamTasksScanWal(pTq); return 0; } @@ -1553,43 +1564,3 @@ FAIL: int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } -int32_t tqStartStreamTasks(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - SStreamMeta* pMeta = pTq->pStreamMeta; - - taosWLockLatch(&pMeta->lock); - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqInfo("vgId:%d no stream tasks exist", vgId); - taosWUnLockLatch(&pMeta->lock); - return 0; - } - - pMeta->walScanCounter += 1; - - if (pMeta->walScanCounter > 1) { - tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); - taosWUnLockLatch(&pMeta->lock); - return 0; - } - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - taosWUnLockLatch(&pMeta->lock); - return -1; - } - - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = WAL_READ_TASKS_ID; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); - taosWUnLockLatch(&pMeta->lock); - - return 0; -} diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 1a9a4ec612..6fbc4197ee 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -16,6 +16,7 @@ #include "tq.h" static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); +static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId); // this function should be executed by stream threads. // extract submit block from WAL, and add them into the input queue for the sources tasks. @@ -57,7 +58,110 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { return 0; } -static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { +int32_t tqStreamTasksStatusCheck(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + tqDebug("vgId:%d start to check all (%d) stream tasks downstream status", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + SArray* pTaskList = NULL; + taosWLockLatch(&pMeta->lock); + pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + taosWUnLockLatch(&pMeta->lock); + + for (int32_t i = 0; i < numOfTasks; ++i) { + int32_t* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId); + if (pTask == NULL) { + continue; + } + + streamTaskCheckDownstreamTasks(pTask); + streamMetaReleaseTask(pMeta, pTask); + } + + return 0; +} + +int32_t tqCheckforStreamStatus(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + + taosWLockLatch(&pMeta->lock); + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + tqInfo("vgId:%d no stream tasks exist", vgId); + taosWUnLockLatch(&pMeta->lock); + return 0; + } + + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); + taosWUnLockLatch(&pMeta->lock); + return -1; + } + + tqDebug("vgId:%d check for stream tasks status, numOfTasks:%d", vgId, numOfTasks); + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = STREAM_TASK_STATUS_CHECK_ID; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); + taosWUnLockLatch(&pMeta->lock); + + return 0; +} + +int32_t tqStartStreamTasks(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + + taosWLockLatch(&pMeta->lock); + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + tqInfo("vgId:%d no stream tasks exist", vgId); + taosWUnLockLatch(&pMeta->lock); + return 0; + } + + pMeta->walScanCounter += 1; + + if (pMeta->walScanCounter > 1) { + tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); + taosWUnLockLatch(&pMeta->lock); + return 0; + } + + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); + taosWUnLockLatch(&pMeta->lock); + return -1; + } + + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = EXTRACT_DATA_FROM_WAL_ID; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); + taosWUnLockLatch(&pMeta->lock); + + return 0; +} + +int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); if (pTask->chkInfo.currentVer < firstVer) { @@ -192,3 +296,4 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { taosArrayDestroy(pTaskList); return 0; } + diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 29f1ddc50f..045a7298c1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId); } else { vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); - tqStartStreamTasks(pVnode->pTq); + tqCheckforStreamStatus(pVnode->pTq); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ae3f094d12..95f68cce2d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -241,7 +241,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR SRpcMsg msg = {0}; int32_t tlen; - tEncodeSize(tEncodeSStreamTaskCheckReq, pReq, tlen, code); + tEncodeSize(tEncodeStreamTaskCheckReq, pReq, tlen, code); if (code < 0) { return -1; } @@ -256,7 +256,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) { + if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) { rpcFreeCont(buf); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2cbe72e0be..aee3070d7f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -389,8 +389,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pInput == NULL) { - qDebug("789 %s", pTask->id.idStr); - if (pTask->info.fillHistory && pTask->status.transferState) { // todo transfer task state here @@ -414,14 +412,18 @@ int32_t streamExecForAll(SStreamTask* pTask) { // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer - qDebug("s-task:%s no need to update time window", pStreamTask->id.idStr); + qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr); } pTimeWindow->skey = INT64_MIN; streamSetStatusNormal(pStreamTask); - streamSchedExec(pStreamTask); + streamMetaSaveTask(pTask->pMeta, pStreamTask); + if (streamMetaCommit(pTask->pMeta)) { + // persistent to disk for + } + streamSchedExec(pStreamTask); streamMetaReleaseTask(pTask->pMeta, pStreamTask); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index aa3f1dbe07..b7bc35d13d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -377,6 +377,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { taosMemoryFree(pTask); continue; } + if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); @@ -385,12 +386,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - // todo handle the fill history task - ASSERT(0); - if (pTask->info.fillHistory) { - ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); - streamTaskCheckDownstreamTasks(pTask); - } + ASSERT(pTask->status.checkDownstream == 0); } tdbFree(pKey); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 5e278cd7f6..82b9941c96 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -28,32 +28,45 @@ const char* streamGetTaskStatusStr(int32_t status) { } } -int32_t streamTaskLaunchRecover(SStreamTask* pTask) { +static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { + SVersionRange* pRange = &pTask->dataRange.range; + + qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer); + + streamSetParamForRecover(pTask); + streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window); + + SStreamRecoverStep1Req req; + streamBuildSourceRecover1Req(pTask, &req); + int32_t len = sizeof(SStreamRecoverStep1Req); + + void* serializedReq = rpcMallocCont(len); + if (serializedReq == NULL) { + return -1; + } + + memcpy(serializedReq, &req, len); + + SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY}; + if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { + /*ASSERT(0);*/ + } + + return 0; +} + +int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - SVersionRange* pRange = &pTask->dataRange.range; - qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer); - - streamSetParamForRecover(pTask); - streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window); - - SStreamRecoverStep1Req req; - streamBuildSourceRecover1Req(pTask, &req); - int32_t len = sizeof(SStreamRecoverStep1Req); - - void* serializedReq = rpcMallocCont(len); - if (serializedReq == NULL) { - return -1; + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + return doLaunchScanHistoryTask(pTask); + } else { + ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); + qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus, + walReaderGetCurrentVer(pTask->exec.pWalReader)); } - - memcpy(serializedReq, &req, len); - - SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY }; - if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { - /*ASSERT(0);*/ - } - } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamSetStatusNormal(pTask); streamSetParamForRecover(pTask); @@ -68,9 +81,9 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { // check status int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { - qDebug("s-task:%s in fill history stage, ver:%" PRId64 "-%"PRId64" window:%" PRId64"-%"PRId64, pTask->id.idStr, - pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer, pTask->dataRange.window.skey, - pTask->dataRange.window.ekey); + SHistDataRange* pRange = &pTask->dataRange; + qDebug("s-task:%s check downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, + pTask->id.idStr, pRange->range.minVer, pRange->range.maxVer, pRange->window.skey, pRange->window.ekey); SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -108,8 +121,10 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskLaunchRecover(pTask); + pTask->status.checkDownstream = 1; + qDebug("s-task:%s (vgId:%d) set downstream task checked for task without downstream tasks, try to launch scan-history-data, status:%s", + pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus)); + streamTaskLaunchScanHistory(pTask); } return 0; @@ -178,8 +193,15 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage", id, numOfReqs); - streamTaskLaunchRecover(pTask); + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs, + streamGetTaskStatusStr(pTask->status.taskStatus)); + streamTaskLaunchScanHistory(pTask); + } else { + ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); + qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id, + streamGetTaskStatusStr(pTask->status.taskStatus)); + } } else { qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, left); @@ -189,13 +211,20 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return -1; } + ASSERT(pTask->status.checkDownstream == 0); + + pTask->status.checkDownstream = 1; ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT); - pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; - qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); - - streamTaskLaunchRecover(pTask); + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id, + streamGetTaskStatusStr(pTask->status.taskStatus)); + streamTaskLaunchScanHistory(pTask); + } else { + ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); + qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id, + streamGetTaskStatusStr(pTask->status.taskStatus)); + } } else { ASSERT(0); } @@ -401,7 +430,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64 - " ver range:%" PRId64 " - %" PRId64, + " ver range:%" PRId64 " - %" PRId64, pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); } else { @@ -430,7 +459,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // todo fix the bug: 2. race condition // an fill history task needs to be started. -int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver) { +int32_t streamTaskStartHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; if (pTask->historyTaskId.taskId == 0) { return TSDB_CODE_SUCCESS; @@ -479,11 +508,13 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { /*code = */streamSetStatusNormal(pTask); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + + // todo check rsp streamMetaSaveTask(pMeta, pTask); return 0; } -int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { +int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -496,7 +527,7 @@ int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq return pEncoder->pos; } -int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { +int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; @@ -509,7 +540,7 @@ int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq return 0; } -int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { +int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; @@ -523,7 +554,7 @@ int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp return pEncoder->pos; } -int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { +int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0ab096aea3..06da72188c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -42,6 +42,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;