fix(stream): add unsupport trans.

This commit is contained in:
Haojun Liao 2023-10-31 23:14:02 +08:00
parent 294c59150c
commit 3e6c89d8eb
3 changed files with 19 additions and 7 deletions

View File

@ -1269,9 +1269,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) {
tqScanWalAsync(pTq, false);
}
}
streamMetaReleaseTask(pMeta, pStreamTask);
} else {

View File

@ -580,7 +580,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
streamTaskSetSchedStatusInactive(pTask);
taosWLockLatch(&pMeta->lock);

View File

@ -105,6 +105,17 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
static bool isUnsupportedTransform(ETaskStatus state, const EStreamTaskEvent event) {
if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT) {
if (event == TASK_EVENT_SCANHIST_DONE || event == TASK_EVENT_CHECKPOINT_DONE || event == TASK_EVENT_GEN_CHECKPOINT) {
return true;
}
}
return false;
}
// todo optimize the perf of find the trans objs by using hash table
static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) {
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
@ -115,10 +126,8 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
}
}
if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) {
} else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) {
// the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block.
if (isUnsupportedTransform(state, event)) {
return NULL;
} else {
ASSERT(0);
}