fix(stream): add full unsupported event filtering.

This commit is contained in:
Haojun Liao 2023-11-02 09:43:48 +08:00
parent 05b3d51803
commit 95efa07e92
5 changed files with 89 additions and 61 deletions

View File

@ -807,6 +807,7 @@ int32_t* taosGetErrno();
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -1064,6 +1064,47 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
return code;
}
static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
const char* id = pTask->id.idStr;
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
// if it's an source task, extract the last version in wal.
SVersionRange *pRange = &pTask->dataRange.range;
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
streamExecTask(pTask); // exec directly
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history from WAL after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
int64_t dstVer = pTask->dataRange.range.minVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files.
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) {
tqScanWalAsync(pTq, false);
}
}
}
// this function should be executed by only one thread
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
@ -1146,7 +1187,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) {
SVersionRange* pRange = NULL;
SStreamTask* pStreamTask = NULL;
// 1. get the related stream task
@ -1165,50 +1205,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
// streamTaskRestoreStatus(pTask);
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// pTask->status.keepTaskStatus = TASK_STATUS__READY;
// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
// }
streamExecTask(pTask); // exec directly
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code == TSDB_CODE_SUCCESS) {
doStartStep2(pTask, pStreamTask, pTq);
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history from WAL after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
int64_t dstVer = pTask->dataRange.range.minVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files.
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) {
tqScanWalAsync(pTq, false);
}
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
}
streamMetaReleaseTask(pMeta, pStreamTask);

View File

@ -59,6 +59,7 @@ int32_t tqScanWal(STQ* pTq) {
}
int32_t tqStartStreamTask(STQ* pTq) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -102,12 +103,16 @@ int32_t tqStartStreamTask(STQ* pTq) {
}
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return 0;
return code;
}
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {

View File

@ -82,13 +82,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
}
return TSDB_CODE_SUCCESS;
}
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
return 0;
@ -108,9 +101,31 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
}
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
static bool isUnsupportedTransform(ETaskStatus state, const EStreamTaskEvent event) {
if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT) {
if (event == TASK_EVENT_SCANHIST_DONE || event == TASK_EVENT_CHECKPOINT_DONE || event == TASK_EVENT_GEN_CHECKPOINT) {
static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) {
if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) {
return (state != TASK_STATUS__UNINIT);
}
if (event == TASK_EVENT_SCANHIST_DONE) {
return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY);
}
if (event == TASK_EVENT_GEN_CHECKPOINT) {
return (state != TASK_STATUS__READY);
}
if (event == TASK_EVENT_CHECKPOINT_DONE) {
return (state != TASK_STATUS__CK);
}
// todo refactor later
if (event == TASK_EVENT_RESUME) {
return true;
}
if (event == TASK_EVENT_HALT) {
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT || state == TASK_STATUS__STOP ||
state == TASK_STATUS__SCAN_HISTORY) {
return true;
}
}
@ -128,7 +143,7 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
}
}
if (isUnsupportedTransform(state, event)) {
if (isInvalidStateTransfer(state, event)) {
return NULL;
} else {
ASSERT(0);
@ -219,7 +234,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
taosMsleep(100);
} else {
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
}
@ -260,7 +275,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event.
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
if (pSM->pActiveTrans != NULL) {
@ -303,14 +318,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockx", pTask->id.idStr);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
if (pTrans->event != event) {
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
keepPrevInfo(pSM);
@ -469,6 +484,10 @@ void doInitStateTransferTable(void) {
streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, &info, true);

View File

@ -670,6 +670,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer")
// TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")