fix(stream): fix the pause failure bug.

This commit is contained in:
Haojun Liao 2023-10-09 14:48:11 +08:00
parent 74ff0c5b50
commit d87db2f829
3 changed files with 23 additions and 10 deletions

View File

@ -697,7 +697,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskStartScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);

View File

@ -60,10 +60,17 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
pStartInfo->readyTs = pTask->execInfo.start;
pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs;
if (pStartInfo->startTs != 0) {
pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs;
} else {
pStartInfo->elapsedTime = 0;
}
streamMetaResetStartInfo(pStartInfo);
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64
@ -71,6 +78,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
}
taosWUnLockLatch(&pMeta->lock);
}
@ -94,7 +102,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0;
}
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
@ -105,15 +113,18 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
return code;
}
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
ASSERT(pTask->status.downstreamReady == 1);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
return doLaunchScanHistoryTask(pTask);
return doStartScanHistoryTask(pTask);
} else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
stDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
walReaderGetCurrentVer(pTask->exec.pWalReader));
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
walReaderGetCurrentVer(pTask->exec.pWalReader));
streamTaskEnablePause(pTask);
}
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
if (pTask->info.fillHistory) {
@ -179,7 +190,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
streamTaskSetReady(pTask, 0);
streamTaskSetRangeStreamCalc(pTask);
streamTaskLaunchScanHistory(pTask);
streamTaskStartScanHistory(pTask);
streamLaunchFillHistoryTask(pTask);
}
@ -289,8 +300,9 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
if (status == TASK_STATUS__SCAN_HISTORY) {
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
streamTaskLaunchScanHistory(pTask);
streamTaskStartScanHistory(pTask);
} else {
// fill-history tasks are not allowed to reach here.
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
pTask->status.taskStatus = TASK_STATUS__DROPPING;

View File

@ -137,6 +137,7 @@ echo "idxDebugFlag 143" >> $TAOS_CFG
echo "udfDebugFlag 143" >> $TAOS_CFG
echo "smaDebugFlag 143" >> $TAOS_CFG
echo "metaDebugFlag 143" >> $TAOS_CFG
echo "stDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG