Merge pull request #27536 from taosdata/fix/3_liaohj
fix(stream): enable follower update the checkpoint info.
This commit is contained in:
commit
63ec527e04
|
@ -272,24 +272,35 @@ typedef struct SCheckpointInfo {
|
||||||
int64_t processedVer;
|
int64_t processedVer;
|
||||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||||
int64_t msgVer;
|
int64_t msgVer;
|
||||||
int32_t consensusTransId; // consensus checkpoint id
|
|
||||||
SActiveCheckpointInfo* pActiveInfo;
|
SActiveCheckpointInfo* pActiveInfo;
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TASK_CONSEN_CHKPT_REQ = 0x1,
|
||||||
|
TASK_CONSEN_CHKPT_SEND = 0x2,
|
||||||
|
TASK_CONSEN_CHKPT_RECV = 0x3,
|
||||||
|
} EConsenChkptStatus;
|
||||||
|
|
||||||
|
typedef struct SConsenChkptInfo {
|
||||||
|
// bool alreadySendChkptId;
|
||||||
|
EConsenChkptStatus status;
|
||||||
|
int64_t statusTs;
|
||||||
|
int32_t consenChkptTransId;
|
||||||
|
} SConsenChkptInfo;
|
||||||
|
|
||||||
typedef struct SStreamStatus {
|
typedef struct SStreamStatus {
|
||||||
SStreamTaskSM* pSM;
|
SStreamTaskSM* pSM;
|
||||||
int8_t taskStatus;
|
int8_t taskStatus;
|
||||||
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
|
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
|
||||||
int8_t schedStatus;
|
int8_t schedStatus;
|
||||||
int8_t statusBackup;
|
int8_t statusBackup;
|
||||||
int32_t schedIdleTime; // idle time before invoke again
|
int32_t schedIdleTime; // idle time before invoke again
|
||||||
int32_t timerActive; // timer is active
|
int32_t timerActive; // timer is active
|
||||||
int64_t lastExecTs; // last exec time stamp
|
int64_t lastExecTs; // last exec time stamp
|
||||||
int32_t inScanHistorySentinel;
|
int32_t inScanHistorySentinel;
|
||||||
bool appendTranstateBlock; // has append the transfer state data block already
|
bool appendTranstateBlock; // has append the transfer state data block already
|
||||||
bool removeBackendFiles; // remove backend files on disk when free stream tasks
|
bool removeBackendFiles; // remove backend files on disk when free stream tasks
|
||||||
bool sendConsensusChkptId;
|
SConsenChkptInfo consenChkptInfo;
|
||||||
bool requireConsensusChkptId;
|
|
||||||
} SStreamStatus;
|
} SStreamStatus;
|
||||||
|
|
||||||
typedef struct SDataRange {
|
typedef struct SDataRange {
|
||||||
|
@ -774,6 +785,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
||||||
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
|
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
|
||||||
|
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts);
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
||||||
|
|
|
@ -969,7 +969,8 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
|
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
|
||||||
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
|
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
|
||||||
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
|
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
|
||||||
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4106)
|
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
|
||||||
|
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||||
|
|
|
@ -242,7 +242,6 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg);
|
|
||||||
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
|
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
|
|
|
@ -1291,6 +1291,3 @@ int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
|
return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|
||||||
return tqStreamProcessConsensusChkptRsp2(pTq->pStreamMeta, pMsg);
|
|
||||||
}
|
|
||||||
|
|
|
@ -1238,9 +1238,9 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
||||||
|
|
||||||
if (pTask->chkInfo.consensusTransId >= req.transId) {
|
if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) {
|
||||||
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
|
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
|
||||||
pTask->chkInfo.consensusTransId, req.transId);
|
pTask->status.consenChkptInfo.consenChkptTransId, req.transId);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1256,7 +1256,9 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
pTask->id.idStr, vgId, req.checkpointId, req.transId);
|
pTask->id.idStr, vgId, req.checkpointId, req.transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->chkInfo.consensusTransId = req.transId;
|
pTask->status.consenChkptInfo.consenChkptTransId = req.transId;
|
||||||
|
pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV;
|
||||||
|
pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs();
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
|
|
|
@ -586,7 +586,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||||
pInfo->checkpointTime, pReq->checkpointTs);
|
pInfo->checkpointTime, pReq->checkpointTs);
|
||||||
} else { // not in restore status, must be in checkpoint status
|
} else { // not in restore status, must be in checkpoint status
|
||||||
if (pStatus.state == TASK_STATUS__CK) {
|
if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
|
||||||
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||||
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||||
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
|
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
|
||||||
|
@ -610,7 +610,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
}
|
}
|
||||||
|
|
||||||
// update only it is in checkpoint status, or during restore procedure.
|
// update only it is in checkpoint status, or during restore procedure.
|
||||||
if (pStatus.state == TASK_STATUS__CK || (!restored)) {
|
if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
|
||||||
pInfo->checkpointId = pReq->checkpointId;
|
pInfo->checkpointId = pReq->checkpointId;
|
||||||
pInfo->checkpointVer = pReq->checkpointVer;
|
pInfo->checkpointVer = pReq->checkpointVer;
|
||||||
pInfo->checkpointTime = pReq->checkpointTs;
|
pInfo->checkpointTime = pReq->checkpointTs;
|
||||||
|
@ -1371,19 +1371,19 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
|
||||||
|
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
ETaskStatus p = streamTaskGetStatus(pTask).state;
|
ETaskStatus p = streamTaskGetStatus(pTask).state;
|
||||||
|
// if (pInfo->alreadySendChkptId == true) {
|
||||||
if (pTask->status.sendConsensusChkptId == true) {
|
// stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
|
||||||
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
|
// streamMutexUnlock(&pTask->lock);
|
||||||
streamMutexUnlock(&pTask->lock);
|
// return TSDB_CODE_SUCCESS;
|
||||||
return TSDB_CODE_SUCCESS;
|
// } else {
|
||||||
} else {
|
// pInfo->alreadySendChkptId = true;
|
||||||
pTask->status.sendConsensusChkptId = true;
|
// }
|
||||||
}
|
//
|
||||||
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pTask->pBackend != NULL) {
|
if (pTask->pBackend != NULL) {
|
||||||
|
@ -1391,8 +1391,9 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
||||||
pTask->pBackend = NULL;
|
pTask->pBackend = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->status.requireConsensusChkptId = true;
|
pInfo->status = TASK_CONSEN_CHKPT_REQ;
|
||||||
stDebug("s-task:%s set the require consensus-checkpointId flag", id);
|
pInfo->statusTs = taosGetTimestampMs();
|
||||||
|
stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -197,10 +197,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*pTask)->status.requireConsensusChkptId) {
|
entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts);
|
||||||
entry.checkpointInfo.consensusChkptId = 1;
|
if (entry.checkpointInfo.consensusChkptId) {
|
||||||
(*pTask)->status.requireConsensusChkptId = false;
|
entry.checkpointInfo.consensusTs = pMsg->ts;
|
||||||
stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*pTask)->exec.pWalReader != NULL) {
|
if ((*pTask)->exec.pWalReader != NULL) {
|
||||||
|
|
|
@ -120,7 +120,8 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
||||||
code = ret;
|
code = ret;
|
||||||
|
|
||||||
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
|
||||||
|
if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -195,9 +196,9 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear the send consensus-checkpointId flag
|
// clear the send consensus-checkpointId flag
|
||||||
streamMutexLock(&(*p)->lock);
|
// streamMutexLock(&(*p)->lock);
|
||||||
(*p)->status.sendConsensusChkptId = false;
|
// (*p)->status.sendConsensusChkptId = false;
|
||||||
streamMutexUnlock(&(*p)->lock);
|
// streamMutexUnlock(&(*p)->lock);
|
||||||
|
|
||||||
if (pStartInfo->startAllTasks != 1) {
|
if (pStartInfo->startAllTasks != 1) {
|
||||||
int64_t el = endTs - startTs;
|
int64_t el = endTs - startTs;
|
||||||
|
@ -385,11 +386,11 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
|
stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
|
|
||||||
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
|
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
|
||||||
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -443,4 +444,27 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
|
SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
|
||||||
|
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
|
||||||
|
pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
|
||||||
|
pConChkptInfo->statusTs = ts;
|
||||||
|
stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr,
|
||||||
|
vgId, pConChkptInfo->statusTs);
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) {
|
||||||
|
pConChkptInfo->statusTs = ts;
|
||||||
|
|
||||||
|
stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64,
|
||||||
|
pTask->id.idStr, vgId, pConChkptInfo->statusTs);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -941,7 +941,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
|
||||||
.checkpointInfo.latestSize = 0,
|
.checkpointInfo.latestSize = 0,
|
||||||
.checkpointInfo.remoteBackup = 0,
|
.checkpointInfo.remoteBackup = 0,
|
||||||
.checkpointInfo.consensusChkptId = 0,
|
.checkpointInfo.consensusChkptId = 0,
|
||||||
.checkpointInfo.consensusTs = taosGetTimestampMs(),
|
.checkpointInfo.consensusTs = 0,
|
||||||
.hTaskId = pTask->hTaskInfo.id.taskId,
|
.hTaskId = pTask->hTaskInfo.id.taskId,
|
||||||
.procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
|
.procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
|
||||||
.outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
|
.outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
|
||||||
|
|
|
@ -413,7 +413,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
// no active event trans exists, handle this event directly
|
// no active event trans exists, handle this event directly
|
||||||
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
|
stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event),
|
||||||
|
pSM->current.name);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
@ -423,7 +424,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) {
|
if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) {
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr);
|
stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr);
|
||||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
return TSDB_CODE_STREAM_CONFLICT_EVENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
||||||
|
|
|
@ -812,7 +812,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exi
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
|
|
Loading…
Reference in New Issue