Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0
This commit is contained in:
commit
e9371a1eab
|
@ -786,8 +786,11 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
||||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock);
|
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock);
|
||||||
|
|
||||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
|
||||||
void streamTaskRestoreStatus(SStreamTask* pTask);
|
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
|
||||||
|
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
|
||||||
|
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
|
||||||
|
int32_t streamTaskRestoreStatus(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
||||||
SRpcHandleInfo* pRpcInfo, int32_t taskId);
|
SRpcHandleInfo* pRpcInfo, int32_t taskId);
|
||||||
|
|
|
@ -354,6 +354,7 @@ SRequestObj* acquireRequest(int64_t rid);
|
||||||
int32_t releaseRequest(int64_t rid);
|
int32_t releaseRequest(int64_t rid);
|
||||||
int32_t removeRequest(int64_t rid);
|
int32_t removeRequest(int64_t rid);
|
||||||
void doDestroyRequest(void* p);
|
void doDestroyRequest(void* p);
|
||||||
|
int64_t removeFromMostPrevReq(SRequestObj* pRequest);
|
||||||
|
|
||||||
char* getDbOfConnection(STscObj* pObj);
|
char* getDbOfConnection(STscObj* pObj);
|
||||||
void setConnectionDB(STscObj* pTscObj, const char* db);
|
void setConnectionDB(STscObj* pTscObj, const char* db);
|
||||||
|
|
|
@ -385,6 +385,33 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
|
||||||
|
|
||||||
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
|
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
|
||||||
|
|
||||||
|
/// return the most previous req ref id
|
||||||
|
int64_t removeFromMostPrevReq(SRequestObj* pRequest) {
|
||||||
|
int64_t mostPrevReqRefId = pRequest->self;
|
||||||
|
SRequestObj* pTmp = pRequest;
|
||||||
|
while (pTmp->relation.prevRefId) {
|
||||||
|
pTmp = acquireRequest(pTmp->relation.prevRefId);
|
||||||
|
if (pTmp) {
|
||||||
|
mostPrevReqRefId = pTmp->self;
|
||||||
|
releaseRequest(mostPrevReqRefId);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
removeRequest(mostPrevReqRefId);
|
||||||
|
return mostPrevReqRefId;
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyNextReq(int64_t nextRefId) {
|
||||||
|
if (nextRefId) {
|
||||||
|
SRequestObj* pObj = acquireRequest(nextRefId);
|
||||||
|
if (pObj) {
|
||||||
|
releaseRequest(nextRefId);
|
||||||
|
releaseRequest(nextRefId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void destroySubRequests(SRequestObj *pRequest) {
|
void destroySubRequests(SRequestObj *pRequest) {
|
||||||
int32_t reqIdx = -1;
|
int32_t reqIdx = -1;
|
||||||
SRequestObj *pReqList[16] = {NULL};
|
SRequestObj *pReqList[16] = {NULL};
|
||||||
|
@ -435,7 +462,7 @@ void doDestroyRequest(void *p) {
|
||||||
uint64_t reqId = pRequest->requestId;
|
uint64_t reqId = pRequest->requestId;
|
||||||
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
||||||
|
|
||||||
destroySubRequests(pRequest);
|
int64_t nextReqRefId = pRequest->relation.nextRefId;
|
||||||
|
|
||||||
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||||
|
|
||||||
|
@ -471,6 +498,7 @@ void doDestroyRequest(void *p) {
|
||||||
taosMemoryFreeClear(pRequest->sqlstr);
|
taosMemoryFreeClear(pRequest->sqlstr);
|
||||||
taosMemoryFree(pRequest);
|
taosMemoryFree(pRequest);
|
||||||
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
||||||
|
destroyNextReq(nextReqRefId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyRequest(SRequestObj *pRequest) {
|
void destroyRequest(SRequestObj *pRequest) {
|
||||||
|
@ -479,7 +507,7 @@ void destroyRequest(SRequestObj *pRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_stop_query(pRequest);
|
taos_stop_query(pRequest);
|
||||||
removeRequest(pRequest->self);
|
removeFromMostPrevReq(pRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosStopQueryImpl(SRequestObj *pRequest) {
|
void taosStopQueryImpl(SRequestObj *pRequest) {
|
||||||
|
|
|
@ -1254,54 +1254,34 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
||||||
int32_t reqIdx = 0;
|
tscInfo("restart request: %s p: %p", pRequest->sqlstr, pRequest);
|
||||||
SRequestObj *pReqList[16] = {NULL};
|
SRequestObj* pUserReq = pRequest;
|
||||||
SRequestObj *pUserReq = NULL;
|
acquireRequest(pRequest->self);
|
||||||
pReqList[0] = pRequest;
|
while (pUserReq) {
|
||||||
uint64_t tmpRefId = 0;
|
if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) {
|
||||||
SRequestObj *pTmp = pRequest;
|
|
||||||
while (pTmp->relation.prevRefId) {
|
|
||||||
tmpRefId = pTmp->relation.prevRefId;
|
|
||||||
pTmp = acquireRequest(tmpRefId);
|
|
||||||
if (pTmp) {
|
|
||||||
pReqList[++reqIdx] = pTmp;
|
|
||||||
releaseRequest(tmpRefId);
|
|
||||||
} else {
|
|
||||||
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
|
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tmpRefId = pRequest->relation.nextRefId;
|
|
||||||
while (tmpRefId) {
|
|
||||||
pTmp = acquireRequest(tmpRefId);
|
|
||||||
if (pTmp) {
|
|
||||||
tmpRefId = pTmp->relation.nextRefId;
|
|
||||||
removeRequest(pTmp->self);
|
|
||||||
releaseRequest(pTmp->self);
|
|
||||||
} else {
|
} else {
|
||||||
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
|
int64_t nextRefId = pUserReq->relation.nextRefId;
|
||||||
break;
|
releaseRequest(pUserReq->self);
|
||||||
|
if (nextRefId) {
|
||||||
|
pUserReq = acquireRequest(nextRefId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
bool hasSubRequest = pUserReq != pRequest || pRequest->relation.prevRefId != 0;
|
||||||
for (int32_t i = reqIdx; i >= 0; i--) {
|
|
||||||
destroyCtxInRequest(pReqList[i]);
|
|
||||||
if (pReqList[i]->relation.userRefId == pReqList[i]->self || 0 == pReqList[i]->relation.userRefId) {
|
|
||||||
pUserReq = pReqList[i];
|
|
||||||
} else {
|
|
||||||
removeRequest(pReqList[i]->self);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pUserReq) {
|
if (pUserReq) {
|
||||||
|
destroyCtxInRequest(pUserReq);
|
||||||
pUserReq->prevCode = code;
|
pUserReq->prevCode = code;
|
||||||
memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
|
memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
|
||||||
} else {
|
} else {
|
||||||
tscError("user req is missing");
|
tscError("User req is missing");
|
||||||
|
removeFromMostPrevReq(pRequest);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (hasSubRequest)
|
||||||
|
removeFromMostPrevReq(pRequest);
|
||||||
|
else
|
||||||
|
releaseRequest(pUserReq->self);
|
||||||
doAsyncQuery(pUserReq, true);
|
doAsyncQuery(pUserReq, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -721,6 +721,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add into buffer firstly
|
||||||
|
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
mDebug("stream stream:%s tasks register into node list", createReq.name);
|
mDebug("stream stream:%s tasks register into node list", createReq.name);
|
||||||
saveStreamTasksInfo(&streamObj, &execInfo);
|
saveStreamTasksInfo(&streamObj, &execInfo);
|
||||||
|
|
|
@ -917,6 +917,22 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
|
||||||
|
STQ* pTq = param;
|
||||||
|
|
||||||
|
SStreamMeta* pMeta = pStreamTask->pMeta;
|
||||||
|
STaskId hId = pStreamTask->hTaskInfo.id;
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
// todo handle error
|
||||||
|
}
|
||||||
|
|
||||||
|
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// this function should be executed by only one thread, so we set an sentinel to protect this function
|
// this function should be executed by only one thread, so we set an sentinel to protect this function
|
||||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
||||||
|
@ -1007,37 +1023,27 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// the following procedure should be executed, no matter status is stop/pause or not
|
// the following procedure should be executed, no matter status is stop/pause or not
|
||||||
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
|
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
ASSERT(pTask->info.fillHistory == 1);
|
||||||
SStreamTask* pStreamTask = NULL;
|
|
||||||
|
|
||||||
// 1. get the related stream task
|
// 1. get the related stream task
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
|
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
|
||||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task set status to be dropping and drop it", id);
|
tqDebug("s-task:%s fill-history task set status to be dropping and drop it", id);
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
|
||||||
|
|
||||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
|
||||||
|
|
||||||
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
|
|
||||||
} else {
|
|
||||||
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
|
||||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -28,8 +28,8 @@ static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDurat
|
||||||
|
|
||||||
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
||||||
int32_t tqScanWal(STQ* pTq) {
|
int32_t tqScanWal(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
|
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||||
|
|
|
@ -865,7 +865,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
|
||||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
|
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask == NULL) {
|
||||||
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
|
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
|
||||||
", it may have been dropped already",
|
", it may have been dropped already",
|
||||||
pMeta->vgId, pTask->hTaskInfo.id.taskId);
|
pMeta->vgId, pTask->hTaskInfo.id.taskId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
|
|
|
@ -26,21 +26,21 @@ extern "C" {
|
||||||
typedef int32_t (*__state_trans_fn)(SStreamTask*);
|
typedef int32_t (*__state_trans_fn)(SStreamTask*);
|
||||||
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
|
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
|
||||||
|
|
||||||
typedef struct SAttachedEventInfo {
|
typedef struct SFutureHandleEventInfo {
|
||||||
ETaskStatus status; // required status that this event can be handled
|
ETaskStatus status; // required status that this event can be handled
|
||||||
EStreamTaskEvent event; // the delayed handled event
|
EStreamTaskEvent event; // the delayed handled event
|
||||||
void* pParam;
|
void* pParam;
|
||||||
void* pFn;
|
__state_trans_user_fn callBackFn;
|
||||||
} SAttachedEventInfo;
|
} SFutureHandleEventInfo;
|
||||||
|
|
||||||
typedef struct STaskStateTrans {
|
typedef struct STaskStateTrans {
|
||||||
bool autoInvokeEndFn;
|
bool autoInvokeEndFn;
|
||||||
SStreamTaskState state;
|
SStreamTaskState state;
|
||||||
EStreamTaskEvent event;
|
EStreamTaskEvent event;
|
||||||
SStreamTaskState next;
|
SStreamTaskState next;
|
||||||
__state_trans_fn pAction;
|
__state_trans_fn pAction;
|
||||||
__state_trans_succ_fn pSuccAction;
|
__state_trans_succ_fn pSuccAction;
|
||||||
SAttachedEventInfo attachEvent;
|
SFutureHandleEventInfo attachEvent;
|
||||||
} STaskStateTrans;
|
} STaskStateTrans;
|
||||||
|
|
||||||
struct SStreamTaskSM {
|
struct SStreamTaskSM {
|
||||||
|
|
|
@ -410,6 +410,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t haltCallback(SStreamTask* pTask, void* param) {
|
||||||
|
streamTaskOpenAllUpstreamInput(pTask);
|
||||||
|
streamTaskSendCheckpointReq(pTask);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
@ -419,11 +425,12 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
int32_t level = pTask->info.taskLevel;
|
int32_t level = pTask->info.taskLevel;
|
||||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
|
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
|
||||||
code = streamDoTransferStateToStreamTask(pTask);
|
code = streamDoTransferStateToStreamTask(pTask);
|
||||||
} else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
|
} else {
|
||||||
|
// no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask != NULL) {
|
if (pStreamTask != NULL) {
|
||||||
// halt the related stream sink task
|
// halt the related stream sink task
|
||||||
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
|
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
|
||||||
pStreamTask->id.idStr, tstrerror(code));
|
pStreamTask->id.idStr, tstrerror(code));
|
||||||
|
@ -432,9 +439,6 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskOpenAllUpstreamInput(pStreamTask);
|
|
||||||
streamTaskSendCheckpointReq(pStreamTask);
|
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -669,6 +669,13 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
streamTaskSendCheckpointSourceRsp(pTask);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
|
|
||||||
|
@ -687,7 +694,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle the dropping event
|
// handle the dropping event
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
|
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
|
@ -385,7 +385,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
||||||
|
|
||||||
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||||
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
|
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
|
||||||
|
|
||||||
int64_t initTs = pTask->execInfo.init;
|
int64_t initTs = pTask->execInfo.init;
|
||||||
int64_t startTs = pTask->execInfo.start;
|
int64_t startTs = pTask->execInfo.start;
|
||||||
|
|
|
@ -647,8 +647,6 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
|
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
|
||||||
pDispatcher->taskId, nodeId, buf);
|
pDispatcher->taskId, nodeId, buf);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// do nothing
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -869,8 +867,8 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
|
||||||
pDst->chkpointTransId = pSrc->chkpointTransId;
|
pDst->chkpointTransId = pSrc->chkpointTransId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||||
|
@ -882,24 +880,24 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
|
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
|
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskResume(SStreamTask* pTask) {
|
void streamTaskResume(SStreamTask* pTask) {
|
||||||
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
|
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
|
||||||
|
|
||||||
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
streamTaskRestoreStatus(pTask);
|
int32_t code = streamTaskRestoreStatus(pTask);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
char* pNew = streamTaskGetStatus(pTask)->name;
|
char* pNew = streamTaskGetStatus(pTask)->name;
|
||||||
if (prevState.state == TASK_STATUS__PAUSE) {
|
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
|
||||||
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
|
|
||||||
} else {
|
|
||||||
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name);
|
stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name, pMeta->numOfPausedTasks);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,20 +59,23 @@ static int32_t streamTaskInitStatus(SStreamTask* pTask);
|
||||||
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
|
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
|
||||||
static int32_t initStateTransferTable();
|
static int32_t initStateTransferTable();
|
||||||
static void doInitStateTransferTable(void);
|
static void doInitStateTransferTable(void);
|
||||||
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask);
|
|
||||||
|
|
||||||
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
||||||
__state_trans_fn fn, __state_trans_succ_fn succFn,
|
__state_trans_fn fn, __state_trans_succ_fn succFn,
|
||||||
SAttachedEventInfo* pEventInfo, bool autoInvoke);
|
SFutureHandleEventInfo* pEventInfo);
|
||||||
|
|
||||||
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
|
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
|
||||||
|
|
||||||
static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
|
static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) {
|
||||||
char* p = streamTaskGetStatus(pTask)->name;
|
char* p = streamTaskGetStatus(pTask)->name;
|
||||||
|
|
||||||
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
||||||
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
|
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
|
||||||
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
|
|
||||||
|
SArray* pList = pTask->status.pSM->pWaitingEventList;
|
||||||
|
taosArrayPush(pList, pEvtInfo);
|
||||||
|
|
||||||
|
stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pList));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,18 +88,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
|
||||||
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) {
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
streamTaskSendCheckpointSourceRsp(pTask);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
||||||
if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr);
|
stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr);
|
||||||
|
@ -170,9 +161,11 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
pEventName, el, pSM->prev.state.name, pSM->current.name);
|
pEventName, el, pSM->prev.state.name, pSM->current.name);
|
||||||
|
|
||||||
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1);
|
||||||
|
|
||||||
// OK, let's handle the attached event, since the task has reached the required status now
|
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
||||||
|
|
||||||
|
// OK, let's handle the waiting event, since the task has reached the required status now
|
||||||
if (pSM->current.state == pEvtInfo->status) {
|
if (pSM->current.state == pEvtInfo->status) {
|
||||||
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
||||||
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
|
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
|
||||||
|
@ -189,7 +182,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
||||||
|
|
||||||
code = pNextTrans->pAction(pSM->pTask);
|
code = pNextTrans->pAction(pSM->pTask);
|
||||||
if (pNextTrans->autoInvokeEndFn) {
|
if (pNextTrans->autoInvokeEndFn) {
|
||||||
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event);
|
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event, pEvtInfo->callBackFn, pEvtInfo->pParam);
|
||||||
} else {
|
} else {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -203,30 +196,61 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskRestoreStatus(SStreamTask* pTask) {
|
static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) {
|
||||||
SStreamTaskSM* pSM = pTask->status.pSM;
|
SStreamTaskSM* pSM = pTask->status.pSM;
|
||||||
|
|
||||||
|
bool removed = false;
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
ASSERT(pSM->pActiveTrans == NULL);
|
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
|
||||||
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
|
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
|
||||||
|
if (pInfo->event == event) {
|
||||||
|
taosArrayRemove(pSM->pWaitingEventList, i);
|
||||||
|
stDebug("s-task:%s pause event in waiting list not be handled yet, remove it from waiting list, remaining:%d",
|
||||||
|
pTask->id.idStr, pInfo->event);
|
||||||
|
removed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SStreamTaskState state = pSM->current;
|
if (!removed) {
|
||||||
pSM->current = pSM->prev.state;
|
stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name);
|
||||||
|
|
||||||
pSM->prev.state = state;
|
|
||||||
pSM->prev.evt = 0;
|
|
||||||
|
|
||||||
pSM->startTs = taosGetTimestampMs();
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
|
||||||
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
|
||||||
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
|
|
||||||
} else {
|
|
||||||
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
|
||||||
|
SStreamTaskSM* pSM = pTask->status.pSM;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) {
|
||||||
|
SStreamTaskState state = pSM->current;
|
||||||
|
pSM->current = pSM->prev.state;
|
||||||
|
|
||||||
|
pSM->prev.state = state;
|
||||||
|
pSM->prev.evt = 0;
|
||||||
|
|
||||||
|
pSM->startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
||||||
|
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr,
|
||||||
|
pSM->prev.state.name, pSM->current.name);
|
||||||
|
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
removeEventInWaitingList(pTask, TASK_EVENT_PAUSE);
|
||||||
|
code = -1; // failed to restore the status
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
||||||
|
@ -242,7 +266,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pSM->pTask = pTask;
|
pSM->pTask = pTask;
|
||||||
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SAttachedEventInfo));
|
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SFutureHandleEventInfo));
|
||||||
if (pSM->pWaitingEventList == NULL) {
|
if (pSM->pWaitingEventList == NULL) {
|
||||||
taosMemoryFree(pSM);
|
taosMemoryFree(pSM);
|
||||||
|
|
||||||
|
@ -273,7 +297,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
if (pTrans->attachEvent.event != 0) {
|
if (pTrans->attachEvent.event != 0) {
|
||||||
attachEvent(pTask, &pTrans->attachEvent);
|
attachWaitedEvent(pTask, &pTrans->attachEvent);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -303,7 +327,32 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
||||||
// todo handle error code;
|
// todo handle error code;
|
||||||
|
|
||||||
if (pTrans->autoInvokeEndFn) {
|
if (pTrans->autoInvokeEndFn) {
|
||||||
streamTaskOnHandleEventSuccess(pSM, event);
|
streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) {
|
||||||
|
SStreamTask* pTask = pSM->pTask;
|
||||||
|
if (pTrans->attachEvent.event != 0) {
|
||||||
|
SFutureHandleEventInfo info = pTrans->attachEvent;
|
||||||
|
info.pParam = param;
|
||||||
|
info.callBackFn = callbackFn;
|
||||||
|
|
||||||
|
attachWaitedEvent(pTask, &info);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
} else { // override current active trans
|
||||||
|
pSM->pActiveTrans = pTrans;
|
||||||
|
pSM->startTs = taosGetTimestampMs();
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
int32_t code = pTrans->pAction(pTask);
|
||||||
|
// todo handle error code;
|
||||||
|
|
||||||
|
if (pTrans->autoInvokeEndFn) {
|
||||||
|
streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,6 +398,45 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SStreamTask* pTask = pSM->pTask;
|
||||||
|
STaskStateTrans* pTrans = NULL;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
|
||||||
|
EStreamTaskEvent evt = pSM->pActiveTrans->event;
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
||||||
|
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
|
||||||
|
taosMsleep(100);
|
||||||
|
} else {
|
||||||
|
// no active event trans exists, handle this event directly
|
||||||
|
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSM->pActiveTrans != NULL) {
|
||||||
|
// currently in some state transfer procedure, not auto invoke transfer, quit from this procedure
|
||||||
|
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
||||||
|
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
||||||
|
pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
|
||||||
|
}
|
||||||
|
|
||||||
|
code = doHandleEventAsync(pSM, event, pTrans, callbackFn, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static void keepPrevInfo(SStreamTaskSM* pSM) {
|
static void keepPrevInfo(SStreamTaskSM* pSM) {
|
||||||
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
||||||
|
|
||||||
|
@ -356,8 +444,9 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
|
||||||
pSM->prev.evt = pTrans->event;
|
pSM->prev.evt = pTrans->event;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
|
||||||
SStreamTask* pTask = pSM->pTask;
|
SStreamTask* pTask = pSM->pTask;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
// do update the task status
|
// do update the task status
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
@ -369,16 +458,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
|
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
|
||||||
|
|
||||||
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
||||||
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
|
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event),
|
||||||
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->event != event) {
|
if (pTrans->event != event) {
|
||||||
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", id, GET_EVT_NAME(event),
|
||||||
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
|
pSM->current.name, GET_EVT_NAME(pTrans->event));
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
@ -388,16 +477,31 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
pSM->current = pTrans->next;
|
pSM->current = pTrans->next;
|
||||||
pSM->pActiveTrans = NULL;
|
pSM->pActiveTrans = NULL;
|
||||||
|
|
||||||
|
// todo remove it
|
||||||
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
||||||
pTrans->pSuccAction(pTask);
|
pTrans->pSuccAction(pTask);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
// todo: add parameter to control lock
|
||||||
|
// after handling the callback function assigned by invoker, go on handling the waiting tasks
|
||||||
|
if (callbackFn != NULL) {
|
||||||
|
stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event));
|
||||||
|
callbackFn(pSM->pTask, param);
|
||||||
|
|
||||||
|
stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event));
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
// tasks in waiting list
|
||||||
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
||||||
doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
|
doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
|
||||||
} else {
|
} else {
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", id,
|
||||||
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -453,7 +557,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
|
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
|
||||||
__state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) {
|
__state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo) {
|
||||||
STaskStateTrans trans = {0};
|
STaskStateTrans trans = {0};
|
||||||
trans.state = StreamTaskStatusList[current];
|
trans.state = StreamTaskStatusList[current];
|
||||||
trans.next = StreamTaskStatusList[next];
|
trans.next = StreamTaskStatusList[next];
|
||||||
|
@ -468,7 +572,7 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr
|
||||||
|
|
||||||
trans.pAction = (fn != NULL) ? fn : dummyFn;
|
trans.pAction = (fn != NULL) ? fn : dummyFn;
|
||||||
trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn;
|
trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn;
|
||||||
trans.autoInvokeEndFn = autoInvoke;
|
trans.autoInvokeEndFn = (fn == NULL);
|
||||||
return trans;
|
return trans;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -482,93 +586,93 @@ void doInitStateTransferTable(void) {
|
||||||
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
|
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
|
||||||
|
|
||||||
// initialization event handle
|
// initialization event handle
|
||||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
|
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
// scan-history related event
|
// scan-history related event
|
||||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
// halt stream task, from other task status
|
// halt stream task, from other task status
|
||||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
// checkpoint related event
|
// checkpoint related event
|
||||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
// pause & resume related event handle
|
// pause & resume related event handle
|
||||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
|
info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
// resume is completed by restore status of state-machine
|
// resume is completed by restore status of state-machine
|
||||||
|
|
||||||
// stop related event
|
// stop related event
|
||||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
// dropping related event
|
// dropping related event
|
||||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
}
|
}
|
||||||
//clang-format on
|
//clang-format on
|
Loading…
Reference in New Issue