fix(stream): do some internal refactor.
This commit is contained in:
parent
bd12ae88e1
commit
385e699cc2
|
@ -20,6 +20,8 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
#define GET_EVT_NAME(_ev) (StreamTaskEventList[(_ev)].name)
|
||||||
|
|
||||||
SStreamTaskState StreamTaskStatusList[9] = {
|
SStreamTaskState StreamTaskStatusList[9] = {
|
||||||
{.state = TASK_STATUS__READY, .name = "ready"},
|
{.state = TASK_STATUS__READY, .name = "ready"},
|
||||||
{.state = TASK_STATUS__DROPPING, .name = "dropped"},
|
{.state = TASK_STATUS__DROPPING, .name = "dropped"},
|
||||||
|
@ -66,7 +68,7 @@ static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
|
|
||||||
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
||||||
StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name);
|
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
|
||||||
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
|
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -209,10 +211,10 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
|
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
|
||||||
stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name);
|
stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already
|
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already
|
||||||
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name);
|
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event));
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
|
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
|
||||||
|
@ -249,11 +251,11 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
||||||
pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name);
|
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(pSM->pActiveTrans->event));
|
||||||
} else {
|
} else {
|
||||||
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name);
|
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event.
|
return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event.
|
||||||
}
|
}
|
||||||
|
@ -261,8 +263,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
if (pSM->pActiveTrans != NULL) {
|
if (pSM->pActiveTrans != NULL) {
|
||||||
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
||||||
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
||||||
pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name,
|
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
||||||
pSM->pActiveTrans->next.name, StreamTaskEventList[event].name);
|
pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
|
||||||
}
|
}
|
||||||
|
|
||||||
doHandleEvent(pSM, event, pTrans);
|
doHandleEvent(pSM, event, pTrans);
|
||||||
|
@ -294,7 +296,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP);
|
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP);
|
||||||
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
||||||
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
|
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
|
||||||
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name);
|
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
stDebug("s-task:%s unlockx", pTask->id.idStr);
|
stDebug("s-task:%s unlockx", pTask->id.idStr);
|
||||||
|
@ -303,7 +305,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
|
|
||||||
if (pTrans->event != event) {
|
if (pTrans->event != event) {
|
||||||
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
||||||
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name);
|
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
stDebug("s-task:%s unlocky", pTask->id.idStr);
|
stDebug("s-task:%s unlocky", pTask->id.idStr);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
@ -320,14 +322,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
||||||
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
|
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
||||||
|
|
||||||
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
||||||
|
|
||||||
// OK, let's handle the attached event, since the task has reached the required status now
|
// OK, let's handle the attached event, since the task has reached the required status now
|
||||||
if (pSM->current.state == pEvtInfo->status) {
|
if (pSM->current.state == pEvtInfo->status) {
|
||||||
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
||||||
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
|
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
|
||||||
|
|
||||||
// remove it
|
// remove it
|
||||||
taosArrayPop(pSM->pWaitingEventList);
|
taosArrayPop(pSM->pWaitingEventList);
|
||||||
|
@ -349,7 +351,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
} else {
|
} else {
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
|
stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
|
||||||
pSM->current.name, StreamTaskEventList[pEvtInfo->event].name,
|
pSM->current.name, GET_EVT_NAME(pEvtInfo->event),
|
||||||
StreamTaskStatusList[pEvtInfo->status].name);
|
StreamTaskStatusList[pEvtInfo->status].name);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -358,7 +360,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
|
|
||||||
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
|
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue