From 39079ab64a35d62b8741fa16a6da1fe766e7c020 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 15 Aug 2023 15:52:06 +0800 Subject: [PATCH] fix(stream): load term from sync model to decide if the node is restart or not --- include/libs/stream/tstream.h | 4 +- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/vnode/src/inc/tq.h | 2 + source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/tq/tq.c | 11 ++--- source/dnode/vnode/src/tq/tqRestore.c | 6 --- source/dnode/vnode/src/tq/tqUtil.c | 5 ++ source/dnode/vnode/src/vnd/vnodeOpen.c | 11 +++-- source/dnode/vnode/src/vnd/vnodeSync.c | 4 ++ source/libs/stream/src/streamDispatch.c | 65 +++++++++++++------------ source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamRecover.c | 2 +- 12 files changed, 60 insertions(+), 55 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 06222d4cb3..a9feeeeb95 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -417,10 +417,10 @@ typedef struct { } SStreamTaskRunReq; typedef struct { - int64_t streamId; - int32_t taskId; int32_t type; int64_t stage; //nodeId from upstream task + int64_t streamId; + int32_t taskId; int32_t srcVgId; int32_t upstreamTaskId; int32_t upstreamChildId; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 772db6c6b0..cb60725110 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -866,7 +866,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)}; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); +// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 94993f99af..75421c2268 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -172,6 +172,8 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq); +void tqUpdateNodeStage(STQ* pTq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c0c01c100d..d84cc26d9f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -225,7 +225,6 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckStreamStatus(STQ* pTq); -int64_t tqGetNodeStage(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9be90eb801..b75455de6c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -128,8 +128,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - int64_t stage = tqGetNodeStage(pTq); - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, stage); + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1); if (pTq->pStreamMeta == NULL) { return -1; } @@ -1531,7 +1530,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } else { tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId); - return TSDB_CODE_INVALID_MSG; + terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; + return terrno; } } @@ -1930,8 +1930,3 @@ int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) { tmsgSendRsp(&rsp); return 0; } - -int64_t tqGetNodeStage(STQ* pTq) { - SSyncState state = syncGetState(pTq->pVnode->sync); - return state.term; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 1c55607653..26fb5f4faf 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -26,12 +26,6 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); - // update the cached nodeId when reading data from WAL files. - int64_t nodeStage = tqGetNodeStage(pTq); - if (pMeta->stage != nodeStage) { - pMeta->stage = nodeStage; - } - while (1) { int32_t scan = pMeta->walScanCounter; tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f333326fce..2211a02193 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -35,6 +35,11 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { return 0; } +void tqUpdateNodeStage(STQ* pTq) { + SSyncState state = syncGetState(pTq->pVnode->sync); + pTq->pStreamMeta->stage = state.term; +} + static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { pRsp->reqOffset = pReq->reqOffset; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 65fc552365..5140f172d9 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -412,11 +412,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC // open tq sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); taosRealPath(tdir, NULL, sizeof(tdir)); - pVnode->pTq = tqOpen(tdir, pVnode); - if (pVnode->pTq == NULL) { - vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); - goto _err; - } // open sma if (smaOpen(pVnode, rollback)) { @@ -444,6 +439,12 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC goto _err; } + pVnode->pTq = tqOpen(tdir, pVnode); + if (pVnode->pTq == NULL) { + vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + if (rollback) { vnodeRollback(pVnode); } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index b95c604f54..3fe74989fb 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -14,6 +14,7 @@ */ #define _DEFAULT_SOURCE +#include "tq.h" #include "vnd.h" #define BATCH_ENABLE 0 @@ -588,6 +589,9 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; + if (pVnode->pTq) { + tqUpdateNodeStage(pVnode->pTq); + } vDebug("vgId:%d, become leader", pVnode->config.vgId); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9955e02ff4..ae82c3dc45 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -47,14 +47,38 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->contLen = contLen; } +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; + ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); + ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); + for (int32_t i = 0; i < pReq->blockNum; i++) { + int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); + void* data = taosArrayGetP(pReq->data, i); + if (tEncodeI32(pEncoder, len) < 0) return -1; + if (tEncodeBinary(pEncoder, data, len) < 0) return -1; + } + tEndEncode(pEncoder); + return pEncoder->pos; +} + int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; @@ -581,30 +605,6 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; - ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); - ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); - for (int32_t i = 0; i < pReq->blockNum; i++) { - int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); - void* data = taosArrayGetP(pReq->data, i); - if (tEncodeI32(pEncoder, len) < 0) return -1; - if (tEncodeBinary(pEncoder, data, len) < 0) return -1; - } - tEndEncode(pEncoder); - return pEncoder->pos; -} - int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); @@ -827,10 +827,15 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure // happened too fast. // todo handle the shuffle dispatch failure - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, - pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); - int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); - if (ret != TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // no retry + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no retry", pTask->id.idStr, + pRsp->downstreamTaskId, tstrerror(code)); + } else { + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, + pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); + int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); + if (ret != TSDB_CODE_SUCCESS) { + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ce5cdc704c..c7fc7f72ea 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -124,7 +124,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); - qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64, vgId, chkpId); + qInfo("vgId:%d open stream meta successfully, latest checkpoint:%"PRId64", stage:%" PRId64, vgId, chkpId, stage); return pMeta; _err: diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f70da8264b..49ded65af2 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -212,7 +212,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ pTask->id.idStr, upstreamTaskId, vgId, stage, pInfo->stage); } - return ((pTask->status.downstreamReady == 1) && (pInfo->stage == upstreamTaskId))? 1:0; + return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0; } static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {