From 3d50df7191157fde34651dd9dbf570889dec619c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 12 Sep 2024 16:05:07 +0800 Subject: [PATCH] refactor: remove void. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 + source/libs/stream/src/streamCheckStatus.c | 2 +- source/libs/stream/src/streamHb.c | 2 +- source/libs/stream/src/streamMeta.c | 202 +++++++++++++++----- source/libs/stream/src/streamStartHistory.c | 5 +- source/libs/stream/src/streamStartTask.c | 38 ++-- source/libs/stream/src/streamTask.c | 5 +- 7 files changed, 192 insertions(+), 66 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c00d9a93bb..bde7e23318 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1235,6 +1235,10 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { pMeta->vgId, req.taskId); // ignore this code to avoid error code over write int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); + if (ret) { + tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret)); + } + return code; } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 41124d8543..48ca2b32e4 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -316,7 +316,7 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) { pInfo->pList = NULL; if (pInfo->checkRspTmr != NULL) { - (void)taosTmrStop(pInfo->checkRspTmr); + streamTmrStop(pInfo->checkRspTmr); pInfo->checkRspTmr = NULL; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 4e35cb718c..703e6a3256 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -331,7 +331,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) { tCleanupStreamHbMsg(&pInfo->hbMsg); if (pInfo->hbTmr != NULL) { - (void) taosTmrStop(pInfo->hbTmr); + streamTmrStop(pInfo->hbTmr); pInfo->hbTmr = NULL; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bc902ccf29..abeb03bb1c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -62,7 +62,12 @@ static void streamMetaEnvInit() { } } -void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } +void streamMetaInit() { + int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); + if (code) { + stError("failed to init stream Meta model, code:%s", tstrerror(code)); + } +} void streamMetaCleanup() { taosCloseRef(streamBackendId); @@ -114,13 +119,17 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId)); if (p == NULL) { - SArray* list = taosArrayInit(8, sizeof(void*)); - p = taosArrayPush(list, &rid); + SArray* pList = taosArrayInit(8, POINTER_BYTES); + if (pList == NULL) { + return terrno; + } + + p = taosArrayPush(pList, &rid); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); + code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &pList, sizeof(void*)); if (code) { stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid); return code; @@ -180,8 +189,13 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { code = tdbTbcMoveToFirst(pCur); if (code) { - (void)tdbTbcClose(pCur); - stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId); + stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId, + tstrerror(code)); + code = tdbTbcClose(pCur); + if (code) { + stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(code)); + } + return ret; } @@ -209,7 +223,10 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); - (void)tdbTbcClose(pCur); + code = tdbTbcClose(pCur); + if (code != 0) { + stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(code)); + } return ret; } @@ -351,8 +368,8 @@ void streamMetaRemoveDB(void* arg, char* key) { int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId, int64_t stage, startComplete_fn_t fn, SStreamMeta** p) { - *p = NULL; int32_t code = 0; + QRY_PARAM_CHECK(p); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { @@ -484,9 +501,26 @@ _err: taosMemoryFree(pMeta->path); if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); - if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb); - if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb); - if (pMeta->db) (void)tdbClose(pMeta->db); + if (pMeta->pTaskDb) { + int32_t ret = tdbTbClose(pMeta->pTaskDb); + if (ret) { + stError("vgId:%d tdb failed close task db, code:%s", pMeta->vgId, tstrerror(ret)); + } + pMeta->pTaskDb = NULL; + } + if (pMeta->pCheckpointDb) { + int32_t ret = tdbTbClose(pMeta->pCheckpointDb); + if (ret) { + stError("vgId:%d tdb failed close task checkpointDb, code:%s", pMeta->vgId, tstrerror(ret)); + } + } + if (pMeta->db) { + int32_t ret = tdbClose(pMeta->db); + if (ret) { + stError("vgId:%d tdb failed close meta db, code:%s", pMeta->vgId, tstrerror(ret)); + } + } + if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); @@ -532,7 +566,7 @@ void streamMetaClear(SStreamMeta* pMeta) { // release the ref by timer if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); - (void)taosTmrStop(p->schedInfo.pDelayTimer); + streamTmrStop(p->schedInfo.pDelayTimer); p->info.delaySchedParam = 0; streamMetaReleaseTask(pMeta, p); } @@ -567,7 +601,10 @@ void streamMetaClose(SStreamMeta* pMeta) { if (pMeta == NULL) { return; } - (void)taosRemoveRef(streamMetaId, pMeta->rid); + int32_t code = taosRemoveRef(streamMetaId, pMeta->rid); + if (code) { + stError("vgId:%d failed to remove ref:%d, code:%s", pMeta->vgId, pMeta->rid, tstrerror(code)); + } } void streamMetaCloseImpl(void* arg) { @@ -576,6 +613,7 @@ void streamMetaCloseImpl(void* arg) { return; } + int32_t code = 0; int32_t vgId = pMeta->vgId; stDebug("vgId:%d start to do-close stream meta", vgId); @@ -584,10 +622,22 @@ void streamMetaCloseImpl(void* arg) { streamMetaWUnLock(pMeta); // already log the error, ignore here - (void)tdbAbort(pMeta->db, pMeta->txn); - (void)tdbTbClose(pMeta->pTaskDb); - (void)tdbTbClose(pMeta->pCheckpointDb); - (void)tdbClose(pMeta->db); + code = tdbAbort(pMeta->db, pMeta->txn); + if (code) { + stError("vgId:%d failed to jump of trans for tdb, code:%s", vgId, tstrerror(code)); + } + code = tdbTbClose(pMeta->pTaskDb); + if (code) { + stError("vgId:%d failed to close taskDb, code:%s", vgId, tstrerror(code)); + } + code = tdbTbClose(pMeta->pCheckpointDb); + if (code) { + stError("vgId:%d failed to close checkpointDb, code:%s", vgId, tstrerror(code)); + } + code = tdbClose(pMeta->db); + if (code) { + stError("vgId:%d failed to close db, code:%s", vgId, tstrerror(code)); + } taosArrayDestroy(pMeta->pTaskList); taosArrayDestroy(pMeta->chkpSaved); @@ -611,7 +661,10 @@ void streamMetaCloseImpl(void* arg) { bkdMgtDestroy(pMeta->bkdChkptMgt); pMeta->role = NODE_ROLE_UNINIT; - (void)taosThreadRwlockDestroy(&pMeta->lock); + code = taosThreadRwlockDestroy(&pMeta->lock); + if (code) { + stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code)); + } taosMemoryFree(pMeta); stDebug("vgId:%d end to close stream meta", vgId); @@ -711,7 +764,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if (pTask->info.fillHistory == 0) { - (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } *pAdded = true; @@ -786,20 +839,26 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id } static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { + int32_t code = 0; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - (void)streamTaskSendCheckpointSourceRsp(pTask); + code = streamTaskSendCheckpointSourceRsp(pTask); + if (code) { + stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId, + tstrerror(code)); + } } - return 0; + return code; } int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; int32_t vgId = pMeta->vgId; + int32_t code = 0; + STaskId id = {.streamId = streamId, .taskId = taskId}; // pre-delete operation streamMetaWLock(pMeta); - STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; @@ -811,12 +870,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } // handle the dropping event - (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); + code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); + if (code) { + stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code)); + } } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId); streamMetaWUnLock(pMeta); return 0; } + streamMetaWUnLock(pMeta); stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId); @@ -850,12 +913,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t pTask = *ppTask; // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { - (void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); + int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } - int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - (void)streamMetaRemoveTask(pMeta, &id); + code = streamMetaRemoveTask(pMeta, &id); + if (code) { + stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code)); + } int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap); int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList); @@ -871,7 +937,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); - (void)taosTmrStop(pTask->schedInfo.pDelayTimer); + streamTmrStop(pTask->schedInfo.pDelayTimer); pTask->info.delaySchedParam = 0; streamMetaReleaseTask(pMeta, pTask); } @@ -936,8 +1002,11 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { code = tdbTbcMoveToFirst(pCur); if (code) { - (void)tdbTbcClose(pCur); - stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId); + stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId); + int32_t ret = tdbTbcClose(pCur); + if (ret != 0) { + stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret)); + } return checkpointId; } @@ -960,7 +1029,11 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); - (void)tdbTbcClose(pCur); + int32_t ret = tdbTbcClose(pCur); + if (ret != 0) { + stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret)); + } + return checkpointId; } @@ -981,6 +1054,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { } pRecycleList = taosArrayInit(4, sizeof(STaskId)); + if (pRecycleList == NULL) { + stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId); + return; + } vgId = pMeta->vgId; stInfo("vgId:%d load stream tasks from meta files", vgId); @@ -996,7 +1073,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (code) { stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); taosArrayDestroy(pRecycleList); - (void)tdbTbcClose(pCur); + int32_t ret = tdbTbcClose(pCur); + if (ret != 0) { + stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret)); + } return; } @@ -1072,11 +1152,11 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { } if (pTask->info.fillHistory == 0) { - (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } if (streamTaskShouldPause(pTask)) { - (void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); + int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); } } @@ -1090,7 +1170,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosArrayGetSize(pRecycleList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { STaskId* pId = taosArrayGet(pRecycleList, i); - (void)streamMetaRemoveTask(pMeta, pId); + code = streamMetaRemoveTask(pMeta, pId); + if (code) { + stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code)); + } } } @@ -1099,8 +1182,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); - - (void)streamMetaCommit(pMeta); + code = streamMetaCommit(pMeta); + if (code) { + stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code)); + } } bool streamMetaTaskInTimer(SStreamMeta* pMeta) { @@ -1117,7 +1202,10 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->status.timerActive >= 1) { stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId); - (void)streamTaskStop(pTask); + int32_t code = streamTaskStop(pTask); + if (code) { + stError("s-task:%s failed to stop task, code:%s", pTask->id.idStr, tstrerror(code)); + } inTimer = true; } } @@ -1150,7 +1238,10 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr); - (void)streamTaskStop(pTask); + int32_t code = streamTaskStop(pTask); + if (code) { + stError("vgId:%d failed to stop task:0x%x, code:%s", vgId, pTask->id.taskId, tstrerror(code)); + } } streamMetaWUnLock(pMeta); @@ -1168,7 +1259,6 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { SArray* pTaskList = NULL; int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); if (code != TSDB_CODE_SUCCESS) { -// return code; } streamMetaRUnLock(pMeta); @@ -1199,14 +1289,17 @@ void streamMetaStartHb(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-rlock", pMeta->vgId); - (void)taosThreadRwlockRdlock(&pMeta->lock); + int32_t code = taosThreadRwlockRdlock(&pMeta->lock); + if (code) { + stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } } void streamMetaRUnLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-runlock", pMeta->vgId); int32_t code = taosThreadRwlockUnlock(&pMeta->lock); if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); + stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code)); } else { // stTrace("vgId:%d meta-runlock completed", pMeta->vgId); } @@ -1214,13 +1307,18 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wlock", pMeta->vgId); - (void)taosThreadRwlockWrlock(&pMeta->lock); - // stTrace("vgId:%d meta-wlock completed", pMeta->vgId); + int32_t code = taosThreadRwlockWrlock(&pMeta->lock); + if (code) { + stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code)); + } } void streamMetaWUnLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wunlock", pMeta->vgId); - (void)taosThreadRwlockUnlock(&pMeta->lock); + int32_t code = taosThreadRwlockUnlock(&pMeta->lock); + if (code) { + stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code)); + } } int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { @@ -1258,7 +1356,7 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { streamMetaReleaseTask(pMeta, pTask); } - (void)streamMetaSendHbHelper(pMeta); + code = streamMetaSendHbHelper(pMeta); pMeta->sendMsgBeforeClosing = false; return TSDB_CODE_SUCCESS; // always return true } @@ -1348,9 +1446,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta streamMetaRUnLock(pMeta); // add the failed task info, along with the related fill-history task info into tasks list. - (void)streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); if (hasFillhistoryTask) { - (void)streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); } } else { streamMetaRUnLock(pMeta); @@ -1365,12 +1463,18 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { int32_t startTs = pTask->execInfo.checkTs; - (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + if (code) { + stError("s-task:%s failed to add self task failed to start", pTask->id.idStr, tstrerror(code)); + } // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + if (code) { + stError("s-task:0x%" PRIx64 " failed to add self task failed to start", pId->taskId, tstrerror(code)); + } } } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 941b8f5145..ca2da9ef8e 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -459,7 +459,10 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { int32_t code = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId, &pInfo); if (code) { stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); - (void)streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + int32_t ret = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + if (ret) { + stError("s-task:%s add task check downstream result failed, code:%s", idStr, tstrerror(ret)); + } return code; } diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index c2e8a523e5..168a8adc2d 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -62,9 +62,12 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if ((pTask == NULL) || (code != 0)) { + stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId); + int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (ret) { + stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret)); + } continue; } @@ -85,9 +88,13 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (pTask == NULL) { + if ((pTask == NULL )|| (code != 0)) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (ret) { + stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret)); + } + continue; } @@ -105,11 +112,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", pTask->id.idStr); - (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? + code = streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? } - (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, - true); + code = streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, + true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -333,9 +340,13 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); - if (pTask == NULL) { + if ((pTask == NULL) || (code != 0)) { stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); - (void)streamMetaAddFailedTask(pMeta, streamId, taskId); + int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId); + if (ret) { + stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret)); + } + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -431,7 +442,10 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { continue; } - (void)streamTaskStop(pTask); + int32_t ret = streamTaskStop(pTask); + if (ret) { + stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret)); + } streamMetaReleaseTask(pMeta, pTask); } @@ -441,7 +455,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el); streamMetaRUnLock(pMeta); - return 0; + return code; } int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5d6cf39e40..79c87f942c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -687,13 +687,14 @@ int32_t streamTaskStop(SStreamTask* pTask) { int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP); if (code) { - stError("failed to handle STOP event, s-task:%s", id); + stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code)); + return code; } if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to kill task related query handle", id); + stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code)); } }