fix(stream): send msg to mnode before closing all tasks.
This commit is contained in:
parent
71f6d9f06f
commit
2acc8423c5
|
@ -494,6 +494,7 @@ typedef struct SStreamMeta {
|
|||
int32_t vgId;
|
||||
int64_t stage;
|
||||
int32_t role;
|
||||
bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
|
||||
STaskStartInfo startInfo;
|
||||
SRWLatch lock;
|
||||
int32_t walScanCounter;
|
||||
|
@ -782,6 +783,8 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
|||
int64_t* oldStage);
|
||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
||||
|
||||
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
||||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||
|
|
|
@ -126,17 +126,16 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
|||
int32_t tqStopStreamTasks(STQ* pTq) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||
|
||||
tqDebug("vgId:%d stop all %d stream task(s)", vgId, numOfTasks);
|
||||
if (numOfTasks == 0) {
|
||||
tqDebug("vgId:%d stop all %d stream task(s)", vgId, num);
|
||||
if (num == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray* pTaskList = NULL;
|
||||
streamMetaWLock(pMeta);
|
||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||
streamMetaWUnLock(pMeta);
|
||||
// send hb msg to mnode before closing all tasks.
|
||||
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
|
||||
int32_t numOfTasks = taosArrayGetSize(pTaskList);
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
|
|
|
@ -40,6 +40,12 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
|
|||
int64_t stage = pMeta->stage;
|
||||
|
||||
pMeta->stage = state.term;
|
||||
|
||||
// mark the sign to send msg before close all tasks
|
||||
if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) {
|
||||
pMeta->sendMsgBeforeClosing = true;
|
||||
}
|
||||
|
||||
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
|
||||
if (isLeader) {
|
||||
tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
|
||||
|
|
|
@ -1051,43 +1051,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
|
|||
taosThreadMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
||||
void metaHbToMnode(void* param, void* tmrId) {
|
||||
int64_t rid = *(int64_t*)param;
|
||||
|
||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
|
||||
if (pMeta == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// need to stop, stop now
|
||||
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) {
|
||||
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
|
||||
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
// not leader not send msg
|
||||
if (pMeta->role != NODE_ROLE_LEADER) {
|
||||
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
pMeta->pHbInfo->hbStart = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
// set the hb start time
|
||||
if (pMeta->pHbInfo->hbStart == 0) {
|
||||
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
|
||||
|
||||
static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||
SStreamHbMsg hbMsg = {0};
|
||||
SEpSet epset = {0};
|
||||
bool hasMnodeEpset = false;
|
||||
|
@ -1181,23 +1145,62 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg msg = {
|
||||
.info.noResp = 1,
|
||||
};
|
||||
SRpcMsg msg = {.info.noResp = 1};
|
||||
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||
|
||||
pMeta->pHbInfo->hbCount += 1;
|
||||
|
||||
stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
|
||||
pMeta->pHbInfo->hbCount);
|
||||
|
||||
tmsgSendReq(&epset, &msg);
|
||||
} else {
|
||||
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
|
||||
}
|
||||
|
||||
_end:
|
||||
_end:
|
||||
streamMetaClearHbMsg(&hbMsg);
|
||||
taosArrayDestroy(pIdList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void metaHbToMnode(void* param, void* tmrId) {
|
||||
int64_t rid = *(int64_t*)param;
|
||||
|
||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
|
||||
if (pMeta == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// need to stop, stop now
|
||||
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) {
|
||||
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
|
||||
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
// not leader not send msg
|
||||
if (pMeta->role != NODE_ROLE_LEADER) {
|
||||
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
pMeta->pHbInfo->hbStart = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
// set the hb start time
|
||||
if (pMeta->pHbInfo->hbStart == 0) {
|
||||
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
|
||||
metaHeartbeatToMnodeImpl(pMeta);
|
||||
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
}
|
||||
|
@ -1320,3 +1323,44 @@ int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, voi
|
|||
schedMsg.msg = code;
|
||||
return taosScheduleTask(pMeta->qHandle, &schedMsg);
|
||||
}
|
||||
|
||||
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
|
||||
SArray* pTaskList = NULL;
|
||||
bool sendMsg = false;
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||
sendMsg = pMeta->sendMsgBeforeClosing;
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
if (!sendMsg) {
|
||||
stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
|
||||
return pTaskList;
|
||||
}
|
||||
|
||||
stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
|
||||
|
||||
// send hb msg to mnode before closing all tasks.
|
||||
int32_t numOfTasks = taosArrayGetSize(pTaskList);
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
if (pTask == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
ETaskStatus s = streamTaskGetStatus(pTask, NULL);
|
||||
if (s == TASK_STATUS__CK) {
|
||||
streamTaskSetCheckpointFailedId(pTask);
|
||||
stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
metaHeartbeatToMnodeImpl(pMeta);
|
||||
pMeta->sendMsgBeforeClosing = false;
|
||||
return pTaskList;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue