diff --git a/include/dnode/vnode/stream.h b/include/dnode/vnode/stream.h new file mode 100644 index 0000000000..6d86847542 --- /dev/null +++ b/include/dnode/vnode/stream.h @@ -0,0 +1,18 @@ +// +// Created by mingming wanng on 2023/11/15. +// + +#ifndef TDENGINE_STREAM_H +#define TDENGINE_STREAM_H + +#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) +#define STREAM_EXEC_START_ALL_TASKS_ID (-2) +#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) + +typedef struct STaskUpdateEntry { + int64_t streamId; + int32_t taskId; + int32_t transId; +} STaskUpdateEntry; + +#endif // TDENGINE_STREAM_H diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 509a7a6b7f..cd81b9873f 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -73,6 +73,7 @@ SArray *smGetMsgHandles() { SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; @@ -87,7 +88,6 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/snode/CMakeLists.txt b/source/dnode/snode/CMakeLists.txt index ebfe80ecab..2da1f9adac 100644 --- a/source/dnode/snode/CMakeLists.txt +++ b/source/dnode/snode/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(snode STATIC ${SNODE_SRC}) target_include_directories( snode PUBLIC "${TD_SOURCE_DIR}/include/dnode/snode" + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" private "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 718f4e851b..25400220b8 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -18,6 +18,7 @@ #include "sndInt.h" #include "tstream.h" #include "tuuid.h" +#include "stream.h" #define sndError(...) \ do { \ @@ -84,7 +85,16 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer streamTaskOpenAllUpstreamInput(pTask); - pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); + SStreamTask* pSateTask = pTask; + SStreamTask task = {0}; + if (pTask->info.fillHistory) { + task.id.streamId = pTask->streamTaskId.streamId; + task.id.taskId = pTask->streamTaskId.taskId; + task.pMeta = pTask->pMeta; + pSateTask = &task; + } + + pTask->pState = streamStateOpen(pSnode->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { sndError("s-task:%s failed to open state for task", pTask->id.idStr); return -1; @@ -92,12 +102,20 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } - int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList); - SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); + SReadHandle handle = { + .checkpointId = pTask->chkInfo.checkpointId, + .vnode = NULL, + .numOfVgroups = numOfVgroups, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window, + }; initStreamStateAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId); ASSERT(pTask->exec.pExecutor); + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); @@ -169,6 +187,167 @@ void sndClose(SSnode *pSnode) { int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } +int32_t sndStartStreamTasks(SSnode* pSnode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = SNODE_HANDLE; + SStreamMeta* pMeta = pSnode->pMeta; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + sndDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + SArray* pTaskList = NULL; + streamMetaWLock(pMeta); + pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); + pMeta->startInfo.startTs = taosGetTimestampMs(); + streamMetaWUnLock(pMeta); + + // broadcast the check downstream tasks msg + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + // fill-history task can only be launched by related stream tasks. + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + if (pTask->status.downstreamReady == 1) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + sndDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", + pTask->id.idStr); + streamLaunchFillHistoryTask(pTask); + } + + streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + return code; +} + +int32_t sndResetStreamTaskStatus(SSnode* pSnode) { + SStreamMeta* pMeta = pSnode->pMeta; + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + sndDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + streamTaskResetStatus(*pTask); + } + + return 0; +} + +int32_t sndRestartStreamTasks(SSnode* pSnode) { + SStreamMeta* pMeta = pSnode->pMeta; + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); + + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + sndDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + terrno = 0; + sndInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, + pMeta->updateInfo.transId); + + while (streamMetaTaskInTimer(pMeta)) { + sndDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + code = streamMetaReopen(pMeta); + if (code != TSDB_CODE_SUCCESS) { + sndError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + int64_t el = taosGetTimestampMs() - st; + + sndInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); + + code = streamMetaLoadAllTasks(pSnode->pMeta); + if (code != TSDB_CODE_SUCCESS) { + sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + sndResetStreamTaskStatus(pSnode); + sndStartStreamTasks(pSnode); + + streamMetaWUnLock(pMeta); + code = terrno; + return code; +} + +int32_t sndStartStreamTaskAsync(SSnode* pSnode, bool restart) { + SStreamMeta* pMeta = pSnode->pMeta; + int32_t vgId = pMeta->vgId; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + sndDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + sndError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); + return -1; + } + + sndDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(&pSnode->msgCb, STREAM_QUEUE, &msg); + return 0; +} + int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t code; @@ -235,6 +414,16 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTaskRunReq *pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + + if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { + sndStartStreamTasks(pSnode); + return 0; + } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { + sndRestartStreamTasks(pSnode); + return 0; + } + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask) { streamExecTask(pTask); @@ -312,22 +501,6 @@ int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } -int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { - switch (pMsg->msgType) { - case TDMT_STREAM_TASK_DEPLOY: { - void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - return sndProcessTaskDeployReq(pSnode, pReq, len); - } - - case TDMT_STREAM_TASK_DROP: - return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen); - default: - ASSERT(0); - } - return 0; -} - int32_t sndProcessTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -517,6 +690,181 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { return code; } +int32_t sndProcessTaskUpdateReq(SSnode* pSnode, SRpcMsg* pMsg) { + SStreamMeta* pMeta = pSnode->pMeta; + int32_t vgId = SNODE_HANDLE; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + + SStreamTaskNodeUpdateMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { + rsp.code = TSDB_CODE_MSG_DECODE_ERROR; + sndError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); + tDecoderClear(&decoder); + return rsp.code; + } + + tDecoderClear(&decoder); + + // update the nodeEpset when it exists + streamMetaWLock(pMeta); + + // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (ppTask == NULL || *ppTask == NULL) { + sndError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, + req.taskId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + + taosArrayDestroy(req.pNodeList); + return rsp.code; + } + + SStreamTask* pTask = *ppTask; + + if (pMeta->updateInfo.transId != req.transId) { + pMeta->updateInfo.transId = req.transId; + sndInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); + // info needs to be kept till the new trans to update the nodeEp arrived. + taosHashClear(pMeta->updateInfo.pTasks); + } else { + sndDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); + } + + STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; + void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); + if (exist != NULL) { + sndDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, + req.transId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return rsp.code; + } + + streamMetaWUnLock(pMeta); + + // the following two functions should not be executed within the scope of meta lock to avoid deadlock + streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamTaskResetStatus(pTask); + + // continue after lock the meta again + streamMetaWLock(pMeta); + + SStreamTask** ppHTask = NULL; + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + if (ppHTask == NULL || *ppHTask == NULL) { + sndError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", + pMeta->vgId, req.taskId); + CLEAR_RELATED_FILLHISTORY_TASK(pTask); + } else { + sndDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + } + } + + { + streamMetaSaveTask(pMeta, pTask); + if (ppHTask != NULL) { + streamMetaSaveTask(pMeta, *ppHTask); + } + + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + } + + streamTaskStop(pTask); + + // keep the already handled info + taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); + + if (ppHTask != NULL) { + streamTaskStop(*ppHTask); + sndDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); + taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); + } else { + sndDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); + } + + rsp.code = 0; + + // possibly only handle the stream task. + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); + + pMeta->startInfo.tasksWillRestart = 1; + + if (updateTasks < numOfTasks) { + sndDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + updateTasks, (numOfTasks - updateTasks)); + streamMetaWUnLock(pMeta); + } else { + sndDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); +#if 1 + sndStartStreamTaskAsync(pSnode, true); + streamMetaWUnLock(pMeta); +#else + streamMetaWUnLock(pMeta); + + // For debug purpose. + // the following procedure consume many CPU resource, result in the re-election of leader + // with high probability. So we employ it as a test case for the stream processing framework, with + // checkpoint/restart/nodeUpdate etc. + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + int32_t code = streamMetaReopen(pMeta); + if (code != 0) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { + tqError("vgId:%d failed to load stream tasks", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + tqInfo("vgId:%d start all stream tasks after all being updated", vgId); + tqResetStreamTaskStatus(pTq); + tqStartStreamTaskAsync(pTq, false); + } else { + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + streamMetaWUnLock(pMeta); +#endif + } + + taosArrayDestroy(req.pNodeList); + return rsp.code; +} + int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: @@ -544,3 +892,22 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { } return 0; } + +int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { + switch (pMsg->msgType) { + case TDMT_STREAM_TASK_DEPLOY: { + void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + return sndProcessTaskDeployReq(pSnode, pReq, len); + } + + case TDMT_STREAM_TASK_DROP: + return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen); + case TDMT_VND_STREAM_TASK_UPDATE: + sndProcessTaskUpdateReq(pSnode, pMsg); + break; + default: + ASSERT(0); + } + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index dc43da7fe7..635c15aa41 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -138,6 +138,11 @@ else() endif() endif() +target_include_directories( + vnode + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" +) + target_link_libraries( vnode PUBLIC os diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index fdd449bf36..b3f8317add 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -43,9 +43,6 @@ extern "C" { typedef struct STqOffsetStore STqOffsetStore; -#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) -#define STREAM_EXEC_START_ALL_TASKS_ID (-2) -#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) // tqExec diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3ae0eb1ddf..521686d023 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,17 +15,12 @@ #include "tq.h" #include "vnd.h" +#include "stream.h" typedef struct { int8_t inited; } STqMgmt; -typedef struct STaskUpdateEntry { - int64_t streamId; - int32_t taskId; - int32_t transId; -} STaskUpdateEntry; - static STqMgmt tqMgmt = {0}; // 0: not init diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index e578638e9d..9f00f9cc10 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -15,6 +15,7 @@ #include "tq.h" #include "vnd.h" +#include "stream.h" #define MAX_REPEAT_SCAN_THRESHOLD 3 #define SCAN_WAL_IDLE_DURATION 100 diff --git a/tests/system-test/8-stream/snodeRestart.py b/tests/system-test/8-stream/snodeRestart.py index 65ca090df9..6adf874ecd 100644 --- a/tests/system-test/8-stream/snodeRestart.py +++ b/tests/system-test/8-stream/snodeRestart.py @@ -27,7 +27,7 @@ class TDTestCase: time.sleep(1) tdSql.query("use test") tdSql.query("create snode on dnode 4") - tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select sum(voltage) from meters partition by groupid interval(4s)") + tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart, sum(voltage) from meters partition by groupid interval(4s)") tdLog.debug("create stream use snode and insert data") time.sleep(10)