fix(stream): add qualified status in assert

This commit is contained in:
Haojun Liao 2023-11-02 08:46:18 +08:00
parent dc8a90c864
commit 05b3d51803
1 changed files with 6 additions and 8 deletions

View File

@ -139,9 +139,10 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
void streamTaskRestoreStatus(SStreamTask* pTask) { void streamTaskRestoreStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM; SStreamTaskSM* pSM = pTask->status.pSM;
taosThreadMutexLock(&pTask->lock);
ASSERT(pSM->pActiveTrans == NULL);
taosThreadMutexLock(&pTask->lock);
ASSERT(pSM->pActiveTrans == NULL);
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT); ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
SStreamTaskState state = pSM->current; SStreamTaskState state = pSM->current;
@ -289,13 +290,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
// do update the task status // do update the task status
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
STaskStateTrans* pTrans = pSM->pActiveTrans; STaskStateTrans* pTrans = pSM->pActiveTrans;
if (pTrans == NULL) { if (pTrans == NULL) {
ETaskStatus s = pSM->current.state; ETaskStatus s = pSM->current.state;
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP); ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP ||
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
// the pSM->prev.evt may be 0, so print string is not appropriate. // the pSM->prev.evt may be 0, so print string is not appropriate.
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr, stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
@ -309,7 +310,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event)); GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlocky", pTask->id.idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -342,7 +342,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
pSM->pActiveTrans = pNextTrans; pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockf", pTask->id.idStr);
int32_t code = pNextTrans->pAction(pSM->pTask); int32_t code = pNextTrans->pAction(pSM->pTask);
if (pNextTrans->autoInvokeEndFn) { if (pNextTrans->autoInvokeEndFn) {
@ -358,7 +357,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
} }
} else { } else {
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockz", pTask->id.idStr);
int64_t el = (taosGetTimestampMs() - pSM->startTs); int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,