ehn: remove void

This commit is contained in:
Hongze Cheng 2024-09-24 10:06:30 +08:00
parent 3312eec628
commit 645160d251
14 changed files with 174 additions and 77 deletions

View File

@ -177,7 +177,7 @@ int32_t walRollback(SWal *, int64_t ver);
int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention); int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
int32_t walEndSnapshot(SWal *); int32_t walEndSnapshot(SWal *);
int32_t walRestoreFromSnapshot(SWal *, int64_t ver); int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
int32_t walApplyVer(SWal *, int64_t ver); void walApplyVer(SWal *, int64_t ver);
// wal reader // wal reader
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id); SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);

View File

@ -32,10 +32,14 @@ static void removeEmptyDir() {
empty = false; empty = false;
} }
if (empty) taosRemoveDir(filename); if (empty) taosRemoveDir(filename);
(void)taosCloseDir(&pDirTmp); if (taosCloseDir(&pDirTmp) != 0) {
uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
}
} }
(void)taosCloseDir(&pDir); if (taosCloseDir(&pDir) != 0) {
uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
}
} }
#ifdef WINDOWS #ifdef WINDOWS
@ -297,7 +301,7 @@ int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId)
path, el); path, el);
} }
if (code != TSDB_CODE_SUCCESS) { // if failed, try to load it from data directory if (code != TSDB_CODE_SUCCESS) { // if failed, try to load it from data directory
#ifdef WINDOWS #ifdef WINDOWS
memset(pathTransform, 0, PATH_MAX); memset(pathTransform, 0, PATH_MAX);
changeDirFromWindowsToLinux(path, pathTransform); changeDirFromWindowsToLinux(path, pathTransform);

View File

@ -235,7 +235,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port; pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER; pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN); tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
pCfg->syncCfg.replicaNum++; pCfg->syncCfg.replicaNum++;
} }
if (pCreate->selfIndex != -1) { if (pCreate->selfIndex != -1) {
@ -247,7 +247,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port; pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER; pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN); tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
pCfg->syncCfg.totalReplicaNum++; pCfg->syncCfg.totalReplicaNum++;
} }
pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum; pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;

View File

@ -143,7 +143,7 @@ typedef struct STbUidStore STbUidStore;
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
int metaUpgrade(SVnode* pVnode, SMeta** ppMeta); int metaUpgrade(SVnode* pVnode, SMeta** ppMeta);
int metaClose(SMeta** pMeta); void metaClose(SMeta** pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys); int metaBegin(SMeta* pMeta, int8_t fromSys);
TXN* metaGetTxn(SMeta* pMeta); TXN* metaGetTxn(SMeta* pMeta);
int metaCommit(SMeta* pMeta, TXN* txn); int metaCommit(SMeta* pMeta, TXN* txn);
@ -207,7 +207,7 @@ int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo, SMetaReader* pR
// tsdb // tsdb
int32_t tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback, bool force); int32_t tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback, bool force);
int32_t tsdbClose(STsdb** pTsdb); void tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbBegin(STsdb* pTsdb);
// int32_t tsdbPrepareCommit(STsdb* pTsdb); // int32_t tsdbPrepareCommit(STsdb* pTsdb);
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
@ -284,7 +284,7 @@ int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg);
int32_t smaInit(); int32_t smaInit();
void smaCleanUp(); void smaCleanUp();
int32_t smaOpen(SVnode* pVnode, int8_t rollback, bool force); int32_t smaOpen(SVnode* pVnode, int8_t rollback, bool force);
int32_t smaClose(SSma* pSma); void smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma); int32_t smaBegin(SSma* pSma);
int32_t smaPrepareAsyncCommit(SSma* pSma); int32_t smaPrepareAsyncCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
@ -314,7 +314,7 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapReader ======================================== // STsdbSnapReader ========================================
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges, int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
STsdbSnapReader** ppReader); STsdbSnapReader** ppReader);
void tsdbSnapReaderClose(STsdbSnapReader** ppReader); void tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ======================================== // STsdbSnapWriter ========================================
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** ppWriter);
@ -323,7 +323,7 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapRAWReader ======================================== // STsdbSnapRAWReader ========================================
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader); int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader);
void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader); void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader);
int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData); int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData);
// STsdbSnapRAWWriter ======================================== // STsdbSnapRAWWriter ========================================
int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** ppWriter); int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** ppWriter);

View File

@ -169,9 +169,9 @@ _exit:
return code; return code;
} }
int metaClose(SMeta **ppMeta) { void metaClose(SMeta **ppMeta) {
metaCleanup(ppMeta); metaCleanup(ppMeta);
return 0; return;
} }
int metaAlterCache(SMeta *pMeta, int32_t nPage) { int metaAlterCache(SMeta *pMeta, int32_t nPage) {

View File

@ -171,7 +171,7 @@ _exit:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
int32_t smaClose(SSma *pSma) { void smaClose(SSma *pSma) {
if (pSma) { if (pSma) {
TAOS_UNUSED(smaPreClose(pSma)); TAOS_UNUSED(smaPreClose(pSma));
(void)taosThreadMutexDestroy(&pSma->mutex); (void)taosThreadMutexDestroy(&pSma->mutex);
@ -182,7 +182,7 @@ int32_t smaClose(SSma *pSma) {
if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma)); if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
taosMemoryFreeClear(pSma); taosMemoryFreeClear(pSma);
} }
return 0; return;
} }
/** /**

View File

@ -102,7 +102,7 @@ _exit:
return code; return code;
} }
int32_t tsdbClose(STsdb **pTsdb) { void tsdbClose(STsdb **pTsdb) {
if (*pTsdb) { if (*pTsdb) {
STsdb *pdb = *pTsdb; STsdb *pdb = *pTsdb;
tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path, tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path,
@ -121,5 +121,5 @@ int32_t tsdbClose(STsdb **pTsdb) {
(void)taosThreadMutexDestroy(&(*pTsdb)->mutex); (void)taosThreadMutexDestroy(&(*pTsdb)->mutex);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);
} }
return 0; return;
} }

View File

@ -91,7 +91,9 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
struct timeval tv; struct timeval tv;
struct timespec ts; struct timespec ts;
(void)taosGetTimeOfDay(&tv); if (taosGetTimeOfDay(&tv) != 0) {
continue;
}
ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000; ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
if (ts.tv_nsec > 999999999l) { if (ts.tv_nsec > 999999999l) {
ts.tv_sec = tv.tv_sec + 1; ts.tv_sec = tv.tv_sec + 1;

View File

@ -24,10 +24,10 @@ struct SVHashEntry {
void* obj; void* obj;
}; };
static int32_t vHashRehash(SVHashTable* ht, uint32_t newNumBuckets) { static void vHashRehash(SVHashTable* ht, uint32_t newNumBuckets) {
SVHashEntry** newBuckets = (SVHashEntry**)taosMemoryCalloc(newNumBuckets, sizeof(SVHashEntry*)); SVHashEntry** newBuckets = (SVHashEntry**)taosMemoryCalloc(newNumBuckets, sizeof(SVHashEntry*));
if (newBuckets == NULL) { if (newBuckets == NULL) {
return terrno; return;
} }
for (int32_t i = 0; i < ht->numBuckets; i++) { for (int32_t i = 0; i < ht->numBuckets; i++) {
@ -45,7 +45,7 @@ static int32_t vHashRehash(SVHashTable* ht, uint32_t newNumBuckets) {
ht->buckets = newBuckets; ht->buckets = newBuckets;
ht->numBuckets = newNumBuckets; ht->numBuckets = newNumBuckets;
return 0; return;
} }
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)) { int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)) {
@ -96,7 +96,7 @@ int32_t vHashPut(SVHashTable* ht, void* obj) {
} }
if (ht->numEntries >= ht->numBuckets) { if (ht->numEntries >= ht->numBuckets) {
(void)vHashRehash(ht, ht->numBuckets * 2); vHashRehash(ht, ht->numBuckets * 2);
bucketIndex = ht->hash(obj) % ht->numBuckets; bucketIndex = ht->hash(obj) % ht->numBuckets;
} }
@ -142,7 +142,7 @@ int32_t vHashDrop(SVHashTable* ht, const void* obj) {
taosMemoryFree(tmp); taosMemoryFree(tmp);
ht->numEntries--; ht->numEntries--;
if (ht->numBuckets > VNODE_HASH_DEFAULT_NUM_BUCKETS && ht->numEntries < ht->numBuckets / 4) { if (ht->numBuckets > VNODE_HASH_DEFAULT_NUM_BUCKETS && ht->numEntries < ht->numBuckets / 4) {
(void)vHashRehash(ht, ht->numBuckets / 2); vHashRehash(ht, ht->numBuckets / 2);
} }
return 0; return 0;
} }

View File

@ -108,7 +108,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
pNode->nodePort = pReq->replicas[i].port; pNode->nodePort = pReq->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER; pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
pCfg->replicaNum++; pCfg->replicaNum++;
} }
@ -121,7 +121,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port; pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER; pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
pCfg->totalReplicaNum++; pCfg->totalReplicaNum++;
} }
@ -176,8 +176,10 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
int32_t prefixLen = strlen(tsdbFilePrefix); int32_t prefixLen = strlen(tsdbFilePrefix);
STfsDir *tsdbDir = NULL; STfsDir *tsdbDir = NULL;
(void)tfsOpendir(pTfs, tsdbPath, &tsdbDir); int32_t tret = tfsOpendir(pTfs, tsdbPath, &tsdbDir);
if (tsdbDir == NULL) return 0; if (tsdbDir == NULL) {
return 0;
}
while (1) { while (1) {
const STfsFile *tsdbFile = tfsReaddir(tsdbDir); const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
@ -248,7 +250,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
SNodeInfo *pNode = &pCfg->nodeInfo[0]; SNodeInfo *pNode = &pCfg->nodeInfo[0];
pNode->nodePort = tsServerPort; pNode->nodePort = tsServerPort;
tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN); tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); bool ret1 = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
info.config.syncCfg = *pCfg; info.config.syncCfg = *pCfg;
@ -317,7 +319,9 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) { void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
vInfo("path:%s is removed while destroy vnode", path); vInfo("path:%s is removed while destroy vnode", path);
(void)tfsRmdir(pTfs, path); if (tfsRmdir(pTfs, path) < 0) {
vError("failed to remove path:%s since %s", path, tstrerror(terrno));
}
// int32_t nlevel = tfsGetLevel(pTfs); // int32_t nlevel = tfsGetLevel(pTfs);
if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) { if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
@ -378,8 +382,13 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
} }
if (updated) { if (updated) {
vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId); vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
(void)vnodeSaveInfo(dir, &info); if (vnodeSaveInfo(dir, &info) < 0) {
(void)vnodeCommitInfo(dir); vError("vgId:%d, failed to save vnode info since %s", info.config.vgId, tstrerror(terrno));
}
if (vnodeCommitInfo(dir) < 0) {
vError("vgId:%d, failed to commit vnode info since %s", info.config.vgId, tstrerror(terrno));
}
} }
// create handle // create handle
@ -440,7 +449,9 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
// open wal // open wal
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
(void)taosRealPath(tdir, NULL, sizeof(tdir)); if (taosRealPath(tdir, NULL, sizeof(tdir)) != 0) {
vError("vgId:%d, failed to get wal dir since %s", TD_VID(pVnode), tstrerror(terrno));
}
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) { if (pVnode->pWal == NULL) {
@ -450,7 +461,9 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
// open tq // open tq
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
(void)taosRealPath(tdir, NULL, sizeof(tdir)); if (taosRealPath(tdir, NULL, sizeof(tdir)) != 0) {
vError("vgId:%d, failed to get tq dir since %s", TD_VID(pVnode), tstrerror(terrno));
}
// open query // open query
if (vnodeQueryOpen(pVnode)) { if (vnodeQueryOpen(pVnode)) {
@ -499,9 +512,9 @@ _err:
if (pVnode->pQuery) vnodeQueryClose(pVnode); if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq); if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) (void)tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) (void)smaClose(pVnode->pSma); if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pMeta) (void)metaClose(&pVnode->pMeta); if (pVnode->pMeta) metaClose(&pVnode->pMeta);
if (pVnode->freeList) vnodeCloseBufPool(pVnode); if (pVnode->freeList) vnodeCloseBufPool(pVnode);
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
@ -518,13 +531,16 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeAWait(&pVnode->commitTask); vnodeAWait(&pVnode->commitTask);
(void)vnodeAChannelDestroy(&pVnode->commitChannel, true); if (vnodeAChannelDestroy(&pVnode->commitChannel, true) != 0) {
vError("vgId:%d, failed to destroy commit channel", TD_VID(pVnode));
}
vnodeSyncClose(pVnode); vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
tqClose(pVnode->pTq); tqClose(pVnode->pTq);
walClose(pVnode->pWal); walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
(void)smaClose(pVnode->pSma); smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(&pVnode->pMeta); if (pVnode->pMeta) metaClose(&pVnode->pMeta);
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);

View File

@ -210,7 +210,8 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
(void)tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq); code = tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq);
TSDB_CHECK_CODE(code, lino, _exit);
pContNew->contLen = htonl(reqLenNew); pContNew->contLen = htonl(reqLenNew);
pContNew->vgId = pContOld->vgId; pContNew->vgId = pContOld->vgId;
@ -420,7 +421,11 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
((SMsgHead *)pCont)->vgId = TD_VID(pVnode); ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size); tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
(void)tEncodeDeleteRes(pCoder, &res); code = tEncodeDeleteRes(pCoder, &res);
if (code) {
vError("vgId:%d %s failed to encode delete response since %s", TD_VID(pVnode), __func__, tstrerror(code));
goto _exit;
}
tEncoderClear(pCoder); tEncoderClear(pCoder);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
@ -466,7 +471,11 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
(void)vnodePreCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token); int32_t ret = vnodePreCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token);
if (ret != 0) {
vError("vgId:%d, failed to preprocess arb check sync request since %s", TD_VID(pVnode), tstrerror(ret));
}
int32_t code = terrno; int32_t code = terrno;
tFreeSVArbCheckSyncReq(&syncReq); tFreeSVArbCheckSyncReq(&syncReq);
@ -643,7 +652,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
} break; } break;
case TDMT_STREAM_CONSEN_CHKPT: { case TDMT_STREAM_CONSEN_CHKPT: {
if (pVnode->restored) { if (pVnode->restored) {
(void)tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg); if (tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg) < 0) {
goto _err;
}
} }
} break; } break;
case TDMT_STREAM_TASK_PAUSE: { case TDMT_STREAM_TASK_PAUSE: {
@ -660,7 +671,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
} break; } break;
case TDMT_VND_STREAM_TASK_RESET: { case TDMT_VND_STREAM_TASK_RESET: {
if (pVnode->restored && vnodeIsLeader(pVnode)) { if (pVnode->restored && vnodeIsLeader(pVnode)) {
(void)tqProcessTaskResetReq(pVnode->pTq, pMsg); if (tqProcessTaskResetReq(pVnode->pTq, pMsg) < 0) {
goto _err;
}
} }
} break; } break;
case TDMT_VND_ALTER_CONFIRM: case TDMT_VND_ALTER_CONFIRM:
@ -710,7 +723,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code, vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
ver); ver);
(void)walApplyVer(pVnode->pWal, ver); walApplyVer(pVnode->pWal, ver);
code = tqPushMsg(pVnode->pTq, pMsg->msgType); code = tqPushMsg(pVnode->pTq, pMsg->msgType);
if (code) { if (code) {
@ -881,7 +894,10 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
} }
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
(void)tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
if (code) {
vError("failed to process sma result since %s", tstrerror(code));
}
} }
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
@ -957,7 +973,10 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
int32_t code = metaDropTables(pVnode->pMeta, ttlReq.pTbUids); int32_t code = metaDropTables(pVnode->pMeta, ttlReq.pTbUids);
if (code) return code; if (code) return code;
(void)tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false); code = tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
if (code) {
vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(code));
}
} }
end: end:
@ -1160,7 +1179,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
} }
} else { } else {
cRsp.code = TSDB_CODE_SUCCESS; cRsp.code = TSDB_CODE_SUCCESS;
(void)tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); if (tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid) < 0) {
vError("vgId:%d, failed to fetch tbUid list", TD_VID(pVnode));
}
if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) { if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
rcode = -1; rcode = -1;
@ -1177,11 +1198,13 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
} }
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids)); vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
(void)tqUpdateTbUidList(pVnode->pTq, tbUids, true); if (tqUpdateTbUidList(pVnode->pTq, tbUids, true) < 0) {
vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(terrno));
}
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) { if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
goto _exit; goto _exit;
} }
(void)tdUidStoreFree(pStore); pStore = tdUidStoreFree(pStore);
// prepare rsp // prepare rsp
int32_t ret = 0; int32_t ret = 0;
@ -1193,13 +1216,20 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
goto _exit; goto _exit;
} }
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen); tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
(void)tEncodeSVCreateTbBatchRsp(&encoder, &rsp); if (tEncodeSVCreateTbBatchRsp(&encoder, &rsp) != 0) {
vError("vgId:%d, failed to encode create table batch response", TD_VID(pVnode));
}
if (tsEnableAudit && tsEnableAuditCreateTable) { if (tsEnableAudit && tsEnableAuditCreateTable) {
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0}; SName name = {0};
(void)tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB); int32_t code = tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
if (code) {
terrno = code;
rcode = -1;
goto _exit;
}
SStringBuilder sb = {0}; SStringBuilder sb = {0};
for (int32_t i = 0; i < tbNames->size; i++) { for (int32_t i = 0; i < tbNames->size; i++) {
@ -1347,7 +1377,9 @@ _exit:
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen); tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
(void)tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp); if (tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp) != 0) {
vError("vgId:%d, failed to encode alter table response", TD_VID(pVnode));
}
tEncoderClear(&ec); tEncoderClear(&ec);
if (vMetaRsp.pSchemas) { if (vMetaRsp.pSchemas) {
taosMemoryFree(vMetaRsp.pSchemas); taosMemoryFree(vMetaRsp.pSchemas);
@ -1402,7 +1434,11 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
} }
} else { } else {
dropTbRsp.code = TSDB_CODE_SUCCESS; dropTbRsp.code = TSDB_CODE_SUCCESS;
if (tbUid > 0) (void)tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid); if (tbUid > 0) {
if (tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid) < 0) {
vError("vgId:%d, failed to fetch tbUid list", TD_VID(pVnode));
}
}
} }
if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) { if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) {
@ -1426,14 +1462,21 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
} }
} }
(void)tqUpdateTbUidList(pVnode->pTq, tbUids, false); if (tqUpdateTbUidList(pVnode->pTq, tbUids, false) < 0) {
(void)tdUpdateTbUidList(pVnode->pSma, pStore, false); vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(terrno));
}
if (tdUpdateTbUidList(pVnode->pSma, pStore, false) < 0) {
goto _exit;
}
if (tsEnableAuditCreateTable) { if (tsEnableAuditCreateTable) {
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0}; SName name = {0};
(void)tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB); if (tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB) != 0) {
vError("vgId:%d, failed to get name from string", TD_VID(pVnode));
}
SStringBuilder sb = {0}; SStringBuilder sb = {0};
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
@ -1457,12 +1500,14 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
_exit: _exit:
taosArrayDestroy(tbUids); taosArrayDestroy(tbUids);
(void)tdUidStoreFree(pStore); pStore = tdUidStoreFree(pStore);
tDecoderClear(&decoder); tDecoderClear(&decoder);
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen); tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
(void)tEncodeSVDropTbBatchRsp(&encoder, &rsp); if (tEncodeSVDropTbBatchRsp(&encoder, &rsp) != 0) {
vError("vgId:%d, failed to encode drop table batch response", TD_VID(pVnode));
}
tEncoderClear(&encoder); tEncoderClear(&encoder);
taosArrayDestroy(rsp.pArray); taosArrayDestroy(rsp.pArray);
taosArrayDestroy(tbNames); taosArrayDestroy(tbNames);
@ -1802,7 +1847,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
} }
if (info.suid) { if (info.suid) {
(void)metaGetInfo(pVnode->pMeta, info.suid, &info, NULL); if (metaGetInfo(pVnode->pMeta, info.suid, &info, NULL) != 0) {
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), info.suid);
goto _exit;
}
} }
if (pSubmitTbData->sver != info.skmVer) { if (pSubmitTbData->sver != info.skmVer) {
@ -1898,7 +1947,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
if (taosArrayGetSize(newTbUids) > 0) { if (taosArrayGetSize(newTbUids) > 0) {
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
(int32_t)taosArrayGetSize(newTbUids)); (int32_t)taosArrayGetSize(newTbUids));
(void)tqUpdateTbUidList(pVnode->pTq, newTbUids, true); if (tqUpdateTbUidList(pVnode->pTq, newTbUids, true) != 0) {
vError("vgId:%d, failed to update tbUid list", TD_VID(pVnode));
}
} }
_exit: _exit:
@ -1907,7 +1958,9 @@ _exit:
tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret); tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen); tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
(void)tEncodeSSubmitRsp2(&ec, pSubmitRsp); if (tEncodeSSubmitRsp2(&ec, pSubmitRsp) != 0) {
vError("vgId:%d, failed to encode submit response", TD_VID(pVnode));
}
tEncoderClear(&ec); tEncoderClear(&ec);
// update statistics // update statistics
@ -1924,7 +1977,7 @@ _exit:
pVnode->monitor.strVgId, pVnode->monitor.strVgId,
pOriginalMsg->info.conn.user, pOriginalMsg->info.conn.user,
"Success"}; "Success"};
(void)taos_counter_add(tsInsertCounter, pSubmitRsp->affectedRows, sample_labels); int tv = taos_counter_add(tsInsertCounter, pSubmitRsp->affectedRows, sample_labels);
} }
if (code == 0) { if (code == 0) {
@ -2149,7 +2202,12 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
pVnode->config.sttTrigger = req.sttTrigger; pVnode->config.sttTrigger = req.sttTrigger;
} else { } else {
vnodeAWait(&pVnode->commitTask); vnodeAWait(&pVnode->commitTask);
(void)tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
int32_t ret = tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
if (ret != 0) {
vError("vgId:%d, failed to disable bg task since %s", TD_VID(pVnode), tstrerror(errno));
}
pVnode->config.sttTrigger = req.sttTrigger; pVnode->config.sttTrigger = req.sttTrigger;
tsdbEnableBgTask(pVnode->pTsdb); tsdbEnableBgTask(pVnode->pTsdb);
} }
@ -2167,7 +2225,9 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
} }
if (walChanged) { if (walChanged) {
(void)walAlter(pVnode->pWal, &pVnode->config.walCfg); if (walAlter(pVnode->pWal, &pVnode->config.walCfg) != 0) {
vError("vgId:%d, failed to alter wal config since %s", TD_VID(pVnode), tstrerror(errno));
}
} }
if (tsdbChanged) { if (tsdbChanged) {
@ -2181,7 +2241,11 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
SBatchDeleteReq deleteReq; SBatchDeleteReq deleteReq;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, pReq, len); tDecoderInit(&decoder, pReq, len);
(void)tDecodeSBatchDeleteReq(&decoder, &deleteReq); int32_t ret = tDecodeSBatchDeleteReq(&decoder, &deleteReq);
if (ret != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG;
}
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK); metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
@ -2351,7 +2415,9 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR
} }
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
(void)syncCheckMember(pVnode->sync); if (syncCheckMember(pVnode->sync) != 0) {
vError("vgId:%d, failed to check member", TD_VID(pVnode));
}
pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP; pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP;
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
@ -2411,7 +2477,9 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
syncRsp.member1Token = syncReq.member1Token; syncRsp.member1Token = syncReq.member1Token;
syncRsp.vgId = TD_VID(pVnode); syncRsp.vgId = TD_VID(pVnode);
(void)vnodeCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token); if (vnodeCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) {
vError("vgId:%d, failed to check assigned log syncd", TD_VID(pVnode));
}
syncRsp.errCode = terrno; syncRsp.errCode = terrno;
if (vnodeUpdateArbTerm(pVnode, syncReq.arbTerm) != 0) { if (vnodeUpdateArbTerm(pVnode, syncReq.arbTerm) != 0) {

View File

@ -69,7 +69,9 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
if (rsp.pCont == NULL) { if (rsp.pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY; pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else { } else {
(void)tSerializeSEpSet(rsp.pCont, contLen, &newEpSet); if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) != 0) {
vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
}
rsp.contLen = contLen; rsp.contLen = contLen;
} }
@ -163,7 +165,9 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
} else { } else {
(void)tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg); if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
vTrace("vgId:%d, failed to put vnode commit to queue since %s", pVnode->config.vgId, terrstr());
}
} }
} }
@ -560,7 +564,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
} }
} while (true); } while (true);
(void)walApplyVer(pVnode->pWal, commitIdx); walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true; pVnode->restored = true;
SStreamMeta *pMeta = pVnode->pTq->pStreamMeta; SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
@ -615,7 +619,9 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
if (pVnode->pTq) { if (pVnode->pTq) {
tqUpdateNodeStage(pVnode->pTq, false); tqUpdateNodeStage(pVnode->pTq, false);
(void)tqStopStreamTasksAsync(pVnode->pTq); if (tqStopStreamTasksAsync(pVnode->pTq) != 0) {
vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
}
} }
} }
@ -750,7 +756,10 @@ int32_t vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncPreClose(SVnode *pVnode) { void vnodeSyncPreClose(SVnode *pVnode) {
vInfo("vgId:%d, sync pre close", pVnode->config.vgId); vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
(void)syncLeaderTransfer(pVnode->sync); int32_t code = syncLeaderTransfer(pVnode->sync);
if (code) {
vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
}
syncPreStop(pVnode->sync); syncPreStop(pVnode->sync);
(void)taosThreadMutexLock(&pVnode->lock); (void)taosThreadMutexLock(&pVnode->lock);

View File

@ -81,6 +81,6 @@ bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint1
void tmsgUpdateDnodeEpSet(SEpSet* epset) { void tmsgUpdateDnodeEpSet(SEpSet* epset) {
for (int32_t i = 0; i < epset->numOfEps; ++i) { for (int32_t i = 0; i < epset->numOfEps; ++i) {
(void)tmsgUpdateDnodeInfo(NULL, NULL, epset->eps[i].fqdn, &epset->eps[i].port); bool ret = tmsgUpdateDnodeInfo(NULL, NULL, epset->eps[i].fqdn, &epset->eps[i].port);
} }
} }

View File

@ -86,11 +86,9 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
int32_t walApplyVer(SWal *pWal, int64_t ver) { void walApplyVer(SWal *pWal, int64_t ver) {
// TODO: error check // TODO: error check
pWal->vers.appliedVer = ver; pWal->vers.appliedVer = ver;
TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
int32_t walCommit(SWal *pWal, int64_t ver) { int32_t walCommit(SWal *pWal, int64_t ver) {