Merge pull request #25417 from taosdata/fix/3_liaohj

fix(stream): record the task check downstream failed info.
This commit is contained in:
Haojun Liao 2024-04-19 22:07:38 +08:00 committed by GitHub
commit 0acc5f485a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 77 additions and 59 deletions

View File

@ -41,5 +41,6 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode);
void tqSetRestoreVersionInfo(SStreamTask* pTask);
#endif // TDENGINE_TQ_COMMON_H

View File

@ -58,17 +58,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
streamSetupScheduleTrigger(pTask);
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask)->name;
if (pTask->info.fillHistory) {

View File

@ -760,16 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask)->name;
const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);

View File

@ -86,6 +86,22 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode)
return TSDB_CODE_SUCCESS;
}
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
}
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);

View File

@ -108,6 +108,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error;
}
// todd: the pk information should comes from the physical plan
for(int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
if (pItem->isPk) {
@ -223,8 +224,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pBufferedRes);
taosArrayClear(pInfo->pUidList);
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
pInfo->pDstSlotIds, pInfo->pUidList);
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes,
pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
@ -293,9 +294,11 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
if (NULL == pInfo->pLastrowReader) {
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader,
pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks);
code = pInfo->readHandle.api.cacheFn.openReader(
pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, taosArrayGetSize(pInfo->matchInfo.pList),
pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList,
&pInfo->pkCol, pInfo->numOfPks);
if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList);

View File

@ -570,13 +570,14 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
}
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) {
pScan->pFuncTypes = taosArrayInit(taosArrayGetSize(pScanLogicNode->pFuncTypes), sizeof(int32_t));
if (NULL == pScan->pFuncTypes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pTargetNode = NULL;
int funcTypeIndex = 0;
FOREACH(pTargetNode, ((SScanPhysiNode*)pScan)->pScanCols) {

View File

@ -1288,7 +1288,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
}
SStreamTask* pTask = *(SStreamTask**)pIter;
stDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr);
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
streamTaskStop(pTask);
}
@ -1649,6 +1649,13 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
return 0;
}
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) { // task does not exists in current vnode, not record the complete info
qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};

View File

@ -993,16 +993,17 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->taskId == taskId) {
ASSERT(reqId == p->reqId);
p->status = status;
p->rspTs = rspTs;
// count down one, since it is ready now
if (p->status == TASK_DOWNSTREAM_READY) {
if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
} else {
*pNotReady = pInfo->notReadyTasks;
}
p->status = status;
p->rspTs = rspTs;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
@ -1026,7 +1027,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set the in check procedure flag", id);
stDebug("s-task:%s set the in-check-procedure flag", id);
return 0;
}
@ -1038,7 +1039,7 @@ int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) {
}
int64_t el = taosGetTimestampMs() - pInfo->startTs;
stDebug("s-task:%s check downstream completed, elapsed time:%" PRId64 " ms", id, el);
stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0;
pInfo->inCheckProcess = 0;
@ -1098,27 +1099,36 @@ static void rspMonitorFn(void* param, void* tmrId) {
int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs;
ETaskStatus state = pStat->state;
int32_t numOfReady = 0;
int32_t numOfFault = 0;
const char* id = pTask->id.idStr;
stDebug("s-task:%s start to do check downstream rsp check", pTask->id.idStr);
stDebug("s-task:%s start to do check downstream rsp check", id);
if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
if (state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor rsp tmr, ref:%d", pTask->id.idStr, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, id);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
return;
}
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, id);
return;
}
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pInfo->notReadyTasks == 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", pTask->id.idStr,
pStat->name, vgId, ref);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
vgId, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
streamTaskCompleteCheck(pInfo, id);
return;
}
@ -1131,13 +1141,12 @@ static void rspMonitorFn(void* param, void* tmrId) {
if (p->status == TASK_DOWNSTREAM_READY) {
numOfReady += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", pTask->id.idStr,
p->taskId);
stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", id, p->taskId);
numOfFault += 1;
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
ASSERT(p->status == -1);
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION
// do nothing and continue waiting for their rsps
@ -1148,25 +1157,26 @@ static void rspMonitorFn(void* param, void* tmrId) {
}
}
} else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", pTask->id.idStr, pStat->name);
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
}
int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) {
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) &&
(numOfFault > 0)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, ref:%d",
pTask->id.idStr, pStat->name, vgId, ref);
"detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
streamTaskCompleteCheck(pInfo, id);
return;
}
@ -1176,14 +1186,14 @@ static void rspMonitorFn(void* param, void* tmrId) {
stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, "
"timeout:%d, ready:%d ref:%d",
pTask->id.idStr, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, pInfo->startTs, now, false);
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
return;
}
@ -1205,7 +1215,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
}
}
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", pTask->id.idStr, numOfNotReady);
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady);
}
if (numOfTimeout > 0) {
@ -1225,15 +1235,14 @@ static void rspMonitorFn(void* param, void* tmrId) {
}
}
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, pTask->id.idStr,
numOfTimeout, now);
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", pTask->id.idStr,
numOfNotReady, numOfFault, numOfTimeout, numOfReady);
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady,
numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);