fix(stream): kill task when pause the stream execution.
This commit is contained in:
parent
ca2183acfd
commit
d43cb3fcdf
|
@ -604,13 +604,10 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
|
||||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
||||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
|
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||||
|
|
||||||
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
|
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
int32_t streamSetStatusNormal(SStreamTask* pTask);
|
int32_t streamSetStatusNormal(SStreamTask* pTask);
|
||||||
const char* streamGetTaskStatusStr(int32_t status);
|
const char* streamGetTaskStatusStr(int32_t status);
|
||||||
|
@ -624,7 +621,6 @@ void streamTaskEnablePause(SStreamTask* pTask);
|
||||||
// source level
|
// source level
|
||||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||||
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
|
||||||
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
|
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
|
||||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
|
@ -1567,9 +1567,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||||
pReq->taskId);
|
pReq->taskId);
|
||||||
|
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1581,9 +1580,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
if (pTask->historyTaskId.taskId != 0) {
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success",
|
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
|
||||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||||
|
@ -1591,14 +1589,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
||||||
streamTaskPause(pHistoryTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamTaskPause(pHistoryTask);
|
||||||
if (pHistoryTask != NULL) {
|
|
||||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1627,7 +1623,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
streamStartRecoverTask(pTask, igUntreated);
|
streamStartScanHistoryAsync(pTask, igUntreated);
|
||||||
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
|
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -582,12 +582,9 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||||
// fill-history WAL scan has completed
|
// fill-history WAL scan has completed
|
||||||
if (pTask->status.transferState) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
|
||||||
code = streamTransferStateToStreamTask(pTask);
|
streamTaskFillHistoryFinished(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
streamTaskEndScanWAL(pTask);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
streamSchedExec(pTask);
|
|
||||||
} else {
|
} else {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
|
|
|
@ -17,23 +17,30 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
typedef struct SStreamTaskRetryInfo {
|
||||||
|
SStreamMeta* pMeta;
|
||||||
|
int32_t taskId;
|
||||||
|
} SStreamTaskRetryInfo;
|
||||||
|
|
||||||
|
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
||||||
static void launchFillHistoryTask(SStreamTask* pTask);
|
static void launchFillHistoryTask(SStreamTask* pTask);
|
||||||
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
||||||
|
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
||||||
|
|
||||||
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
|
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
pTask->status.downstreamReady = 1;
|
pTask->status.downstreamReady = 1;
|
||||||
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
|
|
||||||
|
|
||||||
|
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
|
||||||
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
|
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
|
||||||
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
|
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
||||||
SStreamScanHistoryReq req;
|
SStreamScanHistoryReq req;
|
||||||
streamBuildSourceRecover1Req(pTask, &req, igUntreated);
|
initScanHistoryReq(pTask, &req, igUntreated);
|
||||||
int32_t len = sizeof(SStreamScanHistoryReq);
|
|
||||||
|
|
||||||
|
int32_t len = sizeof(SStreamScanHistoryReq);
|
||||||
void* serializedReq = rpcMallocCont(len);
|
void* serializedReq = rpcMallocCont(len);
|
||||||
if (serializedReq == NULL) {
|
if (serializedReq == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -65,9 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
streamSetParamForScanHistory(pTask);
|
streamSetParamForScanHistory(pTask);
|
||||||
}
|
}
|
||||||
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
|
|
||||||
|
|
||||||
int32_t code = streamStartRecoverTask(pTask, 0);
|
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
|
||||||
|
int32_t code = streamStartScanHistoryAsync(pTask, 0);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +149,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||||
|
|
||||||
streamTaskSetForReady(pTask, 0);
|
streamTaskSetReady(pTask, 0);
|
||||||
streamTaskSetRangeStreamCalc(pTask);
|
streamTaskSetRangeStreamCalc(pTask);
|
||||||
streamTaskLaunchScanHistory(pTask);
|
streamTaskLaunchScanHistory(pTask);
|
||||||
|
|
||||||
|
@ -188,7 +195,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
|
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
|
||||||
streamTaskSetForReady(pTask, numOfReqs);
|
streamTaskSetReady(pTask, numOfReqs);
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t status = pTask->status.taskStatus;
|
||||||
|
@ -319,7 +326,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *p
|
||||||
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
|
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
|
int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
|
||||||
pReq->msgHead.vgId = pTask->info.nodeId;
|
pReq->msgHead.vgId = pTask->info.nodeId;
|
||||||
pReq->streamId = pTask->id.streamId;
|
pReq->streamId = pTask->id.streamId;
|
||||||
pReq->taskId = pTask->id.taskId;
|
pReq->taskId = pTask->id.taskId;
|
||||||
|
@ -532,11 +539,6 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
||||||
streamTaskDoCheckDownstreamTasks(pHTask);
|
streamTaskDoCheckDownstreamTasks(pHTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SStreamTaskRetryInfo {
|
|
||||||
SStreamMeta* pMeta;
|
|
||||||
int32_t taskId;
|
|
||||||
} SStreamTaskRetryInfo;
|
|
||||||
|
|
||||||
static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
SStreamTaskRetryInfo* pInfo = param;
|
SStreamTaskRetryInfo* pInfo = param;
|
||||||
SStreamMeta* pMeta = pInfo->pMeta;
|
SStreamMeta* pMeta = pInfo->pMeta;
|
||||||
|
@ -646,7 +648,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatch recover finish req to all related downstream task
|
// dispatch scan-history finish req to all related downstream task
|
||||||
code = streamDispatchScanHistoryFinishMsg(pTask);
|
code = streamDispatchScanHistoryFinishMsg(pTask);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -655,7 +657,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
|
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
|
||||||
void* exec = pTask->exec.pExecutor;
|
void* exec = pTask->exec.pExecutor;
|
||||||
return qStreamInfoResetTimewindowFilter(exec);
|
return qStreamInfoResetTimewindowFilter(exec);
|
||||||
}
|
}
|
||||||
|
@ -667,7 +669,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||||
int64_t nextStartVer = pRange->maxVer + 1;
|
int64_t nextStartVer = pRange->maxVer + 1;
|
||||||
if (nextStartVer > latestVer - 1) {
|
if (nextStartVer > latestVer - 1) {
|
||||||
// no input data yet. no need to execute the secondardy scan while stream task halt
|
// no input data yet. no need to execute the secondardy scan while stream task halt
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
streamTaskFillHistoryFinished(pTask);
|
||||||
qDebug(
|
qDebug(
|
||||||
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
|
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
|
||||||
"related stream task currentVer:%" PRId64,
|
"related stream task currentVer:%" PRId64,
|
||||||
|
@ -682,7 +684,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
||||||
|
@ -855,7 +856,7 @@ void streamTaskPause(SStreamTask* pTask) {
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: use the lock of the task.
|
// todo: use the task lock, stead of meta lock
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
status = pTask->status.taskStatus;
|
status = pTask->status.taskStatus;
|
||||||
|
@ -869,6 +870,12 @@ void streamTaskPause(SStreamTask* pTask) {
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
// in case of fill-history task, stop the tsdb file scan operation.
|
||||||
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
void* pExecutor = pTask->exec.pExecutor;
|
||||||
|
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t el = taosGetTimestampMs() - st;
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
|
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
|
||||||
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
|
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
|
||||||
|
|
Loading…
Reference in New Issue