refactor: not reload info from disk when trying to restart stream tasks.

This commit is contained in:
Haojun Liao 2024-01-08 14:04:36 +08:00
parent 275e2d6f94
commit 3bcd038b7d
1 changed files with 29 additions and 21 deletions

View File

@ -25,7 +25,6 @@ typedef struct STaskUpdateEntry {
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId); tqDebug("vgId:%d no stream tasks existed to run", vgId);
@ -131,7 +130,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) { if (ppTask == NULL || *ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", vgId,
req.taskId); req.taskId);
rsp.code = TSDB_CODE_SUCCESS; rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -141,21 +140,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
} }
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
const char* idstr = pTask->id.idStr;
if (pMeta->updateInfo.transId != req.transId) { if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId; pMeta->updateInfo.transId = req.transId;
tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", idstr, req.transId);
// info needs to be kept till the new trans to update the nodeEp arrived. // info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks); taosHashClear(pMeta->updateInfo.pTasks);
} else { } else {
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId);
} }
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (exist != NULL) { void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, if (pReqTask != NULL) {
req.transId); tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", idstr, vgId, req.transId);
rsp.code = TSDB_CODE_SUCCESS; rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList); taosArrayDestroy(req.pNodeList);
@ -171,7 +171,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) { if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId); vgId, req.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask); CLEAR_RELATED_FILLHISTORY_TASK(pTask);
} else { } else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
@ -180,7 +180,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
} }
if (restored) { if (restored) {
tqDebug("s-task:%s vgId:%d start to save task", pTask->id.idStr, vgId); tqDebug("s-task:%s vgId:%d start to save task", idstr, vgId);
streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) { if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask); streamMetaSaveTask(pMeta, *ppHTask);
@ -192,10 +192,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
} }
#endif #endif
} else { } else {
tqDebug("s-task:%s vgId:%d not save since restore not finish", pTask->id.idStr, vgId); tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId);
} }
tqDebug("s-task:%s vgId:%d start to stop task after save task", pTask->id.idStr, vgId); tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId);
streamTaskStop(pTask); streamTaskStop(pTask);
// keep the already handled info // keep the already handled info
@ -206,11 +206,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask/fill-history closed, elapsed:%" PRId64 " ms", tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask/fill-history closed, elapsed:%" PRId64 " ms",
pTask->id.idStr, vgId, now - st); idstr, vgId, now - st);
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else { } else {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", pTask->id.idStr, tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", idstr,
vgId, now - st); vgId, now - st);
} }
@ -237,6 +237,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
// for test purpose, to trigger the leader election // for test purpose, to trigger the leader election
taosMSleep(5000); taosMSleep(5000);
#endif #endif
tqStreamTaskStartAsync(pMeta, cb, true); tqStreamTaskStartAsync(pMeta, cb, true);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
} }
@ -759,17 +760,24 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
streamMetaClear(pMeta); // streamMetaClear(pMeta);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
code = streamMetaLoadAllTasks(pMeta); // code = streamMetaLoadAllTasks(pMeta);
if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); // tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta); // streamMetaWUnLock(pMeta);
code = terrno; // code = terrno;
return code; // return code;
// }
{
STaskStartInfo* pStartInfo = &pMeta->startInfo;
taosHashClear(pStartInfo->pReadyTaskSet);
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->readyTs = 0;
} }
if (isLeader && !tsDisableStream) { if (isLeader && !tsDisableStream) {