refactor: check return value for each function.

This commit is contained in:
Haojun Liao 2024-07-17 18:55:04 +08:00
parent 2b00d6549d
commit b35d107bf9
5 changed files with 167 additions and 127 deletions

View File

@ -161,8 +161,7 @@ extern int32_t streamMetaId;
int32_t streamTimerInit();
void streamTimerCleanUp();
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
@ -202,24 +201,24 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIt
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes);
int32_t streamTransferStatePrepare(SStreamTask* pTask);
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue);
void streamQueueProcessFail(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue);
void streamQueueProcessFail(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key);
void streamMetaHbToMnode(void* param, void* tmrId);
int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes);
void destroyMetaHbInfo(SMetaHbInfo* pInfo);
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta);
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount);
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);
void streamMetaRemoveDB(void* arg, char* key);
void streamMetaHbToMnode(void* param, void* tmrId);
int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes);
void destroyMetaHbInfo(SMetaHbInfo* pInfo);
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta);
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount);
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();

View File

@ -185,8 +185,8 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
}
// todo handle memory error
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
terrno = 0;
int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes) {
*pRes = NULL;
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
@ -196,7 +196,8 @@ SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueI
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
taosFreeQitem(pElem);
return dst;
*pRes = dst;
return TSDB_CODE_SUCCESS;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
@ -204,12 +205,13 @@ SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueI
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
taosFreeQitem(pElem);
return dst;
*pRes = dst;
*pRes = dst;
return TSDB_CODE_SUCCESS;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
if (pMerged == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
streamQueueItemIncSize((SStreamQueueItem*)pMerged, streamQueueItemGetSize(pElem));
@ -219,11 +221,13 @@ SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueI
taosFreeQitem(dst);
taosFreeQitem(pElem);
return (SStreamQueueItem*)pMerged;
*pRes = (SStreamQueueItem*)pMerged;
return TSDB_CODE_SUCCESS;
} else {
stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type),
dst->type);
return NULL;
return TSDB_CODE_FAILED;
}
}

View File

@ -41,34 +41,34 @@ static void streamQueueCleanup(SStreamQueue* pQueue) {
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
SStreamQueue* streamQueueOpen(int64_t cap) {
int32_t code;
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
*pQ = NULL;
int32_t code = 0;
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
if (pQueue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
code = taosOpenQueue(&pQueue->pQueue);
if (code) {
taosMemoryFreeClear(pQueue);
terrno = code;
return NULL;
return code;
}
code = taosAllocateQall(&pQueue->qall);
if (code) {
taosCloseQueue(pQueue->pQueue);
taosMemoryFree(pQueue);
terrno = code;
return NULL;
return code;
}
pQueue->status = STREAM_QUEUE__SUCESS;
taosSetQueueCapacity(pQueue->pQueue, cap);
taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
return pQueue;
*pQ = pQueue;
return code;
}
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
@ -227,12 +227,11 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
*pInput = qItem;
} else {
// merge current block failed, let's handle the already merged blocks.
void* newRet = streamQueueMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
if (terrno != 0) {
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
tstrerror(terrno));
}
void* newRet = NULL;
int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
tstrerror(terrno));
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {

View File

@ -366,9 +366,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->inputq.queue = streamQueueOpen(512 << 10);
pTask->outputq.queue = streamQueueOpen(512 << 10);
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
if (code1 || code2) {
stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -379,8 +379,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
int32_t code = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
tstrerror(terrno));
return terrno;
tstrerror(code));
return code;
}
pTask->execInfo.created = taosGetTimestampMs();

View File

@ -20,7 +20,17 @@
#include "ttimer.h"
#include "wal.h"
static int32_t initRes = 0;
#define GET_EVT_NAME(_ev) (StreamTaskEventList[(_ev)].name)
#define CHECK_RET_VAL(_exec) \
do { \
void* p = (_exec); \
if (p == NULL) { \
initRes = TSDB_CODE_OUT_OF_MEMORY; \
return; \
} \
} while (0);
SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__READY, .name = "ready"},
@ -73,10 +83,17 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
SArray* pList = pTask->status.pSM->pWaitingEventList;
taosArrayPush(pList, pEvtInfo);
stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pList));
return 0;
void* px = taosArrayPush(pList, pEvtInfo);
if (px == NULL) {
stError("s-task:%s failed to add into waiting list, total waiting events:%d, code: out of memory", pTask->id.idStr,
(int32_t)taosArrayGetSize(pList));
return TSDB_CODE_OUT_OF_MEMORY;
} else {
stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr,
(int32_t)taosArrayGetSize(pList));
return TSDB_CODE_SUCCESS;
}
}
static int32_t stopTaskSuccFn(SStreamTask* pTask) {
@ -177,14 +194,14 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
// remove it
taosArrayPop(pSM->pWaitingEventList);
(void) taosArrayPop(pSM->pWaitingEventList);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
code = pNextTrans->pAction(pSM->pTask);
if (pNextTrans->autoInvokeEndFn) {
@ -193,7 +210,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
return code;
}
} else {
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
pSM->current.name, GET_EVT_NAME(pEvtInfo->event),
StreamTaskStatusList[pEvtInfo->status].name);
@ -228,9 +245,9 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve
int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
int32_t code = 0;
int32_t code = 0;
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) {
SStreamTaskState state = pSM->current;
@ -244,21 +261,25 @@ int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr,
pSM->prev.state.name, pSM->current.name);
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
code = doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
} else {
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
}
} else {
removeEventInWaitingList(pTask, TASK_EVENT_PAUSE);
code = -1; // failed to restore the status
(void)removeEventInWaitingList(pTask, TASK_EVENT_PAUSE); // ignore the return value,
code = TSDB_CODE_FAILED; // failed to restore the status, since it is not in pause status
}
taosThreadMutexUnlock(&pTask->lock);
(void)taosThreadMutexUnlock(&pTask->lock);
return code;
}
int32_t streamCreateStateMachine(SStreamTask* pTask) {
initStateTransferTable();
int32_t code = initStateTransferTable();
if (code != TSDB_CODE_SUCCESS) {
return code;
}
const char* id = pTask->id.idStr;
SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
@ -297,16 +318,21 @@ void streamDestroyStateMachine(SStreamTaskSM* pSM) {
static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) {
SStreamTask* pTask = pSM->pTask;
const char* id = pTask->id.idStr;
int32_t code = 0;
if (pTrans->attachEvent.event != 0) {
attachWaitedEvent(pTask, &pTrans->attachEvent);
taosThreadMutexUnlock(&pTask->lock);
code = attachWaitedEvent(pTask, &pTrans->attachEvent);
if (code) {
return code;
}
(void) taosThreadMutexUnlock(&pTask->lock);
while (1) {
// wait for the task to be here
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
ETaskStatus s = streamTaskGetStatus(pTask).state;
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already
stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
@ -323,42 +349,49 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
} else { // override current active trans
pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
int32_t code = pTrans->pAction(pTask);
// todo handle error code;
code = pTrans->pAction(pTask);
if (pTrans->autoInvokeEndFn) {
streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL);
int32_t c = streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL);
if (code == TSDB_CODE_SUCCESS) {
code = c;
}
}
}
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) {
SStreamTask* pTask = pSM->pTask;
int32_t code = 0;
if (pTrans->attachEvent.event != 0) {
SFutureHandleEventInfo info = pTrans->attachEvent;
info.pParam = param;
info.callBackFn = callbackFn;
attachWaitedEvent(pTask, &info);
taosThreadMutexUnlock(&pTask->lock);
code = attachWaitedEvent(pTask, &info);
(void) taosThreadMutexUnlock(&pTask->lock);
} else { // override current active trans
pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
int32_t code = pTrans->pAction(pTask);
code = pTrans->pAction(pTask);
// todo handle error code;
if (pTrans->autoInvokeEndFn) {
streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param);
int32_t c = streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param);
if (code == TSDB_CODE_SUCCESS) {
code = c;
}
}
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
@ -367,11 +400,11 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
STaskStateTrans* pTrans = NULL;
while (1) {
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
EStreamTaskEvent evt = pSM->pActiveTrans->event;
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
@ -381,7 +414,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
@ -406,11 +439,11 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _
STaskStateTrans* pTrans = NULL;
while (1) {
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
EStreamTaskEvent evt = pSM->pActiveTrans->event;
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
@ -420,7 +453,7 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name);
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
@ -449,9 +482,10 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
SStreamTask* pTask = pSM->pTask;
const char* id = pTask->id.idStr;
int32_t code = 0;
// do update the task status
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
STaskStateTrans* pTrans = pSM->pActiveTrans;
if (pTrans == NULL) {
@ -463,14 +497,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event),
pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
if (pTrans->event != event) {
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", id, GET_EVT_NAME(event),
pSM->current.name, GET_EVT_NAME(pTrans->event));
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
@ -480,34 +514,38 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
pSM->pActiveTrans = NULL;
// todo remove it
// todo: handle the error code
// on success callback, add into lock if necessary, or maybe we should add an option for this?
pTrans->pSuccAction(pTask);
code = pTrans->pSuccAction(pTask);
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
// todo: add parameter to control lock
// after handling the callback function assigned by invoker, go on handling the waiting tasks
if (callbackFn != NULL) {
stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event));
callbackFn(pSM->pTask, param);
int32_t ret = callbackFn(pSM->pTask, param);
if (ret != TSDB_CODE_SUCCESS) {
// todo handle error
}
stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event));
}
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
// tasks in waiting list
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
code = doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
} else {
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", id,
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
}
return TSDB_CODE_SUCCESS;
return code;
}
SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask) {
@ -525,14 +563,14 @@ const char* streamTaskGetStatusStr(ETaskStatus status) {
void streamTaskResetStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s level:%d fill-history:%d vgId:%d set uninit, prev status:%s", pTask->id.idStr,
pTask->info.taskLevel, pTask->info.fillHistory, pTask->pMeta->vgId, pSM->current.name);
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->pWaitingEventList);
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
// clear the downstream ready status
pTask->status.downstreamReady = 0;
@ -575,8 +613,7 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr
}
int32_t initStateTransferTable() {
taosThreadOnce(&streamTaskStateMachineInit, doInitStateTransferTable);
return TSDB_CODE_SUCCESS;
return taosThreadOnce(&streamTaskStateMachineInit, doInitStateTransferTable);
}
//clang-format off
@ -585,92 +622,93 @@ void doInitStateTransferTable(void) {
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanHistoryTaskReady, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
// scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
// halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
// checkpoint related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
// pause & resume related event handle
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
// resume is completed by restore status of state-machine
// stop related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
// dropping related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
}
//clang-format on