fix(stream): disable pause if task is un-init.

This commit is contained in:
Haojun Liao 2024-06-21 14:14:44 +08:00
parent aea4254d40
commit e7105edaa4
6 changed files with 76 additions and 18 deletions

View File

@ -1075,7 +1075,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
mWarn("not all vnodes ready, quit from vnodes status check");
taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock);
return 0;
return true;
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
@ -1911,9 +1911,51 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
{ // check for tasks, if tasks are not ready, not allowed to pause
bool found = false;
bool readyToPause = true;
taosThreadMutexLock(&execInfo.lock);
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->id.streamId != pStream->uid) {
continue;
}
if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%x in checkpoint/uninit status, not ready for pause",
pStream->name, pStream->uid, pEntry->nodeId, pEntry->id.taskId);
readyToPause = false;
}
found = true;
}
taosThreadMutexUnlock(&execInfo.lock);
if (!found) {
mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
if (!readyToPause) {
mError("stream:%s task not ready for pause yet", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
}
STrans *pTrans =
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream");
if (pTrans == NULL) {

View File

@ -997,11 +997,19 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
ETaskStatus status = streamTaskGetStatus(pTask)->state;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK && pTask->info.fillHistory == 0) {
if (status == TASK_STATUS__UNINIT) {
tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr);
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
}
if (level == TASK_LEVEL__SINK) {
ASSERT (status != TASK_STATUS__UNINIT); /*{
// tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr);
//
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
// int32_t code = pMeta->expandTaskFn(pTask);
// if (code != TSDB_CODE_SUCCESS) {
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
// }
// }
// int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
}*/
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
@ -1027,11 +1035,21 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else {
streamTrySchedExec(pTask);
}
} else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ?
if (pTask->info.fillHistory == 0) {
tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr);
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
}
} else {
ASSERT (status != TASK_STATUS__UNINIT);// { // todo: fill-history task init ?
// if (pTask->info.fillHistory == 0) {
// tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr);
//
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
// int32_t code = pMeta->expandTaskFn(pTask);
// if (code != TSDB_CODE_SUCCESS) {
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
// }
// }
// int32_t ret = */streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
// }
// }
}
streamMetaReleaseTask(pMeta, pTask);

View File

@ -58,8 +58,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
if (pInfo->stage < stage) {
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
", prev:%" PRId64,
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock);
@ -170,13 +169,13 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
if (pTask != NULL) {
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
streamMetaReleaseTask(pMeta, pTask);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->status);
streamMetaReleaseTask(pMeta, pTask);
} else {
pRsp->status = TASK_DOWNSTREAM_NOT_READY;
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64

View File

@ -240,7 +240,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
if (code == TSDB_CODE_SUCCESS) {
checkFillhistoryTaskStatus(pTask, pHisTask);
}
}
streamMetaReleaseTask(pMeta, pHisTask);

View File

@ -836,7 +836,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
// in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) {

View File

@ -623,9 +623,9 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);