Merge pull request #25442 from taosdata/fix/3_liaohj

fix(stream): save the epset if task epset updated.
This commit is contained in:
Haojun Liao 2024-04-23 09:10:45 +08:00 committed by GitHub
commit d7a08909dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 50 additions and 34 deletions

View File

@ -806,7 +806,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
void streamTaskCheckDownstream(SStreamTask* pTask); void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask);

View File

@ -161,6 +161,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
bool updated = false;
SStreamTaskNodeUpdateMsg req = {0}; SStreamTaskNodeUpdateMsg req = {0};
@ -214,7 +215,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
return rsp.code; return rsp.code;
} }
streamTaskUpdateEpsetInfo(pTask, req.pNodeList); updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask); streamTaskResetStatus(pTask);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
@ -230,21 +231,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
CLEAR_RELATED_FILLHISTORY_TASK(pTask); CLEAR_RELATED_FILLHISTORY_TASK(pTask);
} else { } else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); bool updateEpSet = streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
if (!updated) {
updated = updateEpSet;
}
streamTaskResetStatus(*ppHTask); streamTaskResetStatus(*ppHTask);
streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
} }
} }
if (restored) { if (updated) {
tqDebug("s-task:%s vgId:%d start to save task", idstr, vgId); tqDebug("s-task:%s vgId:%d save task after update epset", idstr, vgId);
streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) { if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask); streamMetaSaveTask(pMeta, *ppHTask);
} }
} else {
tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId);
} }
tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId); tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId);
@ -253,15 +255,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
// keep the already updated info // keep the already updated info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
int64_t now = taosGetTimestampMs();
if (ppHTask != NULL) { if (ppHTask != NULL) {
streamTaskStop(*ppHTask); streamTaskStop(*ppHTask);
int64_t now = taosGetTimestampMs();
tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask/fill-history closed, elapsed:%" PRId64 " ms", tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask/fill-history closed, elapsed:%" PRId64 " ms",
idstr, vgId, now - st); idstr, vgId, now - st);
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else { } else {
int64_t now = taosGetTimestampMs();
tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", idstr, tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", idstr,
vgId, now - st); vgId, now - st);
} }
@ -277,7 +277,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
if (updateTasks < numOfTasks) { if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks)); updateTasks, (numOfTasks - updateTasks));
streamMetaWUnLock(pMeta);
} else { } else {
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
@ -286,7 +285,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
if (!restored) { if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0; pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta);
} else { } else {
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
#if 0 #if 0
@ -295,10 +293,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
#endif #endif
tqStreamTaskStartAsync(pMeta, cb, true); tqStreamTaskStartAsync(pMeta, cb, true);
streamMetaWUnLock(pMeta);
} }
} }
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList); taosArrayDestroy(req.pNodeList);
return rsp.code; return rsp.code;
} }

View File

@ -24,8 +24,8 @@
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec #define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray); int32_t childId = taosArrayGetSize(pArray);
@ -34,10 +34,14 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
return 0; return 0;
} }
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0}; char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away if (pTask->info.nodeId == nodeId) { // execution task should be moved away
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pTask->info.epSet, pEpSet);
}
epsetAssign(&pTask->info.epSet, pEpSet); epsetAssign(&pTask->info.epSet, pEpSet);
epsetToStr(pEpSet, buf, tListLen(buf)); epsetToStr(pEpSet, buf, tListLen(buf));
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
@ -46,12 +50,12 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
// check for the dispatch info and the upstream task info // check for the dispatch info and the upstream task info
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
} else if (level == TASK_LEVEL__AGG) { } else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
} else { // TASK_LEVEL__SINK } else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
} }
return 0; return 0;
@ -608,7 +612,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0}; char buf[512] = {0};
epsetToStr(pEpSet, buf, tListLen(buf)); epsetToStr(pEpSet, buf, tListLen(buf));
@ -616,6 +620,10 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
for (int32_t i = 0; i < numOfUpstream; ++i) { for (int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo->nodeId == nodeId) { if (pInfo->nodeId == nodeId) {
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pInfo->epSet, pEpSet);
}
epsetAssign(&pInfo->epSet, pEpSet); epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId, stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId,
nodeId, buf); nodeId, buf);
@ -642,12 +650,14 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
} }
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool *pUpdated) {
char buf[512] = {0}; char buf[512] = {0};
epsetToStr(pEpSet, buf, tListLen(buf)); epsetToStr(pEpSet, buf, tListLen(buf));
int32_t id = pTask->id.taskId; *pUpdated = false;
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type;
int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
@ -656,18 +666,24 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo->vgId == nodeId) { if (pVgInfo->vgId == nodeId) {
if (!(*pUpdated)) {
(*pUpdated) = isEpsetEqual(&pVgInfo->epSet, pEpSet);
}
epsetAssign(&pVgInfo->epSet, pEpSet); epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, buf);
buf);
break; break;
} }
} }
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) { } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher; STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
if (pDispatcher->nodeId == nodeId) { if (pDispatcher->nodeId == nodeId) {
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pDispatcher->epSet, pEpSet);
}
epsetAssign(&pDispatcher->epSet, pEpSet); epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, buf);
buf);
} }
} }
} }
@ -690,7 +706,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
return 0; return 0;
} }
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
STaskExecStatisInfo* p = &pTask->execInfo; STaskExecStatisInfo* p = &pTask->execInfo;
int32_t numOfNodes = taosArrayGetSize(pNodeList); int32_t numOfNodes = taosArrayGetSize(pNodeList);
@ -701,11 +717,13 @@ int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId, stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
numOfNodes, p->updateCount, prevTs); numOfNodes, p->updateCount, prevTs);
bool updated = false;
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp); doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
} }
return 0;
return updated;
} }
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
@ -1033,7 +1051,7 @@ static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id)
stWarn("s-task:%s already not in-check-procedure", id); stWarn("s-task:%s already not in-check-procedure", id);
} }
int64_t el = taosGetTimestampMs() - pInfo->startTs; int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0; pInfo->startTs = 0;
@ -1075,7 +1093,7 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
req.downstreamNodeId = pVgInfo->vgId; req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d",
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break; break;