diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a027b4f9ba..d3717bc1e7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -436,33 +436,38 @@ typedef struct STaskStartInfo { int32_t elapsedTime; } STaskStartInfo; +typedef struct STaskUpdateInfo { + SHashObj* pTasks; + int32_t transId; +} STaskUpdateInfo; + // meta typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pCheckpointDb; - SHashObj* pTasksMap; - SArray* pTaskList; // SArray - void* ahandle; - TXN* txn; - FTaskExpand* expandFunc; - int32_t vgId; - int64_t stage; - int32_t role; - STaskStartInfo startInfo; - SRWLatch lock; - int32_t walScanCounter; - void* streamBackend; - int64_t streamBackendRid; - SHashObj* pTaskBackendUnique; - TdThreadMutex backendMutex; - SMetaHbInfo* pHbInfo; - SHashObj* pUpdateTaskSet; - int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta - int32_t numOfPausedTasks; - int32_t chkptNotReadyTasks; - int64_t rid; + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pCheckpointDb; + SHashObj* pTasksMap; + SArray* pTaskList; // SArray + void* ahandle; + TXN* txn; + FTaskExpand* expandFunc; + int32_t vgId; + int64_t stage; + int32_t role; + STaskStartInfo startInfo; + SRWLatch lock; + int32_t walScanCounter; + void* streamBackend; + int64_t streamBackendRid; + SHashObj* pTaskBackendUnique; + TdThreadMutex backendMutex; + SMetaHbInfo* pHbInfo; + STaskUpdateInfo updateInfo; + int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta + int32_t numOfPausedTasks; + int32_t chkptNotReadyTasks; + int64_t rid; int64_t chkpId; SArray* chkpSaved; @@ -664,6 +669,7 @@ typedef struct SNodeUpdateInfo { } SNodeUpdateInfo; typedef struct SStreamTaskNodeUpdateMsg { + int32_t transId; // to identify the msg int64_t streamId; int32_t taskId; SArray* pNodeList; // SArray diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 194c1021f6..a2aa56dd6e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1883,18 +1883,19 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, int64_t streamId, - int32_t taskId) { - pMsg->streamId = streamId; - pMsg->taskId = taskId; +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, + int32_t transId) { + pMsg->streamId = pId->streamId; + pMsg->taskId = pId->taskId; + pMsg->transId = transId; pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); } static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, - int64_t streamId, int32_t taskId) { + SStreamTaskId* pId, int32_t transId) { SStreamTaskNodeUpdateMsg req = {0}; - initNodeUpdateMsg(&req, pInfo, streamId, taskId); + initNodeUpdateMsg(&req, pInfo, pId, transId); int32_t code = 0; int32_t blen; @@ -1968,7 +1969,7 @@ void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_ // todo extract method: traverse stream tasks // build trans to update the epset static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans* pTrans) { - mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); + mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid); taosWLockLatch(&pStream->lock); int32_t numOfLevels = taosArrayGetSize(pStream->tasks); @@ -1983,7 +1984,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p void *pBuf = NULL; int32_t len = 0; streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId); + doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); STransAction action = {0}; initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 24c31b95c8..eac6603e8b 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -125,7 +125,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId); -//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); // tqMeta @@ -133,7 +132,6 @@ int32_t tqMetaOpen(STQ* pTq); int32_t tqMetaClose(STQ* pTq); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); -//int32_t tqMetaRestoreHandle(STQ* pTq); int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 711c9a52bc..907dc8d88a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -20,6 +20,12 @@ typedef struct { int8_t inited; } STqMgmt; +typedef struct STaskUpdateEntry { + int64_t streamId; + int32_t taskId; + int32_t transId; +} STaskUpdateEntry; + static STqMgmt tqMgmt = {0}; // 0: not init @@ -1869,7 +1875,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } SStreamTask* pTask = *ppTask; - tqDebug("s-task:%s receive nodeEp update msg from mnode", pTask->id.idStr); + + if (pMeta->updateInfo.transId != req.transId) { + pMeta->updateInfo.transId = req.transId; + tqDebug("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); + // info needs to be kept till the new trans to update the nodeEp arrived. + taosHashClear(pMeta->updateInfo.pTasks); + } else { + tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); + } + + STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; + void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); + if (exist != NULL) { + tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, + req.transId); + rsp.code = TSDB_CODE_SUCCESS; + taosWUnLockLatch(&pMeta->lock); + taosArrayDestroy(req.pNodeList); + return rsp.code; + } streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); @@ -1899,12 +1924,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } streamTaskStop(pTask); - taosHashPut(pMeta->pUpdateTaskSet, &pTask->id, sizeof(pTask->id), NULL, 0); + + // keep the already handled info + taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); if (ppHTask != NULL) { streamTaskStop(*ppHTask); tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); - taosHashPut(pMeta->pUpdateTaskSet, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); + taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); } else { tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); } @@ -1913,7 +1940,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // possibly only handle the stream task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet); + int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); pMeta->startInfo.startedAfterNodeUpdate = 1; @@ -1922,8 +1949,6 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { updateTasks, (numOfTasks - updateTasks)); taosWUnLockLatch(&pMeta->lock); } else { - taosHashClear(pMeta->pUpdateTaskSet); - if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.startedAfterNodeUpdate = 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 750f5d6a43..cd69bc0d92 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1175,6 +1175,9 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; + // todo this new attribute will be result in being incompatible with previous version + if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1; + int32_t size = taosArrayGetSize(pMsg->pNodeList); if (tEncodeI32(pEncoder, size) < 0) return -1; @@ -1193,6 +1196,8 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1; + int32_t size = 0; if (tDecodeI32(pDecoder, &size) < 0) return -1; pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f2dba090cf..b13e49beb4 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -143,8 +143,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->pUpdateTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); - if (pMeta->pUpdateTaskSet == NULL) { + pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK); + if (pMeta->updateInfo.pTasks == NULL) { goto _err; } @@ -219,7 +219,7 @@ _err: if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); - if (pMeta->pUpdateTaskSet) taosHashCleanup(pMeta->pUpdateTaskSet); + if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosMemoryFree(pMeta); @@ -340,7 +340,7 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskBackendUnique); - taosHashCleanup(pMeta->pUpdateTaskSet); + taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosMemoryFree(pMeta->pHbInfo);