refactor: check return value for each function.

This commit is contained in:
Haojun Liao 2024-07-18 09:30:58 +08:00
parent 80a7b7da5f
commit 438921e9cc
4 changed files with 78 additions and 30 deletions

View File

@ -771,6 +771,8 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask);
// timer
tmr_h streamTimerGetInstance();
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -54,7 +54,7 @@ static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
SStreamMeta* pMeta = pTask->pMeta;
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
for (int j = 0; j < num; ++j) {
@ -62,14 +62,18 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
bool exist = existInHbMsg(pMsg, pTaskEpset);
if (!exist) {
taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
void* p = taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
if (p == NULL) {
stError("failed to set the updateNode info in hbMsg, vgId:%d", pMeta->vgId);
}
stDebug("vgId:%d nodeId:%d added into hbMsg update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
(int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
}
}
taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList);
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
}
static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) {
@ -101,9 +105,7 @@ static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* p
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
tmsgSendReq(pEpset, &msg);
return TSDB_CODE_SUCCESS;
return tmsgSendReq(pEpset, &msg);
}
// NOTE: this task should be executed within the SStreamMeta lock region.
@ -112,6 +114,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
bool hasMnodeEpset = false;
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SMetaHbInfo* pInfo = pMeta->pHbInfo;
int32_t code = 0;
// not recv the hb msg rsp yet, send current hb msg again
if (pInfo->msgSendTs > 0) {
@ -135,8 +138,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
}
pInfo->msgSendTs = taosGetTimestampMs();
doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset);
return TSDB_CODE_SUCCESS;
return doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset);
}
SStreamHbMsg* pMsg = &pInfo->hbMsg;
@ -168,9 +170,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
continue;
}
taosThreadMutexLock(&(*pTask)->lock);
(void) taosThreadMutexLock(&(*pTask)->lock);
STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask);
taosThreadMutexUnlock(&(*pTask)->lock);
(void) taosThreadMutexUnlock(&(*pTask)->lock);
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
@ -188,9 +190,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo",
(*pTask)->id.idStr, p->transId);
taosThreadMutexLock(&(*pTask)->lock);
(void) taosThreadMutexLock(&(*pTask)->lock);
streamTaskClearCheckInfo((*pTask), true);
taosThreadMutexUnlock(&(*pTask)->lock);
(void) taosThreadMutexUnlock(&(*pTask)->lock);
}
}
@ -210,7 +212,11 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
}
addUpdateNodeIntoHbMsg(*pTask, pMsg);
taosArrayPush(pMsg->pTaskStatus, &entry);
p = taosArrayPush(pMsg->pTaskStatus, &entry);
if (p == NULL) {
stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", (*pTask)->id.taskId, pMeta->vgId);
}
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasMnodeEpset = true;
@ -221,18 +227,19 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
if (hasMnodeEpset) {
pInfo->msgSendTs = taosGetTimestampMs();
doSendHbMsgInfo(pMsg, pMeta, &epset);
code = doSendHbMsgInfo(pMsg, pMeta, &epset);
} else {
stDebug("vgId:%d no tasks or no mnd epset, not send stream hb to mnode", pMeta->vgId);
tCleanupStreamHbMsg(&pInfo->hbMsg);
pInfo->msgSendTs = -1;
}
return TSDB_CODE_SUCCESS;
return code;
}
void streamMetaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param;
int32_t code = 0;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
@ -243,15 +250,25 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
// need to stop, stop now
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
code = taosReleaseRef(streamMetaId, rid);
if (code == TSDB_CODE_SUCCESS) {
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
} else {
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%d", pMeta->vgId, rid);
}
return;
}
// not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
taosReleaseRef(streamMetaId, rid);
code = taosReleaseRef(streamMetaId, rid);
if (code == TSDB_CODE_SUCCESS) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
} else {
stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%d", pMeta->vgId,
pMeta->role, rid);
}
pMeta->pHbInfo->hbStart = 0;
return;
}
@ -262,17 +279,30 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%d", pMeta->vgId, rid);
}
return;
}
streamMetaRLock(pMeta);
streamMetaSendHbHelper(pMeta);
streamMetaRUnLock(pMeta);
code = streamMetaSendHbHelper(pMeta);
if (code) {
stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, strerror(code));
}
taosTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
streamMetaRUnLock(pMeta);
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%d", pMeta->vgId, rid);
}
}
int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
@ -297,7 +327,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
tCleanupStreamHbMsg(&pInfo->hbMsg);
if (pInfo->hbTmr != NULL) {
taosTmrStop(pInfo->hbTmr);
(void) taosTmrStop(pInfo->hbTmr);
pInfo->hbTmr = NULL;
}

View File

@ -1106,7 +1106,12 @@ int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, voi
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
*pList = NULL;
int32_t code = 0;
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
if (pTaskList == NULL) {
stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
return TSDB_CODE_OUT_OF_MEMORY;
}
bool sendMsg = pMeta->sendMsgBeforeClosing;
if (!sendMsg) {
@ -1122,8 +1127,8 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) {
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) { // this error is ignored
continue;
}
@ -1140,9 +1145,9 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaSendHbHelper(pMeta);
code = streamMetaSendHbHelper(pMeta);
pMeta->sendMsgBeforeClosing = false;
return TSDB_CODE_SUCCESS;
return code;
}
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {

View File

@ -38,3 +38,14 @@ void streamTimerCleanUp() {
tmr_h streamTimerGetInstance() {
return streamTimer;
}
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg) {
while (1) {
bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId);
if (ret) {
break;
}
stError("vgId:%d failed to reset %s, try again", vgId, pMsg);
}
}