diff --git a/include/util/tqueue.h b/include/util/tqueue.h index f7eaf794b0..5ae642b69f 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -79,7 +79,7 @@ void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item); void taosFreeQitem(void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem); -int32_t taosReadQitem(STaosQueue *queue, void **ppItem); +void taosReadQitem(STaosQueue *queue, void **ppItem); bool taosQueueEmpty(STaosQueue *queue); void taosUpdateItemSize(STaosQueue *queue, int32_t items); int32_t taosQueueItemSize(STaosQueue *queue); diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 9ed6512352..6904ea850c 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -730,7 +730,7 @@ static void* monitorThreadFunc(void* param) { } MonitorSlowLogData* slowLogData = NULL; - (void)taosReadQitem(monitorQueue, (void**)&slowLogData); + taosReadQitem(monitorQueue, (void**)&slowLogData); if (slowLogData != NULL) { if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) { if (slowLogData->pFile != NULL) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 0b0cb6dc91..43c12f8164 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -706,7 +706,9 @@ void ctgProcessTimerEvent(void *param, void *tmrId) { int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false); if (code) { qError("clear cache enqueue failed, error:%s", tstrerror(code)); - (void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) { + qError("reset catalog cache monitor timer error, timer stoppped"); + } } goto _return; @@ -714,7 +716,9 @@ void ctgProcessTimerEvent(void *param, void *tmrId) { } qTrace("reset catalog timer"); - (void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) { + qError("reset catalog cache monitor timer error, timer stoppped"); + } _return: @@ -1517,10 +1521,16 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SC _return: if (pJob) { - (void)taosReleaseRef(gCtgMgmt.jobPool, pJob->refId); + int32_t code2 = taosReleaseRef(gCtgMgmt.jobPool, pJob->refId); + if (TSDB_CODE_SUCCESS) { + qError("release catalog job refId %" PRId64 "falied, error:%s", pJob->refId, tstrerror(code2)); + } if (code) { - (void)taosRemoveRef(gCtgMgmt.jobPool, pJob->refId); + code2 = taosRemoveRef(gCtgMgmt.jobPool, pJob->refId); + if (TSDB_CODE_SUCCESS) { + qError("remove catalog job refId %" PRId64 "falied, error:%s", pJob->refId, tstrerror(code2)); + } } } @@ -1967,7 +1977,9 @@ void catalogDestroy(void) { } if (gCtgMgmt.cacheTimer) { - (void)taosTmrStop(gCtgMgmt.cacheTimer); + if (taosTmrStop(gCtgMgmt.cacheTimer)) { + qTrace("stop catalog cache timer may failed"); + } gCtgMgmt.cacheTimer = NULL; taosTmrCleanUp(gCtgMgmt.timer); gCtgMgmt.timer = NULL; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index e35aaeb0b1..9940474891 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1007,7 +1007,11 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const CTG_ERR_JRET(terrno); } - (void)taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); + void* p = taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); + if (NULL == p) { + ctgError("acquire job from ref failed, refId:%" PRId64 ", error: %s", pJob->refId, tstrerror(terrno)); + CTG_ERR_JRET(terrno); + } double el = (taosGetTimestampUs() - st) / 1000.0; qDebug("qid:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms", @@ -1406,7 +1410,11 @@ int32_t ctgCallUserCb(void* param) { qDebug("qid:0x%" PRIx64 " ctg end to call user cb", pJob->queryId); - (void)taosRemoveRef(gCtgMgmt.jobPool, pJob->refId); + int64_t refId = pJob->refId; + int32_t code = taosRemoveRef(gCtgMgmt.jobPool, refId); + if (code) { + qError("qid:0x%" PRIx64 " remove ctg job %" PRId64 " from jobPool failed, error:%s", pJob->queryId, refId, tstrerror(code)); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 01201a2480..0e54ac77a2 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -2915,21 +2915,27 @@ void ctgClearMetaCache(SCtgCacheOperation *operation) { if (CTG_CACHE_LOW(remainSize, cacheMaxSize)) { qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize); - (void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) { + qError("reset catalog cache monitor timer error, timer stoppped"); + } return; } if (!roundDone) { qDebug("catalog all meta cleared, remainSize:%" PRId64 ", cacheMaxSize:%dMB, to clear handle", remainSize, cacheMaxSize); ctgClearFreeCache(operation); - (void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) { + qError("reset catalog cache monitor timer error, timer stoppped"); + } return; } int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false); if (code) { qError("clear cache enqueue failed, error:%s", tstrerror(code)); - (void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer); + if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) { + qError("reset catalog cache monitor timer error, timer stoppped"); + } } } @@ -2993,7 +2999,10 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) { pCtgCache->pTsmas = NULL; ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName); - (void)taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN); + code = taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN); + if (TSDB_CODE_SUCCESS != code) { + ctgError("remove table %s.%s from tsmaCache failed, error:%s", msg->dbFName, msg->tbName, tstrerror(code)); + } CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock); } else { @@ -3191,6 +3200,7 @@ void ctgCleanupCacheQueue(void) { SCtgQNode *nodeNext = NULL; SCtgCacheOperation *op = NULL; bool stopQueue = false; + int32_t code = 0; while (true) { node = gCtgMgmt.queue.head->next; @@ -3209,7 +3219,10 @@ void ctgCleanupCacheQueue(void) { } if (op->syncOp) { - (void)tsem_post(&op->rspSem); + code = tsem_post(&op->rspSem); + if (code) { + qError("tsem_post failed when cleanup cache queue, error:%s", tstrerror(code)); + } } else { taosMemoryFree(op); } @@ -3234,12 +3247,13 @@ void ctgCleanupCacheQueue(void) { void *ctgUpdateThreadFunc(void *param) { setThreadName("catalog"); + int32_t code = 0; qInfo("catalog update thread started"); while (true) { if (tsem_wait(&gCtgMgmt.queue.reqSem)) { - qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); + qError("ctg tsem_wait failed, error:%s", tstrerror(terrno)); } if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) { @@ -3256,7 +3270,10 @@ void *ctgUpdateThreadFunc(void *param) { (void)(*gCtgCacheOperation[operation->opId].func)(operation); // ignore any error if (operation->syncOp) { - (void)tsem_post(&operation->rspSem); + code = tsem_post(&operation->rspSem); + if (code) { + ctgError("tsem_post failed for syncOp update, error:%s", tstrerror(code)); + } } else { taosMemoryFreeClear(operation); } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 00b1d7ad79..c9114ce90e 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -470,7 +470,10 @@ _return: taosMemoryFree(pMsg->pEpSet); if (pJob) { - (void)taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); + int32_t code2 = taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); + if (code2) { + qError("release ctg job refId:%" PRId64 " failed, error:%s", cbParam->refId, tstrerror(code2)); + } } CTG_API_LEAVE(code); @@ -1087,11 +1090,16 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n int32_t reqType = TDMT_MND_GET_TABLE_INDEX; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; - (void)tNameExtractFullName(name, tbFName); ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp); + int32_t code = tNameExtractFullName(name, tbFName); + if (code) { + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname); + CTG_ERR_RET(code); + } + + code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); CTG_ERR_RET(code); @@ -1403,17 +1411,23 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t msgLen = 0; int32_t reqType = TDMT_VND_TABLE_CFG; char tbFName[TSDB_TABLE_FNAME_LEN]; - (void)tNameExtractFullName(pTableName, tbFName); + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; (void)tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; + int32_t code = tNameExtractFullName(pTableName, tbFName); + if (code) { + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); + CTG_ERR_RET(code); + } + SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); + code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); CTG_ERR_RET(code); @@ -1471,15 +1485,20 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t msgLen = 0; int32_t reqType = TDMT_MND_TABLE_CFG; char tbFName[TSDB_TABLE_FNAME_LEN]; - (void)tNameExtractFullName(pTableName, tbFName); void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; (void)tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; + int32_t code = tNameExtractFullName(pTableName, tbFName); + if (code) { + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); + CTG_ERR_RET(code); + } + ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); + code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); CTG_ERR_RET(code); @@ -1583,11 +1602,15 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char fullName[TSDB_TABLE_FNAME_LEN]; - (void)tNameExtractFullName(pName, fullName); + int32_t code = tNameExtractFullName(pName, fullName); + if (code) { + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pName->type, pName->dbname, pName->tname); + CTG_ERR_RET(code); + } ctgDebug("try to get view info from mnode, viewFName:%s", fullName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp); + code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build view-meta msg failed, code:%x, viewFName:%s", code, fullName); CTG_ERR_RET(code); @@ -1640,11 +1663,15 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; - (void)tNameExtractFullName(name, tbFName); + int32_t code = tNameExtractFullName(name, tbFName); + if (code) { + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname); + CTG_ERR_RET(code); + } ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp); + code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); CTG_ERR_RET(code); @@ -1697,7 +1724,12 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c int32_t msgLen = 0; int32_t reqType = TDMT_VND_GET_STREAM_PROGRESS; char tbFName[TSDB_TABLE_FNAME_LEN]; - (void)tNameExtractFullName(pTbName, tbFName); + int32_t code = tNameExtractFullName(pTbName, tbFName); + if (code) { + ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTbName->type, pTbName->dbname, pTbName->tname); + CTG_ERR_RET(code); + } + SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; @@ -1705,7 +1737,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c ctgDebug("try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d, target:%s", vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp); + code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get stream progress failed, code:%s, tbFName:%s", tstrerror(code), tbFName); CTG_ERR_RET(code); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index daa2199421..545e3e1371 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -433,6 +433,7 @@ void ctgFreeHandle(SCatalog* pCtg) { void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardNum, bool* roundDone) { int64_t cacheSize = 0; + int32_t code = 0; void* pIter = taosHashIterate(pCtg->dbCache, NULL); while (pIter) { SCtgDBCache* dbCache = pIter; @@ -447,7 +448,10 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardN continue; } - (void)taosHashRemove(dbCache->tbCache, key, len); + code = taosHashRemove(dbCache->tbCache, key, len); + if (code) { + qError("taosHashRemove table cache failed, key:%s, len:%d, error:%s", (char*)key, (int32_t)len, tstrerror(code)); + } cacheSize = len + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(pCache->pMeta) + ctgGetTbIndexCacheSize(pCache->pIndex); @@ -1201,8 +1205,12 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* d SVgroupInfo* vgInfo = NULL; char tbFullName[TSDB_TABLE_FNAME_LEN]; - (void)tNameExtractFullName(pTableName, tbFullName); - + code = tNameExtractFullName(pTableName, tbFullName); + if (code) { + ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); + CTG_ERR_RET(code); + } + uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix); @@ -1965,7 +1973,12 @@ int32_t ctgChkSetTbAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) { char tbFName[TSDB_TABLE_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; - (void)tNameExtractFullName(&req->pRawReq->tbName, tbFName); + code = tNameExtractFullName(&req->pRawReq->tbName, tbFName); + if (code) { + ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); + CTG_ERR_RET(code); + } + (void)tNameGetFullDbName(&req->pRawReq->tbName, dbFName); while (true) { @@ -2151,7 +2164,11 @@ int32_t ctgChkSetViewAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) if (IS_SYS_DBNAME(req->pRawReq->tbName.dbname)) { (void)snprintf(viewFName, sizeof(viewFName), "%s.%s", req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); } else { - (void)tNameExtractFullName(&req->pRawReq->tbName, viewFName); + code = tNameExtractFullName(&req->pRawReq->tbName, viewFName); + if (code) { + ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); + CTG_ERR_RET(code); + } } int32_t len = strlen(viewFName) + 1; diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 60bfb58ef5..57f4289ebf 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -190,7 +190,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw } SDataDeleterBuf* pBuf = NULL; - (void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); + taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); if (pBuf != NULL) { TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); taosFreeQitem(pBuf); @@ -248,7 +248,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { taosMemoryFree(pDeleter->pParam); while (!taosQueueEmpty(pDeleter->pDataBlocks)) { SDataDeleterBuf* pBuf = NULL; - (void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); + taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); if (pBuf != NULL) { taosMemoryFreeClear(pBuf->pData); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 9bbc5a94eb..8acd569358 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -233,7 +233,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRow } SDataDispatchBuf* pBuf = NULL; - (void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); + taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); if (pBuf != NULL) { TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); @@ -291,7 +291,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; - (void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); + taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); if (pBuf != NULL) { taosMemoryFreeClear(pBuf->pData); taosFreeQitem(pBuf); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 5ed97c8db6..b29bef1f1e 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -57,6 +57,7 @@ typedef struct SSubmitRspParam { int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { SSubmitRspParam* pParam = (SSubmitRspParam*)param; SDataInserterHandle* pInserter = pParam->pInserter; + int32_t code2 = 0; if (code) { pInserter->submitRes.code = code; @@ -106,7 +107,14 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { _return: - (void)tsem_post(&pInserter->ready); + code2 = tsem_post(&pInserter->ready); + if (code2 < 0) { + qError("tsem_post inserter ready failed, error:%s", tstrerror(code2)); + if (TSDB_CODE_SUCCESS == code) { + pInserter->submitRes.code = code2; + } + } + taosMemoryFree(pMsg->pData); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 44e8a3cb8a..3d23130e13 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -561,7 +561,8 @@ static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPost static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) { SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; - + int32_t code = 0; + pPost->isStarted = false; if (pStbJoin->basic.batchFetch) { @@ -571,7 +572,11 @@ static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDy if (pPost->leftNeedCache) { uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); if (num && --(*num) <= 0) { - (void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); + code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); + if (code) { + qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code)); + QRY_ERR_RET(code); + } QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true)); } } @@ -579,7 +584,11 @@ static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDy if (!pPost->rightNeedCache) { void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); if (NULL != v) { - (void)tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); + code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); + if (code) { + qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code)); + QRY_ERR_RET(code); + } QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false)); } } @@ -660,7 +669,11 @@ static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnc break; default: if (1 == (*pNum)) { - (void)tSimpleHashRemove(pOnceHash, pKey, keySize); + code = tSimpleHashRemove(pOnceHash, pKey, keySize); + if (code) { + qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code)); + QRY_ERR_RET(code); + } } (*pNum)++; break; @@ -811,8 +824,12 @@ static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) { uint64_t* pUid = NULL; int32_t iter = 0; + int32_t code = 0; while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) { - (void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid)); + code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid)); + if (code) { + qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code)); + } } pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 1796aa7b64..2540579413 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -30,7 +30,9 @@ static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) { if (pFileInfo->fd.fd) { - (void)taosCloseFile(&pFileInfo->fd.fd); + if (taosCloseFile(&pFileInfo->fd.fd) < 0) { + qError("close group cache file failed, fd:%p, error:%s", pFileInfo->fd.fd, tstrerror(terrno)); + } pFileInfo->fd.fd = NULL; (void)taosThreadMutexDestroy(&pFileInfo->fd.mutex); } @@ -92,7 +94,9 @@ static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) { static void freeSGcSessionCtx(void* p) { SGcSessionCtx* pSession = p; if (pSession->semInit) { - (void)tsem_destroy(&pSession->waitSem); + if (tsem_destroy(&pSession->waitSem) < 0) { + qError("tsem_destroy session waitSem failed, error:%s", tstrerror(terrno)); + } } } @@ -262,6 +266,9 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S } static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) { + if (NULL == pFd) { + return; + } (void)taosThreadMutexUnlock(&pFd->mutex); } @@ -274,6 +281,8 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC SGroupCacheData* pGroup = NULL; while (NULL != pHead) { + pFd = NULL; + if (pGCache->batchFetch) { pFileCtx = &pHead->pCtx->fileCtx; } else { @@ -290,7 +299,11 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC int64_t blkId = pHead->basic.blkId; pHead = pHead->next; - (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + if (code) { + qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code)); + goto _return; + } continue; } @@ -304,13 +317,19 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC } if (deleted) { + releaseFdToFileCtx(pFd); + qTrace("FileId:%d-%d-%d already be deleted, skip write", pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId); int64_t blkId = pHead->basic.blkId; pHead = pHead->next; - (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + if (code) { + qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code)); + goto _return; + } continue; } @@ -336,7 +355,11 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC int64_t blkId = pHead->basic.blkId; pHead = pHead->next; - (void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + if (code) { + qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code)); + goto _return; + } } _return: @@ -1036,7 +1059,11 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator } SGcSessionCtx* pWaitCtx = *ppWaitCtx; pWaitCtx->newFetch = true; - (void)taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId)); + code = taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId)); + if (code) { + qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", pSessionId, tstrerror(code)); + return code; + } QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem)); return code; @@ -1125,14 +1152,22 @@ static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstr QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES)); - (void)tsem_wait(&pSession->waitSem); + code = tsem_wait(&pSession->waitSem); + if (code) { + qError("tsem_wait failed, error:%s", tstrerror(code)); + QRY_ERR_JRET(code); + } if (pSession->newFetch) { pSession->newFetch = false; return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes); } - (void)taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId)); + code = taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId)); + if (code) { + qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", sessionId, tstrerror(code)); + QRY_ERR_JRET(code); + } bool got = false; return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got); @@ -1278,14 +1313,22 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (ppBlock) { QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock)); - (void)taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + code = taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + if (code) { + qError("taosHashRemove session %" PRId64 " from pReadBlk failed, error: %s", pGcParam->sessionId, tstrerror(code)); + QRY_ERR_RET(code); + } } } code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes); if (NULL == *ppRes) { qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows); - (void)taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + code = taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + if (code) { + qError("taosHashRemove session %" PRId64 " from pSessions failed, error: %s", pGcParam->sessionId, tstrerror(code)); + QRY_ERR_RET(code); + } } else { pSession->resRows += (*ppRes)->info.rows; qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 302dd31788..61809b99e0 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -2413,7 +2413,10 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) { pGrp->readIdx = 0; //pGrp->endIdx = pGrp->blk->info.rows - 1; } else { - (void)taosArrayPop(pCache->grps); + if (NULL == taosArrayPop(pCache->grps)) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + pGrp = taosArrayGet(pCache->grps, 0); if (NULL == pGrp) { MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 90c2248c12..d0354c12d5 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1375,7 +1375,9 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol _return: if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) { - (void)taosArrayPop(pTable->eqGrps); + if (NULL == taosArrayPop(pTable->eqGrps)) { + code = terrno; + } } else { pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 4b9067a191..aef348827a 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -581,7 +581,10 @@ void qwDestroyImpl(void *pMgmt) { int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); - (void)taosTmrStop(mgmt->hbTimer); //ignore error + if (taosTmrStop(mgmt->hbTimer)) { + qTrace("stop qworker hb timer may failed"); + } + mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 1a9d3e7ba9..041347791e 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1187,7 +1187,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { int32_t schNum = taosHashGetSize(mgmt->schHash); if (schNum <= 0) { QW_UNLOCK(QW_READ, &mgmt->schLock); - (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) { + qError("reset qworker hb timer error, timer stoppped"); + } (void)qwRelease(refId); // ignore error return; } @@ -1199,7 +1201,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { taosMemoryFree(rspList); taosArrayDestroy(pExpiredSch); QW_ELOG("calloc %d SQWHbInfo failed, code:%x", schNum, terrno); - (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) { + qError("reset qworker hb timer error, timer stoppped"); + } (void)qwRelease(refId); // ignore error return; } @@ -1255,7 +1259,10 @@ _return: taosMemoryFreeClear(rspList); taosArrayDestroy(pExpiredSch); - (void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error + if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) { + qError("reset qworker hb timer error, timer stoppped"); + } + (void)qwRelease(refId); // ignore error } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 3a25f37895..a3be94d4c8 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -414,7 +414,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOG_TASK_START_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - (void)taosArrayPush((_task)->profile.execTime, &us); \ + if (NULL == taosArrayPush((_task)->profile.execTime, &us)) { \ + qError("taosArrayPush task execTime failed, error:%s", tstrerror(terrno)); \ + } \ if (0 == (_task)->execId) { \ (_task)->profile.startTs = us; \ } \ diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 8e2fbb878d..57a21643ef 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -512,6 +512,7 @@ int32_t schNotifyUserFetchRes(SSchJob *pJob) { } void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { + int32_t code = 0; SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock); if (SCH_OP_NULL == pJob->opStatus.op) { @@ -526,7 +527,10 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { if (SCH_JOB_IN_SYNC_OP(pJob)) { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); - (void)tsem_post(&pJob->rspSem); // ignore error + code = tsem_post(&pJob->rspSem); + if (code) { + ctgError("tsem_post failed for syncOp, error:%s", tstrerror(code)); + } } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); (void)schNotifyUserExecRes(pJob); // ignore error @@ -771,7 +775,10 @@ void schFreeJobImpl(void *job) { taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->sql); - (void)tsem_destroy(&pJob->rspSem); // ignore error + int32_t code = tsem_destroy(&pJob->rspSem); + if (code) { + qError("tsem_destroy failed, error:%s", tstrerror(code)); + } taosMemoryFree(pJob); int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); @@ -790,7 +797,12 @@ int32_t schJobFetchRows(SSchJob *pJob) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - (void)tsem_wait(&pJob->rspSem); // ignore error + code = tsem_wait(&pJob->rspSem); + if (code) { + qError("tsem_wait for fetch rspSem failed, error:%s", tstrerror(code)); + SCH_RET(code); + } + SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); } } else { @@ -895,7 +907,10 @@ _return: } else if (pJob->refId < 0) { schFreeJobImpl(pJob); } else { - (void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error + code = taosRemoveRef(schMgmt.jobRef, pJob->refId); + if (code) { + SCH_JOB_DLOG("taosRemoveRef job refId:0x%" PRIx64 " from jobRef, error:%s", pJob->refId, tstrerror(code)); + } } SCH_RET(code); @@ -909,7 +924,11 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { if (pReq->syncReq) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - (void)tsem_wait(&pJob->rspSem); // ignore error + code = tsem_wait(&pJob->rspSem); + if (code) { + qError("qid:0x%" PRIx64 " tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code)); + SCH_ERR_RET(code); + } } SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 59b8954a48..8e9d46de31 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -422,7 +422,9 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { schDropTaskOnExecNode(pJob, pTask); if (pTask->delayTimer) { - (void)taosTmrStopA(&pTask->delayTimer); // ignore error + if (!taosTmrStopA(&pTask->delayTimer)) { + SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status); + } } taosHashClear(pTask->execNodes); (void)schRemoveTaskFromExecList(pJob, pTask); // ignore error @@ -1319,7 +1321,9 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - (void)taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer); + if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer)) { + SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p", schMgmt.timer); + } return TSDB_CODE_SUCCESS; } @@ -1350,7 +1354,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { SCH_LOCK_TASK(pTask); if (pTask->delayTimer) { - (void)taosTmrStopA(&pTask->delayTimer); + if (!taosTmrStopA(&pTask->delayTimer)) { + SCH_TASK_WLOG("stop delayTimer failed, status:%d", pTask->delayTimer); + } } schDropTaskOnExecNode(pJob, pTask); SCH_UNLOCK_TASK(pTask); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 3f610ed387..612486c806 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -86,6 +86,7 @@ void schFreeHbTrans(SSchHbTrans *pTrans) { } void schCleanClusterHb(void *pTrans) { + int32_t code = 0; SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL); @@ -93,7 +94,10 @@ void schCleanClusterHb(void *pTrans) { if (hb->trans.pTrans == pTrans) { SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL); schFreeHbTrans(hb); - (void)taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); + code = taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); + if (code) { + qError("taosHashRemove hb connection failed, error:%s", tstrerror(code)); + } } hb = taosHashIterate(schMgmt.hbConnections, hb); @@ -116,7 +120,10 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep int64_t taskNum = atomic_load_64(&hb->taskNum); if (taskNum <= 0) { schFreeHbTrans(hb); - (void)taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); + code = taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); + if (code) { + SCH_TASK_WLOG("taosHashRemove hb connection failed, error:%s", tstrerror(code)); + } } SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3ce5cd5714..a62c59e547 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -184,6 +184,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) { } void schedulerDestroy(void) { + int32_t code = 0; atomic_store_8((int8_t *)&schMgmt.exit, 1); if (schMgmt.jobRef >= 0) { @@ -195,7 +196,10 @@ void schedulerDestroy(void) { if (refId == 0) { break; } - (void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error + code = taosRemoveRef(schMgmt.jobRef, pJob->refId); + if (code) { + qWarn("taosRemoveRef job refId:%" PRId64 " failed, error:%s", pJob->refId, tstrerror(code)); + } pJob = taosIterateRef(schMgmt.jobRef, refId); } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 7780be3fb7..758e283bc3 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -488,10 +488,10 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { if (pe->num == 0) { taosHashEntryWUnlock(pHashObj, pe); taosHashRUnlock(pHashObj); - return -1; + return TSDB_CODE_NOT_FOUND; } - int code = -1; + int code = TSDB_CODE_NOT_FOUND; SHashNode *pNode = pe->next; SHashNode *prevNode = NULL; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 780a6c94f1..a802737461 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -229,9 +229,8 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { return code; } -int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { +void taosReadQitem(STaosQueue *queue, void **ppItem) { STaosQnode *pNode = NULL; - int32_t code = 0; (void)taosThreadMutexLock(&queue->mutex); @@ -247,14 +246,11 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { if (queue->qset) { (void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1); } - code = 1; uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, queue->memOfItems); } (void)taosThreadMutexUnlock(&queue->mutex); - - return code; } int32_t taosAllocateQall(STaosQall **qall) { diff --git a/source/util/src/tref.c b/source/util/src/tref.c index 0eac7b4427..a1c161a42c 100644 --- a/source/util/src/tref.c +++ b/source/util/src/tref.c @@ -426,7 +426,7 @@ static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) { } else { uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); terrno = TSDB_CODE_REF_NOT_EXIST; - code = -1; + code = terrno; } taosUnlockList(pSet->lockedBy + hash); diff --git a/source/util/src/tsimplehash.c b/source/util/src/tsimplehash.c index d14e72822f..b16fde86cf 100644 --- a/source/util/src/tsimplehash.c +++ b/source/util/src/tsimplehash.c @@ -297,7 +297,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) { } int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { - int32_t code = TSDB_CODE_FAILED; + int32_t code = TSDB_CODE_INVALID_PARA; if (!pHashObj || !key) { return code; } diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 60edb2f045..d5310b2e7e 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -492,6 +492,9 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han if (timer == NULL) { *pTmrId = taosTmrStart(fp, mseconds, param, handle); + if (NULL == *pTmrId) { + stopped = true; + } return stopped; }