Merge pull request #28062 from taosdata/enh/-TD-31890-16
ehn: remove void
This commit is contained in:
commit
e8b50df6ca
|
@ -177,7 +177,7 @@ int32_t walRollback(SWal *, int64_t ver);
|
||||||
int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
|
int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
|
||||||
int32_t walEndSnapshot(SWal *);
|
int32_t walEndSnapshot(SWal *);
|
||||||
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
||||||
int32_t walApplyVer(SWal *, int64_t ver);
|
void walApplyVer(SWal *, int64_t ver);
|
||||||
|
|
||||||
// wal reader
|
// wal reader
|
||||||
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
|
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
|
||||||
|
|
|
@ -32,10 +32,14 @@ static void removeEmptyDir() {
|
||||||
empty = false;
|
empty = false;
|
||||||
}
|
}
|
||||||
if (empty) taosRemoveDir(filename);
|
if (empty) taosRemoveDir(filename);
|
||||||
(void)taosCloseDir(&pDirTmp);
|
if (taosCloseDir(&pDirTmp) != 0) {
|
||||||
|
uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosCloseDir(&pDir);
|
if (taosCloseDir(&pDir) != 0) {
|
||||||
|
uError("[rsync] close dir error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
@ -297,7 +301,7 @@ int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId)
|
||||||
path, el);
|
path, el);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) { // if failed, try to load it from data directory
|
if (code != TSDB_CODE_SUCCESS) { // if failed, try to load it from data directory
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
memset(pathTransform, 0, PATH_MAX);
|
memset(pathTransform, 0, PATH_MAX);
|
||||||
changeDirFromWindowsToLinux(path, pathTransform);
|
changeDirFromWindowsToLinux(path, pathTransform);
|
||||||
|
|
|
@ -235,7 +235,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
|
pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
|
||||||
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||||
tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
|
tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
|
||||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||||
pCfg->syncCfg.replicaNum++;
|
pCfg->syncCfg.replicaNum++;
|
||||||
}
|
}
|
||||||
if (pCreate->selfIndex != -1) {
|
if (pCreate->selfIndex != -1) {
|
||||||
|
@ -247,7 +247,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
|
pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
|
||||||
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||||
tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
|
tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
|
||||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||||
pCfg->syncCfg.totalReplicaNum++;
|
pCfg->syncCfg.totalReplicaNum++;
|
||||||
}
|
}
|
||||||
pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
|
pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
|
||||||
|
|
|
@ -143,7 +143,7 @@ typedef struct STbUidStore STbUidStore;
|
||||||
|
|
||||||
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
|
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
|
||||||
int metaUpgrade(SVnode* pVnode, SMeta** ppMeta);
|
int metaUpgrade(SVnode* pVnode, SMeta** ppMeta);
|
||||||
int metaClose(SMeta** pMeta);
|
void metaClose(SMeta** pMeta);
|
||||||
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
||||||
TXN* metaGetTxn(SMeta* pMeta);
|
TXN* metaGetTxn(SMeta* pMeta);
|
||||||
int metaCommit(SMeta* pMeta, TXN* txn);
|
int metaCommit(SMeta* pMeta, TXN* txn);
|
||||||
|
@ -207,7 +207,7 @@ int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo, SMetaReader* pR
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
int32_t tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback, bool force);
|
int32_t tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback, bool force);
|
||||||
int32_t tsdbClose(STsdb** pTsdb);
|
void tsdbClose(STsdb** pTsdb);
|
||||||
int32_t tsdbBegin(STsdb* pTsdb);
|
int32_t tsdbBegin(STsdb* pTsdb);
|
||||||
// int32_t tsdbPrepareCommit(STsdb* pTsdb);
|
// int32_t tsdbPrepareCommit(STsdb* pTsdb);
|
||||||
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
|
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
|
||||||
|
@ -284,7 +284,7 @@ int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t smaInit();
|
int32_t smaInit();
|
||||||
void smaCleanUp();
|
void smaCleanUp();
|
||||||
int32_t smaOpen(SVnode* pVnode, int8_t rollback, bool force);
|
int32_t smaOpen(SVnode* pVnode, int8_t rollback, bool force);
|
||||||
int32_t smaClose(SSma* pSma);
|
void smaClose(SSma* pSma);
|
||||||
int32_t smaBegin(SSma* pSma);
|
int32_t smaBegin(SSma* pSma);
|
||||||
int32_t smaPrepareAsyncCommit(SSma* pSma);
|
int32_t smaPrepareAsyncCommit(SSma* pSma);
|
||||||
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
|
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
|
||||||
|
@ -314,7 +314,7 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
|
||||||
// STsdbSnapReader ========================================
|
// STsdbSnapReader ========================================
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
|
||||||
STsdbSnapReader** ppReader);
|
STsdbSnapReader** ppReader);
|
||||||
void tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
void tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
||||||
// STsdbSnapWriter ========================================
|
// STsdbSnapWriter ========================================
|
||||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** ppWriter);
|
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** ppWriter);
|
||||||
|
@ -323,7 +323,7 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback);
|
||||||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
||||||
// STsdbSnapRAWReader ========================================
|
// STsdbSnapRAWReader ========================================
|
||||||
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader);
|
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader);
|
||||||
void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader);
|
void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader);
|
||||||
int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData);
|
int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData);
|
||||||
// STsdbSnapRAWWriter ========================================
|
// STsdbSnapRAWWriter ========================================
|
||||||
int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** ppWriter);
|
int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** ppWriter);
|
||||||
|
|
|
@ -169,9 +169,9 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaClose(SMeta **ppMeta) {
|
void metaClose(SMeta **ppMeta) {
|
||||||
metaCleanup(ppMeta);
|
metaCleanup(ppMeta);
|
||||||
return 0;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaAlterCache(SMeta *pMeta, int32_t nPage) {
|
int metaAlterCache(SMeta *pMeta, int32_t nPage) {
|
||||||
|
|
|
@ -171,7 +171,7 @@ _exit:
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smaClose(SSma *pSma) {
|
void smaClose(SSma *pSma) {
|
||||||
if (pSma) {
|
if (pSma) {
|
||||||
TAOS_UNUSED(smaPreClose(pSma));
|
TAOS_UNUSED(smaPreClose(pSma));
|
||||||
(void)taosThreadMutexDestroy(&pSma->mutex);
|
(void)taosThreadMutexDestroy(&pSma->mutex);
|
||||||
|
@ -182,7 +182,7 @@ int32_t smaClose(SSma *pSma) {
|
||||||
if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
|
if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
|
||||||
taosMemoryFreeClear(pSma);
|
taosMemoryFreeClear(pSma);
|
||||||
}
|
}
|
||||||
return 0;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -102,7 +102,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbClose(STsdb **pTsdb) {
|
void tsdbClose(STsdb **pTsdb) {
|
||||||
if (*pTsdb) {
|
if (*pTsdb) {
|
||||||
STsdb *pdb = *pTsdb;
|
STsdb *pdb = *pTsdb;
|
||||||
tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path,
|
tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path,
|
||||||
|
@ -121,5 +121,5 @@ int32_t tsdbClose(STsdb **pTsdb) {
|
||||||
(void)taosThreadMutexDestroy(&(*pTsdb)->mutex);
|
(void)taosThreadMutexDestroy(&(*pTsdb)->mutex);
|
||||||
taosMemoryFreeClear(*pTsdb);
|
taosMemoryFreeClear(*pTsdb);
|
||||||
}
|
}
|
||||||
return 0;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,9 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
|
||||||
|
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
struct timespec ts;
|
struct timespec ts;
|
||||||
(void)taosGetTimeOfDay(&tv);
|
if (taosGetTimeOfDay(&tv) != 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
|
ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
|
||||||
if (ts.tv_nsec > 999999999l) {
|
if (ts.tv_nsec > 999999999l) {
|
||||||
ts.tv_sec = tv.tv_sec + 1;
|
ts.tv_sec = tv.tv_sec + 1;
|
||||||
|
|
|
@ -24,10 +24,10 @@ struct SVHashEntry {
|
||||||
void* obj;
|
void* obj;
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t vHashRehash(SVHashTable* ht, uint32_t newNumBuckets) {
|
static void vHashRehash(SVHashTable* ht, uint32_t newNumBuckets) {
|
||||||
SVHashEntry** newBuckets = (SVHashEntry**)taosMemoryCalloc(newNumBuckets, sizeof(SVHashEntry*));
|
SVHashEntry** newBuckets = (SVHashEntry**)taosMemoryCalloc(newNumBuckets, sizeof(SVHashEntry*));
|
||||||
if (newBuckets == NULL) {
|
if (newBuckets == NULL) {
|
||||||
return terrno;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < ht->numBuckets; i++) {
|
for (int32_t i = 0; i < ht->numBuckets; i++) {
|
||||||
|
@ -45,7 +45,7 @@ static int32_t vHashRehash(SVHashTable* ht, uint32_t newNumBuckets) {
|
||||||
ht->buckets = newBuckets;
|
ht->buckets = newBuckets;
|
||||||
ht->numBuckets = newNumBuckets;
|
ht->numBuckets = newNumBuckets;
|
||||||
|
|
||||||
return 0;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)) {
|
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)) {
|
||||||
|
@ -96,7 +96,7 @@ int32_t vHashPut(SVHashTable* ht, void* obj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ht->numEntries >= ht->numBuckets) {
|
if (ht->numEntries >= ht->numBuckets) {
|
||||||
(void)vHashRehash(ht, ht->numBuckets * 2);
|
vHashRehash(ht, ht->numBuckets * 2);
|
||||||
bucketIndex = ht->hash(obj) % ht->numBuckets;
|
bucketIndex = ht->hash(obj) % ht->numBuckets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ int32_t vHashDrop(SVHashTable* ht, const void* obj) {
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
ht->numEntries--;
|
ht->numEntries--;
|
||||||
if (ht->numBuckets > VNODE_HASH_DEFAULT_NUM_BUCKETS && ht->numEntries < ht->numBuckets / 4) {
|
if (ht->numBuckets > VNODE_HASH_DEFAULT_NUM_BUCKETS && ht->numEntries < ht->numBuckets / 4) {
|
||||||
(void)vHashRehash(ht, ht->numBuckets / 2);
|
vHashRehash(ht, ht->numBuckets / 2);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,7 +108,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
|
||||||
pNode->nodePort = pReq->replicas[i].port;
|
pNode->nodePort = pReq->replicas[i].port;
|
||||||
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
||||||
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||||
vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
|
vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
|
||||||
pCfg->replicaNum++;
|
pCfg->replicaNum++;
|
||||||
}
|
}
|
||||||
|
@ -121,7 +121,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
|
||||||
pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
|
pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
|
||||||
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||||
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
|
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
|
||||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||||
vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
|
vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
|
||||||
pCfg->totalReplicaNum++;
|
pCfg->totalReplicaNum++;
|
||||||
}
|
}
|
||||||
|
@ -176,8 +176,10 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
|
||||||
int32_t prefixLen = strlen(tsdbFilePrefix);
|
int32_t prefixLen = strlen(tsdbFilePrefix);
|
||||||
|
|
||||||
STfsDir *tsdbDir = NULL;
|
STfsDir *tsdbDir = NULL;
|
||||||
(void)tfsOpendir(pTfs, tsdbPath, &tsdbDir);
|
int32_t tret = tfsOpendir(pTfs, tsdbPath, &tsdbDir);
|
||||||
if (tsdbDir == NULL) return 0;
|
if (tsdbDir == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
|
const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
|
||||||
|
@ -248,7 +250,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
||||||
SNodeInfo *pNode = &pCfg->nodeInfo[0];
|
SNodeInfo *pNode = &pCfg->nodeInfo[0];
|
||||||
pNode->nodePort = tsServerPort;
|
pNode->nodePort = tsServerPort;
|
||||||
tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
|
tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
|
||||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
bool ret1 = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||||
vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
|
vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
|
||||||
|
|
||||||
info.config.syncCfg = *pCfg;
|
info.config.syncCfg = *pCfg;
|
||||||
|
@ -317,7 +319,9 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
|
||||||
|
|
||||||
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
|
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
|
||||||
vInfo("path:%s is removed while destroy vnode", path);
|
vInfo("path:%s is removed while destroy vnode", path);
|
||||||
(void)tfsRmdir(pTfs, path);
|
if (tfsRmdir(pTfs, path) < 0) {
|
||||||
|
vError("failed to remove path:%s since %s", path, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
// int32_t nlevel = tfsGetLevel(pTfs);
|
// int32_t nlevel = tfsGetLevel(pTfs);
|
||||||
if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
|
if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
|
||||||
|
@ -378,8 +382,13 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
}
|
}
|
||||||
if (updated) {
|
if (updated) {
|
||||||
vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
|
vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
|
||||||
(void)vnodeSaveInfo(dir, &info);
|
if (vnodeSaveInfo(dir, &info) < 0) {
|
||||||
(void)vnodeCommitInfo(dir);
|
vError("vgId:%d, failed to save vnode info since %s", info.config.vgId, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vnodeCommitInfo(dir) < 0) {
|
||||||
|
vError("vgId:%d, failed to commit vnode info since %s", info.config.vgId, tstrerror(terrno));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// create handle
|
// create handle
|
||||||
|
@ -499,9 +508,9 @@ _err:
|
||||||
if (pVnode->pQuery) vnodeQueryClose(pVnode);
|
if (pVnode->pQuery) vnodeQueryClose(pVnode);
|
||||||
if (pVnode->pTq) tqClose(pVnode->pTq);
|
if (pVnode->pTq) tqClose(pVnode->pTq);
|
||||||
if (pVnode->pWal) walClose(pVnode->pWal);
|
if (pVnode->pWal) walClose(pVnode->pWal);
|
||||||
if (pVnode->pTsdb) (void)tsdbClose(&pVnode->pTsdb);
|
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
||||||
if (pVnode->pSma) (void)smaClose(pVnode->pSma);
|
if (pVnode->pSma) smaClose(pVnode->pSma);
|
||||||
if (pVnode->pMeta) (void)metaClose(&pVnode->pMeta);
|
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
|
||||||
if (pVnode->freeList) vnodeCloseBufPool(pVnode);
|
if (pVnode->freeList) vnodeCloseBufPool(pVnode);
|
||||||
|
|
||||||
taosMemoryFree(pVnode);
|
taosMemoryFree(pVnode);
|
||||||
|
@ -518,13 +527,16 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
|
||||||
void vnodeClose(SVnode *pVnode) {
|
void vnodeClose(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
vnodeAWait(&pVnode->commitTask);
|
vnodeAWait(&pVnode->commitTask);
|
||||||
(void)vnodeAChannelDestroy(&pVnode->commitChannel, true);
|
if (vnodeAChannelDestroy(&pVnode->commitChannel, true) != 0) {
|
||||||
|
vError("vgId:%d, failed to destroy commit channel", TD_VID(pVnode));
|
||||||
|
}
|
||||||
|
|
||||||
vnodeSyncClose(pVnode);
|
vnodeSyncClose(pVnode);
|
||||||
vnodeQueryClose(pVnode);
|
vnodeQueryClose(pVnode);
|
||||||
tqClose(pVnode->pTq);
|
tqClose(pVnode->pTq);
|
||||||
walClose(pVnode->pWal);
|
walClose(pVnode->pWal);
|
||||||
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
||||||
(void)smaClose(pVnode->pSma);
|
smaClose(pVnode->pSma);
|
||||||
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
|
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
|
||||||
vnodeCloseBufPool(pVnode);
|
vnodeCloseBufPool(pVnode);
|
||||||
|
|
||||||
|
|
|
@ -466,7 +466,11 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)vnodePreCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token);
|
int32_t ret = vnodePreCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token);
|
||||||
|
if (ret != 0) {
|
||||||
|
vError("vgId:%d, failed to preprocess arb check sync request since %s", TD_VID(pVnode), tstrerror(ret));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = terrno;
|
int32_t code = terrno;
|
||||||
tFreeSVArbCheckSyncReq(&syncReq);
|
tFreeSVArbCheckSyncReq(&syncReq);
|
||||||
|
|
||||||
|
@ -710,7 +714,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
|
vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
|
||||||
ver);
|
ver);
|
||||||
|
|
||||||
(void)walApplyVer(pVnode->pWal, ver);
|
walApplyVer(pVnode->pWal, ver);
|
||||||
|
|
||||||
code = tqPushMsg(pVnode->pTq, pMsg->msgType);
|
code = tqPushMsg(pVnode->pTq, pMsg->msgType);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -881,7 +885,10 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
(void)tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||||
|
if (code) {
|
||||||
|
vError("failed to process sma result since %s", tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
||||||
|
@ -957,7 +964,10 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
int32_t code = metaDropTables(pVnode->pMeta, ttlReq.pTbUids);
|
int32_t code = metaDropTables(pVnode->pMeta, ttlReq.pTbUids);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
|
||||||
(void)tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
|
code = tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
|
||||||
|
if (code) {
|
||||||
|
vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
@ -1160,7 +1170,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cRsp.code = TSDB_CODE_SUCCESS;
|
cRsp.code = TSDB_CODE_SUCCESS;
|
||||||
(void)tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
|
if (tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid) < 0) {
|
||||||
|
vError("vgId:%d, failed to fetch tbUid list", TD_VID(pVnode));
|
||||||
|
}
|
||||||
if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) {
|
if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
rcode = -1;
|
rcode = -1;
|
||||||
|
@ -1177,11 +1189,13 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
|
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
|
||||||
(void)tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
if (tqUpdateTbUidList(pVnode->pTq, tbUids, true) < 0) {
|
||||||
|
vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
}
|
||||||
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
|
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
(void)tdUidStoreFree(pStore);
|
pStore = tdUidStoreFree(pStore);
|
||||||
|
|
||||||
// prepare rsp
|
// prepare rsp
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
@ -1193,7 +1207,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
|
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
|
||||||
(void)tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
|
if (tEncodeSVCreateTbBatchRsp(&encoder, &rsp) != 0) {
|
||||||
|
vError("vgId:%d, failed to encode create table batch response", TD_VID(pVnode));
|
||||||
|
}
|
||||||
|
|
||||||
if (tsEnableAudit && tsEnableAuditCreateTable) {
|
if (tsEnableAudit && tsEnableAuditCreateTable) {
|
||||||
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
||||||
|
@ -1347,7 +1363,9 @@ _exit:
|
||||||
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
|
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
|
||||||
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
||||||
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
||||||
(void)tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
|
if (tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp) != 0) {
|
||||||
|
vError("vgId:%d, failed to encode alter table response", TD_VID(pVnode));
|
||||||
|
}
|
||||||
tEncoderClear(&ec);
|
tEncoderClear(&ec);
|
||||||
if (vMetaRsp.pSchemas) {
|
if (vMetaRsp.pSchemas) {
|
||||||
taosMemoryFree(vMetaRsp.pSchemas);
|
taosMemoryFree(vMetaRsp.pSchemas);
|
||||||
|
@ -1402,7 +1420,11 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
||||||
if (tbUid > 0) (void)tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
|
if (tbUid > 0) {
|
||||||
|
if (tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid) < 0) {
|
||||||
|
vError("vgId:%d, failed to fetch tbUid list", TD_VID(pVnode));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) {
|
if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) {
|
||||||
|
@ -1426,14 +1448,21 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tqUpdateTbUidList(pVnode->pTq, tbUids, false);
|
if (tqUpdateTbUidList(pVnode->pTq, tbUids, false) < 0) {
|
||||||
(void)tdUpdateTbUidList(pVnode->pSma, pStore, false);
|
vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdUpdateTbUidList(pVnode->pSma, pStore, false) < 0) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
if (tsEnableAuditCreateTable) {
|
if (tsEnableAuditCreateTable) {
|
||||||
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
(void)tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
|
if (tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB) != 0) {
|
||||||
|
vError("vgId:%d, failed to get name from string", TD_VID(pVnode));
|
||||||
|
}
|
||||||
|
|
||||||
SStringBuilder sb = {0};
|
SStringBuilder sb = {0};
|
||||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||||
|
@ -1457,12 +1486,14 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosArrayDestroy(tbUids);
|
taosArrayDestroy(tbUids);
|
||||||
(void)tdUidStoreFree(pStore);
|
pStore = tdUidStoreFree(pStore);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
|
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
|
||||||
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
||||||
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
|
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
|
||||||
(void)tEncodeSVDropTbBatchRsp(&encoder, &rsp);
|
if (tEncodeSVDropTbBatchRsp(&encoder, &rsp) != 0) {
|
||||||
|
vError("vgId:%d, failed to encode drop table batch response", TD_VID(pVnode));
|
||||||
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
taosArrayDestroy(rsp.pArray);
|
taosArrayDestroy(rsp.pArray);
|
||||||
taosArrayDestroy(tbNames);
|
taosArrayDestroy(tbNames);
|
||||||
|
@ -1898,7 +1929,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
if (taosArrayGetSize(newTbUids) > 0) {
|
if (taosArrayGetSize(newTbUids) > 0) {
|
||||||
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
||||||
(int32_t)taosArrayGetSize(newTbUids));
|
(int32_t)taosArrayGetSize(newTbUids));
|
||||||
(void)tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
if (tqUpdateTbUidList(pVnode->pTq, newTbUids, true) != 0) {
|
||||||
|
vError("vgId:%d, failed to update tbUid list", TD_VID(pVnode));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -1924,7 +1957,7 @@ _exit:
|
||||||
pVnode->monitor.strVgId,
|
pVnode->monitor.strVgId,
|
||||||
pOriginalMsg->info.conn.user,
|
pOriginalMsg->info.conn.user,
|
||||||
"Success"};
|
"Success"};
|
||||||
(void)taos_counter_add(tsInsertCounter, pSubmitRsp->affectedRows, sample_labels);
|
int tv = taos_counter_add(tsInsertCounter, pSubmitRsp->affectedRows, sample_labels);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
@ -2149,7 +2182,12 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
|
||||||
pVnode->config.sttTrigger = req.sttTrigger;
|
pVnode->config.sttTrigger = req.sttTrigger;
|
||||||
} else {
|
} else {
|
||||||
vnodeAWait(&pVnode->commitTask);
|
vnodeAWait(&pVnode->commitTask);
|
||||||
(void)tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
|
||||||
|
int32_t ret = tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
||||||
|
if (ret != 0) {
|
||||||
|
vError("vgId:%d, failed to disable bg task since %s", TD_VID(pVnode), tstrerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
pVnode->config.sttTrigger = req.sttTrigger;
|
pVnode->config.sttTrigger = req.sttTrigger;
|
||||||
tsdbEnableBgTask(pVnode->pTsdb);
|
tsdbEnableBgTask(pVnode->pTsdb);
|
||||||
}
|
}
|
||||||
|
@ -2167,7 +2205,9 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walChanged) {
|
if (walChanged) {
|
||||||
(void)walAlter(pVnode->pWal, &pVnode->config.walCfg);
|
if (walAlter(pVnode->pWal, &pVnode->config.walCfg) != 0) {
|
||||||
|
vError("vgId:%d, failed to alter wal config since %s", TD_VID(pVnode), tstrerror(errno));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbChanged) {
|
if (tsdbChanged) {
|
||||||
|
@ -2351,7 +2391,9 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
(void)syncCheckMember(pVnode->sync);
|
if (syncCheckMember(pVnode->sync) != 0) {
|
||||||
|
vError("vgId:%d, failed to check member", TD_VID(pVnode));
|
||||||
|
}
|
||||||
|
|
||||||
pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP;
|
pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP;
|
||||||
pRsp->code = TSDB_CODE_SUCCESS;
|
pRsp->code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -2411,7 +2453,9 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
|
||||||
syncRsp.member1Token = syncReq.member1Token;
|
syncRsp.member1Token = syncReq.member1Token;
|
||||||
syncRsp.vgId = TD_VID(pVnode);
|
syncRsp.vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
(void)vnodeCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token);
|
if (vnodeCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) {
|
||||||
|
vError("vgId:%d, failed to check assigned log syncd", TD_VID(pVnode));
|
||||||
|
}
|
||||||
syncRsp.errCode = terrno;
|
syncRsp.errCode = terrno;
|
||||||
|
|
||||||
if (vnodeUpdateArbTerm(pVnode, syncReq.arbTerm) != 0) {
|
if (vnodeUpdateArbTerm(pVnode, syncReq.arbTerm) != 0) {
|
||||||
|
|
|
@ -69,7 +69,9 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
|
||||||
if (rsp.pCont == NULL) {
|
if (rsp.pCont == NULL) {
|
||||||
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
} else {
|
} else {
|
||||||
(void)tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
|
if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) != 0) {
|
||||||
|
vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
|
||||||
|
}
|
||||||
rsp.contLen = contLen;
|
rsp.contLen = contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,7 +165,9 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
rpcMsg.pCont = NULL;
|
rpcMsg.pCont = NULL;
|
||||||
} else {
|
} else {
|
||||||
(void)tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
|
||||||
|
vTrace("vgId:%d, failed to put vnode commit to queue since %s", pVnode->config.vgId, terrstr());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -560,7 +564,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
}
|
}
|
||||||
} while (true);
|
} while (true);
|
||||||
|
|
||||||
(void)walApplyVer(pVnode->pWal, commitIdx);
|
walApplyVer(pVnode->pWal, commitIdx);
|
||||||
pVnode->restored = true;
|
pVnode->restored = true;
|
||||||
|
|
||||||
SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
|
SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
|
||||||
|
@ -615,7 +619,9 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
|
|
||||||
if (pVnode->pTq) {
|
if (pVnode->pTq) {
|
||||||
tqUpdateNodeStage(pVnode->pTq, false);
|
tqUpdateNodeStage(pVnode->pTq, false);
|
||||||
(void)tqStopStreamTasksAsync(pVnode->pTq);
|
if (tqStopStreamTasksAsync(pVnode->pTq) != 0) {
|
||||||
|
vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -750,7 +756,10 @@ int32_t vnodeSyncStart(SVnode *pVnode) {
|
||||||
|
|
||||||
void vnodeSyncPreClose(SVnode *pVnode) {
|
void vnodeSyncPreClose(SVnode *pVnode) {
|
||||||
vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
|
vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
|
||||||
(void)syncLeaderTransfer(pVnode->sync);
|
int32_t code = syncLeaderTransfer(pVnode->sync);
|
||||||
|
if (code) {
|
||||||
|
vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
|
||||||
|
}
|
||||||
syncPreStop(pVnode->sync);
|
syncPreStop(pVnode->sync);
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&pVnode->lock);
|
(void)taosThreadMutexLock(&pVnode->lock);
|
||||||
|
|
|
@ -81,6 +81,6 @@ bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint1
|
||||||
|
|
||||||
void tmsgUpdateDnodeEpSet(SEpSet* epset) {
|
void tmsgUpdateDnodeEpSet(SEpSet* epset) {
|
||||||
for (int32_t i = 0; i < epset->numOfEps; ++i) {
|
for (int32_t i = 0; i < epset->numOfEps; ++i) {
|
||||||
(void)tmsgUpdateDnodeInfo(NULL, NULL, epset->eps[i].fqdn, &epset->eps[i].port);
|
bool ret = tmsgUpdateDnodeInfo(NULL, NULL, epset->eps[i].fqdn, &epset->eps[i].port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,11 +86,9 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walApplyVer(SWal *pWal, int64_t ver) {
|
void walApplyVer(SWal *pWal, int64_t ver) {
|
||||||
// TODO: error check
|
// TODO: error check
|
||||||
pWal->vers.appliedVer = ver;
|
pWal->vers.appliedVer = ver;
|
||||||
|
|
||||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walCommit(SWal *pWal, int64_t ver) {
|
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||||
|
|
Loading…
Reference in New Issue