diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h new file mode 100644 index 0000000000..75dafcdbff --- /dev/null +++ b/include/dnode/vnode/tqCommon.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TQ_COMMON_H +#define TDENGINE_TQ_COMMON_H + +// message process +int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart); +int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored); +int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); +int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); +int32_t startStreamTasks(SStreamMeta* pMeta); +int32_t resetStreamTaskStatus(SStreamMeta* pMeta); + +#endif // TDENGINE_TQ_COMMON_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c187127fff..f8ac360c71 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -853,23 +853,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int8_t isSucceed); -// message process -int32_t streamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart); -int32_t streamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored); -int32_t streamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t streamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t streamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t streamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t streamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t streamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t streamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); -int32_t streamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t streamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); -int32_t streamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); -int32_t streamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); -int32_t startStreamTasks(SStreamMeta* pMeta); -int32_t resetStreamTaskStatus(SStreamMeta* pMeta); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c14d99ca36..0167c1a6ac 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -698,8 +698,17 @@ static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask STransAction action = {0}; SEpSet epset = {0}; if(pTask->info.nodeId == SNODE_HANDLE){ - SSnodeObj* pObj = mndAcquireSnode(pMnode, pTask->info.nodeId); - addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); + SSnodeObj *pObj = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); + sdbRelease(pMnode->pSdb, pObj); + } }else{ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); epset = mndGetVgroupEpset(pMnode, pVgObj); diff --git a/source/dnode/snode/CMakeLists.txt b/source/dnode/snode/CMakeLists.txt index ebfe80ecab..4cd8e26e78 100644 --- a/source/dnode/snode/CMakeLists.txt +++ b/source/dnode/snode/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(snode STATIC ${SNODE_SRC}) target_include_directories( snode PUBLIC "${TD_SOURCE_DIR}/include/dnode/snode" + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" private "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( @@ -16,4 +17,5 @@ target_link_libraries( PRIVATE stream PRIVATE wal PRIVATE index + PRIVATE tqCommon ) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 8db2e783db..72ef6d14bd 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -16,7 +16,7 @@ #include "rsync.h" #include "executor.h" #include "sndInt.h" -#include "tstream.h" +#include "tqCommon.h" #include "tuuid.h" #define sndError(...) \ @@ -165,25 +165,25 @@ void sndClose(SSnode *pSnode) { int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: - return streamTaskProcessRunReq(pSnode->pMeta, pMsg, true); + return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true); case TDMT_STREAM_TASK_DISPATCH: - return streamTaskProcessDispatchReq(pSnode->pMeta, pMsg); + return tqStreamTaskProcessDispatchReq(pSnode->pMeta, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: - return streamTaskProcessDispatchRsp(pSnode->pMeta, pMsg); + return tqStreamTaskProcessDispatchRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE: - return streamTaskProcessRetrieveReq(pSnode->pMeta, pMsg); + return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_RSP: // 1036 break; case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return streamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg); + return tqStreamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return streamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg); + return tqStreamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK: - return streamTaskProcessCheckReq(pSnode->pMeta, pMsg); + return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: - return streamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true); + return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true); case TDMT_STREAM_TASK_CHECKPOINT_READY: - return streamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); + return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); @@ -196,13 +196,13 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { case TDMT_STREAM_TASK_DEPLOY: { void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); - return streamTaskProcessDeployReq(pSnode->pMeta, -1, pReq, len, true, true); + return tqStreamTaskProcessDeployReq(pSnode->pMeta, -1, pReq, len, true, true); } case TDMT_STREAM_TASK_DROP: - return streamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); + return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: - return streamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); + return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); default: ASSERT(0); } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index dc43da7fe7..9aeb14cd60 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -1,4 +1,5 @@ # vnode +add_subdirectory(src/tqCommon) add_library(vnode STATIC "") set( VNODE_SOURCE_FILES @@ -117,6 +118,7 @@ if (${BUILD_CONTRIB}) PUBLIC "inc" PUBLIC "src/inc" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" ) else() @@ -125,6 +127,7 @@ else() PUBLIC "inc" PUBLIC "src/inc" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" ) if (${TD_LINUX}) target_include_directories( @@ -158,6 +161,7 @@ target_link_libraries( PUBLIC transport PUBLIC stream PUBLIC index + PUBLIC tqCommon ) IF (TD_GRANT) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1145fa328d..5b04b31c94 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,6 +15,7 @@ #include "tq.h" #include "vnd.h" +#include "tqCommon.h" typedef struct { int8_t inited; @@ -890,15 +891,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - return streamTaskProcessDeployReq(pTq->pStreamMeta, sversion, msg, msgLen, vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored); + return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, sversion, msg, msgLen, vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored); } static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { @@ -1080,11 +1081,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // only the agg tasks and the sink tasks will receive this message from upstream tasks int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { @@ -1096,7 +1097,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { tqScanWal(pTq); return 0; } - int32_t code = streamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); if(code == 0 && taskId > 0){ tqScanWalAsync(pTq, false); } @@ -1104,15 +1105,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { - return streamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen); + return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen); } int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { @@ -1221,7 +1222,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { @@ -1358,11 +1359,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg); + return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { - return streamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored); + return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored); } int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tqCommon/CMakeLists.txt b/source/dnode/vnode/src/tqCommon/CMakeLists.txt new file mode 100644 index 0000000000..aea0e709e3 --- /dev/null +++ b/source/dnode/vnode/src/tqCommon/CMakeLists.txt @@ -0,0 +1,20 @@ +aux_source_directory(. TQ_SOURCE_FILES) +add_library(tqCommon STATIC ${TQ_SOURCE_FILES}) +target_include_directories( + tqCommon + PUBLIC "../inc" + PUBLIC "../../inc" +) + +target_link_libraries( + tqCommon + PRIVATE stream + PRIVATE common + PRIVATE transport + PRIVATE executor + PRIVATE index + PRIVATE qcom + PRIVATE qworker + PRIVATE sync + PRIVATE tfs +) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c new file mode 100644 index 0000000000..aee2aaa244 --- /dev/null +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -0,0 +1,809 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstream.h" +#include "tmsgcb.h" +#include "tq.h" + +typedef struct STaskUpdateEntry { + int64_t streamId; + int32_t taskId; + int32_t transId; +} STaskUpdateEntry; + +int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { + int32_t vgId = pMeta->vgId; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + tqDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); + return -1; + } + + tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(cb, STREAM_QUEUE, &msg); + return 0; +} + +int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + + SStreamTaskNodeUpdateMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { + rsp.code = TSDB_CODE_MSG_DECODE_ERROR; + tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); + tDecoderClear(&decoder); + return rsp.code; + } + + tDecoderClear(&decoder); + + // update the nodeEpset when it exists + streamMetaWLock(pMeta); + + // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (ppTask == NULL || *ppTask == NULL) { + tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, + req.taskId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + + taosArrayDestroy(req.pNodeList); + return rsp.code; + } + + SStreamTask* pTask = *ppTask; + + if (pMeta->updateInfo.transId != req.transId) { + pMeta->updateInfo.transId = req.transId; + tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); + // info needs to be kept till the new trans to update the nodeEp arrived. + taosHashClear(pMeta->updateInfo.pTasks); + } else { + tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); + } + + STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; + void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); + if (exist != NULL) { + tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, + req.transId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return rsp.code; + } + + streamMetaWUnLock(pMeta); + + // the following two functions should not be executed within the scope of meta lock to avoid deadlock + streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamTaskResetStatus(pTask); + + // continue after lock the meta again + streamMetaWLock(pMeta); + + SStreamTask** ppHTask = NULL; + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + if (ppHTask == NULL || *ppHTask == NULL) { + tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", + pMeta->vgId, req.taskId); + CLEAR_RELATED_FILLHISTORY_TASK(pTask); + } else { + tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + } + } + + { + streamMetaSaveTask(pMeta, pTask); + if (ppHTask != NULL) { + streamMetaSaveTask(pMeta, *ppHTask); + } + + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + } + + streamTaskStop(pTask); + + // keep the already handled info + taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); + + if (ppHTask != NULL) { + streamTaskStop(*ppHTask); + tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); + taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); + } else { + tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); + } + + rsp.code = 0; + + // possibly only handle the stream task. + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); + + pMeta->startInfo.tasksWillRestart = 1; + + if (updateTasks < numOfTasks) { + tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + updateTasks, (numOfTasks - updateTasks)); + streamMetaWUnLock(pMeta); + } else { + if (!restored) { + tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); + pMeta->startInfo.tasksWillRestart = 0; + streamMetaWUnLock(pMeta); + } else { + tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); +#if 1 + tqStreamTaskStartAsync(pMeta, cb, true); + streamMetaWUnLock(pMeta); +#else + streamMetaWUnLock(pMeta); + + // For debug purpose. + // the following procedure consume many CPU resource, result in the re-election of leader + // with high probability. So we employ it as a test case for the stream processing framework, with + // checkpoint/restart/nodeUpdate etc. + while (1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + int32_t code = streamMetaReopen(pMeta); + if (code != 0) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + streamMetaInitBackend(pMeta); + + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { + tqError("vgId:%d failed to load stream tasks", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + tqInfo("vgId:%d start all stream tasks after all being updated", vgId); + resetStreamTaskStatus(pTq->pStreamMeta); + tqStartStreamTaskAsync(pTq, false); + } else { + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + streamMetaWUnLock(pMeta); +#endif + } + } + + taosArrayDestroy(req.pNodeList); + return rsp.code; +} + +int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + SStreamDispatchReq req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); + if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { + tDecoderClear(&decoder); + return TSDB_CODE_MSG_DECODE_ERROR; + } + tDecoderClear(&decoder); + + tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); + if (pTask) { + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; + if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0){ + return -1; + } + tDeleteStreamDispatchReq(&req); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } else { + tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already", + pMeta->vgId, req.taskId); + + SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + if (pRspHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId); + return -1; + } + + pRspHead->vgId = htonl(req.upstreamNodeId); + ASSERT(pRspHead->vgId != 0); + + SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead)); + pRsp->streamId = htobe64(req.streamId); + pRsp->upstreamTaskId = htonl(req.upstreamTaskId); + pRsp->upstreamNodeId = htonl(req.upstreamNodeId); + pRsp->downstreamNodeId = htonl(pMeta->vgId); + pRsp->downstreamTaskId = htonl(req.taskId); + pRsp->msgId = htonl(req.msgId); + pRsp->stage = htobe64(req.stage); + pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; + + int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead}; + tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId); + + tmsgSendRsp(&rsp); + tDeleteStreamDispatchReq(&req); + + return 0; + } +} + +int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + + int32_t vgId = pMeta->vgId; + pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); + pRsp->streamId = htobe64(pRsp->streamId); + pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); + pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId); + pRsp->stage = htobe64(pRsp->stage); + pRsp->msgId = htonl(pRsp->msgId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId); + if (pTask) { + streamProcessDispatchRsp(pTask, pRsp, pMsg->code); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } else { + tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); + terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; + return terrno; + } +} + +int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; + + SStreamRetrieveReq req; + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); + tDecodeStreamRetrieveReq(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId); + if (pTask == NULL) { + tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, + req.dstTaskId); + return -1; + } + + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; + streamProcessRetrieveReq(pTask, &req, &rsp); + + streamMetaReleaseTask(pMeta, pTask); + tDeleteStreamRetrieveReq(&req); + return 0; +} + +int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamScanHistoryFinishReq req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecodeStreamScanHistoryFinishReq(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", + pMeta->vgId, req.downstreamTaskId); + return -1; + } + + tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); + + int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + +int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamCompleteHistoryMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecodeCompleteHistoryDataMsg(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", + pMeta->vgId, req.upstreamTaskId); + return -1; + } + + int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); + if (remain > 0) { + tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", + pTask->id.idStr, req.downstreamId, remain); + } else { + tqDebug( + "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " + "completed msg", + pTask->id.idStr, req.downstreamId); + streamProcessScanHistoryFinishRsp(pTask); + } + + streamMetaReleaseTask(pMeta, pTask); + return 0; +} + +int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + SStreamTaskCheckReq req; + SDecoder decoder; + + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); + tDecodeStreamTaskCheckReq(&decoder, &req); + tDecoderClear(&decoder); + + int32_t taskId = req.downstreamTaskId; + + SStreamTaskCheckRsp rsp = { + .reqId = req.reqId, + .streamId = req.streamId, + .childId = req.childId, + .downstreamNodeId = req.downstreamNodeId, + .downstreamTaskId = req.downstreamTaskId, + .upstreamNodeId = req.upstreamNodeId, + .upstreamTaskId = req.upstreamTaskId, + }; + + // only the leader node handle the check request + if (pMeta->role == NODE_ROLE_FOLLOWER) { + tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", + taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); + rsp.status = TASK_DOWNSTREAM_NOT_LEADER; + } else { + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); + if (pTask != NULL) { + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); + streamMetaReleaseTask(pMeta, pTask); + + char* p = NULL; + streamTaskGetStatus(pTask, &p); + tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", + pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + } else { + rsp.status = TASK_DOWNSTREAM_NOT_READY; + tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp check_status %d", + req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + } + } + + return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); +} + +int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t vgId = pMeta->vgId; + + int32_t code; + SStreamTaskCheckRsp rsp; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pReq, len); + code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); + if (code < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&decoder); + tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno)); + return -1; + } + + tDecoderClear(&decoder); + tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); + + if (!isLeader) { + tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, + rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); + return code; + } + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); + if (pTask == NULL) { + tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + rsp.streamId, rsp.upstreamTaskId, vgId); + terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; + return -1; + } + + code = streamProcessCheckRsp(pTask, &rsp); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + +int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + + SStreamCheckpointReadyMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + return code; + } + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId); + return code; + } + + tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + + streamProcessCheckpointReadyMsg(pTask); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + +int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored) { + int32_t code = 0; + int32_t vgId = pMeta->vgId; + + if (tsDisableStream) { + tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); + return code; + } + + tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); + + // 1.deserialize msg and build task + int32_t size = sizeof(SStreamTask); + SStreamTask* pTask = taosMemoryCalloc(1, size); + if (pTask == NULL) { + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + code = tDecodeStreamTask(&decoder, pTask); + tDecoderClear(&decoder); + + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pTask); + return TSDB_CODE_INVALID_MSG; + } + + // 2.save task, use the latest commit version as the initial start version of stream task. + int32_t taskId = pTask->id.taskId; + int64_t streamId = pTask->id.streamId; + bool added = false; + + streamMetaWLock(pMeta); + code = streamMetaRegisterTask(pMeta, sversion, pTask, &added); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + streamMetaWUnLock(pMeta); + + if (code < 0) { + tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); + tFreeStreamTask(pTask); + return code; + } + + // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if + // it is added into the meta store + if (added) { + // only handled in the leader node + if (isLeader) { + tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); + SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId); + + if (p != NULL && restored && p->info.fillHistory == 0) { + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + streamTaskHandleEvent(p->status.pSM, event); + } else if (!restored) { + tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); + } + + if (p != NULL) { + streamMetaReleaseTask(pMeta, p); + } + } else { + tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId); + } + } else { + tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); + tFreeStreamTask(pTask); + } + + return code; +} + +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { + SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; + + int32_t vgId = pMeta->vgId; + tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask != NULL) { + // drop the related fill-history task firstly + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pHTaskId = &pTask->hTaskInfo.id; + streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); + tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); + } + streamMetaReleaseTask(pMeta, pTask); + } + + // drop the stream task now + streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); + + // commit the update + streamMetaWLock(pMeta); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); + + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + streamMetaWUnLock(pMeta); + + return 0; +} + +int32_t startStreamTasks(SStreamMeta* pMeta) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pMeta->vgId; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + SArray* pTaskList = NULL; + streamMetaWLock(pMeta); + pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); + pMeta->startInfo.startTs = taosGetTimestampMs(); + streamMetaWUnLock(pMeta); + + // broadcast the check downstream tasks msg + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + // fill-history task can only be launched by related stream tasks. + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + if (pTask->status.downstreamReady == 1) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", + pTask->id.idStr); + streamLaunchFillHistoryTask(pTask); + } + + streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + return code; +} + +int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + streamTaskResetStatus(*pTask); + } + + return 0; +} + +static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); + + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + terrno = 0; + tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, + pMeta->updateInfo.transId); + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + code = streamMetaReopen(pMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + streamMetaInitBackend(pMeta); + int64_t el = taosGetTimestampMs() - st; + + tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); + + code = streamMetaLoadAllTasks(pMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + if (isLeader && !tsDisableStream) { + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + resetStreamTaskStatus(pMeta); + + streamMetaWUnLock(pMeta); + startStreamTasks(pMeta); + } else { + streamMetaResetStartInfo(&pMeta->startInfo); + streamMetaWUnLock(pMeta); + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + + code = terrno; + return code; +} + +int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { + SStreamTaskRunReq* pReq = pMsg->pCont; + + int32_t taskId = pReq->taskId; + int32_t vgId = pMeta->vgId; + + if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { + startStreamTasks(pMeta); + return 0; + } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { + restartStreamTasks(pMeta, isLeader); + return 0; + } + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); + if (pTask != NULL) { // even in halt status, the data in inputQ must be processed + char* p = NULL; + if (streamTaskReadyToRun(pTask, &p)) { + tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.nextProcessVer); + streamExecTask(pTask); + } else { + int8_t status = streamTaskSetSchedStatusInactive(pTask); + tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, + pTask->id.idStr, p, status); + } + + streamMetaReleaseTask(pMeta, pTask); + return 0; + } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. + // todo add one function to handle this + tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); + return -1; + } +} + + diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1f0f8f7362..817d5124a2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -18,6 +18,7 @@ #include "sync.h" #include "tsdb.h" #include "vnd.h" +#include "tqCommon.h" #define BATCH_ENABLE 0 @@ -570,7 +571,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); resetStreamTaskStatus(pVnode->pTq->pStreamMeta); - streamTaskStartAsync(pMeta, &pVnode->msgCb, false); + tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 60e64f31bb..4800e62109 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -52,12 +52,6 @@ extern "C" { #define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -typedef struct STaskUpdateEntry { - int64_t streamId; - int32_t taskId; - int32_t transId; -} STaskUpdateEntry; - typedef struct { int8_t type; SSDataBlock* pBlock; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 91d97a3068..69a8d1309a 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -784,785 +784,3 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->checkpointFailed = pSrc->checkpointFailed; } -int32_t streamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { - int32_t vgId = pMeta->vgId; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - stDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - return -1; - } - - stDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(cb, STREAM_QUEUE, &msg); - return 0; -} - -int32_t streamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { - int32_t vgId = pMeta->vgId; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; - - SStreamTaskNodeUpdateMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { - rsp.code = TSDB_CODE_MSG_DECODE_ERROR; - stError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); - tDecoderClear(&decoder); - return rsp.code; - } - - tDecoderClear(&decoder); - - // update the nodeEpset when it exists - streamMetaWLock(pMeta); - - // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. - STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask == NULL || *ppTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, - req.taskId); - rsp.code = TSDB_CODE_SUCCESS; - streamMetaWUnLock(pMeta); - - taosArrayDestroy(req.pNodeList); - return rsp.code; - } - - SStreamTask* pTask = *ppTask; - - if (pMeta->updateInfo.transId != req.transId) { - pMeta->updateInfo.transId = req.transId; - stInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); - // info needs to be kept till the new trans to update the nodeEp arrived. - taosHashClear(pMeta->updateInfo.pTasks); - } else { - stDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); - } - - STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; - void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); - if (exist != NULL) { - stDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, - req.transId); - rsp.code = TSDB_CODE_SUCCESS; - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return rsp.code; - } - - streamMetaWUnLock(pMeta); - - // the following two functions should not be executed within the scope of meta lock to avoid deadlock - streamTaskUpdateEpsetInfo(pTask, req.pNodeList); - streamTaskResetStatus(pTask); - - // continue after lock the meta again - streamMetaWLock(pMeta); - - SStreamTask** ppHTask = NULL; - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - if (ppHTask == NULL || *ppHTask == NULL) { - stError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", - pMeta->vgId, req.taskId); - CLEAR_RELATED_FILLHISTORY_TASK(pTask); - } else { - stDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); - streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); - } - } - - { - streamMetaSaveTask(pMeta, pTask); - if (ppHTask != NULL) { - streamMetaSaveTask(pMeta, *ppHTask); - } - - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } - } - - streamTaskStop(pTask); - - // keep the already handled info - taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); - - if (ppHTask != NULL) { - streamTaskStop(*ppHTask); - stDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); - taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); - } else { - stDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); - } - - rsp.code = 0; - - // possibly only handle the stream task. - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - - pMeta->startInfo.tasksWillRestart = 1; - - if (updateTasks < numOfTasks) { - stDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, - updateTasks, (numOfTasks - updateTasks)); - streamMetaWUnLock(pMeta); - } else { - if (!restored) { - stDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->startInfo.tasksWillRestart = 0; - streamMetaWUnLock(pMeta); - } else { - stDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); -#if 1 - streamTaskStartAsync(pMeta, cb, true); - streamMetaWUnLock(pMeta); -#else - streamMetaWUnLock(pMeta); - - // For debug purpose. - // the following procedure consume many CPU resource, result in the re-election of leader - // with high probability. So we employ it as a test case for the stream processing framework, with - // checkpoint/restart/nodeUpdate etc. - while (1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); - if (startVal == 0) { - break; - } - - tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); - taosMsleep(500); - } - - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - - int32_t code = streamMetaReopen(pMeta); - if (code != 0) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - streamMetaInitBackend(pMeta); - - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - tqError("vgId:%d failed to load stream tasks", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d start all stream tasks after all being updated", vgId); - resetStreamTaskStatus(pTq->pStreamMeta); - tqStartStreamTaskAsync(pTq, false); - } else { - tqInfo("vgId:%d, follower node not start stream tasks", vgId); - } - streamMetaWUnLock(pMeta); -#endif - } - } - - taosArrayDestroy(req.pNodeList); - return rsp.code; -} - -int32_t streamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - SStreamDispatchReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { - tDecoderClear(&decoder); - return TSDB_CODE_MSG_DECODE_ERROR; - } - tDecoderClear(&decoder); - - stDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); - if (pTask) { - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0){ - return -1; - } - tDeleteStreamDispatchReq(&req); - streamMetaReleaseTask(pMeta, pTask); - return 0; - } else { - stError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already", - pMeta->vgId, req.taskId); - - SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); - if (pRspHead == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId); - return -1; - } - - pRspHead->vgId = htonl(req.upstreamNodeId); - ASSERT(pRspHead->vgId != 0); - - SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead)); - pRsp->streamId = htobe64(req.streamId); - pRsp->upstreamTaskId = htonl(req.upstreamTaskId); - pRsp->upstreamNodeId = htonl(req.upstreamNodeId); - pRsp->downstreamNodeId = htonl(pMeta->vgId); - pRsp->downstreamTaskId = htonl(req.taskId); - pRsp->msgId = htonl(req.msgId); - pRsp->stage = htobe64(req.stage); - pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; - - int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); - SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead}; - stError("s-task:0x%x send dispatch error rsp, no task", req.taskId); - - tmsgSendRsp(&rsp); - tDeleteStreamDispatchReq(&req); - - return 0; - } -} - -int32_t streamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - - int32_t vgId = pMeta->vgId; - pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); - pRsp->streamId = htobe64(pRsp->streamId); - pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); - pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId); - pRsp->stage = htobe64(pRsp->stage); - pRsp->msgId = htonl(pRsp->msgId); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId); - if (pTask) { - streamProcessDispatchRsp(pTask, pRsp, pMsg->code); - streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_SUCCESS; - } else { - stDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); - terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return terrno; - } -} - -int32_t streamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SDecoder decoder; - - SStreamRetrieveReq req; - tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - tDecodeStreamRetrieveReq(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId); - if (pTask == NULL) { - stError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, - req.dstTaskId); - return -1; - } - - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessRetrieveReq(pTask, &req, &rsp); - - streamMetaReleaseTask(pMeta, pTask); - tDeleteStreamRetrieveReq(&req); - return 0; -} - -int32_t streamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamScanHistoryFinishReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - stError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", - pMeta->vgId, req.downstreamTaskId); - return -1; - } - - stDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); - - int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -int32_t streamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamCompleteHistoryMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeCompleteHistoryDataMsg(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - stError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", - pMeta->vgId, req.upstreamTaskId); - return -1; - } - - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); - if (remain > 0) { - stDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", - pTask->id.idStr, req.downstreamId, remain); - } else { - stDebug( - "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " - "completed msg", - pTask->id.idStr, req.downstreamId); - streamProcessScanHistoryFinishRsp(pTask); - } - - streamMetaReleaseTask(pMeta, pTask); - return 0; -} - -int32_t streamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - SStreamTaskCheckReq req; - SDecoder decoder; - - tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - tDecodeStreamTaskCheckReq(&decoder, &req); - tDecoderClear(&decoder); - - int32_t taskId = req.downstreamTaskId; - - SStreamTaskCheckRsp rsp = { - .reqId = req.reqId, - .streamId = req.streamId, - .childId = req.childId, - .downstreamNodeId = req.downstreamNodeId, - .downstreamTaskId = req.downstreamTaskId, - .upstreamNodeId = req.upstreamNodeId, - .upstreamTaskId = req.upstreamTaskId, - }; - - // only the leader node handle the check request - if (pMeta->role == NODE_ROLE_FOLLOWER) { - stError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", - taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); - rsp.status = TASK_DOWNSTREAM_NOT_LEADER; - } else { - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); - if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); - streamMetaReleaseTask(pMeta, pTask); - - char* p = NULL; - streamTaskGetStatus(pTask, &p); - stDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", - pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - } else { - rsp.status = TASK_DOWNSTREAM_NOT_READY; - stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp check_status %d", - req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - } - } - - return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); -} - -int32_t streamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t vgId = pMeta->vgId; - - int32_t code; - SStreamTaskCheckRsp rsp; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)pReq, len); - code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); - if (code < 0) { - terrno = TSDB_CODE_INVALID_MSG; - tDecoderClear(&decoder); - stError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno)); - return -1; - } - - tDecoderClear(&decoder); - stDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", - rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - - if (!isLeader) { - stError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, - rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); - return code; - } - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); - if (pTask == NULL) { - stError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return -1; - } - - code = streamProcessCheckRsp(pTask, &rsp); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -int32_t streamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { - int32_t vgId = pMeta->vgId; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; - - SStreamCheckpointReadyMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - return code; - } - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - stError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId); - return code; - } - - stDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId, - pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); - - streamProcessCheckpointReadyMsg(pTask); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -int32_t streamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored) { - int32_t code = 0; - int32_t vgId = pMeta->vgId; - - if (tsDisableStream) { - stInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); - return code; - } - - stDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); - - // 1.deserialize msg and build task - int32_t size = sizeof(SStreamTask); - SStreamTask* pTask = taosMemoryCalloc(1, size); - if (pTask == NULL) { - stError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); - return TSDB_CODE_OUT_OF_MEMORY; - } - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - code = tDecodeStreamTask(&decoder, pTask); - tDecoderClear(&decoder); - - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pTask); - return TSDB_CODE_INVALID_MSG; - } - - // 2.save task, use the latest commit version as the initial start version of stream task. - int32_t taskId = pTask->id.taskId; - int64_t streamId = pTask->id.streamId; - bool added = false; - - streamMetaWLock(pMeta); - code = streamMetaRegisterTask(pMeta, sversion, pTask, &added); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - streamMetaWUnLock(pMeta); - - if (code < 0) { - stError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); - tFreeStreamTask(pTask); - return code; - } - - // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if - // it is added into the meta store - if (added) { - // only handled in the leader node - if (isLeader) { - stDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); - SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId); - - if (p != NULL && restored && p->info.fillHistory == 0) { - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - streamTaskHandleEvent(p->status.pSM, event); - } else if (!restored) { - stWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); - } - - if (p != NULL) { - streamMetaReleaseTask(pMeta, p); - } - } else { - stDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId); - } - } else { - stWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); - tFreeStreamTask(pTask); - } - - return code; -} - -int32_t streamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { - SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - - int32_t vgId = pMeta->vgId; - stDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - if (pTask != NULL) { - // drop the related fill-history task firstly - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHTaskId = &pTask->hTaskInfo.id; - streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); - stDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); - } - streamMetaReleaseTask(pMeta, pTask); - } - - // drop the stream task now - streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); - - // commit the update - streamMetaWLock(pMeta); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); - - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } - streamMetaWUnLock(pMeta); - - return 0; -} - -int32_t startStreamTasks(SStreamMeta* pMeta) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = pMeta->vgId; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } - - SArray* pTaskList = NULL; - streamMetaWLock(pMeta); - pTaskList = taosArrayDup(pMeta->pTaskList, NULL); - taosHashClear(pMeta->startInfo.pReadyTaskSet); - taosHashClear(pMeta->startInfo.pFailedTaskSet); - pMeta->startInfo.startTs = taosGetTimestampMs(); - streamMetaWUnLock(pMeta); - - // broadcast the check downstream tasks msg - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); - if (pTask == NULL) { - continue; - } - - // fill-history task can only be launched by related stream tasks. - if (pTask->info.fillHistory == 1) { - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - if (pTask->status.downstreamReady == 1) { - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", - pTask->id.idStr); - streamLaunchFillHistoryTask(pTask); - } - - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); - if (ret != TSDB_CODE_SUCCESS) { - code = ret; - } - - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - return code; -} - -int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - - stDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - - STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - streamTaskResetStatus(*pTask); - } - - return 0; -} - -static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { - int32_t vgId = pMeta->vgId; - int32_t code = 0; - int64_t st = taosGetTimestampMs(); - - while(1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); - if (startVal == 0) { - break; - } - - stDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); - taosMsleep(500); - } - - terrno = 0; - stInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, - pMeta->updateInfo.transId); - - while (streamMetaTaskInTimer(pMeta)) { - stDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - code = streamMetaReopen(pMeta); - if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } - - streamMetaInitBackend(pMeta); - int64_t el = taosGetTimestampMs() - st; - - stInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); - - code = streamMetaLoadAllTasks(pMeta); - if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } - - if (isLeader && !tsDisableStream) { - stInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); - resetStreamTaskStatus(pMeta); - - streamMetaWUnLock(pMeta); - startStreamTasks(pMeta); - } else { - streamMetaResetStartInfo(&pMeta->startInfo); - streamMetaWUnLock(pMeta); - stInfo("vgId:%d, follower node not start stream tasks", vgId); - } - - code = terrno; - return code; -} - -int32_t streamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { - SStreamTaskRunReq* pReq = pMsg->pCont; - - int32_t taskId = pReq->taskId; - int32_t vgId = pMeta->vgId; - - if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { - startStreamTasks(pMeta); - return 0; - } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { - restartStreamTasks(pMeta, isLeader); - return 0; - } - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); - if (pTask != NULL) { // even in halt status, the data in inputQ must be processed - char* p = NULL; - if (streamTaskReadyToRun(pTask, &p)) { - stDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); - streamExecTask(pTask); - } else { - int8_t status = streamTaskSetSchedStatusInactive(pTask); - stDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, - pTask->id.idStr, p, status); - } - - streamMetaReleaseTask(pMeta, pTask); - return 0; - } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. - // todo add one function to handle this - stError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); - return -1; - } -}