Merge pull request #27575 from taosdata/fix/create_tb

refactor: remove void.
This commit is contained in:
Haojun Liao 2024-08-30 18:50:25 +08:00 committed by GitHub
commit 2fd5d5abc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 59 additions and 19 deletions

View File

@ -585,12 +585,18 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
(void)taosArrayPush(pTimeoutList, &p->taskId);
void* px = taosArrayPush(pTimeoutList, &p->taskId);
if (px == NULL) {
stError("s-task:%s failed to record time out task:0x%x", id, p->taskId);
}
} else { // el < CHECK_NOT_RSP_DURATION
(*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp
}
} else {
(void)taosArrayPush(pNotReadyList, &p->taskId);
void* px = taosArrayPush(pNotReadyList, &p->taskId);
if (px == NULL) {
stError("s-task:%s failed to record not ready task:0x%x", id, p->taskId);
}
}
}
}

View File

@ -973,7 +973,10 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
(void)taosArrayPush(pNotSendList, pInfo);
void* px = taosArrayPush(pNotSendList, pInfo);
if (px == NULL) {
stError("s-task:%s failed to record not send info, code: out of memory", id);
}
}
}

View File

@ -474,7 +474,10 @@ static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t
streamMutexLock(&pMsgInfo->lock);
}
(void)taosArrayPush(pMsgInfo->pSendInfo, &entry);
void* p = taosArrayPush(pMsgInfo->pSendInfo, &entry);
if (p == NULL) {
stError("failed to add dispatch info");
}
if (lock) {
streamMutexUnlock(&pMsgInfo->lock);
@ -671,8 +674,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
// failed to put into name buffer, no need to do anything
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
(void)tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing
int32_t code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
}
}
@ -914,10 +917,14 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
continue;
}
(void)taosArrayPush(pNotRspList, &pInfo->upstreamTaskId);
void* p = taosArrayPush(pNotRspList, &pInfo->upstreamTaskId);
if (p == NULL) {
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
} else {
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId,
pTask->info.taskLevel, pInfo->upstreamTaskId);
}
}
int32_t checkpointId = pActiveInfo->activeId;
@ -1100,8 +1107,17 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
payloadLen += sizeof(SRetrieveTableRsp);
(void)taosArrayPush(pReq->dataLen, &payloadLen);
(void)taosArrayPush(pReq->data, &buf);
void* px = taosArrayPush(pReq->dataLen, &payloadLen);
if (px == NULL) {
taosMemoryFree(buf);
return terrno;
}
px = taosArrayPush(pReq->data, &buf);
if (px == NULL) {
taosMemoryFree(buf);
return terrno;
}
pReq->totalLen += dataStrLen;
return 0;
@ -1221,8 +1237,12 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
pTask->id.idStr, pReady->checkpointId, pReady->transId, pReq->transId, pReq->checkpointId);
}
} else {
(void)taosArrayPush(pActiveInfo->pReadyMsgList, &info);
void* px = taosArrayPush(pActiveInfo->pReadyMsgList, &info);
if (px != NULL) {
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size + 1);
} else {
stError("s-task:%s failed to add readyMsg, code: out of memory", pTask->id.idStr);
}
}
streamMutexUnlock(&pActiveInfo->lock);
@ -1259,7 +1279,12 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pActiveInfo->lock);
(void)taosArrayPush(pActiveInfo->pReadyMsgList, &info);
void* px = taosArrayPush(pActiveInfo->pReadyMsgList, &info);
if (px == NULL) {
streamMutexUnlock(&pActiveInfo->lock);
stError("s-task:%s failed to add readyMsg info, code: out of memory", pTask->id.idStr);
return terrno;
}
int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList);
int32_t total = streamTaskGetNumOfUpstream(pTask);

View File

@ -845,7 +845,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
(void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
}
(void)taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
(void)streamMetaRemoveTask(pMeta, &id);
@ -1013,7 +1013,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask);
STaskId id = streamTaskGetTaskId(pTask);
(void)taosArrayPush(pRecycleList, &id);
void* px = taosArrayPush(pRecycleList, &id);
if (px == NULL) {
stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
}
int32_t total = taosArrayGetSize(pRecycleList);
stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
@ -1034,7 +1037,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue;
}
(void)taosArrayPush(pMeta->pTaskList, &pTask->id);
void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
if (px == NULL) {
stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
}
} else {
// todo this should replace the existed object put by replay creating stream task msg from mnode
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
@ -1044,7 +1050,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) {
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
(void)taosArrayPop(pMeta->pTaskList);
void* px = taosArrayPop(pMeta->pTaskList);
tFreeStreamTask(pTask);
continue;
}

View File

@ -191,7 +191,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
// remove it
(void) taosArrayPop(pSM->pWaitingEventList);
void* px = taosArrayPop(pSM->pWaitingEventList);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);