From 6a9826cf6e4ccef54c883ccbf33feb4c03b0f8a9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Aug 2023 10:37:25 +0800 Subject: [PATCH] enh(stream): update the stream task epset. --- include/common/tmisce.h | 16 +++ include/common/tmsgdef.h | 2 +- include/libs/stream/tstream.h | 14 ++- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 5 +- source/dnode/mnode/impl/src/mndStream.c | 86 +++++++++++----- source/dnode/vnode/src/tq/tq.c | 100 ++++++++++--------- source/dnode/vnode/src/tq/tqRestore.c | 2 +- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 1 - source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/stream/src/streamDispatch.c | 16 +++ source/libs/stream/src/streamMeta.c | 1 + source/libs/stream/src/streamRecover.c | 3 + source/libs/stream/src/streamTask.c | 90 ++++++++++++----- source/libs/transport/src/transCli.c | 17 +--- 14 files changed, 233 insertions(+), 122 deletions(-) diff --git a/include/common/tmisce.h b/include/common/tmisce.h index bc6558900c..3d1afcd21f 100644 --- a/include/common/tmisce.h +++ b/include/common/tmisce.h @@ -28,6 +28,22 @@ typedef struct SCorEpSet { } SCorEpSet; #define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse])) + +#define EPSET_TO_STR(_eps, tbuf) \ + do { \ + int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \ + for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \ + if (_i == (_eps)->numOfEps - 1) { \ + len += \ + snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \ + } else { \ + len += \ + snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \ + } \ + } \ + len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \ + } while (0); + int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp); void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 19760ff2f0..c99daa7250 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -301,7 +301,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) +// TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 95dd720050..9f224a4e30 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -556,13 +556,13 @@ typedef struct { int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); -typedef struct { +typedef struct SNodeUpdateInfo { int32_t nodeId; SEpSet prevEp; SEpSet newEp; } SNodeUpdateInfo; -typedef struct { +typedef struct SStreamTaskNodeUpdateMsg { int64_t streamId; int32_t taskId; SArray* pNodeList; // SArray @@ -571,6 +571,14 @@ typedef struct { int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); +typedef struct SStreamTaskNodeUpdateRsp { + int64_t streamId; + int32_t taskId; +} SStreamTaskNodeUpdateRsp; + +int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg); +int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg); + typedef struct { int64_t streamId; int32_t downstreamTaskId; @@ -630,7 +638,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage); int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir); -int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet); +int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, int32_t taskId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 6a04d15cc3..0e3dcd27df 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -730,8 +730,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; @@ -742,11 +742,12 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; +// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6e6465f885..a96ebfff9e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -93,6 +93,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); @@ -1743,14 +1744,17 @@ typedef struct SVgroupChangeInfo { SArray* pUpdateNodeList; //SArray } SVgroupChangeInfo; -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo) { +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo, int64_t streamId, int32_t taskId) { + pMsg->streamId = streamId; + pMsg->taskId = taskId; pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); } -static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, SVgroupChangeInfo* pInfo) { +static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, + int64_t streamId, int32_t taskId) { SStreamTaskNodeUpdateMsg req = {0}; - initNodeUpdateMsg(&req, pInfo); + initNodeUpdateMsg(&req, pInfo, streamId, taskId); int32_t code = 0; int32_t blen; @@ -1847,7 +1851,8 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr void *pBuf = NULL; int32_t len = 0; - doBuildStreamTaskUpdateMsg(&pBuf, &len, pTask->info.nodeId, pInfo); + streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); + doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId); STransAction action = {0}; initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet); @@ -1881,24 +1886,47 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr return TSDB_CODE_ACTION_IN_PROGRESS; } -// todo. 1. multiple change, 2. replica change problem -static SVgroupChangeInfo mndFindChangedVgroupInfo(SMnode *pMnode, const SArray *pPrevVgroupList, - const SArray *pVgroupList) { +static bool isNodeEpsetChanged(const SEpSet* pPrevEpset, const SEpSet* pCurrent) { + const SEp* pEp = GET_ACTIVE_EP(pPrevEpset); + + for(int32_t i = 0; i < pCurrent->numOfEps; ++i) { + const SEp* p = &(pCurrent->eps[i]); + if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { + return false; + } + } + + return true; +} + +// 1. increase the replica does not affect the stream process. +// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream +// tasks on the will be removed replica. +// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we will +// handle it as mentioned in 1 & 2 items. +static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) { SVgroupChangeInfo info = { .pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), .pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), }; - int32_t numOfVgroups = taosArrayGetSize(pPrevVgroupList); - for (int32_t i = 0; i < numOfVgroups; ++i) { - SNodeEntry *pPrevEntry = taosArrayGet(pPrevVgroupList, i); + int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); + for (int32_t i = 0; i < numOfNodes; ++i) { + SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i); - int32_t num = taosArrayGetSize(pVgroupList); + int32_t num = taosArrayGetSize(pNodeList); for (int32_t j = 0; j < num; ++j) { - SNodeEntry *pCurrent = taosArrayGet(pVgroupList, j); + SNodeEntry *pCurrent = taosArrayGet(pNodeList, j); + if (pCurrent->nodeId == pPrevEntry->nodeId) { - // todo handle the replica change problem. - if (!isEpsetEqual(&pCurrent->epset, &pPrevEntry->epset)) { + if (isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) { + const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); + + char buf[256] = {0}; + EPSET_TO_STR(&pCurrent->epset, buf); + mDebug("nodeId:%d epset changed detected, old:%s:%d -> new:%s", pCurrent->nodeId, pPrevEp->fqdn, + pPrevEp->port, buf); + SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId}; epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); epsetAssign(&updateInfo.newEp, &pCurrent->epset); @@ -1908,6 +1936,7 @@ static SVgroupChangeInfo mndFindChangedVgroupInfo(SMnode *pMnode, const SArray * taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); mndReleaseVgroup(pMnode, pVgroup); } + break; } } @@ -1953,11 +1982,6 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { break; } - // update the related upstream and downstream tasks, todo remove this, no need this function - // taosWLockLatch(&pStream->lock); - // streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); - // streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); - // taosWUnLockLatch(&pStream->lock); void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); if (p == NULL && p1 == NULL) { @@ -1975,7 +1999,7 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { return 0; } -static SArray* doExtractNodeList(SMnode *pMnode) { +static SArray* doExtractNodeListFromStream(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2022,7 +2046,6 @@ static SArray* doExtractNodeList(SMnode *pMnode) { } static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { - return 0; int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { mDebug("still in checking node change"); @@ -2030,17 +2053,26 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } mDebug("start to do node change checking"); + int64_t ts = taosGetTimestampSec(); SMnode *pMnode = pMsg->info.node; - if (execNodeList.pNodeEntryList == NULL) { - execNodeList.pNodeEntryList = doExtractNodeList(pMnode); + if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) { + if (execNodeList.pNodeEntryList != NULL) { + execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); + } + execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode); + } + + if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { + mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); + execNodeList.ts = ts; + atomic_store_32(&mndNodeCheckSentinel, 0); + return 0; } SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); - int64_t ts = taosGetTimestampSec(); - - SVgroupChangeInfo changeInfo = mndFindChangedVgroupInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { mndProcessVgroupChange(pMnode, &changeInfo); } @@ -2053,7 +2085,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { execNodeList.pNodeEntryList = pNodeSnapshot; execNodeList.ts = ts; - mDebug("end to do node change checking"); + mDebug("end to do stream task node change checking"); atomic_store_32(&mndNodeCheckSentinel, 0); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8d841ee01e..9299dbc9e5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -192,37 +192,40 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { } void tqNotifyClose(STQ* pTq) { - if (pTq != NULL) { - SStreamMeta* pMeta = pTq->pStreamMeta; - taosWLockLatch(&pMeta->lock); - - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - tqDebug("vgId:%d s-task:%s set closing flag", pMeta->vgId, pTask->id.idStr); - streamTaskStop(pTask); - } - - taosWUnLockLatch(&pMeta->lock); - - tqDebug("vgId:%d start to check all tasks", pMeta->vgId); - - int64_t st = taosGetTimestampMs(); - - while (hasStreamTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - int64_t el = taosGetTimestampMs() - st; - tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", - pMeta->vgId, el); + if (pTq == NULL) { + return; } + + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = pMeta->vgId; + + tqDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId); + taosWLockLatch(&pMeta->lock); + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + tqDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr); + streamTaskStop(pTask); + } + + taosWUnLockLatch(&pMeta->lock); + + tqDebug("vgId:%d start to check all tasks", vgId); + int64_t st = taosGetTimestampMs(); + + while (hasStreamTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + int64_t el = taosGetTimestampMs() - st; + tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); } //static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, @@ -1100,8 +1103,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { - tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, - pTq->pStreamMeta->vgId); + tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed", + rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; return -1; } @@ -1116,9 +1119,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms int32_t vgId = TD_VID(pTq->pVnode); if (tsDisableStream) { + tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); return 0; } + tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); + // 1.deserialize msg and build task SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { @@ -1141,8 +1147,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamMeta* pStreamMeta = pTq->pStreamMeta; - // 2.save task, use the newest commit version as the initial start version of stream task. + // 2.save task, use the latest commit version as the initial start version of stream task. int32_t taskId = pTask->id.taskId; + int64_t streamId = pTask->id.streamId; bool added = false; taosWLockLatch(&pStreamMeta->lock); @@ -1151,21 +1158,26 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWUnLockLatch(&pStreamMeta->lock); if (code < 0) { - tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks); + tqError("vgId:%d failed to add s-task:0x%x, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); tFreeStreamTask(pTask); return -1; } - // not added into meta store + // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if + // it is added into the meta store if (added) { tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); - SStreamTask* p = streamMetaAcquireTask(pStreamMeta, pTask->id.streamId, taskId); - if (p != NULL) { // reset the downstreamReady flag. + SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); + + bool restored = pTq->pVnode->restored; + if (p != NULL && restored) { // reset the downstreamReady flag. streamTaskCheckDownstreamTasks(p); + } else if (!restored) { + tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); } streamMetaReleaseTask(pStreamMeta, p); } else { - tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId); + tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); tFreeStreamTask(pTask); } @@ -1840,17 +1852,15 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } - tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); -// streamTaskUpdateEpInfo(pTask); + tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); + streamTaskUpdateEpsetInfo(pTask, req.pNodeList); SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { - tqError( - "vgId:%d failed to acquire fill-history task:0x%x when handling task update, it may have been dropped " - "already", - pMeta->vgId, pTask->historyTaskId.taskId); + tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped", + pMeta->vgId, pTask->historyTaskId.taskId); streamMetaReleaseTask(pMeta, pTask); @@ -1859,7 +1869,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr); -// streamTaskUpdateEpInfo(pHistoryTask); + streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList); } if (pHistoryTask != NULL) { diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c51d49c689..c1f9c7a0f9 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -61,7 +61,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d start to check all (%d) stream tasks downstream status", vgId, numOfTasks); + tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 687e66b324..5a802447bd 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -81,7 +81,6 @@ _err: tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); *ppReader = NULL; return code; - return 0; } int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 88b2cc88f2..91aa009632 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -473,7 +473,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_STREAM_TASK_DEPLOY: { - if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) { + if (tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) { goto _err; } } break; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c3695e9a43..fc88bbc07b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1007,6 +1007,22 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* taosArrayPush(pMsg->pNodeList, &info); } + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; tEndDecode(pDecoder); return 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4b094e4e98..664c6359f4 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -410,6 +410,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { int32_t streamLoadTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; + qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId); if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno)); return -1; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index bc380390d5..484e5157db 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -67,6 +67,7 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__CK: return "check-point"; case TASK_STATUS__CK_READY: return "check-point-ready"; case TASK_STATUS__DROPPING: return "dropping"; + case TASK_STATUS__STOP: return "stop"; default:return ""; } } @@ -115,6 +116,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { .upstreamTaskId = pTask->id.taskId, .upstreamNodeId = pTask->info.nodeId, .childId = pTask->info.selfChildId, + .stage = pTask->status.stage, }; // serialize @@ -172,6 +174,7 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p .downstreamTaskId = pRsp->downstreamTaskId, .downstreamNodeId = pRsp->downstreamNodeId, .childId = pRsp->childId, + .stage = pTask->status.stage, }; qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId, diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index aee0bb33d7..7a77b98db6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -13,11 +13,12 @@ * along with this program. If not, see . */ -#include "streamInt.h" +#include "tmisce.h" #include "executor.h" +#include "streamInt.h" #include "tstream.h" -#include "wal.h" #include "ttimer.h" +#include "wal.h" static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); @@ -404,7 +405,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS for(int32_t i = 0; i < numOfUpstream; ++i) { SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); if (pInfo->nodeId == nodeId) { - pInfo->epSet = *pEpSet; + epsetAssign(&pInfo->epSet, pEpSet); break; } } @@ -465,6 +466,12 @@ int32_t streamTaskStop(SStreamTask* pTask) { } int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) { + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + + qDebug("s-task:%s vgId:%d restart current task, stage:%d, status:%s, sched-status:%d", id, vgId, pTask->status.stage, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); + // 1. stop task streamTaskStop(pTask); @@ -477,39 +484,68 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) { pTask->status.downstreamReady = 0; pTask->status.stage += 1; - qDebug("s-task:%s reset downstream status and stage:%d, start to check downstream", pTask->id.idStr, - pTask->status.stage); + streamSetStatusNormal(pTask); + qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, + pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus)); // 3. start to check the downstream status streamTaskCheckDownstreamTasks(pTask); return 0; } -int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) { - int32_t numOfLevels = taosArrayGetSize(pTaskList); +// todo remove it +//int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) { +// int32_t numOfLevels = taosArrayGetSize(pTaskList); +// +// for (int32_t j = 0; j < numOfLevels; ++j) { +// SArray *pLevel = taosArrayGetP(pTaskList, j); +// +// int32_t numOfTasks = taosArrayGetSize(pLevel); +// for (int32_t k = 0; k < numOfTasks; ++k) { +// SStreamTask *pTask = taosArrayGetP(pLevel, k); +// if (pTask->info.nodeId == nodeId) { +// pTask->info.epSet = *pEpSet; +// continue; +// } +// +// // check for the dispath info and the upstream task info +// int32_t level = pTask->info.taskLevel; +// if (level == TASK_LEVEL__SOURCE) { +// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); +// } else if (level == TASK_LEVEL__AGG) { +// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); +// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); +// } else { // TASK_LEVEL__SINK +// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); +// } +// } +// } +// return 0; +//} - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pTaskList, j); +int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { + if (pTask->info.nodeId == nodeId) { // execution task should be moved away + epsetAssign(&pTask->info.epSet, pEpSet); + } - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - if (pTask->info.nodeId == nodeId) { - pTask->info.epSet = *pEpSet; - continue; - } + // check for the dispath info and the upstream task info + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SOURCE) { + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else if (level == TASK_LEVEL__AGG) { + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else { // TASK_LEVEL__SINK + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + } - // check for the dispath info and the upstream task info - int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SOURCE) { - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); - } else if (level == TASK_LEVEL__AGG) { - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); - } else { // TASK_LEVEL__SINK - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - } - } + return 0; +} + +int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { + for(int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { + SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); + doUpdateEpsetInfo(pTask, pInfo->nodeId, &pInfo->newEp); } return 0; } \ No newline at end of file diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index cfdc5b5e8b..1ecc2e3506 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -13,6 +13,7 @@ */ #include "transComm.h" +#include "tmisce.h" typedef struct { int32_t numOfConn; @@ -308,18 +309,6 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } \ } while (0) -#define EPSET_DEBUG_STR(epSet, tbuf) \ - do { \ - int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \ - for (int i = 0; i < (epSet)->numOfEps; i++) { \ - if (i == (epSet)->numOfEps - 1) { \ - len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \ - } else { \ - len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \ - } \ - } \ - len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse); \ - } while (0); static void* cliWorkThread(void* arg); @@ -2167,7 +2156,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { if (rpcDebugFlag & DEBUG_DEBUG) { STraceId* trace = &pMsg->msg.info.traceId; char tbuf[256] = {0}; - EPSET_DEBUG_STR(&pCtx->epSet, tbuf); + EPSET_TO_STR(&pCtx->epSet, tbuf); tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, pCtx->retryStep, pCtx->retryNextInterval); } @@ -2396,7 +2385,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (hasEpSet) { if (rpcDebugFlag & DEBUG_TRACE) { char tbuf[256] = {0}; - EPSET_DEBUG_STR(&pCtx->epSet, tbuf); + EPSET_TO_STR(&pCtx->epSet, tbuf); tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } }