fix(stream): return error code.

This commit is contained in:
Haojun Liao 2023-11-02 10:22:03 +08:00
parent 95efa07e92
commit c48c801e19
1 changed files with 3 additions and 5 deletions

View File

@ -218,7 +218,6 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
if (pTrans->attachEvent.event != 0) { if (pTrans->attachEvent.event != 0) {
attachEvent(pTask, &pTrans->attachEvent); attachEvent(pTask, &pTrans->attachEvent);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlock1", pTask->id.idStr);
while (1) { while (1) {
// wait for the task to be here // wait for the task to be here
@ -242,7 +241,6 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
pSM->pActiveTrans = pTrans; pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlock2", pTask->id.idStr);
int32_t code = pTrans->pAction(pTask); int32_t code = pTrans->pAction(pTask);
// todo handle error code; // todo handle error code;
@ -256,12 +254,12 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
} }
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = pSM->pTask; SStreamTask* pTask = pSM->pTask;
STaskStateTrans* pTrans = NULL; STaskStateTrans* pTrans = NULL;
while (1) { while (1) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
EStreamTaskEvent evt = pSM->pActiveTrans->event; EStreamTaskEvent evt = pSM->pActiveTrans->event;
@ -285,12 +283,12 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
pSM->pActiveTrans->next.name, GET_EVT_NAME(event)); pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
} }
doHandleEvent(pSM, event, pTrans); code = doHandleEvent(pSM, event, pTrans);
break; break;
} }
} }
return TSDB_CODE_SUCCESS; return code;
} }
static void keepPrevInfo(SStreamTaskSM* pSM) { static void keepPrevInfo(SStreamTaskSM* pSM) {