op stream pause

pause data dispatch

opt stream pause
This commit is contained in:
liuyao 2023-08-30 11:55:00 +08:00
parent 646b339bde
commit 0a72e576bd
9 changed files with 57 additions and 19 deletions

View File

@ -409,6 +409,7 @@ typedef struct SStreamMeta {
SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
int32_t pauseTaskNum;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -673,8 +674,8 @@ int32_t streamTaskGetInputQItems(const SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask);
void streamTaskResume(SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskHalt(SStreamTask* pTask);
void streamTaskResumeFromHalt(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask);

View File

@ -1615,7 +1615,7 @@ int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) {
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
if (mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
@ -1755,7 +1755,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}

View File

@ -223,7 +223,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqStartStreamTasks(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);

View File

@ -1210,7 +1210,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamSetStatusNormal(pTask);
}
tqStartStreamTasks(pTq);
tqStartStreamTasks(pTq, false);
}
streamMetaReleaseTask(pMeta, pTask);
@ -1376,7 +1376,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq);
tqStartStreamTasks(pTq, false);
return 0;
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this
@ -1459,7 +1459,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
streamTaskPause(pTask);
streamTaskPause(pTask, pMeta);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
@ -1475,7 +1475,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pHistoryTask);
streamTaskPause(pHistoryTask, pMeta);
streamMetaReleaseTask(pMeta, pHistoryTask);
}
@ -1490,9 +1490,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
}
// todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal
streamTaskResume(pTask);
streamTaskResume(pTask, pTq->pStreamMeta);
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
// no lock needs to secure the access of the version
@ -1511,7 +1516,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq);
tqStartStreamTasks(pTq, false);
} else {
streamSchedExec(pTask);
}

View File

@ -46,7 +46,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) {
tqStartStreamTasks(pTq);
tqStartStreamTasks(pTq, true);
}
return 0;

View File

@ -121,7 +121,7 @@ int32_t tqCheckStreamStatus(STQ* pTq) {
return 0;
}
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -147,6 +147,13 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return 0;
}
int32_t numOfPauseTasks = pTq->pStreamMeta->pauseTaskNum;
if (ckPause && numOfTasks == numOfPauseTasks) {
tqDebug("ignore all submit, all streams had been paused");
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -403,7 +403,11 @@ static void doRetryDispatchData(void* param, void* tmrId) {
if (!streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
if (streamTaskShouldPause(&pTask->status)) {
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
} else {
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
} else {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);

View File

@ -186,6 +186,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
pMeta->pauseTaskNum = 0;
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage);
return pMeta;
@ -469,6 +471,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) {
pTask = *ppTask;
if (streamTaskShouldPause(&pTask->status)) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
}
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
@ -668,8 +674,13 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
return -1;
}
if (streamTaskShouldPause(&pTask->status)) {
atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
}
ASSERT(pTask->status.downstreamReady == 0);
}
qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum);
tdbFree(pKey);
tdbFree(pVal);

View File

@ -811,9 +811,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
}
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
void streamTaskPause(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
int64_t st = taosGetTimestampMs();
int8_t status = pTask->status.taskStatus;
@ -828,6 +826,12 @@ void streamTaskPause(SStreamTask* pTask) {
return;
}
if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
return;
}
while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) {
@ -857,6 +861,8 @@ void streamTaskPause(SStreamTask* pTask) {
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
taosWUnLockLatch(&pMeta->lock);
// in case of fill-history task, stop the tsdb file scan operation.
@ -870,12 +876,16 @@ void streamTaskPause(SStreamTask* pTask) {
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
}
void streamTaskResume(SStreamTask* pTask) {
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__PAUSE) {
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s resume from pause", pTask->id.idStr);
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s resume from pause. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s sink task.resume from pause. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
} else {
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
}