diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e772a47e3d..dc20e6bd76 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2965,6 +2965,20 @@ typedef struct { int8_t reserved; } SVPauseStreamTaskRsp; +typedef struct { + SMsgHead head; + int32_t taskId; + int32_t nodeId; + SEpSet epset; +} SVStreamTaskUpdateReq; + +typedef struct { + int8_t reserved; +} SVStreamTaskUpdateRsp; + +int32_t tSerializeVTaskUpdateReq(void* buf, int32_t bufLen, const SVStreamTaskUpdateReq* pReq); +int32_t tDeserializeVTaskUpdateReq(void* buf, int32_t bufLen, SVStreamTaskUpdateReq* pReq); + typedef struct { char name[TSDB_STREAM_FNAME_LEN]; int8_t igNotExists; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 73a0bed48f..0aa7e1f955 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7919,6 +7919,24 @@ int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamR return 0; } +int32_t tSerializeVTaskUpdateReq(void *buf, int32_t bufLen, const SVStreamTaskUpdateReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; +// if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; +// if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; +// if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeVTaskUpdateReq(void* buf, int32_t bufLen, SVStreamTaskUpdateReq* pReq) { + return 0; +} + int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8f10c00c9c..ed64c0ec36 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -66,6 +66,9 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId); +static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans); +static void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset); + int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, @@ -471,11 +474,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { STransAction action = {0}; action.mTraceId = pTrans->mTraceId; - memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); - action.pCont = buf; - action.contLen = tlen; - action.msgType = TDMT_STREAM_TASK_DEPLOY; - + initTransAction(&action, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); return -1; @@ -653,8 +652,6 @@ _OVER: } static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { - // vnode - /*if (pTask->info.nodeId > 0) {*/ SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -663,16 +660,13 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; + STransAction action = {0}; - memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); - action.pCont = pReq; - action.contLen = sizeof(SVDropStreamTaskReq); - action.msgType = TDMT_STREAM_TASK_DROP; + initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; } - /*}*/ return 0; } @@ -1043,11 +1037,8 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = buf; - action.contLen = tlen; - action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; - + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset); mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -1483,16 +1474,17 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); if (pReq == NULL) { + mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; + STransAction action = {0}; - memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); - action.pCont = pReq; - action.contLen = sizeof(SVPauseStreamTaskReq); - action.msgType = TDMT_STREAM_TASK_PAUSE; + initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1630,11 +1622,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; pReq->igUntreated = igUntreated; + STransAction action = {0}; - memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); - action.pCont = pReq; - action.contLen = sizeof(SVResumeStreamTaskReq); - action.msgType = TDMT_STREAM_TASK_RESUME; + initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1776,8 +1766,39 @@ static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t no return TSDB_CODE_SUCCESS; } +int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) { + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL) { + mError("failed to encode stream since %s", terrstr()); + return -1; + } + + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + sdbFreeRaw(pCommitRaw); + mndTransDrop(pTrans); + return -1; + } + + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { + mError("stream trans:%d failed to set raw status since %s", pTrans->id, terrstr()); + sdbFreeRaw(pCommitRaw); + mndTransDrop(pTrans); + return -1; + } + + return 0; +} + +void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset) { + pAction->epSet = *pEpset; + pAction->contLen = contLen; + pAction->pCont = pCont; + pAction->msgType = msgType; +} + // build trans to update the epset -static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int32_t nodeId, SEpSet* pEpset) { +static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int32_t nodeId, SEpSet *pEpset) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update"); if (pTrans == NULL) { mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); @@ -1787,7 +1808,7 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3 mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); ASSERT(0); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to build stream:0x%" PRIx64 " task DAG update, code:%s", pStream->uid, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); @@ -1805,15 +1826,12 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3 for (int32_t k = 0; k < numOfTasks; ++k) { SStreamTask *pTask = taosArrayGetP(pLevel, k); - void* pBuf = NULL; + void *pBuf = NULL; int32_t len = 0; doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, pEpset); STransAction action = {0}; - action.epSet = pTask->info.epSet; - action.pCont = pBuf; - action.contLen = len; - action.msgType = TDMT_VND_STREAM_TASK_UPDATE; + initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pBuf); @@ -1824,28 +1842,12 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3 } taosWUnLockLatch(&pStream->lock); - - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - if (pCommitRaw == NULL) { - mError("failed to prepare trans rebalance since %s", terrstr()); - return -1; - } - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - sdbFreeRaw(pCommitRaw); - mError("failed to prepare trans rebalance since %s", terrstr()); - return -1; - } - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { - sdbFreeRaw(pCommitRaw); - mError("failed to prepare trans rebalance since %s", terrstr()); - return -1; - } - - return TSDB_CODE_SUCCESS; + return mndPersistTransLog(pStream, pTrans); } static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEpSet) { - int32_t numOfLevels = 0; + int32_t numOfLevels = taosArrayGetSize(pStream->tasks); + for (int32_t j = 0; j < numOfLevels; ++j) { SArray *pLevel = taosArrayGetP(pStream->tasks, j); @@ -1870,10 +1872,9 @@ static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEp } } return 0; - } -// todo: handle the database drop/stream drop case +// todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1891,6 +1892,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } int64_t now = taosGetTimestampSec(); + mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); // timeout list bool nodeChanged = false; @@ -1899,8 +1901,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // record the timeout node for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); - if (now - pEntry->hbTimestamp > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next -// taosArrayPush(pList, &pEntry); + int64_t duration = now - pEntry->hbTimestamp; + if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next + taosArrayPush(pList, &pEntry); + mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); + continue; } if (pEntry->nodeId != req.vgId) { @@ -1910,14 +1915,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { pEntry->hbTimestamp = now; // check epset to identify whether the node has been transferred to other dnodes. - // 1. node the epset is changed, which means the node transfer has occurred for this node. + // node the epset is changed, which means the node transfer has occurred for this node. if (!isEpsetEqual(&pEntry->epset, &req.epset)) { nodeChanged = true; break; } } - // todo handle the node timeout case. + // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, + // to identify whether the dnode is truely offline or not. // handle the node changed case if (!nodeChanged) { @@ -1927,7 +1933,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t nodeId = req.vgId; SEpSet newEpSet = req.epset; - { // check all streams that involved this vnode should update the epset info + {// check all streams that involved this vnode should update the epset info SStreamObj *pStream = NULL; void *pIter = NULL; while (1) { @@ -1936,27 +1942,17 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { break; } - // update the related upstream and downstream tasks + // update the related upstream and downstream tasks, todo remove this, no need this function taosWLockLatch(&pStream->lock); updateTaskEpInfo(pStream, req.vgId, &req.epset); - // write down + taosWUnLockLatch(&pStream->lock); code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet); if (code != TSDB_CODE_SUCCESS) { - + // todo } - taosWUnLockLatch(&pStream->lock); } - } -// if (code == 0) { -// if (mndTransPrepare(pMnode, pTrans) != 0) { -// mError("failed to prepre trans rebalance since %s", terrstr()); -// } -// } -// mndTransDrop(pTrans); - - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 609c2f9135..3e29c85e44 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -811,7 +811,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SMqRebVgReq req = {0}; SDecoder dc = {0}; - tDecoderInit(&dc, msg, msgLen); + tDecoderInit(&dc, (uint8_t*)msg, msgLen); // decode req if (tDecodeSMqRebVgReq(&dc, &req) < 0) { @@ -1810,3 +1810,44 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); return code; } + +int32_t tqProcessTaskUpdateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { +// SStreamTaskUpdateInfo* pReq = (SVPauseStreamTaskReq*)msg; +// +// SStreamMeta* pMeta = pTq->pStreamMeta; +// SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); +// if (pTask == NULL) { +// tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, +// pReq->taskId); +// +// // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active +// return TSDB_CODE_SUCCESS; +// } +// +// tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); +// streamTaskPause(pTask); +// +// SStreamTask* pHistoryTask = NULL; +// if (pTask->historyTaskId.taskId != 0) { +// pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); +// if (pHistoryTask == NULL) { +// tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success", +// pMeta->vgId, pTask->historyTaskId.taskId); +// +// streamMetaReleaseTask(pMeta, pTask); +// +// // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active +// return TSDB_CODE_SUCCESS; +// } +// +// tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); +// streamTaskPause(pHistoryTask); +// } +// +// streamMetaReleaseTask(pMeta, pTask); +// if (pHistoryTask != NULL) { +// streamMetaReleaseTask(pMeta, pHistoryTask); +// } +// + return TSDB_CODE_SUCCESS; +}