fix(stream): fix error in stream.

This commit is contained in:
Haojun Liao 2023-10-19 16:20:27 +08:00
parent 402aefd95d
commit c1cebae6ba
8 changed files with 64 additions and 56 deletions

View File

@ -75,7 +75,7 @@ typedef enum ETaskStatus {
TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS_STREAM_SCAN_HISTORY,
TASK_STATUS__STREAM_SCAN_HISTORY,
} ETaskStatus;
enum {
@ -720,6 +720,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr);
const char* streamTaskGetStatusStr(ETaskStatus status);
void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask);
@ -755,8 +756,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusUnint(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResumeFromHalt(SStreamTask* pTask);

View File

@ -1183,7 +1183,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status));
ready = false;
break;
}
@ -1567,7 +1567,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
continue;
}
const char* pStatus = streamGetTaskStatusStr(pe->status);
const char* pStatus = streamTaskGetStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus);
// status
@ -2615,7 +2615,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
pEntry->status = p->status;
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
}

View File

@ -1446,7 +1446,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask != NULL) {
// drop the related fill-history task firstly
if (pTask->hTaskInfo.id.taskId != 0) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHTaskId = &pTask->hTaskInfo.id;
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId);
@ -1486,7 +1486,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
streamTaskPause(pTask, pMeta);
SStreamTask* pHistoryTask = NULL;
if (pTask->hTaskInfo.id.taskId != 0) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
@ -1868,7 +1868,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamTaskResetStatus(pTask);
SStreamTask** ppHTask = NULL;
if (pTask->hTaskInfo.id.taskId != 0) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",

View File

@ -308,7 +308,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr);
}
int8_t status = streamTaskGetStatus(pStreamTask, NULL);
ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL);
ASSERT(((status == TASK_STATUS__DROPPING) || (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) &&
pTask->status.appendTranstateBlock == true);
@ -352,7 +352,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet.
streamTaskResumeFromHalt(pStreamTask);
streamTaskResumeFromHalt(pStreamTask); // todo refactor: use streamTaskResume.
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);

View File

@ -348,6 +348,7 @@ void streamMetaCloseImpl(void* arg) {
stDebug("end to close stream meta");
}
// todo let's check the status for each task
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
int32_t len;

View File

@ -42,7 +42,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) &&
pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p);
@ -92,16 +93,20 @@ static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
ASSERT(pTask->status.downstreamReady == 1 && streamTaskGetStatus(pTask, NULL) == TASK_STATUS__SCAN_HISTORY);
int32_t level = pTask->info.taskLevel;
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(pTask->status.downstreamReady == 1 &&
((status == TASK_STATUS__SCAN_HISTORY) || (status == TASK_STATUS__STREAM_SCAN_HISTORY)));
if (level == TASK_LEVEL__SOURCE) {
return doStartScanHistoryTask(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
} else if (level == TASK_LEVEL__AGG) {
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
streamTaskEnablePause(pTask);
}
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
} else if (level == TASK_LEVEL__SINK) {
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
}
@ -292,13 +297,13 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__SCAN_HISTORY);
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY);
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p);
streamTaskStartScanHistory(pTask);
// start the related fill-history task, when current task is ready
if (pTask->hTaskInfo.id.taskId != 0) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
streamLaunchFillHistoryTask(pTask);
}
@ -516,7 +521,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__SCAN_HISTORY) {
if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly",
id, p, pReq->upstreamTaskId);
@ -571,7 +576,9 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
}
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
ASSERT(/*pTask->status.taskStatus*/ streamTaskGetStatus(pTask, NULL) == TASK_STATUS__SCAN_HISTORY);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY);
SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ
@ -939,9 +946,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
doCheckDownstreamStatus(pTask);
}
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
int64_t st = taosGetTimestampMs();
#if 0
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) {
@ -1013,9 +1018,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
}
int64_t el = taosGetTimestampMs() - st;
stDebug("vgId:%d s-task:%s set pause flag, prev:%s, pause elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
}
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {

View File

@ -732,20 +732,6 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
pInfo->retryTimes += 1;
}
const char* streamGetTaskStatusStr(int32_t status) {
switch(status) {
case TASK_STATUS__READY: return "normal";
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused";
case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop";
case TASK_STATUS__UNINIT: return "uninitialized";
default:return "";
}
}
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
pEntry->id.streamId = pTask->id.streamId;
pEntry->id.taskId = pTask->id.taskId;

View File

@ -29,10 +29,10 @@ SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__HALT, .name = "halt"},
{.state = TASK_STATUS__PAUSE, .name = "paused"},
{.state = TASK_STATUS__CK, .name = "checkpoint"},
{.state = TASK_STATUS_STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
{.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
};
SStreamEventInfo StreamTaskEventList[10] = {
SStreamEventInfo StreamTaskEventList[11] = {
{}, // dummy event, place holder
{.event = TASK_EVENT_INIT, .name = "initialize"},
{.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-initialize"},
@ -43,6 +43,7 @@ SStreamEventInfo StreamTaskEventList[10] = {
{.event = TASK_EVENT_PAUSE, .name = "pausing"},
{.event = TASK_EVENT_RESUME, .name = "resuming"},
{.event = TASK_EVENT_HALT, .name = "halting"},
{.event = TASK_EVENT_DROPPING, .name = "dropping"},
};
static TdThreadOnce streamTaskStateMachineInit = PTHREAD_ONCE_INIT;
@ -173,35 +174,45 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
}
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
taosThreadMutexLock(&pSM->pTask->lock);
SStreamTask* pTask = pSM->pTask;
taosThreadMutexLock(&pTask->lock);
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event);
stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name,
stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name,
pSM->current.name);
if (pTrans->attachEvent.event != 0) {
attachEvent(pSM->pTask, &pTrans->attachEvent);
taosThreadMutexUnlock(&pSM->pTask->lock);
attachEvent(pTask, &pTrans->attachEvent);
taosThreadMutexUnlock(&pTask->lock);
while (1) {
// wait for the task to be here
ETaskStatus s = streamTaskGetStatus(pSM->pTask, NULL);
taosThreadMutexLock(&pTask->lock);
ETaskStatus s = streamTaskGetStatus(pTask, NULL);
taosThreadMutexUnlock(&pTask->lock);
if (s == pTrans->attachEvent.status) {
return TSDB_CODE_SUCCESS;
} else {// this event has been handled already
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pSM->pTask->id.idStr,
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr,
StreamTaskEventList[event].name);
taosMsleep(100);
}
}
} else {
ASSERT(pSM->pActiveTrans == NULL);
if (pSM->pActiveTrans != NULL) {
ASSERT(!pSM->pActiveTrans->autoInvokeEndFn);
stWarn("s-task:%s status:%s handle event:%s is interrupted, handle the new event:%s", pTask->id.idStr,
pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name, StreamTaskEventList[event].name);
}
pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pSM->pTask->lock);
taosThreadMutexUnlock(&pTask->lock);
int32_t code = pTrans->pAction(pSM->pTask);
int32_t code = pTrans->pAction(pTask);
// todo handle error code;
if (pTrans->autoInvokeEndFn) {
@ -273,6 +284,10 @@ ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr) {
return s.state;
}
const char* streamTaskGetStatusStr(ETaskStatus status) {
return StreamTaskStatusList[status].name;
}
void streamTaskResetStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
@ -304,6 +319,9 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr
if (pEventInfo != NULL) {
trans.attachEvent = *pEventInfo;
} else {
trans.attachEvent.event = 0;
trans.attachEvent.status = 0;
}
trans.pAction = (fn != NULL) ? fn : dummyFn;
@ -329,15 +347,16 @@ void doInitStateTransferTable(void) {
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS_STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST,
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
streamTaskSetReadyForWal, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE,
streamTaskSetReadyForWal, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
@ -347,7 +366,7 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
@ -375,7 +394,7 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
@ -410,7 +429,7 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// dropping related event
@ -430,6 +449,6 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
}