fix:send update msg to vnodes if snode restart

This commit is contained in:
wangmm0220 2023-11-15 18:13:35 +08:00
parent 8a9ded764d
commit 45782f278b
9 changed files with 415 additions and 31 deletions

View File

@ -0,0 +1,18 @@
//
// Created by mingming wanng on 2023/11/15.
//
#ifndef TDENGINE_STREAM_H
#define TDENGINE_STREAM_H
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
typedef struct STaskUpdateEntry {
int64_t streamId;
int32_t taskId;
int32_t transId;
} STaskUpdateEntry;
#endif // TDENGINE_STREAM_H

View File

@ -73,6 +73,7 @@ SArray *smGetMsgHandles() {
SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
@ -87,7 +88,6 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -3,6 +3,7 @@ add_library(snode STATIC ${SNODE_SRC})
target_include_directories(
snode
PUBLIC "${TD_SOURCE_DIR}/include/dnode/snode"
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(

View File

@ -18,6 +18,7 @@
#include "sndInt.h"
#include "tstream.h"
#include "tuuid.h"
#include "stream.h"
#define sndError(...) \
do { \
@ -84,7 +85,16 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
streamTaskOpenAllUpstreamInput(pTask);
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
SStreamTask* pSateTask = pTask;
SStreamTask task = {0};
if (pTask->info.fillHistory) {
task.id.streamId = pTask->streamTaskId.streamId;
task.id.taskId = pTask->streamTaskId.taskId;
task.pMeta = pTask->pMeta;
pSateTask = &task;
}
pTask->pState = streamStateOpen(pSnode->path, pSateTask, false, -1, -1);
if (pTask->pState == NULL) {
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
return -1;
@ -92,12 +102,20 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
.numOfVgroups = numOfVgroups,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStreamStateAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId);
ASSERT(pTask->exec.pExecutor);
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
@ -169,6 +187,167 @@ void sndClose(SSnode *pSnode) {
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
int32_t sndStartStreamTasks(SSnode* pSnode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = SNODE_HANDLE;
SStreamMeta* pMeta = pSnode->pMeta;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
sndDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
// fill-history task can only be launched by related stream tasks.
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
sndDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return code;
}
int32_t sndResetStreamTaskStatus(SSnode* pSnode) {
SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
sndDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
}
return 0;
}
int32_t sndRestartStreamTasks(SSnode* pSnode) {
SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
sndDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
terrno = 0;
sndInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId);
while (streamMetaTaskInTimer(pMeta)) {
sndDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
sndError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
int64_t el = taosGetTimestampMs() - st;
sndInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.);
code = streamMetaLoadAllTasks(pSnode->pMeta);
if (code != TSDB_CODE_SUCCESS) {
sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
sndResetStreamTaskStatus(pSnode);
sndStartStreamTasks(pSnode);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
int32_t sndStartStreamTaskAsync(SSnode* pSnode, bool restart) {
SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
sndDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sndError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
return -1;
}
sndDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pSnode->msgCb, STREAM_QUEUE, &msg);
return 0;
}
int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t code;
@ -235,6 +414,16 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTaskRunReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
sndStartStreamTasks(pSnode);
return 0;
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
sndRestartStreamTasks(pSnode);
return 0;
}
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
if (pTask) {
streamExecTask(pTask);
@ -312,22 +501,6 @@ int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
return 0;
}
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY: {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return sndProcessTaskDeployReq(pSnode, pReq, len);
}
case TDMT_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen);
default:
ASSERT(0);
}
return 0;
}
int32_t sndProcessTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -517,6 +690,181 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
return code;
}
int32_t sndProcessTaskUpdateReq(SSnode* pSnode, SRpcMsg* pMsg) {
SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = SNODE_HANDLE;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SStreamTaskNodeUpdateMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
sndError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
tDecoderClear(&decoder);
return rsp.code;
}
tDecoderClear(&decoder);
// update the nodeEpset when it exists
streamMetaWLock(pMeta);
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) {
sndError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
SStreamTask* pTask = *ppTask;
if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId;
sndInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
} else {
sndDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId);
}
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (exist != NULL) {
sndDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
req.transId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
streamMetaWUnLock(pMeta);
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask);
// continue after lock the meta again
streamMetaWLock(pMeta);
SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) {
sndError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
} else {
sndDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
}
}
{
streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
}
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
}
streamTaskStop(pTask);
// keep the already handled info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
sndDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else {
sndDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
}
rsp.code = 0;
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.tasksWillRestart = 1;
if (updateTasks < numOfTasks) {
sndDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
streamMetaWUnLock(pMeta);
} else {
sndDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
#if 1
sndStartStreamTaskAsync(pSnode, true);
streamMetaWUnLock(pMeta);
#else
streamMetaWUnLock(pMeta);
// For debug purpose.
// the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc.
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqStartStreamTaskAsync(pTq, false);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
#endif
}
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
@ -544,3 +892,22 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
}
return 0;
}
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY: {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return sndProcessTaskDeployReq(pSnode, pReq, len);
}
case TDMT_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen);
case TDMT_VND_STREAM_TASK_UPDATE:
sndProcessTaskUpdateReq(pSnode, pMsg);
break;
default:
ASSERT(0);
}
return 0;
}

View File

@ -138,6 +138,11 @@ else()
endif()
endif()
target_include_directories(
vnode
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
)
target_link_libraries(
vnode
PUBLIC os

View File

@ -43,9 +43,6 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore;
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
// tqExec

View File

@ -15,17 +15,12 @@
#include "tq.h"
#include "vnd.h"
#include "stream.h"
typedef struct {
int8_t inited;
} STqMgmt;
typedef struct STaskUpdateEntry {
int64_t streamId;
int32_t taskId;
int32_t transId;
} STaskUpdateEntry;
static STqMgmt tqMgmt = {0};
// 0: not init

View File

@ -15,6 +15,7 @@
#include "tq.h"
#include "vnd.h"
#include "stream.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100

View File

@ -27,7 +27,7 @@ class TDTestCase:
time.sleep(1)
tdSql.query("use test")
tdSql.query("create snode on dnode 4")
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select sum(voltage) from meters partition by groupid interval(4s)")
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart, sum(voltage) from meters partition by groupid interval(4s)")
tdLog.debug("create stream use snode and insert data")
time.sleep(10)