refactor: do some internal refactor

This commit is contained in:
Haojun Liao 2024-08-17 01:47:30 +08:00
parent 1f6cef26e8
commit f4bac23906
6 changed files with 39 additions and 36 deletions

View File

@ -26,12 +26,13 @@ static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo);
static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
static bool validateHbMsg(const SArray *pNodeList, int32_t vgId);
static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId);
static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks);
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
@ -52,7 +53,7 @@ void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
}
}
void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SFailedCheckpointInfo *p = taosArrayGet(pList, i);
@ -401,13 +402,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list, added into orphan task list", p->id.taskId);
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
void* px = taosArrayPush(pOrphanTasks, &oTask);
if (px == NULL) {
mError("failed to put task into list, taskId:0x%" PRIx64, p->id.taskId);
}
checkforOrphanTask(pMnode, p, pOrphanTasks);
continue;
}
@ -423,7 +418,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SStreamObj *pStream = NULL;
code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
if (code) {
mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code));
mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s",
p->id.streamId, (int32_t)p->id.taskId, tstrerror(code));
continue;
}
@ -434,7 +430,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (code == 0) {
mndAddConsensusTasks(pInfo, &cp);
} else {
mError("failed to get consensus checkpoint-info");
mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId);
}
mndReleaseStream(pMnode, pStream);
@ -454,7 +450,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SFailedCheckpointInfo info = {
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedChkpt, &info);
addIntoFailedChkptList(pFailedChkpt, &info);
// remove failed trans from pChkptStreams
code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
@ -516,6 +512,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pMnode != NULL) { // make sure that the unit test case can work
code = mndStreamSendUpdateChkptInfoMsg(pMnode);
if (code) {
mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code));
}
}
streamMutexUnlock(&execInfo.lock);
@ -554,3 +553,26 @@ void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_
tmsgSendRsp(&rsp);
pRpcInfo->handle = NULL; // disable auto rsp
}
void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks) {
SStreamObj *pStream = NULL;
int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
if (code) {
mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list",
p->id.streamId, p->id.taskId);
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
void *px = taosArrayPush(pOrphanTasks, &oTask);
if (px == NULL) {
mError("failed to put task into orphan list, taskId:0x%" PRIx64", code:%s", p->id.taskId, tstrerror(terrno));
}
} else {
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet",
p->id.taskId);
}
}

View File

@ -131,27 +131,6 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
}
int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) {
tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
code = streamTaskSendRestoreChkptMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
// this is to process request from transaction, always return true.
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId;

View File

@ -1354,7 +1354,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
return code;
}
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
streamMutexLock(&pTask->lock);

View File

@ -200,7 +200,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
if ((*pTask)->status.requireConsensusChkptId) {
entry.checkpointInfo.consensusChkptId = 1;
(*pTask)->status.requireConsensusChkptId = false;
stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId);
stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId);
}
if ((*pTask)->exec.pWalReader != NULL) {

View File

@ -1424,7 +1424,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
}
// negotiate the consensus checkpoint id for current task
code = streamTaskSendRestoreChkptMsg(pTask);
code = streamTaskSendNegotiateChkptIdMsg(pTask);
// this task may has no checkpoint, but others tasks may generate checkpoint already?
streamMetaReleaseTask(pMeta, pTask);

View File

@ -1182,6 +1182,8 @@ const char* streamTaskGetExecType(int32_t type) {
return "resume-task-from-idle";
case STREAM_EXEC_T_ADD_FAILED_TASK:
return "record-start-failed-task";
case 0:
return "exec-all-tasks";
default:
return "invalid-exec-type";
}