diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index ae74ed204e..fd248861e3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -161,8 +161,7 @@ extern int32_t streamMetaId; int32_t streamTimerInit(); void streamTimerCleanUp(); - -void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); +void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); @@ -202,24 +201,24 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIt int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); -SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); +int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes); int32_t streamTransferStatePrepare(SStreamTask* pTask); -SStreamQueue* streamQueueOpen(int64_t cap); -void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); -void streamQueueProcessSuccess(SStreamQueue* queue); -void streamQueueProcessFail(SStreamQueue* queue); -void* streamQueueNextItem(SStreamQueue* pQueue); -void streamFreeQitem(SStreamQueueItem* data); -int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); +int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ); +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); +void streamQueueProcessSuccess(SStreamQueue* queue); +void streamQueueProcessFail(SStreamQueue* queue); +void* streamQueueNextItem(SStreamQueue* pQueue); +void streamFreeQitem(SStreamQueueItem* data); +int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); -void streamMetaRemoveDB(void* arg, char* key); -void streamMetaHbToMnode(void* param, void* tmrId); -int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes); -void destroyMetaHbInfo(SMetaHbInfo* pInfo); -void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta); -void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount); -int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); +void streamMetaRemoveDB(void* arg, char* key); +void streamMetaHbToMnode(void* param, void* tmrId); +int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes); +void destroyMetaHbInfo(SMetaHbInfo* pInfo); +void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta); +void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount); +int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 4235b6c1bb..af4946cf81 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -185,8 +185,8 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm } // todo handle memory error -SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { - terrno = 0; +int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes) { + *pRes = NULL; if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; @@ -196,7 +196,8 @@ SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueI streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); taosFreeQitem(pElem); - return dst; + *pRes = dst; + return TSDB_CODE_SUCCESS; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; @@ -204,12 +205,13 @@ SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueI streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); taosFreeQitem(pElem); - return dst; + *pRes = dst; + *pRes = dst; + return TSDB_CODE_SUCCESS; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); if (pMerged == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } streamQueueItemIncSize((SStreamQueueItem*)pMerged, streamQueueItemGetSize(pElem)); @@ -219,11 +221,13 @@ SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueI taosFreeQitem(dst); taosFreeQitem(pElem); - return (SStreamQueueItem*)pMerged; + + *pRes = (SStreamQueueItem*)pMerged; + return TSDB_CODE_SUCCESS; } else { stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type); - return NULL; + return TSDB_CODE_FAILED; } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 7bc50417bd..b8cdcd4cf5 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -41,34 +41,34 @@ static void streamQueueCleanup(SStreamQueue* pQueue) { static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } -SStreamQueue* streamQueueOpen(int64_t cap) { - int32_t code; +int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) { + *pQ = NULL; + int32_t code = 0; SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); if (pQueue == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } code = taosOpenQueue(&pQueue->pQueue); if (code) { taosMemoryFreeClear(pQueue); - terrno = code; - return NULL; + return code; } code = taosAllocateQall(&pQueue->qall); if (code) { taosCloseQueue(pQueue->pQueue); taosMemoryFree(pQueue); - terrno = code; - return NULL; + return code; } pQueue->status = STREAM_QUEUE__SUCESS; taosSetQueueCapacity(pQueue->pQueue, cap); taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024); - return pQueue; + + *pQ = pQueue; + return code; } void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { @@ -227,12 +227,11 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte *pInput = qItem; } else { // merge current block failed, let's handle the already merged blocks. - void* newRet = streamQueueMergeQueueItem(*pInput, qItem); - if (newRet == NULL) { - if (terrno != 0) { - stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, - tstrerror(terrno)); - } + void* newRet = NULL; + int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, + tstrerror(terrno)); *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 11eea98e98..39e12a9da7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -366,9 +366,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; - pTask->inputq.queue = streamQueueOpen(512 << 10); - pTask->outputq.queue = streamQueueOpen(512 << 10); - if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) { + int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue); + int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue); + if (code1 || code2) { stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); return TSDB_CODE_OUT_OF_MEMORY; } @@ -379,8 +379,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i int32_t code = streamCreateStateMachine(pTask); if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr, - tstrerror(terrno)); - return terrno; + tstrerror(code)); + return code; } pTask->execInfo.created = taosGetTimestampMs(); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 1af47cf43f..7e47857a39 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -20,7 +20,17 @@ #include "ttimer.h" #include "wal.h" +static int32_t initRes = 0; + #define GET_EVT_NAME(_ev) (StreamTaskEventList[(_ev)].name) +#define CHECK_RET_VAL(_exec) \ + do { \ + void* p = (_exec); \ + if (p == NULL) { \ + initRes = TSDB_CODE_OUT_OF_MEMORY; \ + return; \ + } \ + } while (0); SStreamTaskState StreamTaskStatusList[9] = { {.state = TASK_STATUS__READY, .name = "ready"}, @@ -73,10 +83,17 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name); SArray* pList = pTask->status.pSM->pWaitingEventList; - taosArrayPush(pList, pEvtInfo); - stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pList)); - return 0; + void* px = taosArrayPush(pList, pEvtInfo); + if (px == NULL) { + stError("s-task:%s failed to add into waiting list, total waiting events:%d, code: out of memory", pTask->id.idStr, + (int32_t)taosArrayGetSize(pList)); + return TSDB_CODE_OUT_OF_MEMORY; + } else { + stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr, + (int32_t)taosArrayGetSize(pList)); + return TSDB_CODE_SUCCESS; + } } static int32_t stopTaskSuccFn(SStreamTask* pTask) { @@ -177,14 +194,14 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, GET_EVT_NAME(pEvtInfo->event), pSM->current.name); // remove it - taosArrayPop(pSM->pWaitingEventList); + (void) taosArrayPop(pSM->pWaitingEventList); STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event); ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL); pSM->pActiveTrans = pNextTrans; pSM->startTs = taosGetTimestampMs(); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { @@ -193,7 +210,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, return code; } } else { - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr, pSM->current.name, GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name); @@ -228,9 +245,9 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve int32_t streamTaskRestoreStatus(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; - int32_t code = 0; + int32_t code = 0; - taosThreadMutexLock(&pTask->lock); + (void) taosThreadMutexLock(&pTask->lock); if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) { SStreamTaskState state = pSM->current; @@ -244,21 +261,25 @@ int32_t streamTaskRestoreStatus(SStreamTask* pTask) { if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); - doHandleWaitingEvent(pSM, "restore-pause/halt", pTask); + code = doHandleWaitingEvent(pSM, "restore-pause/halt", pTask); } else { stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); } } else { - removeEventInWaitingList(pTask, TASK_EVENT_PAUSE); - code = -1; // failed to restore the status + (void)removeEventInWaitingList(pTask, TASK_EVENT_PAUSE); // ignore the return value, + code = TSDB_CODE_FAILED; // failed to restore the status, since it is not in pause status } - taosThreadMutexUnlock(&pTask->lock); + (void)taosThreadMutexUnlock(&pTask->lock); return code; } int32_t streamCreateStateMachine(SStreamTask* pTask) { - initStateTransferTable(); + int32_t code = initStateTransferTable(); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + const char* id = pTask->id.idStr; SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM)); @@ -297,16 +318,21 @@ void streamDestroyStateMachine(SStreamTaskSM* pSM) { static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) { SStreamTask* pTask = pSM->pTask; const char* id = pTask->id.idStr; + int32_t code = 0; if (pTrans->attachEvent.event != 0) { - attachWaitedEvent(pTask, &pTrans->attachEvent); - taosThreadMutexUnlock(&pTask->lock); + code = attachWaitedEvent(pTask, &pTrans->attachEvent); + if (code) { + return code; + } + + (void) taosThreadMutexUnlock(&pTask->lock); while (1) { // wait for the task to be here - taosThreadMutexLock(&pTask->lock); + (void) taosThreadMutexLock(&pTask->lock); ETaskStatus s = streamTaskGetStatus(pTask).state; - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event)); @@ -323,42 +349,49 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt } else { // override current active trans pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); - int32_t code = pTrans->pAction(pTask); - // todo handle error code; + code = pTrans->pAction(pTask); if (pTrans->autoInvokeEndFn) { - streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL); + int32_t c = streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL); + if (code == TSDB_CODE_SUCCESS) { + code = c; + } } } - return TSDB_CODE_SUCCESS; + return code; } static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) { SStreamTask* pTask = pSM->pTask; + int32_t code = 0; + if (pTrans->attachEvent.event != 0) { SFutureHandleEventInfo info = pTrans->attachEvent; info.pParam = param; info.callBackFn = callbackFn; - attachWaitedEvent(pTask, &info); - taosThreadMutexUnlock(&pTask->lock); + code = attachWaitedEvent(pTask, &info); + (void) taosThreadMutexUnlock(&pTask->lock); } else { // override current active trans pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); - int32_t code = pTrans->pAction(pTask); + code = pTrans->pAction(pTask); // todo handle error code; if (pTrans->autoInvokeEndFn) { - streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param); + int32_t c = streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param); + if (code == TSDB_CODE_SUCCESS) { + code = c; + } } } - return TSDB_CODE_SUCCESS; + return code; } int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { @@ -367,11 +400,11 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { STaskStateTrans* pTrans = NULL; while (1) { - taosThreadMutexLock(&pTask->lock); + (void) taosThreadMutexLock(&pTask->lock); if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { EStreamTaskEvent evt = pSM->pActiveTrans->event; - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); @@ -381,7 +414,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } @@ -406,11 +439,11 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ STaskStateTrans* pTrans = NULL; while (1) { - taosThreadMutexLock(&pTask->lock); + (void) taosThreadMutexLock(&pTask->lock); if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { EStreamTaskEvent evt = pSM->pActiveTrans->event; - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); @@ -420,7 +453,7 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } @@ -449,9 +482,10 @@ static void keepPrevInfo(SStreamTaskSM* pSM) { int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) { SStreamTask* pTask = pSM->pTask; const char* id = pTask->id.idStr; + int32_t code = 0; // do update the task status - taosThreadMutexLock(&pTask->lock); + (void) taosThreadMutexLock(&pTask->lock); STaskStateTrans* pTrans = pSM->pActiveTrans; if (pTrans == NULL) { @@ -463,14 +497,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } if (pTrans->event != event) { stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", id, GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event)); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INVALID_STATETRANS; } @@ -480,34 +514,38 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even pSM->pActiveTrans = NULL; // todo remove it + // todo: handle the error code // on success callback, add into lock if necessary, or maybe we should add an option for this? - pTrans->pSuccAction(pTask); + code = pTrans->pSuccAction(pTask); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); // todo: add parameter to control lock // after handling the callback function assigned by invoker, go on handling the waiting tasks if (callbackFn != NULL) { stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event)); - callbackFn(pSM->pTask, param); + int32_t ret = callbackFn(pSM->pTask, param); + if (ret != TSDB_CODE_SUCCESS) { + // todo handle error + } stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event)); } - taosThreadMutexLock(&pTask->lock); + (void) taosThreadMutexLock(&pTask->lock); // tasks in waiting list if (taosArrayGetSize(pSM->pWaitingEventList) > 0) { - doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask); + code = doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask); } else { - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); int64_t el = (taosGetTimestampMs() - pSM->startTs); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", id, GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name); } - return TSDB_CODE_SUCCESS; + return code; } SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask) { @@ -525,14 +563,14 @@ const char* streamTaskGetStatusStr(ETaskStatus status) { void streamTaskResetStatus(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; - taosThreadMutexLock(&pTask->lock); + (void) taosThreadMutexLock(&pTask->lock); stDebug("s-task:%s level:%d fill-history:%d vgId:%d set uninit, prev status:%s", pTask->id.idStr, pTask->info.taskLevel, pTask->info.fillHistory, pTask->pMeta->vgId, pSM->current.name); pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; pSM->pActiveTrans = NULL; taosArrayClear(pSM->pWaitingEventList); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); // clear the downstream ready status pTask->status.downstreamReady = 0; @@ -575,8 +613,7 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr } int32_t initStateTransferTable() { - taosThreadOnce(&streamTaskStateMachineInit, doInitStateTransferTable); - return TSDB_CODE_SUCCESS; + return taosThreadOnce(&streamTaskStateMachineInit, doInitStateTransferTable); } //clang-format off @@ -585,92 +622,93 @@ void doInitStateTransferTable(void) { // initialization event handle STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanHistoryTaskReady, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); // scan-history related event trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); // halt stream task, from other task status trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); // checkpoint related event trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); // pause & resume related event handle trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); // resume is completed by restore status of state-machine // stop related event trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); // dropping related event trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL); - taosArrayPush(streamTaskSMTrans, &trans); + CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); } //clang-format on