fix(stream): not transfer state if state is not appropriate.

This commit is contained in:
Haojun Liao 2023-11-02 19:12:36 +08:00
parent c4079332ff
commit 7a32b3a209
4 changed files with 14 additions and 10 deletions

View File

@ -808,6 +808,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102) #define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
// TDLite // TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -1167,10 +1167,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
if (pTask->info.fillHistory == 1) {
ASSERT(pTask->status.pauseAllowed == true);
}
streamScanHistoryData(pTask); streamScanHistoryData(pTask);
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;

View File

@ -285,13 +285,14 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
stError( stError(
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related " "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
"fill-history task", "fill-history task",
pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); id, (int32_t) pTask->streamTaskId.taskId);
// 1. free it and remove fill-history task from disk meta-store // 1. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
@ -304,7 +305,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else { } else {
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id,
pStreamTask->id.idStr); pStreamTask->id.idStr);
} }
@ -318,7 +319,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
} else { } else {
ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
} }
// wait for the stream task to handle all in the inputQ, and to be idle // wait for the stream task to handle all in the inputQ, and to be idle
@ -328,7 +329,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure. // start the task state transfer procedure.
char* p = NULL; char* p = NULL;
streamTaskGetStatus(pStreamTask, &p); status = streamTaskGetStatus(pStreamTask, &p);
if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
streamMetaReleaseTask(pMeta, pStreamTask);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task. // update the scan data range for source task.
@ -351,7 +357,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 3. resume the state of stream task, after this function, the stream task will run immediately. // 3. resume the state of stream task, after this function, the stream task will run immediately.
streamTaskResume(pStreamTask); streamTaskResume(pStreamTask);
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id);
// 4. free it and remove fill-history task from disk meta-store // 4. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);

View File

@ -670,7 +670,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled
// stream // stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS,"Invalid task status to proceed")
// TDLite // TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")