fix(stream): record the start failure tasks.
This commit is contained in:
parent
69863febe6
commit
6b1889284b
|
@ -434,7 +434,8 @@ typedef struct STaskStartInfo {
|
|||
int64_t readyTs;
|
||||
int32_t tasksWillRestart;
|
||||
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
||||
int32_t elapsedTime;
|
||||
} STaskStartInfo;
|
||||
|
||||
|
@ -812,7 +813,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
|
|||
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
|
||||
void streamMetaRLock(SStreamMeta* pMeta);
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||
void streamMetaWLock(SStreamMeta* pMeta);
|
||||
|
|
|
@ -73,6 +73,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
|||
streamMetaWLock(pMeta);
|
||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
||||
pMeta->startInfo.startTs = taosGetTimestampMs();
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
|
@ -97,7 +98,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
|||
streamLaunchFillHistoryTask(pTask);
|
||||
}
|
||||
|
||||
streamMetaUpdateTaskReadyInfo(pTask);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -150,6 +150,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
|
||||
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->startInfo.pReadyTaskSet == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->startInfo.pFailedTaskSet == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
|
||||
|
@ -221,6 +227,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
|
||||
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
||||
taosMemoryFree(pMeta);
|
||||
|
||||
stError("failed to open stream meta");
|
||||
|
@ -300,6 +307,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|||
|
||||
// the willrestart/starting flag can NOT be cleared
|
||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
||||
pMeta->startInfo.readyTs = 0;
|
||||
}
|
||||
|
||||
|
@ -340,6 +348,7 @@ void streamMetaCloseImpl(void* arg) {
|
|||
taosHashCleanup(pMeta->pTaskBackendUnique);
|
||||
taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
||||
|
||||
taosMemoryFree(pMeta->pHbInfo);
|
||||
taosMemoryFree(pMeta->path);
|
||||
|
@ -1091,6 +1100,7 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) {
|
|||
|
||||
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||
taosHashClear(pStartInfo->pFailedTaskSet);
|
||||
pStartInfo->tasksWillRestart = 0;
|
||||
pStartInfo->readyTs = 0;
|
||||
// reset the sentinel flag value to be 0
|
||||
|
|
|
@ -57,7 +57,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
||||
pTask->id.idStr, numOfDowns, el, p);
|
||||
|
||||
streamMetaUpdateTaskReadyInfo(pTask);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -392,20 +392,22 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
doProcessDownstreamReadyRsp(pTask);
|
||||
}
|
||||
} else { // not ready, wait for 100ms and retry
|
||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||
stError(
|
||||
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
|
||||
"not check wait for downstream task nodeUpdate, and all tasks restart",
|
||||
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
|
||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||
stError(
|
||||
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
|
||||
"not check wait for downstream task nodeUpdate, and all tasks restart",
|
||||
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
|
||||
} else {
|
||||
stError(
|
||||
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
|
||||
"downstream again, nodeUpdate needed",
|
||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
}
|
||||
|
||||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||
} else if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
stError(
|
||||
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
|
||||
"downstream again, nodeUpdate needed",
|
||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
|
||||
|
||||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||
|
||||
|
@ -981,28 +983,59 @@ void streamTaskEnablePause(SStreamTask* pTask) {
|
|||
pTask->status.pauseAllowed = 1;
|
||||
}
|
||||
|
||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||
typedef struct STaskInitTs {
|
||||
int64_t start;
|
||||
int64_t end;
|
||||
bool success;
|
||||
} STaskInitTs;
|
||||
|
||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
void* pIter = NULL;
|
||||
size_t keyLen = 0;
|
||||
|
||||
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
|
||||
succ ? "success" : "failed");
|
||||
|
||||
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
|
||||
STaskInitTs* pInfo = pIter;
|
||||
void* key = taosHashGetKey(pIter, &keyLen);
|
||||
|
||||
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
|
||||
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
|
||||
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
STaskId id = streamTaskExtractKey(pTask);
|
||||
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
|
||||
SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet;
|
||||
|
||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||
|
||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||
|
||||
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
|
||||
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
|
||||
pStartInfo->readyTs = pTask->execInfo.start;
|
||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
|
||||
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64
|
||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
|
||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
|
||||
pStartInfo->elapsedTime / 1000.0);
|
||||
|
||||
// print the initialization elapsed time and info
|
||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
|
Loading…
Reference in New Issue