refactor: check return value for each function.

This commit is contained in:
Haojun Liao 2024-07-17 16:56:36 +08:00
parent 30186f466b
commit 2aa4d6028b
4 changed files with 22 additions and 15 deletions

View File

@ -639,7 +639,7 @@ bool streamTaskShouldPause(const SStreamTask* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskIsIdle(const SStreamTask* pTask);
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus); bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId);
SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask); SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask);
const char* streamTaskGetStatusStr(ETaskStatus status); const char* streamTaskGetStatusStr(ETaskStatus status);
void streamTaskResetStatus(SStreamTask* pTask); void streamTaskResetStatus(SStreamTask* pTask);
@ -758,7 +758,7 @@ void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId); void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId);
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pTaskList);
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
void streamMetaLoadAllTasks(SStreamMeta* pMeta); void streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);

View File

@ -65,11 +65,6 @@ struct SActiveCheckpointInfo {
tmr_h pSendReadyMsgTmr; tmr_h pSendReadyMsgTmr;
}; };
struct SConsensusCheckpoint {
int8_t inProcess;
};
typedef struct { typedef struct {
int8_t type; int8_t type;
SSDataBlock* pBlock; SSDataBlock* pBlock;

View File

@ -1104,13 +1104,14 @@ int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, voi
return taosScheduleTask(pMeta->qHandle, &schedMsg); return taosScheduleTask(pMeta->qHandle, &schedMsg);
} }
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
*pList = NULL;
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL); SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
bool sendMsg = pMeta->sendMsgBeforeClosing; bool sendMsg = pMeta->sendMsgBeforeClosing;
if (!sendMsg) { if (!sendMsg) {
stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId); stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
return pTaskList; return TSDB_CODE_SUCCESS;
} }
stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId); stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
@ -1141,7 +1142,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
streamMetaSendHbHelper(pMeta); streamMetaSendHbHelper(pMeta);
pMeta->sendMsgBeforeClosing = false; pMeta->sendMsgBeforeClosing = false;
return pTaskList; return TSDB_CODE_SUCCESS;
} }
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) { void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
@ -1311,14 +1312,19 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
// send hb msg to mnode before closing all tasks. // send hb msg to mnode before closing all tasks.
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); SArray* pTaskList = NULL;
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t numOfTasks = taosArrayGetSize(pTaskList); int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
} }

View File

@ -359,7 +359,7 @@ static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
} }
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); (void) createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
pTask->refCnt = 1; pTask->refCnt = 1;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
@ -997,10 +997,16 @@ SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
return NULL; return NULL;
} }
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
char buf[128] = {0}; char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId); sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
return taosStrdup(buf); *pId = taosStrdup(buf);
if (*pId == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
} }
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {