diff --git a/docs/examples/c/multi_bind_example.c b/docs/examples/c/multi_bind_example.c index 3d0bd3ccef..0134899a1d 100644 --- a/docs/examples/c/multi_bind_example.c +++ b/docs/examples/c/multi_bind_example.c @@ -33,7 +33,7 @@ void executeSQL(TAOS *taos, const char *sql) { void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) { if (code != 0) { printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt)); - taos_stmt_close(stmt); + (void)taos_stmt_close(stmt); exit(EXIT_FAILURE); } } @@ -123,7 +123,7 @@ void insertData(TAOS *taos) { int affectedRows = taos_stmt_affected_rows(stmt); printf("successfully inserted %d rows\n", affectedRows); // close - taos_stmt_close(stmt); + (void)taos_stmt_close(stmt); } int main() { diff --git a/docs/examples/c/stmt_example.c b/docs/examples/c/stmt_example.c index 290a6bee66..3b9efe49d7 100644 --- a/docs/examples/c/stmt_example.c +++ b/docs/examples/c/stmt_example.c @@ -33,7 +33,7 @@ void executeSQL(TAOS *taos, const char *sql) { void checkErrorCode(TAOS_STMT *stmt, int code, const char* msg) { if (code != 0) { printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt)); - taos_stmt_close(stmt); + (void)taos_stmt_close(stmt); exit(EXIT_FAILURE); } } @@ -119,7 +119,7 @@ void insertData(TAOS *taos) { int affectedRows = taos_stmt_affected_rows(stmt); printf("successfully inserted %d rows\n", affectedRows); // close - taos_stmt_close(stmt); + (void)taos_stmt_close(stmt); } int main() { diff --git a/docs/examples/c/subscribe_demo.c b/docs/examples/c/subscribe_demo.c deleted file mode 100644 index 2fe62c24eb..0000000000 --- a/docs/examples/c/subscribe_demo.c +++ /dev/null @@ -1,66 +0,0 @@ -// A simple demo for asynchronous subscription. -// compile with: -// gcc -o subscribe_demo subscribe_demo.c -ltaos - -#include -#include -#include -#include - -int nTotalRows; - -/** - * @brief callback function of subscription. - * - * @param tsub - * @param res - * @param param. the additional parameter passed to taos_subscribe - * @param code. error code - */ -void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) { - if (code != 0) { - printf("error: %d\n", code); - exit(EXIT_FAILURE); - } - - TAOS_ROW row = NULL; - int num_fields = taos_num_fields(res); - TAOS_FIELD* fields = taos_fetch_fields(res); - int nRows = 0; - - while ((row = taos_fetch_row(res))) { - char buf[4096] = {0}; - taos_print_row(buf, row, fields, num_fields); - puts(buf); - nRows++; - } - - nTotalRows += nRows; - printf("%d rows consumed.\n", nRows); -} - -int main() { - TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 6030); - if (taos == NULL) { - printf("failed to connect to server\n"); - exit(EXIT_FAILURE); - } - - int restart = 1; // if the topic already exists, where to subscribe from the begin. - const char* topic = "topic-meter-current-bg-10"; - const char* sql = "select * from power.meters where current > 10"; - void* param = NULL; // additional parameter. - int interval = 2000; // consumption interval in microseconds. - TAOS_SUB* tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, interval); - - // wait for insert from others process. you can open TDengine CLI to insert some records for test. - - getchar(); // press Enter to stop - - printf("total rows consumed: %d\n", nTotalRows); - int keep = 0; // whether to keep subscribe process - taos_unsubscribe(tsub, keep); - - taos_close(taos); - taos_cleanup(); -} diff --git a/source/common/src/tcol.c b/source/common/src/tcol.c index a949d0793a..17972c6777 100644 --- a/source/common/src/tcol.c +++ b/source/common/src/tcol.c @@ -238,7 +238,7 @@ const char* columnLevelStr(uint8_t type) { bool checkColumnEncode(char encode[TSDB_CL_COMPRESS_OPTION_LEN]) { if (0 == strlen(encode)) return true; - strtolower(encode, encode); + (void)strtolower(encode, encode); for (int i = 0; i < supportedEncodeNum; ++i) { if (0 == strcmp((const char*)encode, supportedEncode[i])) { return true; @@ -255,7 +255,7 @@ bool checkColumnEncodeOrSetDefault(uint8_t type, char encode[TSDB_CL_COMPRESS_OP } bool checkColumnCompress(char compress[TSDB_CL_COMPRESS_OPTION_LEN]) { if (0 == strlen(compress)) return true; - strtolower(compress, compress); + (void)strtolower(compress, compress); for (int i = 0; i < supportedCompressNum; ++i) { if (0 == strcmp((const char*)compress, supportedCompress[i])) { return true; @@ -273,7 +273,7 @@ bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRES } bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]) { if (0 == strlen(level)) return true; - strtolower(level, level); + (void)strtolower(level, level); if (1 == strlen(level)) { if ('h' == level[0] || 'm' == level[0] || 'l' == level[0]) return true; } else { diff --git a/source/common/src/trow.c b/source/common/src/trow.c index 1dc4a6f9ba..942255cd7c 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -68,7 +68,7 @@ bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC return false; } } - tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal); + (void)tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal); ++pIter->colIdx; } else if (TD_IS_KV_ROW(pIter->pRow)) { return tdSTSRowIterGetKvVal(pIter, colId, &pIter->kvIdx, pVal); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 70d0b858f6..89f3c6e253 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -212,8 +212,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { mndTransRefresh(pMnode, pTrans); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); - sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); - code = 0; + code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); _OUT: if (pTrans) mndReleaseTrans(pMnode, pTrans); @@ -222,7 +221,7 @@ _OUT: static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId == 0) { goto _OUT; } @@ -232,7 +231,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { pMgmt->transSec = 0; pMgmt->transSeq = 0; pMgmt->errCode = code; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); @@ -241,7 +240,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { } _OUT: - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); return 0; } @@ -304,7 +303,7 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { } else { mInfo("vgId:1, sync restore finished"); } - mndRefreshUserIpWhiteList(pMnode); + (void)mndRefreshUserIpWhiteList(pMnode); ASSERT(commitIdx == mndSyncAppliedIndex(pFsm)); } @@ -350,16 +349,16 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; mInfo("vgId:1, become follower"); - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); } - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); } static void mndBecomeLearner(const SSyncFSM *pFsm) { @@ -367,16 +366,16 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; mInfo("vgId:1, become learner"); - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); } - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); } static void mndBecomeLeader(const SSyncFSM *pFsm) { @@ -435,12 +434,12 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosThreadMutexInit(&pMgmt->lock, NULL); - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexInit(&pMgmt->lock, NULL); + (void)taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, @@ -477,7 +476,7 @@ int32_t mndInitSync(SMnode *pMnode) { } int32_t code = 0; - tsem_init(&pMgmt->syncSem, 0, 0); + (void)tsem_init(&pMgmt->syncSem, 0, 0); pMgmt->sync = syncOpen(&syncInfo, true); if (pMgmt->sync <= 0) { if (terrno != 0) code = terrno; @@ -495,15 +494,15 @@ void mndCleanupSync(SMnode *pMnode) { syncStop(pMgmt->sync); mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync); - tsem_destroy(&pMgmt->syncSem); - taosThreadMutexDestroy(&pMgmt->lock); + (void)tsem_destroy(&pMgmt->syncSem); + (void)taosThreadMutexDestroy(&pMgmt->lock); memset(pMgmt, 0, sizeof(SSyncMgmt)); } void mndSyncCheckTimeout(SMnode *pMnode) { mTrace("check sync timeout"); SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { int32_t curSec = taosGetTimestampSec(); int32_t delta = curSec - pMgmt->transSec; @@ -515,7 +514,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) { pMgmt->transSeq = 0; terrno = TSDB_CODE_SYN_TIMEOUT; pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); } else { mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq); @@ -523,7 +522,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) { } else { // mTrace("check sync timeout msg, no trans waiting for confirm"); } - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); } int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { @@ -536,12 +535,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY; memcpy(req.pCont, pRaw, req.contLen); - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); pMgmt->errCode = 0; if (pMgmt->transId != 0) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); rpcFreeCont(req.pCont); TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED); } @@ -555,23 +554,24 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { if (code == 0) { mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq); pMgmt->transSeq = seq; - taosThreadMutexUnlock(&pMgmt->lock); - tsem_wait(&pMgmt->syncSem); + (void)taosThreadMutexUnlock(&pMgmt->lock); + (void)tsem_wait(&pMgmt->syncSem); } else if (code > 0) { mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; - taosThreadMutexUnlock(&pMgmt->lock); - sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); - code = 0; + (void)taosThreadMutexUnlock(&pMgmt->lock); + code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); + if (code == 0) { + sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); + } } else { mError("trans:%d, failed to proposed since %s", transId, terrstr()); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); if (terrno == 0) { terrno = TSDB_CODE_APP_ERROR; } @@ -600,15 +600,15 @@ void mndSyncStart(SMnode *pMnode) { void mndSyncStop(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosThreadMutexLock(&pMgmt->lock); + (void)taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING; - tsem_post(&pMgmt->syncSem); + (void)tsem_post(&pMgmt->syncSem); } - taosThreadMutexUnlock(&pMgmt->lock); + (void)taosThreadMutexUnlock(&pMgmt->lock); } bool mndIsLeader(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 2095f80f0f..8bb2f11e7c 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -682,7 +682,7 @@ static void ipRangeToStr(SIpV4Range *range, char *buf) { struct in_addr addr; addr.s_addr = range->ip; - uv_inet_ntop(AF_INET, &addr, buf, 32); + (void)uv_inet_ntop(AF_INET, &addr, buf, 32); if (range->mask != 32) { (void)sprintf(buf + strlen(buf), "/%d", range->mask); } @@ -2188,7 +2188,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode mndReleaseDb(pMnode, pDb); TAOS_CHECK_GOTO(terrno, &lino, _OVER); // TODO: refactor the terrno to code } - taosHashRemove(pNewUser->readDbs, pAlterReq->objname, len); + (void)taosHashRemove(pNewUser->readDbs, pAlterReq->objname, len); mndReleaseDb(pMnode, pDb); } else { taosHashClear(pNewUser->readDbs); @@ -2204,7 +2204,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode mndReleaseDb(pMnode, pDb); TAOS_CHECK_GOTO(terrno, &lino, _OVER); // TODO: refactor the terrno to code } - taosHashRemove(pNewUser->writeDbs, pAlterReq->objname, len); + (void)taosHashRemove(pNewUser->writeDbs, pAlterReq->objname, len); mndReleaseDb(pMnode, pDb); } else { taosHashClear(pNewUser->writeDbs); @@ -2311,7 +2311,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { TAOS_CHECK_GOTO(mndAcquireUser(pMnode, alterReq.user, &pUser), &lino, _OVER); - mndAcquireUser(pMnode, pReq->info.conn.user, &pOperUser); + (void)mndAcquireUser(pMnode, pReq->info.conn.user, &pOperUser); if (pOperUser == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_MND_NO_USER_FROM_CONN, &lino, _OVER); } @@ -2517,7 +2517,7 @@ static int32_t mndDropUser(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser) { mndTransDrop(pTrans); TAOS_RETURN(terrno); } - ipWhiteMgtRemove(pUser->user); + (void)ipWhiteMgtRemove(pUser->user); mndTransDrop(pTrans); TAOS_RETURN(0); @@ -2830,7 +2830,10 @@ static int32_t mndLoopHash(SHashObj *hash, char *priType, SSDataBlock *pBlock, i } if (nodesStringToNode(value, &pAst) == 0) { - nodesNodeToSQL(pAst, *sql, bufSz, &sqlLen); + if (nodesNodeToSQL(pAst, *sql, bufSz, &sqlLen) != 0) { + sqlLen = 5; + (void)sprintf(*sql, "error"); + } nodesDestroyNode(pAst); } else { sqlLen = 5; diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 29eaa0509a..243f51ca0a 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -194,7 +194,7 @@ typedef enum { #define TD_SMA_LOOPS_CHECK(n, limit) \ if (++(n) > limit) { \ - sched_yield(); \ + (void)sched_yield(); \ (n) = 0; \ } diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index e707f2150d..75b112f42b 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -257,7 +257,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) { int32_t refVal = T_REF_VAL_GET(pRSmaInfo); if (refVal == 0) { - taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid)); + (void)taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid)); } else { smaDebug( "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index e7a8de17a0..a219da33db 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -46,7 +46,7 @@ int32_t smaInit() { old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2); if (old != 2) break; if (++nLoops > 1000) { - sched_yield(); + (void)sched_yield(); nLoops = 0; } } @@ -69,7 +69,7 @@ int32_t smaInit() { if (!smaMgmt.refHash || !smaMgmt.tmrHandle) { code = terrno; - taosCloseRef(smaMgmt.rsetId); + (void)taosCloseRef(smaMgmt.rsetId); if (smaMgmt.refHash) { taosHashCleanup(smaMgmt.refHash); smaMgmt.refHash = NULL; @@ -97,7 +97,7 @@ void smaCleanUp() { old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2); if (old != 2) break; if (++nLoops > 1000) { - sched_yield(); + (void)sched_yield(); nLoops = 0; } } @@ -130,7 +130,7 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) { : atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv); if ((code = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) { - tdFreeSmaEnv(pEnv); + (void)tdFreeSmaEnv(pEnv); *ppEnv = NULL; (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL) : atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL); @@ -179,7 +179,7 @@ static void tRSmaInfoHashFreeNode(void *data) { if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) { (void)taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES); } - tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo); + (void)tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo); } } @@ -207,7 +207,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat); pRSmaStat->pSma = (SSma *)pSma; atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); - tsem_init(&pRSmaStat->notEmpty, 0, 0); + (void)tsem_init(&pRSmaStat->notEmpty, 0, 0); if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) { code = TSDB_CODE_OUT_OF_MEMORY; TAOS_CHECK_GOTO(code, &lino, _exit); @@ -216,7 +216,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS (void)taosArrayPush(pRSmaStat->blocks, &datablock); // init smaMgmt - smaInit(); + TAOS_CHECK_GOTO(smaInit(), &lino, _exit); int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat); if (refId < 0) { @@ -285,20 +285,20 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } // step 3: - tdRsmaStopExecutor(pSma); + (void)tdRsmaStopExecutor(pSma); // step 4: destroy the rsma info and associated fetch tasks taosHashCleanup(RSMA_INFO_HASH(pStat)); // step 5: free pStat - tsem_destroy(&(pStat->notEmpty)); + (void)tsem_destroy(&(pStat->notEmpty)); taosArrayDestroy(pStat->blocks); taosMemoryFreeClear(pStat); } } static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { - tdDestroySmaState(pSmaStat, smaType); + (void)tdDestroySmaState(pSmaStat, smaType); if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { taosMemoryFreeClear(pSmaStat); } @@ -438,7 +438,7 @@ static int32_t tdRsmaStopExecutor(const SSma *pSma) { pthread = (TdThread *)&pStat->data; for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) { - tsem_post(&(pRSmaStat->notEmpty)); + (void)tsem_post(&(pRSmaStat->notEmpty)); } for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index d5409cf268..ebff03ff99 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -412,7 +412,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con _exit: if (code != 0) { - tdFreeRSmaInfo(pSma, pRSmaInfo); + (void)tdFreeRSmaInfo(pSma, pRSmaInfo); } else { smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid); } @@ -1264,7 +1264,7 @@ _checkpoint: if (pItem && pItem->pStreamTask) { SStreamTask *pTask = pItem->pStreamTask; // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); - streamTaskSetActiveCheckpointInfo(pTask, checkpointId); + (void)streamTaskSetActiveCheckpointInfo(pTask, checkpointId); pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId; pTask->chkInfo.checkpointVer = pItem->submitReqVer; @@ -1373,7 +1373,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ", rsetId:%d refId:%" PRIi64, SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId); if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { - taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + (void)taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); } tdReleaseRSmaInfo(pSma, pRSmaInfo); (void)tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId); @@ -1629,7 +1629,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { batchMax = TMAX(batchMax, 4); } while (occupied || (++batchCnt < batchMax)) { // greedy mode - taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock + (void)taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock int32_t qallItemSize = taosQallItemSize(pInfo->qall); if (qallItemSize > 0) { if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) { @@ -1663,7 +1663,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } if (qallItemSize > 0) { - atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); + (void)atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); continue; } if (RSMA_NEED_FETCH(pInfo)) { @@ -1673,7 +1673,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { break; } } - atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0); + (void)atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0); } } } else { diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 918fd9e5d3..96010728c2 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -123,7 +123,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg)); // create stable to save tsma result in dstVgId - tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + (void)tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); pReq.name = (char *)tNameGetTableName(&stbFullName); pReq.suid = pCfg->dstTbUid; pReq.schemaRow = pCfg->schemaRow; @@ -283,7 +283,7 @@ static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq * SEncoder encoder; tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len); - tEncodeSBatchDeleteReq(&encoder, pDelReq); + (void)tEncodeSBatchDeleteReq(&encoder, pDelReq); tEncoderClear(&encoder); ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode); diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index dda5173ad9..c7beee6e8a 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -39,13 +39,15 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa // alloc pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); if (pReader == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pReader->pTq = pTq; pReader->sver = sver; pReader->ever = ever; pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair)); + if (pReader->tdbTbList == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); + } STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK}; taosArrayPush(pReader->tdbTbList, &pair1); @@ -60,16 +62,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa if (code) { tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode), tstrerror(code)); - taosMemoryFree(pReader); - goto _err; + TAOS_CHECK_GOTO(code, NULL, _err); } code = tdbTbcMoveToFirst(pReader->pCur); if (code) { tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode), tstrerror(code)); - taosMemoryFree(pReader); - goto _err; + TAOS_CHECK_GOTO(code, NULL, _err); } tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode)); @@ -79,11 +79,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa _err: tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + streamTaskSnapReaderClose(pReader); *ppReader = NULL; return code; } int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { + if (pReader == NULL) return 0; + int32_t code = 0; tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode)); taosArrayDestroy(pReader->tdbTbList); @@ -116,6 +119,10 @@ NextTbl: break; } else { pVal = taosMemoryCalloc(1, tLen); + if (pVal == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } memcpy(pVal, tVal, tLen); vLen = tLen; } @@ -174,8 +181,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa // alloc pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pWriter->pTq = pTq; pWriter->sver = sver; @@ -184,12 +190,6 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa *ppWriter = pWriter; tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode)); return code; - -_err: - tqError("vgId:%d, vnode stream-task snapshot writer failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppWriter = NULL; - return code; - return 0; } int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { @@ -207,8 +207,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { if (code) goto _err; } - if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - code = -1; + if ((code = tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0)) < 0) { taosMemoryFree(pWriter); goto _err; } @@ -241,10 +240,11 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t int64_t key[2] = {taskId.streamId, taskId.taskId}; streamMetaWLock(pTq->pStreamMeta); - if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), - nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { + if ((code = + tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), + nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn)) < 0) { streamMetaWUnLock(pTq->pStreamMeta); - return -1; + return code; } streamMetaWUnLock(pTq->pStreamMeta); } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 3d344a8a20..bce8cda125 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -128,7 +128,7 @@ static STaskQueue taskQueue = {0}; static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; - execFn(pSchedMsg->thandle); + (void)execFn(pSchedMsg->thandle); taosFreeQitem(pSchedMsg); } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index c3eeb12209..0f158591b4 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -143,9 +143,9 @@ SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); -STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer); -void taskDbDestroy(void* pBackend, bool flush); -void taskDbDestroy2(void* pBackend); +int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb); +void taskDbDestroy(void* pBackend, bool flush); +void taskDbDestroy2(void* pBackend); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); @@ -252,7 +252,7 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId); -SBkdMgt* bkdMgtCreate(char* path); +int32_t bkdMgtCreate(char* path, SBkdMgt **bm); int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 869877c9a8..09f6573052 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2525,35 +2525,35 @@ _EXIT: return NULL; } -STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) { +int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) { char* statePath = NULL; char* dbPath = NULL; int code = 0; - terrno = 0; if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) { - terrno = code; stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, - chkptId, tstrerror(terrno)); - return NULL; + chkptId, tstrerror(code)); + return code; } STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); if (pTaskDb != NULL) { int64_t chkpId = -1, ver = -1; - if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0)) { + if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) { *processVer = ver; } else { - terrno = code; stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId, - tstrerror(terrno)); + tstrerror(code)); taskDbDestroy(pTaskDb, false); - return NULL; + return code; } + } else { + code = TSDB_CODE_INVALID_PARA; } taosMemoryFree(dbPath); taosMemoryFree(statePath); - return pTaskDb; + *ppTaskDb = pTaskDb; + return code; } void taskDbDestroy(void* pDb, bool flush) { @@ -2794,8 +2794,10 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) { int32_t code = 0; int64_t processVer = -1; - STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0, &processVer); - RocksdbCfInst* pSrcBackend = pCfInst; + STaskDbWrapper* pTaskDb = NULL; + + code = taskDbOpen(path, key, 0, &processVer, &pTaskDb); + RocksdbCfInst* pSrcBackend = pCfInst; for (int i = 0; i < nCf; i++) { rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i]; @@ -4626,10 +4628,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { void dbChkpDestroy(SDbChkp* pChkp); -SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { +int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) { + int32_t code = 0; SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } @@ -4637,41 +4640,41 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { p->preCkptId = -1; p->pSST = taosArrayInit(64, sizeof(void*)); if (p->pSST == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; dbChkpDestroy(p); - return NULL; + return code; } p->path = path; p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); if (p->buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } p->idx = 0; p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (p->pSstTbl[0] == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (p->pSstTbl[1] == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } p->pAdd = taosArrayInit(64, sizeof(void*)); if (p->pAdd == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } p->pDel = taosArrayInit(64, sizeof(void*)); if (p->pDel == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } @@ -4679,15 +4682,15 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { taosThreadRwlockInit(&p->rwLock, NULL); SArray* list = NULL; - int32_t code = dbChkpGetDelta(p, initChkpId, list); + code = dbChkpGetDelta(p, initChkpId, list); if (code != 0) { goto _EXIT; } - - return p; + *ppChkp = p; + return code; _EXIT: dbChkpDestroy(p); - return NULL; + return code; } void dbChkpDestroy(SDbChkp* pChkp) { @@ -4880,35 +4883,36 @@ _ERROR: return code; } -SBkdMgt* bkdMgtCreate(char* path) { - terrno = 0; +int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) { + int32_t code = 0; SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (p->pDbChkpTbl == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; bkdMgtDestroy(p); - return NULL; + return code; } p->path = taosStrdup(path); if (p->path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; bkdMgtDestroy(p); - return NULL; + return code; } if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); bkdMgtDestroy(p); - return NULL; + return code; } + *mgt = p; - return p; + return code; } void bkdMgtDestroy(SBkdMgt* bm) { @@ -4949,11 +4953,11 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, return code; } - SDbChkp* p = dbChkpCreate(path, chkpId); - if (p == NULL) { + SDbChkp* p = NULL; + code = dbChkpCreate(path, chkpId, &p); + if (code != 0) { taosMemoryFree(path); taosThreadRwlockUnlock(&bm->rwLock); - code = terrno; return code; } @@ -4986,8 +4990,9 @@ int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) { taosThreadRwlockWrlock(&bm->rwLock); SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); if (pp == NULL) { - SDbChkp* p = dbChkpCreate(path, 0); - if (p != NULL) { + SDbChkp* p = NULL; + code = dbChkpCreate(path, 0, &p); + if (code != 0) { taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*)); code = 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7b410501ca..81640e0050 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -68,12 +68,12 @@ static void streamMetaEnvInit() { } } -void streamMetaInit() { (void) taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } +void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaCleanup() { - (void) taosCloseRef(streamBackendId); - (void) taosCloseRef(streamBackendCfWrapperId); - (void) taosCloseRef(streamMetaId); + (void)taosCloseRef(streamBackendId); + (void)taosCloseRef(streamBackendCfWrapperId); + (void)taosCloseRef(streamMetaId); metaRefMgtCleanup(); streamTimerCleanUp(); @@ -128,12 +128,12 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); if (code) { - stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t) vgId, *rid); + stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid); return code; } } else { SArray* list = *(SArray**)p; - void* px = taosArrayPush(list, &rid); + void* px = taosArrayPush(list, &rid); if (px == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; } @@ -186,7 +186,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { code = tdbTbcMoveToFirst(pCur); if (code) { - (void) tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId); return ret; } @@ -215,7 +215,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); - (void) tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); return ret; } @@ -276,6 +276,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { } int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) { + int32_t code = 0; int64_t chkpId = pTask->chkInfo.checkpointId; streamMutexLock(&pMeta->backendMutex); @@ -299,8 +300,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) STaskDbWrapper* pBackend = NULL; int64_t processVer = -1; while (1) { - pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer); - if (pBackend != NULL) { + code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend); + if (code == 0) { break; } @@ -319,7 +320,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) if (processVer != -1) pTask->chkInfo.processedVer = processVer; - int32_t code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); + code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); if (code) { stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId); } @@ -469,8 +470,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); - pMeta->bkdChkptMgt = bkdMgtCreate(tpath); - if (pMeta->bkdChkptMgt == NULL) { + code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); + if (code != 0) { goto _err; } @@ -485,7 +486,7 @@ _err: if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb); - if (pMeta->db) (void) tdbClose(pMeta->db); + if (pMeta->db) (void)tdbClose(pMeta->db); if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); @@ -531,7 +532,7 @@ void streamMetaClear(SStreamMeta* pMeta) { // release the ref by timer if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); - (void) taosTmrStop(p->schedInfo.pDelayTimer); + (void)taosTmrStop(p->schedInfo.pDelayTimer); p->info.delaySchedParam = 0; streamMetaReleaseTask(pMeta, p); } @@ -566,7 +567,7 @@ void streamMetaClose(SStreamMeta* pMeta) { if (pMeta == NULL) { return; } - (void) taosRemoveRef(streamMetaId, pMeta->rid); + (void)taosRemoveRef(streamMetaId, pMeta->rid); } void streamMetaCloseImpl(void* arg) { @@ -583,10 +584,10 @@ void streamMetaCloseImpl(void* arg) { streamMetaWUnLock(pMeta); // already log the error, ignore here - (void) tdbAbort(pMeta->db, pMeta->txn); - (void) tdbTbClose(pMeta->pTaskDb); - (void) tdbTbClose(pMeta->pCheckpointDb); - (void) tdbClose(pMeta->db); + (void)tdbAbort(pMeta->db, pMeta->txn); + (void)tdbTbClose(pMeta->pTaskDb); + (void)tdbTbClose(pMeta->pCheckpointDb); + (void)tdbClose(pMeta->db); taosArrayDestroy(pMeta->pTaskList); taosArrayDestroy(pMeta->chkpSaved); @@ -610,7 +611,7 @@ void streamMetaCloseImpl(void* arg) { bkdMgtDestroy(pMeta->bkdChkptMgt); pMeta->role = NODE_ROLE_UNINIT; - (void) taosThreadRwlockDestroy(&pMeta->lock); + (void)taosThreadRwlockDestroy(&pMeta->lock); taosMemoryFree(pMeta); stDebug("vgId:%d end to close stream meta", vgId); @@ -691,13 +692,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa p = taosArrayPush(pMeta->pTaskList, &pTask->id); if (p == NULL) { - stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId); + stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); return TSDB_CODE_OUT_OF_MEMORY; } code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); if (code) { - stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId); + stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); return code; } @@ -710,7 +711,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if (pTask->info.fillHistory == 0) { - (void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } *pAdded = true; @@ -779,7 +780,7 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - (void) streamTaskSendCheckpointSourceRsp(pTask); + (void)streamTaskSendCheckpointSourceRsp(pTask); } return 0; } @@ -802,7 +803,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } // handle the dropping event - (void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); + (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); } else { stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); streamMetaWUnLock(pMeta); @@ -841,12 +842,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t pTask = *ppTask; // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { - (void) atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); + (void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } - (void) taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + (void)taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - (void) streamMetaRemoveTask(pMeta, &id); + (void)streamMetaRemoveTask(pMeta, &id); ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); streamMetaWUnLock(pMeta); @@ -854,7 +855,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ASSERT(pTask->status.timerActive == 0); if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); - (void) taosTmrStop(pTask->schedInfo.pDelayTimer); + (void)taosTmrStop(pTask->schedInfo.pDelayTimer); pTask->info.delaySchedParam = 0; streamMetaReleaseTask(pMeta, pTask); } @@ -915,7 +916,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { code = tdbTbcMoveToFirst(pCur); if (code) { - (void) tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId); return checkpointId; } @@ -953,7 +954,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { SDecoder decoder; int32_t vgId = 0; int32_t code = 0; - SArray* pRecycleList = NULL; + SArray* pRecycleList = NULL; if (pMeta == NULL) { return; @@ -975,7 +976,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (code) { stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); taosArrayDestroy(pRecycleList); - (void) tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); return; } @@ -1008,7 +1009,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { tFreeStreamTask(pTask); STaskId id = streamTaskGetTaskId(pTask); - (void) taosArrayPush(pRecycleList, &id); + (void)taosArrayPush(pRecycleList, &id); int32_t total = taosArrayGetSize(pRecycleList); stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); @@ -1029,7 +1030,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - (void) taosArrayPush(pMeta->pTaskList, &pTask->id); + (void)taosArrayPush(pMeta->pTaskList, &pTask->id); } else { // todo this should replace the existed object put by replay creating stream task msg from mnode stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); @@ -1039,17 +1040,17 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) { stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); - (void) taosArrayPop(pMeta->pTaskList); + (void)taosArrayPop(pMeta->pTaskList); tFreeStreamTask(pTask); continue; } if (pTask->info.fillHistory == 0) { - (void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); + (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } if (streamTaskShouldPause(pTask)) { - (void) atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); + (void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); } ASSERT(pTask->status.downstreamReady == 0); @@ -1065,7 +1066,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosArrayGetSize(pRecycleList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { STaskId* pId = taosArrayGet(pRecycleList, i); - (void) streamMetaRemoveTask(pMeta, pId); + (void)streamMetaRemoveTask(pMeta, pId); } } @@ -1093,7 +1094,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->status.timerActive >= 1) { stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId); - (void) streamTaskStop(pTask); + (void)streamTaskStop(pTask); inTimer = true; } } @@ -1126,7 +1127,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr); - (void) streamTaskStop(pTask); + (void)streamTaskStop(pTask); } streamMetaWUnLock(pMeta); @@ -1173,7 +1174,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { void streamMetaRLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-rlock", pMeta->vgId); - (void) taosThreadRwlockRdlock(&pMeta->lock); + (void)taosThreadRwlockRdlock(&pMeta->lock); } void streamMetaRUnLock(SStreamMeta* pMeta) { @@ -1188,13 +1189,13 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wlock", pMeta->vgId); - (void) taosThreadRwlockWrlock(&pMeta->lock); + (void)taosThreadRwlockWrlock(&pMeta->lock); // stTrace("vgId:%d meta-wlock completed", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wunlock", pMeta->vgId); - (void) taosThreadRwlockUnlock(&pMeta->lock); + (void)taosThreadRwlockUnlock(&pMeta->lock); } int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { @@ -1320,7 +1321,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); continue; } @@ -1343,7 +1344,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); continue; } @@ -1361,10 +1362,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", pTask->id.idStr); - (void) streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? + (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? } - (void) streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true); + (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, + true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -1420,14 +1422,14 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = NULL; + SStreamTask* pTask = NULL; code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if (code != TSDB_CODE_SUCCESS) { continue; } - (void) streamTaskStop(pTask); + (void)streamTaskStop(pTask); streamMetaReleaseTask(pMeta, pTask); } @@ -1467,7 +1469,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); - (void) streamMetaAddFailedTask(pMeta, streamId, taskId); + (void)streamMetaAddFailedTask(pMeta, streamId, taskId); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1558,9 +1560,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; - int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); + int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); if (code) { - } int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); @@ -1632,9 +1633,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta streamMetaRUnLock(pMeta); // add the failed task info, along with the related fill-history task info into tasks list. - (void) streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + (void)streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); if (hasFillhistoryTask) { - (void) streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + (void)streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); } } else { streamMetaRUnLock(pMeta); @@ -1649,12 +1650,12 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { int32_t startTs = pTask->execInfo.checkTs; - (void) streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - (void) streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); } } @@ -1662,7 +1663,7 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt int64_t startTs) { const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; - int32_t code = 0; + int32_t code = 0; // keep the already updated info STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId}; diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 1b2f961726..2b21510e45 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -69,7 +69,7 @@ void *backendOpen() { key.ts = ts; const char *val = "value data"; int32_t vlen = strlen(val); - int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen); + int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen); ASSERT(code == 0); tsArray.push_back(ts); @@ -83,7 +83,7 @@ void *backendOpen() { const char *val = "value data"; int32_t len = 0; char *newVal = NULL; - int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); + int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(code == 0); ASSERT(len == strlen(val)); @@ -377,7 +377,7 @@ TEST_F(BackendEnv, checkOpen) { char val[128] = {0}; sprintf(val, "val_%d", i); int32_t code = streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, - (int32_t)(strlen(val)), tsStart + 100000); + (int32_t)(strlen(val)), tsStart + 100000); ASSERT(code == 0); } @@ -396,7 +396,7 @@ TEST_F(BackendEnv, checkOpen) { char val[128] = {0}; sprintf(val, "val_%d", i); int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, - (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); + (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); ASSERT(code == 0); } int32_t code = streamStatePutBatch_rocksdb(p, pBatch); @@ -417,7 +417,7 @@ TEST_F(BackendEnv, checkOpen) { char val[128] = {0}; sprintf(val, "val_%d", i); int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, - (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); + (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); ASSERT(code == 0); } code = streamStatePutBatch_rocksdb(p, pBatch); @@ -432,13 +432,12 @@ TEST_F(BackendEnv, checkOpen) { const char *path = "/tmp/backend/stream"; const char *dump = "/tmp/backend/stream/dump"; // taosMkDir(dump); - code = taosMulMkDir(dump); - ASSERT(code == 0); + taosMulMkDir(dump); + SBkdMgt *mgt = NULL; - SBkdMgt *mgt = bkdMgtCreate((char *)path); - SArray *result = taosArrayInit(4, sizeof(void *)); - code = bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); - ASSERT(code == 0); + code = bkdMgtCreate((char *)path, &mgt); + SArray *result = taosArrayInit(4, sizeof(void *)); + bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); ASSERT(code == 0); @@ -475,7 +474,7 @@ TEST_F(BackendEnv, backendUtil) { } TEST_F(BackendEnv, oldBackendInit) { const char *path = "/tmp/backend1"; - int32_t code = taosMulMkDir(path); + int32_t code = taosMulMkDir(path); ASSERT(code == 0); { diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 59171876ff..4360fc7d54 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -53,7 +53,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) { void *p = NULL; // SBackendWrapper *p = streamBackendInit(streamPath, -1, 2); // p = taskDbOpen((char *)streamPath, (char *)"test", -1); - p = bkdMgtCreate((char *)streamPath); + int32_t code = bkdMgtCreate((char *)streamPath, (SBkdMgt **)&p); // const int64_t interval = 20 * 1000; // const int64_t watermark = 10 * 60 * 1000; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 8b6092e839..ef2cbece79 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -134,7 +134,7 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI } SSnapshot snapshot = {0}; - (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // TODO: check the return code + (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); if (prevIndex == snapshot.lastApplyIndex) { *pSyncTerm = snapshot.lastApplyTerm; return 0; @@ -184,7 +184,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { int32_t code = 0, lino = 0; SSnapshot snapshot = {0}; - pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot)); SyncIndex commitIndex = snapshot.lastApplyIndex; SyncTerm commitTerm = TMAX(snapshot.lastApplyTerm, 0); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 450d22528d..8a2c53997b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -86,14 +86,17 @@ int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSn pSender->replicaIndex = replicaIndex; pSender->term = raftStoreGetTerm(pSyncNode); pSender->startTime = -1; - pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; + code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); + if (code != 0) { + taosMemoryFreeClear(pSender); + TAOS_RETURN(code); + } SSyncSnapBuffer *pSndBuf = NULL; code = syncSnapBufferCreate(&pSndBuf); if (pSndBuf == NULL) { - taosMemoryFree(pSender); - pSender = NULL; + taosMemoryFreeClear(pSender); TAOS_RETURN(code); } pSndBuf->entryDeleteCb = syncSnapBlockDestroy; @@ -472,7 +475,7 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { syncSnapBufferDestroy(&pReceiver->pRcvBuf); } - snapshotReceiverClearInfoData(pReceiver); + (void)snapshotReceiverClearInfoData(pReceiver); // free receiver taosMemoryFree(pReceiver); @@ -592,7 +595,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, &pReceiver->snapshot); if (code != 0) { - sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code)); + sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code)); TAOS_RETURN(code); } pReceiver->pWriter = NULL; @@ -603,7 +606,11 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap // get fsmState SSnapshot snapshot = {0}; - pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); + code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); + if (code != 0) { + sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code)); + TAOS_RETURN(code); + } pReceiver->pSyncNode->fsmState = snapshot.state; // reset wal @@ -1276,13 +1283,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { sSInfo(pSender, "process end rsp"); snapshotSenderStop(pSender, true); - syncNodeReplicateReset(pSyncNode, &pMsg->srcId); + (void)syncNodeReplicateReset(pSyncNode, &pMsg->srcId); } return 0; _ERROR: snapshotSenderStop(pSender, false); - syncNodeReplicateReset(pSyncNode, &pMsg->srcId); + (void)syncNodeReplicateReset(pSyncNode, &pMsg->srcId); TAOS_RETURN(code); } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 2076de6ec7..49737b9045 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -183,7 +183,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { - pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); } SyncIndex logLastIndex = SYNC_INDEX_INVALID; @@ -253,7 +253,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { - pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); } SyncIndex logLastIndex = SYNC_INDEX_INVALID; @@ -302,7 +302,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { - pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); } SyncIndex logLastIndex = SYNC_INDEX_INVALID; diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index ede7636011..f81bbbdeb7 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -483,7 +483,7 @@ const STfsFile *tfsReaddir(STfsDir *pTfsDir) { void tfsClosedir(STfsDir *pTfsDir) { if (pTfsDir) { if (pTfsDir->pDir != NULL) { - taosCloseDir(&pTfsDir->pDir); + (void)taosCloseDir(&pTfsDir->pDir); pTfsDir->pDir = NULL; } taosMemoryFree(pTfsDir);