fix(stream): add full unsupported event filtering.
This commit is contained in:
parent
e14d42c01e
commit
351a31302f
|
@ -803,6 +803,7 @@ int32_t* taosGetErrno();
|
||||||
// stream
|
// stream
|
||||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||||
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
|
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
|
||||||
|
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||||
|
|
|
@ -1061,6 +1061,47 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
||||||
|
|
||||||
|
// if it's an source task, extract the last version in wal.
|
||||||
|
SVersionRange *pRange = &pTask->dataRange.range;
|
||||||
|
|
||||||
|
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
||||||
|
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||||
|
|
||||||
|
if (done) {
|
||||||
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
||||||
|
streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
|
streamExecTask(pTask); // exec directly
|
||||||
|
} else {
|
||||||
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
|
", do secondary scan-history from WAL after halt the related stream task:%s",
|
||||||
|
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
||||||
|
pStreamTask->id.idStr);
|
||||||
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
|
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||||
|
|
||||||
|
int64_t dstVer = pTask->dataRange.range.minVer;
|
||||||
|
pTask->chkInfo.nextProcessVer = dstVer;
|
||||||
|
|
||||||
|
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||||
|
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||||
|
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
||||||
|
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
|
||||||
|
// now the fill-history task starts to scan data from wal files.
|
||||||
|
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
tqScanWalAsync(pTq, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// this function should be executed by only one thread
|
// this function should be executed by only one thread
|
||||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
||||||
|
@ -1143,7 +1184,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
|
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
SVersionRange* pRange = NULL;
|
|
||||||
SStreamTask* pStreamTask = NULL;
|
SStreamTask* pStreamTask = NULL;
|
||||||
|
|
||||||
// 1. get the related stream task
|
// 1. get the related stream task
|
||||||
|
@ -1162,50 +1202,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||||
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
doStartStep2(pTask, pStreamTask, pTq);
|
||||||
// if it's an source task, extract the last version in wal.
|
|
||||||
pRange = &pTask->dataRange.range;
|
|
||||||
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
|
||||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
|
||||||
|
|
||||||
if (done) {
|
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
|
||||||
streamTaskPutTranstateIntoInputQ(pTask);
|
|
||||||
// streamTaskRestoreStatus(pTask);
|
|
||||||
|
|
||||||
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
|
||||||
// pTask->status.keepTaskStatus = TASK_STATUS__READY;
|
|
||||||
// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
|
|
||||||
// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
|
|
||||||
// }
|
|
||||||
|
|
||||||
streamExecTask(pTask); // exec directly
|
|
||||||
} else {
|
} else {
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
|
||||||
", do secondary scan-history from WAL after halt the related stream task:%s",
|
|
||||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
|
||||||
pStreamTask->id.idStr);
|
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
|
||||||
|
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
|
||||||
|
|
||||||
int64_t dstVer = pTask->dataRange.range.minVer;
|
|
||||||
pTask->chkInfo.nextProcessVer = dstVer;
|
|
||||||
|
|
||||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
|
||||||
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
|
||||||
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
|
||||||
|
|
||||||
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
|
|
||||||
|
|
||||||
// now the fill-history task starts to scan data from wal files.
|
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
tqScanWalAsync(pTq, false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
|
|
@ -59,6 +59,7 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStartStreamTask(STQ* pTq) {
|
int32_t tqStartStreamTask(STQ* pTq) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
|
@ -102,12 +103,16 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
code = ret;
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pTaskList);
|
taosArrayDestroy(pTaskList);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
|
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
|
||||||
|
|
|
@ -82,13 +82,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
|
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -108,9 +101,31 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
|
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
|
||||||
static bool isUnsupportedTransform(ETaskStatus state, const EStreamTaskEvent event) {
|
static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) {
|
||||||
if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT) {
|
if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) {
|
||||||
if (event == TASK_EVENT_SCANHIST_DONE || event == TASK_EVENT_CHECKPOINT_DONE || event == TASK_EVENT_GEN_CHECKPOINT) {
|
return (state != TASK_STATUS__UNINIT);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event == TASK_EVENT_SCANHIST_DONE) {
|
||||||
|
return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event == TASK_EVENT_GEN_CHECKPOINT) {
|
||||||
|
return (state != TASK_STATUS__READY);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event == TASK_EVENT_CHECKPOINT_DONE) {
|
||||||
|
return (state != TASK_STATUS__CK);
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo refactor later
|
||||||
|
if (event == TASK_EVENT_RESUME) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event == TASK_EVENT_HALT) {
|
||||||
|
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT || state == TASK_STATUS__STOP ||
|
||||||
|
state == TASK_STATUS__SCAN_HISTORY) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -128,7 +143,7 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isUnsupportedTransform(state, event)) {
|
if (isInvalidStateTransfer(state, event)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -219,7 +234,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
|
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,7 +275,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
|
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event.
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSM->pActiveTrans != NULL) {
|
if (pSM->pActiveTrans != NULL) {
|
||||||
|
@ -303,14 +318,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
stDebug("s-task:%s unlockx", pTask->id.idStr);
|
stDebug("s-task:%s unlockx", pTask->id.idStr);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->event != event) {
|
if (pTrans->event != event) {
|
||||||
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
||||||
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
|
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
|
||||||
keepPrevInfo(pSM);
|
keepPrevInfo(pSM);
|
||||||
|
@ -469,6 +484,10 @@ void doInitStateTransferTable(void) {
|
||||||
streamTaskKeepCurrentVerInWal, NULL, true);
|
streamTaskKeepCurrentVerInWal, NULL, true);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
||||||
|
streamTaskKeepCurrentVerInWal, NULL, true);
|
||||||
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
||||||
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
||||||
streamTaskKeepCurrentVerInWal, &info, true);
|
streamTaskKeepCurrentVerInWal, &info, true);
|
||||||
|
|
|
@ -665,6 +665,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu
|
||||||
// stream
|
// stream
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer")
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
|
||||||
|
|
Loading…
Reference in New Issue