diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index efd7ec41dd..06222d4cb3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -119,13 +119,12 @@ typedef struct { } SStreamMergedSubmit; typedef struct { - int8_t type; - + int8_t type; + int64_t nodeId; // nodeId, from SStreamMeta int32_t srcVgId; int32_t srcTaskId; int64_t sourceVer; int64_t reqId; - SArray* blocks; // SArray } SStreamDataBlock; @@ -250,6 +249,7 @@ typedef struct SStreamChildEpInfo { int32_t taskId; SEpSet epSet; bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it + int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer } SStreamChildEpInfo; typedef struct SStreamId { @@ -272,7 +272,6 @@ typedef struct SStreamStatus { bool transferState; int8_t timerActive; // timer is active int8_t pauseAllowed; // allowed task status to be set to be paused - int32_t stage; // rollback will increase this attribute one for each time } SStreamStatus; typedef struct SHistDataRange { @@ -379,6 +378,7 @@ typedef struct SStreamMeta { TXN* txn; FTaskExpand* expandFunc; int32_t vgId; + int64_t stage; SRWLatch lock; int32_t walScanCounter; void* streamBackend; @@ -420,6 +420,7 @@ typedef struct { int64_t streamId; int32_t taskId; int32_t type; + int64_t stage; //nodeId from upstream task int32_t srcVgId; int32_t upstreamTaskId; int32_t upstreamChildId; @@ -459,13 +460,13 @@ typedef struct { typedef struct { int64_t reqId; + int64_t stage; int64_t streamId; int32_t upstreamNodeId; int32_t upstreamTaskId; int32_t downstreamNodeId; int32_t downstreamTaskId; int32_t childId; - int32_t stage; } SStreamTaskCheckReq; typedef struct { @@ -476,7 +477,7 @@ typedef struct { int32_t downstreamNodeId; int32_t downstreamTaskId; int32_t childId; - int32_t stage; + int32_t oldStage; int8_t status; } SStreamTaskCheckRsp; @@ -626,17 +627,15 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); int32_t streamTaskEndScanWAL(SStreamTask* pTask); -SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); -int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); +int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); // recover and fill history void streamTaskCheckDownstreamTasks(SStreamTask* pTask); -int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage); +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskStop(SStreamTask* pTask); @@ -681,7 +680,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta void streamMetaInit(); void streamMetaCleanup(); -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage); void streamMetaClose(SStreamMeta* streamMeta); // save to stream meta store diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 5fc266adf9..ab4834352e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -95,7 +95,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { } pSnode->msgCb = pOption->msgCb; - pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE); + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, -1); if (pSnode->pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; @@ -325,7 +325,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId); if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.stage); + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); streamMetaReleaseTask(pSnode->pMeta, pTask); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d84cc26d9f..c0c01c100d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -225,6 +225,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckStreamStatus(STQ* pTq); +int64_t tqGetNodeStage(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c33492bfa1..9be90eb801 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -128,7 +128,8 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId); + int64_t stage = tqGetNodeStage(pTq); + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, stage); if (pTq->pStreamMeta == NULL) { return -1; } @@ -1064,16 +1065,14 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.stage); - rsp.stage = pTask->status.stage; + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); streamMetaReleaseTask(pTq->pStreamMeta, pTask); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", - pTask->id.idStr, pStatus, rsp.stage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; - rsp.stage = 0; tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); @@ -1928,8 +1927,11 @@ int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) { } streamMetaReleaseTask(pMeta, pTask); - -// tDecoderClear(&decoder); tmsgSendRsp(&rsp); return 0; +} + +int64_t tqGetNodeStage(STQ* pTq) { + SSyncState state = syncGetState(pTq->pVnode->sync); + return state.term; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index da88ae34b8..1c55607653 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -26,6 +26,12 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); + // update the cached nodeId when reading data from WAL files. + int64_t nodeStage = tqGetNodeStage(pTq); + if (pMeta->stage != nodeStage) { + pMeta->stage = nodeStage; + } + while (1) { int32_t scan = pMeta->walScanCounter; tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan); @@ -80,13 +86,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { continue; } - if (pTask->info.fillHistory == 1) { - tqDebug("s-task:%s fill-history task, wait for related stream task:0x%x to launch it", pTask->id.idStr, - pTask->streamTaskId.taskId); - continue; - } - - streamTaskDoCheckDownstreamTasks(pTask); + streamTaskCheckDownstreamTasks(pTask); streamMetaReleaseTask(pMeta, pTask); } @@ -285,6 +285,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } + // downstream task has blocked the output, stopped for a while if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 8baf4d1ece..65221a1e33 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -241,23 +241,31 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - int32_t status = 0; SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); ASSERT(pInfo != NULL); - if (!pInfo->dataAllowed) { - qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId); + // upstream task has restarted/leader-follower switch/transferred to other dnodes + if (pReq->stage > pInfo->stage) { + qError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64 + ", current:%" PRId64 " dispatch msg rejected", + pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage); status = TASK_INPUT_STATUS__BLOCKED; } else { - // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked - if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); - qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); - } + if (!pInfo->dataAllowed) { + qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, + pReq->upstreamTaskId); + status = TASK_INPUT_STATUS__BLOCKED; + } else { + // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked + if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); + } - status = streamTaskAppendInputBlocks(pTask, pReq); + status = streamTaskAppendInputBlocks(pTask, pReq); + } } { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index fc88bbc07b..9955e02ff4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -49,6 +49,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; @@ -81,6 +82,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; pReq->srcVgId = vgId; + pReq->stage = pTask->pMeta->stage; pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId; @@ -457,27 +459,27 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); + const char* id = pTask->id.idStr; int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->queue); if (numOfElems > 0) { - qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", pTask->id.idStr, - numOfElems); + qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems); } // to make sure only one dispatch is running int8_t old = atomic_val_compare_exchange_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { - qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); + qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", id, old); return 0; } ASSERT(pTask->msgInfo.pData == NULL); - qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputInfo.status); + qDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputInfo.status); SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputInfo.queue); if (pBlock == NULL) { atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); - qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pTask->outputInfo.status); + qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputInfo.status); return 0; } @@ -492,7 +494,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { break; } - qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr, + qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", id, tstrerror(terrno), pTask->outputInfo.status, retryCount); // todo deal with only partially success dispatch case @@ -581,11 +583,12 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a6c15d997a..ce5cdc704c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "executor.h" #include "streamBackendRocksdb.h" #include "streamInt.h" @@ -38,7 +39,7 @@ void streamMetaCleanup() { taosCloseRef(streamBackendCfWrapperId); } -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { @@ -92,6 +93,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; + pMeta->stage = stage; // send heartbeat every 20sec. pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 484e5157db..ef8a96b3df 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -13,10 +13,11 @@ * along with this program. If not, see . */ +#include #include "streamInt.h" +#include "trpc.h" #include "ttimer.h" #include "wal.h" -#include "trpc.h" typedef struct SStreamTaskRetryInfo { SStreamMeta* pMeta; @@ -116,7 +117,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { .upstreamTaskId = pTask->id.taskId, .upstreamNodeId = pTask->info.nodeId, .childId = pTask->info.selfChildId, - .stage = pTask->status.stage, + .stage = pTask->pMeta->stage, }; // serialize @@ -174,7 +175,7 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p .downstreamTaskId = pRsp->downstreamTaskId, .downstreamNodeId = pRsp->downstreamNodeId, .childId = pRsp->childId, - .stage = pTask->status.stage, + .stage = pTask->pMeta->stage, }; qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId, @@ -197,8 +198,21 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p return 0; } -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage) { - return ((pTask->status.downstreamReady == 1) && (pTask->status.stage == stage))? 1:0; +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage) { + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); + ASSERT(pInfo != NULL && pInfo->stage <= stage); + + if (pInfo->stage == -1) { + pInfo->stage = stage; + qDebug("s-task:%s receive msg from upstream task:0x%x, init stage value:%"PRId64, pTask->id.idStr, upstreamTaskId, stage); + } + + if (pInfo->stage < stage) { + qError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64, + pTask->id.idStr, vgId, stage, pInfo->stage); + } + + return ((pTask->status.downstreamReady == 1) && (pInfo->stage == upstreamTaskId))? 1:0; } static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { @@ -266,7 +280,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } } else { // not ready, wait for 100ms and retry qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->stage); + pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage); taosMsleep(100); streamRecheckDownstream(pTask, pRsp); } @@ -664,7 +678,7 @@ int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->stage) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -678,7 +692,7 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->stage) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; tEndDecode(pDecoder); return 0; } @@ -692,7 +706,7 @@ int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->stage) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->oldStage) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -707,7 +721,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->stage) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->oldStage) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; tEndDecode(pDecoder); return 0; @@ -785,15 +799,13 @@ void launchFillHistoryTask(SStreamTask* pTask) { streamLaunchFillHistoryTask(pTask); } +// only the downstream tasks are ready, set the task to be ready to work. void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { if (pTask->info.fillHistory) { qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); return; } - ASSERT(pTask->status.downstreamReady == 0); - - // check downstream tasks for itself streamTaskDoCheckDownstreamTasks(pTask); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index bd1ffcfbb3..b6291c4c02 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -13,9 +13,11 @@ * along with this program. If not, see . */ -#include "tmisce.h" +#include "sync.h" +#include "tstream.h" #include "executor.h" #include "streamInt.h" +#include "tmisce.h" #include "tstream.h" #include "ttimer.h" #include "wal.h" @@ -60,6 +62,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; /*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/ if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; + if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1; return 0; } @@ -69,6 +72,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; /*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/ if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; + if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1; return 0; } @@ -382,6 +386,7 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { pEpInfo->epSet = pTask->info.epSet; pEpInfo->nodeId = pTask->info.nodeId; pEpInfo->taskId = pTask->id.taskId; + pEpInfo->stage = -1; return pEpInfo; } @@ -467,9 +472,10 @@ int32_t streamTaskStop(SStreamTask* pTask) { int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) { const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; + int64_t stage = pTask->pMeta->stage; + int32_t vgId = pTask->pMeta->vgId; - qDebug("s-task:%s vgId:%d restart current task, stage:%d, status:%s, sched-status:%d", id, vgId, pTask->status.stage, + qDebug("s-task:%s vgId:%d restart current task, stage:%"PRId64", status:%s, sched-status:%d", id, vgId, stage, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); // 1. stop task @@ -482,8 +488,6 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) taosArrayClear(pTask->pRspMsgList); pTask->status.downstreamReady = 0; - pTask->status.stage += 1; - streamSetStatusNormal(pTask); taosWLockLatch(&pTask->pMeta->lock); @@ -491,8 +495,9 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) streamMetaCommit(pTask->pMeta); taosWUnLockLatch(&pTask->pMeta->lock); - qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, - pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus)); + ASSERT(0); +// qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, +// pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus)); // 3. start to check the downstream status if (startTask) {