diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 85981ac9aa..12e4fe7753 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -88,33 +88,33 @@ const ESyncRole vnodeStrToRole(char *str) { int vnodeEncodeConfig(const void *pObj, SJson *pJson) { const SVnodeCfg *pCfg = (SVnodeCfg *)pObj; - if (tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId) < 0) return -1; - if (tjsonAddStringToObject(pJson, "dbname", pCfg->dbname) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "szPage", pCfg->szPage) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "szCache", pCfg->szCache) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "cacheLast", pCfg->cacheLast) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "cacheLastSize", pCfg->cacheLastSize) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "isRsma", pCfg->isRsma) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "slLevel", pCfg->tsdbCfg.slLevel) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "s3ChunkSize", pCfg->s3ChunkSize) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "s3KeepLocal", pCfg->s3KeepLocal) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "s3Compact", pCfg->s3Compact) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1; + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId)); + TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson, "dbname", pCfg->dbname)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "szPage", pCfg->szPage)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "szCache", pCfg->szCache)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "cacheLast", pCfg->cacheLast)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "cacheLastSize", pCfg->cacheLastSize)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "isRsma", pCfg->isRsma)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "slLevel", pCfg->tsdbCfg.slLevel)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "s3ChunkSize", pCfg->s3ChunkSize)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "s3KeepLocal", pCfg->s3KeepLocal)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "s3Compact", pCfg->s3Compact)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize)); if (pCfg->tsdbCfg.retentions[0].keep > 0) { int32_t nRetention = 1; if (pCfg->tsdbCfg.retentions[1].freq > 0) { @@ -124,61 +124,67 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { } } SJson *pNodeRetentions = tjsonCreateArray(); + if (pNodeRetentions == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); for (int32_t i = 0; i < nRetention; ++i) { SJson *pNodeRetention = tjsonCreateObject(); const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; - tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); - tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); - tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep); - tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit); - tjsonAddItemToArray(pNodeRetentions, pNodeRetention); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit)); + TAOS_CHECK_RETURN(tjsonAddItemToArray(pNodeRetentions, pNodeRetention)); } } - if (tjsonAddIntegerToObject(pJson, "tsdb.encryptAlgorithm", pCfg->tsdbCfg.encryptAlgorithm) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "hashChange", pCfg->hashChange) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1; - - if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion) < 0) return -1; - - if (tjsonAddIntegerToObject(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries) < 0) return -1; + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "tsdb.encryptAlgorithm", pCfg->tsdbCfg.encryptAlgorithm)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashChange", pCfg->hashChange)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries)); SJson *nodeInfo = tjsonCreateArray(); - if (nodeInfo == NULL) return -1; - if (tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo) < 0) return -1; + if (nodeInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + TAOS_CHECK_RETURN(tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo)); vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pCfg->vgId, pCfg->syncCfg.replicaNum, pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex, pCfg->syncCfg.changeVersion); for (int i = 0; i < pCfg->syncCfg.totalReplicaNum; ++i) { - SJson *info = tjsonCreateObject(); + SJson *info = tjsonCreateObject(); + if (info == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SNodeInfo *pNode = (SNodeInfo *)&pCfg->syncCfg.nodeInfo[i]; - if (info == NULL) return -1; - if (tjsonAddIntegerToObject(info, "nodePort", pNode->nodePort) < 0) return -1; - if (tjsonAddStringToObject(info, "nodeFqdn", pNode->nodeFqdn) < 0) return -1; - if (tjsonAddIntegerToObject(info, "nodeId", pNode->nodeId) < 0) return -1; - if (tjsonAddIntegerToObject(info, "clusterId", pNode->clusterId) < 0) return -1; - if (tjsonAddStringToObject(info, "isReplica", vnodeRoleToStr(pNode->nodeRole)) < 0) return -1; - if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1; + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(info, "nodePort", pNode->nodePort)); + TAOS_CHECK_RETURN(tjsonAddStringToObject(info, "nodeFqdn", pNode->nodeFqdn)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(info, "nodeId", pNode->nodeId)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(info, "clusterId", pNode->clusterId)); + TAOS_CHECK_RETURN(tjsonAddStringToObject(info, "isReplica", vnodeRoleToStr(pNode->nodeRole))); + TAOS_CHECK_RETURN(tjsonAddItemToArray(nodeInfo, info)); vDebug("vgId:%d, encode config, replica:%d ep:%s:%u dnode:%d", pCfg->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); } @@ -191,50 +197,50 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { int32_t code; tjsonGetNumberValue(pJson, "vgId", pCfg->vgId, code); - if (code < 0) return -1; - if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1; + if (code) return code; + if ((code = tjsonGetStringValue(pJson, "dbname", pCfg->dbname))) return code; tjsonGetNumberValue(pJson, "dbId", pCfg->dbId, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "szPage", pCfg->szPage, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "cacheLast", pCfg->cacheLast, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "cacheLastSize", pCfg->cacheLastSize, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset, code); - if (code < 0) return -1; + if (code) return code; SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); int32_t nRetention = tjsonGetArraySize(pNodeRetentions); if (nRetention > TSDB_RETENTION_MAX) { @@ -244,52 +250,54 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); ASSERT(pNodeRetention != NULL); tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code); + if (code) return code; tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code); + if (code) return code; tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep, code); + if (code) return code; tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code); + if (code) return code; } tjsonGetNumberValue(pJson, "tsdb.encryptAlgorithm", pCfg->tsdbCfg.encryptAlgorithm, code); - if (code < 0) return -1; + if (code) return code; #if defined(TD_ENTERPRISE) if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) { if (tsEncryptKey[0] == 0) { - terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; - return -1; + return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; } else { strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); } } #endif tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm, code); - if (code < 0) return -1; + if (code) return code; #if defined(TD_ENTERPRISE) if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) { if (tsEncryptKey[0] == 0) { - terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; - return -1; + return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; } else { strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); } } #endif tjsonGetNumberValue(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm, code); - if (code < 0) return -1; + if (code) return code; #if defined(TD_ENTERPRISE) if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) { if (tsEncryptKey[0] == 0) { @@ -303,35 +311,35 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { tjsonGetNumberValue(pJson, "sstTrigger", pCfg->sttTrigger, code); if (code < 0) pCfg->sttTrigger = TSDB_DEFAULT_SST_TRIGGER; tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "hashChange", pCfg->hashChange, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "hashPrefix", pCfg->hashPrefix, code); if (code < 0) pCfg->hashPrefix = TSDB_DEFAULT_HASH_PREFIX; tjsonGetNumberValue(pJson, "hashSuffix", pCfg->hashSuffix, code); if (code < 0) pCfg->hashSuffix = TSDB_DEFAULT_HASH_SUFFIX; tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries, code); - if (code < 0) return -1; + if (code) return code; SJson *nodeInfo = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo"); int arraySize = tjsonGetArraySize(nodeInfo); @@ -344,15 +352,15 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i]; if (info == NULL) return -1; tjsonGetNumberValue(info, "nodePort", pNode->nodePort, code); - if (code < 0) return -1; + if (code) return code; tjsonGetStringValue(info, "nodeFqdn", pNode->nodeFqdn); tjsonGetNumberValue(info, "nodeId", pNode->nodeId, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(info, "clusterId", pNode->clusterId, code); - if (code < 0) return -1; + if (code) return code; char role[10] = {0}; code = tjsonGetStringValue(info, "isReplica", role); - if (code < 0) return -1; + if (code) return code; if (strlen(role) != 0) { pNode->nodeRole = vnodeStrToRole(role); } else { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 12b26a4b26..96fb461188 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -125,20 +125,16 @@ int vnodeBegin(SVnode *pVnode) { TSDB_CHECK_CODE(code, lino, _exit); // begin meta - if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + code = metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL); + TSDB_CHECK_CODE(code, lino, _exit); // begin tsdb - if (tsdbBegin(pVnode->pTsdb) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbBegin(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); // begin sma - if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) { - code = terrno; + if (VND_IS_RSMA(pVnode)) { + code = smaBegin(pVnode->pSma); TSDB_CHECK_CODE(code, lino, _exit); } @@ -171,54 +167,41 @@ int vnodeShouldCommit(SVnode *pVnode, bool atExit) { } int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { + int32_t code = 0; + int32_t lino; char fname[TSDB_FILENAME_LEN]; - TdFilePtr pFile; - char *data; + TdFilePtr pFile = NULL; + char *data = NULL; snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP); - // encode info - data = NULL; - - if (vnodeEncodeInfo(pInfo, &data) < 0) { - vError("failed to encode json info."); - return -1; - } + code = vnodeEncodeInfo(pInfo, &data); + TSDB_CHECK_CODE(code, lino, _exit); // save info to a vnode_tmp.json pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); if (pFile == NULL) { - vError("failed to open info file:%s for write:%s", fname, terrstr()); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit); } if (taosWriteFile(pFile, data, strlen(data)) < 0) { - vError("failed to write info file:%s error:%s", fname, terrstr()); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit); } if (taosFsyncFile(pFile) < 0) { - vError("failed to fsync info file:%s error:%s", fname, terrstr()); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit); } - taosCloseFile(&pFile); - - // free info binary - taosMemoryFree(data); - - vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname, - pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion); - - return 0; - -_err: +_exit: + if (code) { + vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code)); + } else { + vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname, + pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion); + } taosCloseFile(&pFile); taosMemoryFree(data); - return -1; + return code; } int vnodeCommitInfo(const char *dir) { @@ -229,8 +212,7 @@ int vnodeCommitInfo(const char *dir) { snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP); if (taosRenameFile(tfname, fname) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + return terrno = TAOS_SYSTEM_ERROR(errno); } vInfo("vnode info is committed, dir:%s", dir); @@ -238,6 +220,8 @@ int vnodeCommitInfo(const char *dir) { } int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { + int32_t code = 0; + int32_t lino; char fname[TSDB_FILENAME_LEN]; TdFilePtr pFile = NULL; char *pData = NULL; @@ -248,44 +232,35 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { // read info pFile = taosOpenFile(fname, TD_FILE_READ); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit); } if (taosFStatFile(pFile, &size, NULL) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit); } pData = taosMemoryMalloc(size + 1); if (pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); } if (taosReadFile(pFile, pData, size) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit); } pData[size] = '\0'; - taosCloseFile(&pFile); - // decode info - if (vnodeDecodeInfo(pData, pInfo) < 0) { - taosMemoryFree(pData); - return -1; + code = vnodeDecodeInfo(pData, pInfo); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code)); } - taosMemoryFree(pData); - - return 0; - -_err: taosCloseFile(&pFile); - taosMemoryFree(pData); - return -1; + return code; } static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { @@ -297,7 +272,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { // wait last commit task vnodeAWait(&pVnode->commitTask); - if (syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; + code = syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg); + TSDB_CHECK_CODE(code, lino, _exit); pVnode->state.commitTerm = pVnode->state.applyTerm; @@ -312,17 +288,16 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode)); - if (vnodeSaveInfo(dir, &pInfo->info) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + code = vnodeSaveInfo(dir, &pInfo->info); + TSDB_CHECK_CODE(code, lino, _exit); tsdbPreCommit(pVnode->pTsdb); - metaPrepareAsyncCommit(pVnode->pMeta); + code = metaPrepareAsyncCommit(pVnode->pMeta); + TSDB_CHECK_CODE(code, lino, _exit); code = smaPrepareAsyncCommit(pVnode->pSma); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); taosThreadMutexLock(&pVnode->mutex); ASSERT(pVnode->onCommit == NULL); @@ -459,10 +434,8 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { } // commit info - if (vnodeCommitInfo(dir) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + code = vnodeCommitInfo(dir); + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbCommitCommit(pVnode->pTsdb); TSDB_CHECK_CODE(code, lino, _exit); @@ -472,10 +445,8 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { TSDB_CHECK_CODE(code, lino, _exit); } - if (metaFinishCommit(pVnode->pMeta, pInfo->txn) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + code = metaFinishCommit(pVnode->pMeta, pInfo->txn); + TSDB_CHECK_CODE(code, lino, _exit); pVnode->state.committed = pInfo->info.state.committed; @@ -492,7 +463,7 @@ _exit: } else { vInfo("vgId:%d, commit end", TD_VID(pVnode)); } - return 0; + return code; } bool vnodeShouldRollback(SVnode *pVnode) { @@ -514,15 +485,15 @@ void vnodeRollback(SVnode *pVnode) { offset = strlen(tFName); snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP); - (void)taosRemoveFile(tFName); + TAOS_UNUSED(taosRemoveFile(tFName)); } static int vnodeEncodeState(const void *pObj, SJson *pJson) { const SVState *pState = (SVState *)pObj; - if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1; + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit version", pState->committed)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm)); return 0; } @@ -532,70 +503,67 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { int32_t code; tjsonGetNumberValue(pJson, "commit version", pState->committed, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code); - if (code < 0) return -1; + if (code) return code; tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code); - if (code < 0) return -1; + if (code) return code; return 0; } static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { - SJson *pJson; - char *pData; - - *ppData = NULL; + int32_t code = 0; + int32_t lino; + SJson *pJson = NULL; + char *pData = NULL; pJson = tjsonCreateObject(); if (pJson == NULL) { - return -1; + TSDB_CHECK_CODE(code = terrno, lino, _exit); } - if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) { - goto _err; - } + code = tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config); + TSDB_CHECK_CODE(code, lino, _exit); - if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) { - goto _err; - } + code = tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state); + TSDB_CHECK_CODE(code, lino, _exit); pData = tjsonToString(pJson); if (pData == NULL) { - goto _err; + TSDB_CHECK_CODE(code = terrno, lino, _exit); } tjsonDelete(pJson); - *ppData = pData; - return 0; - -_err: - tjsonDelete(pJson); - return -1; +_exit: + if (code) { + tjsonDelete(pJson); + *ppData = NULL; + } else { + *ppData = pData; + } + return code; } int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { - SJson *pJson = NULL; + int32_t code = 0; + int32_t lino; + SJson *pJson = NULL; pJson = tjsonParse(pData); if (pJson == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit); return -1; } - if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) { - goto _err; - } + code = tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config); + TSDB_CHECK_CODE(code, lino, _exit); - if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) { - goto _err; - } + code = tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state); + TSDB_CHECK_CODE(code, lino, _exit); +_exit: tjsonDelete(pJson); - - return 0; - -_err: - tjsonDelete(pJson); - return -1; + return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index a050734cb9..75ba2be100 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -17,20 +17,19 @@ #include "vnd.h" static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) { - int32_t code = -1; + int32_t code = 0; STsdbFSetPartList *pList = tsdbFSetPartListCreate(); if (pList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _out; } - if (tDeserializeTsdbFSetPartList(buf, bufLen, pList) < 0) { - terrno = TSDB_CODE_INVALID_DATA_FMT; - goto _out; - } - if (tsdbFSetPartListToRangeDiff(pList, ppRanges) < 0) { - goto _out; - } - code = 0; + + code = tDeserializeTsdbFSetPartList(buf, bufLen, pList); + if (code) goto _out; + + code = tsdbFSetPartListToRangeDiff(pList, ppRanges); + if (code) goto _out; + _out: tsdbFSetPartListDestroy(&pList); return code; @@ -48,29 +47,29 @@ struct SVSnapReader { int8_t metaDone; SMetaSnapReader *pMetaReader; // tsdb - int8_t tsdbDone; + int8_t tsdbDone; TFileSetRangeArray *pRanges; - STsdbSnapReader *pTsdbReader; + STsdbSnapReader *pTsdbReader; // tsdb raw int8_t tsdbRAWDone; STsdbSnapRAWReader *pTsdbRAWReader; // tq - int8_t tqHandleDone; - STqSnapReader *pTqSnapReader; - int8_t tqOffsetDone; - STqSnapReader *pTqOffsetReader; - int8_t tqCheckInfoDone; - STqSnapReader *pTqCheckInfoReader; + int8_t tqHandleDone; + STqSnapReader *pTqSnapReader; + int8_t tqOffsetDone; + STqSnapReader *pTqOffsetReader; + int8_t tqCheckInfoDone; + STqSnapReader *pTqCheckInfoReader; // stream int8_t streamTaskDone; SStreamTaskReader *pStreamTaskReader; int8_t streamStateDone; SStreamStateReader *pStreamStateReader; // rsma - int8_t rsmaDone; + int8_t rsmaDone; TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; - SRSmaSnapReader *pRsmaReader; + SRSmaSnapReader *pRsmaReader; }; static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) { @@ -88,14 +87,15 @@ static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, } static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) { + int32_t code = 0; SVnode *pVnode = pReader->pVnode; - int32_t code = -1; if (pParam->data) { // decode SSyncTLV *datHead = (void *)pParam->data; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { - terrno = TSDB_CODE_INVALID_DATA_FMT; + code = TSDB_CODE_INVALID_DATA_FMT; + terrno = code; goto _out; } @@ -116,21 +116,25 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ); if (ppRanges == NULL) { vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); + code = TSDB_CODE_INVALID_DATA_FMT; goto _out; } - if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { + code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges); + if (code) { vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); goto _out; } } break; case SNAP_DATA_RAW: { - if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) { + code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts); + if (code) { vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); goto _out; } } break; default: vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); + code = TSDB_CODE_INVALID_DATA_FMT; goto _out; } } @@ -147,7 +151,7 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode), (pReader->tsdbDone ? "raw" : "normal")); } - code = 0; + _out: return code; } @@ -160,42 +164,43 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader)); if (pReader == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } pReader->pVnode = pVnode; pReader->sver = sver; pReader->ever = ever; // snapshot info - if (vnodeSnapReaderDealWithSnapInfo(pReader, pParam) < 0) { - goto _err; - } + code = vnodeSnapReaderDealWithSnapInfo(pReader, pParam); + if (code) goto _exit; // open tsdb snapshot raw reader if (!pReader->tsdbRAWDone) { ASSERT(pReader->sver == 0); code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); - if (code) goto _err; + if (code) goto _exit; } // check snapshot ever SSnapshot snapshot = {0}; - vnodeGetSnapshot(pVnode, &snapshot); + code = vnodeGetSnapshot(pVnode, &snapshot); + if (code) goto _exit; if (ever != snapshot.lastApplyIndex) { vError("vgId:%d, abort reader open due to vnode snapshot changed. ever:%" PRId64 ", commit ver:%" PRId64, TD_VID(pVnode), ever, snapshot.lastApplyIndex); code = TSDB_CODE_SYN_INTERNAL_ERROR; - goto _err; + goto _exit; } - vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever); - *ppReader = pReader; - return code; - -_err: - vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code)); - *ppReader = NULL; +_exit: + if (code) { + vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code)); + *ppReader = NULL; + } else { + vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever); + *ppReader = pReader; + } return code; } @@ -245,6 +250,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t code = 0; + int32_t lino; SVnode *pVnode = pReader->pVnode; int32_t vgId = TD_VID(pReader->pVnode); @@ -261,31 +267,28 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ); if (NULL == pFile) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } int64_t size; if (taosFStatFile(pFile, &size, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(errno); taosCloseFile(&pFile); - goto _err; + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit); } *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1); if (*ppData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; taosCloseFile(&pFile); - goto _err; + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); } ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG; ((SSnapDataHdr *)(*ppData))->size = size + 1; ((SSnapDataHdr *)(*ppData))->data[size] = '\0'; if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) { - code = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(*ppData); taosCloseFile(&pFile); - goto _err; + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit); } taosCloseFile(&pFile); @@ -299,20 +302,18 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) // open reader if not if (pReader->pMetaReader == NULL) { code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = metaSnapRead(pReader->pMetaReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->metaDone = 1; - code = metaSnapReaderClose(&pReader->pMetaReader); - if (code) goto _err; - } + pReader->metaDone = 1; + code = metaSnapReaderClose(&pReader->pMetaReader); + TSDB_CHECK_CODE(code, lino, _exit); } } @@ -322,20 +323,17 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (pReader->pTsdbReader == NULL) { code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges, &pReader->pTsdbReader); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbSnapRead(pReader->pTsdbReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->tsdbDone = 1; - code = tsdbSnapReaderClose(&pReader->pTsdbReader); - if (code) goto _err; - } + pReader->tsdbDone = 1; + code = tsdbSnapReaderClose(&pReader->pTsdbReader); + TSDB_CHECK_CODE(code, lino, _exit); } } @@ -344,20 +342,17 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (pReader->pTsdbRAWReader == NULL) { ASSERT(pReader->sver == 0); code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->tsdbRAWDone = 1; - code = tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader); - if (code) goto _err; - } + pReader->tsdbRAWDone = 1; + code = tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader); + TSDB_CHECK_CODE(code, lino, _exit); } } @@ -365,59 +360,53 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) vInfo("vgId:%d tq transform start", vgId); if (!pReader->tqHandleDone) { if (pReader->pTqSnapReader == NULL) { - code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE, &pReader->pTqSnapReader); - if (code < 0) goto _err; + code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE, + &pReader->pTqSnapReader); + TSDB_CHECK_CODE(code, lino, _exit); } code = tqSnapRead(pReader->pTqSnapReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->tqHandleDone = 1; - code = tqSnapReaderClose(&pReader->pTqSnapReader); - if (code) goto _err; - } + pReader->tqHandleDone = 1; + code = tqSnapReaderClose(&pReader->pTqSnapReader); + TSDB_CHECK_CODE(code, lino, _exit); } } if (!pReader->tqCheckInfoDone) { if (pReader->pTqCheckInfoReader == NULL) { - code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO, &pReader->pTqCheckInfoReader); - if (code < 0) goto _err; + code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO, + &pReader->pTqCheckInfoReader); + TSDB_CHECK_CODE(code, lino, _exit); } code = tqSnapRead(pReader->pTqCheckInfoReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->tqCheckInfoDone = 1; - code = tqSnapReaderClose(&pReader->pTqCheckInfoReader); - if (code) goto _err; - } + pReader->tqCheckInfoDone = 1; + code = tqSnapReaderClose(&pReader->pTqCheckInfoReader); + TSDB_CHECK_CODE(code, lino, _exit); } } if (!pReader->tqOffsetDone) { if (pReader->pTqOffsetReader == NULL) { - code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET, &pReader->pTqOffsetReader); - if (code < 0) goto _err; + code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET, + &pReader->pTqOffsetReader); + TSDB_CHECK_CODE(code, lino, _exit); } code = tqSnapRead(pReader->pTqOffsetReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->tqOffsetDone = 1; - code = tqSnapReaderClose(&pReader->pTqOffsetReader); - if (code) goto _err; - } + pReader->tqOffsetDone = 1; + code = tqSnapReaderClose(&pReader->pTqOffsetReader); + TSDB_CHECK_CODE(code, lino, _exit); } } @@ -426,28 +415,19 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (!pReader->streamTaskDone) { if (pReader->pStreamTaskReader == NULL) { code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader); - if (code) { - vError("vgId:%d open streamtask snapshot reader failed, code:%s", vgId, tstrerror(code)); - goto _err; - } + TSDB_CHECK_CODE(code, lino, _exit); } code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData); - if (code) { - vError("vgId:%d error happens during read data from streatask snapshot, code:%s", vgId, tstrerror(code)); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + vInfo("vgId:%d no streamTask snapshot", vgId); + goto _exit; } else { - if (*ppData) { - vInfo("vgId:%d no streamTask snapshot", vgId); - goto _exit; - } else { - pReader->streamTaskDone = 1; - code = streamTaskSnapReaderClose(pReader->pStreamTaskReader); - if (code) { - goto _err; - } - pReader->pStreamTaskReader = NULL; - } + pReader->streamTaskDone = 1; + code = streamTaskSnapReaderClose(pReader->pStreamTaskReader); + TSDB_CHECK_CODE(code, lino, _exit); + pReader->pStreamTaskReader = NULL; } } if (!pReader->streamStateDone) { @@ -457,21 +437,18 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (code) { pReader->streamStateDone = 1; pReader->pStreamStateReader = NULL; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } code = streamStateSnapRead(pReader->pStreamStateReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->streamStateDone = 1; - code = streamStateSnapReaderClose(pReader->pStreamStateReader); - if (code) goto _err; - pReader->pStreamStateReader = NULL; - } + pReader->streamStateDone = 1; + code = streamStateSnapReaderClose(pReader->pStreamStateReader); + TSDB_CHECK_CODE(code, lino, _exit); + pReader->pStreamStateReader = NULL; } } @@ -480,20 +457,17 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) // open if not if (pReader->pRsmaReader == NULL) { code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = rsmaSnapRead(pReader->pRsmaReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->rsmaDone = 1; - code = rsmaSnapReaderClose(&pReader->pRsmaReader); - if (code) goto _err; - } + pReader->rsmaDone = 1; + code = rsmaSnapReaderClose(&pReader->pRsmaReader); + TSDB_CHECK_CODE(code, lino, _exit); } } @@ -501,21 +475,21 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) *nData = 0; _exit: - if (*ppData) { - SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData); - - pReader->index++; - *nData = sizeof(SSnapDataHdr) + pHdr->size; - pHdr->index = pReader->index; - vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index, - pHdr->type, *nData); + if (code) { + vError("vgId:%d, vnode snapshot read failed at %s:%d since %s", vgId, __FILE__, lino, tstrerror(code)); } else { - vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index); - } - return code; + if (*ppData) { + SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData); -_err: - vError("vgId:%d, vnode snapshot read failed since %s", vgId, tstrerror(code)); + pReader->index++; + *nData = sizeof(SSnapDataHdr) + pHdr->size; + pHdr->index = pReader->index; + vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index, + pHdr->type, *nData); + } else { + vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index); + } + } return code; } @@ -532,19 +506,19 @@ struct SVSnapWriter { SMetaSnapWriter *pMetaSnapWriter; // tsdb TFileSetRangeArray *pRanges; - STsdbSnapWriter *pTsdbSnapWriter; + STsdbSnapWriter *pTsdbSnapWriter; // tsdb raw STsdbSnapRAWWriter *pTsdbSnapRAWWriter; // tq - STqSnapWriter *pTqSnapHandleWriter; - STqSnapWriter *pTqSnapOffsetWriter; - STqSnapWriter *pTqSnapCheckInfoWriter; + STqSnapWriter *pTqSnapHandleWriter; + STqSnapWriter *pTqSnapOffsetWriter; + STqSnapWriter *pTqSnapCheckInfoWriter; // stream SStreamTaskWriter *pStreamTaskWriter; SStreamStateWriter *pStreamStateWriter; // rsma TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; - SRSmaSnapWriter *pRsmaSnapWriter; + SRSmaSnapWriter *pRsmaSnapWriter; }; TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) { @@ -563,18 +537,18 @@ TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) { SVnode *pVnode = pWriter->pVnode; - int32_t code = -1; + int32_t code = 0; + int32_t lino; if (pParam->data) { SSyncTLV *datHead = (void *)pParam->data; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { - terrno = TSDB_CODE_INVALID_DATA_FMT; - goto _out; + TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit); } STsdbRepOpts tsdbOpts = {0}; TFileSetRangeArray **ppRanges = NULL; - int32_t offset = 0; + int32_t offset = 0; while (offset + sizeof(SSyncTLV) < datHead->len) { SSyncTLV *subField = (void *)(datHead->val + offset); @@ -589,30 +563,30 @@ static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotP ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ); if (ppRanges == NULL) { vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); - goto _out; - } - if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { - vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); - goto _out; + TSDB_CHECK_CODE(code = terrno, lino, _exit); } + + code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges); + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_RAW: { - if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) { - vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); - goto _out; - } + code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts); + TSDB_CHECK_CODE(code, lino, _exit); } break; default: vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); - goto _out; + TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit); + goto _exit; } } vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format); } - code = 0; -_out: +_exit: + if (code) { + vError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code)); + } return code; } @@ -634,6 +608,7 @@ static int32_t vnodeEnableBgTask(SVnode *pVnode) { int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) { int32_t code = 0; + int32_t lino; SVSnapWriter *pWriter = NULL; int64_t sver = pParam->start; int64_t ever = pParam->end; @@ -644,8 +619,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter // alloc pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); } pWriter->pVnode = pVnode; pWriter->sver = sver; @@ -655,19 +629,19 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter pWriter->commitID = ++pVnode->state.commitID; // snapshot info - if (vnodeSnapWriterDealWithSnapInfo(pWriter, pParam) < 0) { - goto _err; + code = vnodeSnapWriterDealWithSnapInfo(pWriter, pParam); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code)); + if (pWriter) taosMemoryFreeClear(pWriter); + *ppWriter = NULL; + } else { + vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), + sver, ever, pWriter->commitID); + *ppWriter = pWriter; } - - vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), - sver, ever, pWriter->commitID); - *ppWriter = pWriter; - return code; - -_err: - vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code)); - if (pWriter) taosMemoryFreeClear(pWriter); - *ppWriter = NULL; return code; } @@ -785,14 +759,13 @@ _exit: static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { int32_t code = 0; + int32_t lino; SVnode *pVnode = pWriter->pVnode; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; // decode info - if (vnodeDecodeInfo(pHdr->data, &pWriter->info) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } + code = vnodeDecodeInfo(pHdr->data, &pWriter->info); + TSDB_CHECK_CODE(code, lino, _exit); // change some value pWriter->info.state.commitID = pWriter->commitID; @@ -805,10 +778,8 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_ pWriter->info.config = pVnode->config; pWriter->info.config.vndStats = vndStats; vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId); - if (vnodeSaveInfo(dir, &pWriter->info) < 0) { - code = terrno; - goto _exit; - } + code = vnodeSaveInfo(dir, &pWriter->info); + TSDB_CHECK_CODE(code, lino, _exit); _exit: return code; @@ -816,6 +787,7 @@ _exit: int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { int32_t code = 0; + int32_t lino; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SVnode *pVnode = pWriter->pVnode; @@ -824,7 +796,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { if (pHdr->index != pWriter->index + 1) { vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode), pHdr->index, pWriter->index + 1); - return -1; + TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_MSG, lino, _exit); } pWriter->index = pHdr->index; @@ -835,17 +807,17 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { switch (pHdr->type) { case SNAP_DATA_CFG: { code = vnodeSnapWriteInfo(pWriter, pData, nData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_META: { // meta if (pWriter->pMetaSnapWriter == NULL) { code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_TSDB: case SNAP_DATA_DEL: { @@ -853,69 +825,69 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { if (pWriter->pTsdbSnapWriter == NULL) { code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges, &pWriter->pTsdbSnapWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_RAW: { // tsdb if (pWriter->pTsdbSnapRAWWriter == NULL) { ASSERT(pWriter->sver == 0); code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_TQ_HANDLE: { // tq handle if (pWriter->pTqSnapHandleWriter == NULL) { code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_TQ_CHECKINFO: { // tq checkinfo if (pWriter->pTqSnapCheckInfoWriter == NULL) { code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapCheckInfoWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tqSnapCheckInfoWrite(pWriter->pTqSnapCheckInfoWriter, pData, nData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_TQ_OFFSET: { // tq offset if (pWriter->pTqSnapOffsetWriter == NULL) { code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_STREAM_TASK: case SNAP_DATA_STREAM_TASK_CHECKPOINT: { if (pWriter->pStreamTaskWriter == NULL) { code = streamTaskSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamTaskWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_STREAM_STATE_BACKEND: { if (pWriter->pStreamStateWriter == NULL) { code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = streamStateSnapWrite(pWriter->pStreamStateWriter, pData, nData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; case SNAP_DATA_RSMA1: @@ -925,20 +897,19 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { if (pWriter->pRsmaSnapWriter == NULL) { code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges, &pWriter->pRsmaSnapWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } break; default: break; } _exit: - return code; - -_err: - vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), - tstrerror(code), pHdr->index, pHdr->type, nData); + if (code) { + vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), + tstrerror(code), pHdr->index, pHdr->type, nData); + } return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8f28871e3b..742d7e4f3f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -387,7 +387,7 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (msgcb == NULL || msgcb->putToQueueFp == NULL) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; - return -1; + return TSDB_CODE_INVALID_PARA; } int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg); @@ -400,13 +400,13 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { - return -1; + return TSDB_CODE_INVALID_PARA; } if (msgcb == NULL || msgcb->putToQueueFp == NULL) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; - return -1; + return TSDB_CODE_INVALID_PARA; } int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); @@ -485,8 +485,7 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { SVnode *pVnode = pFsm->data; - int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader); - return code; + return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader); } static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { @@ -496,8 +495,7 @@ static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { SVnode *pVnode = pFsm->data; - int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); - return code; + return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); } static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) { @@ -514,8 +512,7 @@ static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void } } while (true); - int32_t code = vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter); - return code; + return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter); } static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { @@ -580,7 +577,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) if (pMeta->startInfo.startAllTasks == 1) { pMeta->startInfo.restartCount += 1; vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, - pMeta->startInfo.restartCount); + pMeta->startInfo.restartCount); } else { pMeta->startInfo.startAllTasks = 1; streamMetaWUnLock(pMeta); @@ -636,7 +633,7 @@ static void vnodeBecomeLeader(const SSyncFSM *pFsm) { } } -static void vnodeBecomeAssignedLeader(const SSyncFSM* pFsm) { +static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; vDebug("vgId:%d, become assigned leader", pVnode->config.vgId); if (pVnode->pTq) { @@ -662,12 +659,16 @@ static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) { int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); return itemSize; } else { - return -1; + return TSDB_CODE_INVALID_PARA; } } static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); + if (pFsm == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } pFsm->data = pVnode; pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex; @@ -724,7 +725,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) { pVnode->sync = syncOpen(&syncInfo, vnodeVersion); if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); - return -1; + return terrno; } return 0; @@ -732,9 +733,10 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) { int32_t vnodeSyncStart(SVnode *pVnode) { vInfo("vgId:%d, start sync", pVnode->config.vgId); - if (syncStart(pVnode->sync) < 0) { - vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr()); - return -1; + int32_t code = syncStart(pVnode->sync); + if (code) { + vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code)); + return code; } return 0; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c7e654605b..b455e3355f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -112,7 +112,7 @@ int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) { if (pSyncNode == NULL) { sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid); - return -1; + return terrno; } *cfg = pSyncNode->raftCfg.cfg; @@ -2036,7 +2036,7 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) { pSyncNode->hbrSlowNum = 0; // reset restoreFinish - //pSyncNode->restoreFinish = false; + // pSyncNode->restoreFinish = false; // state change pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER; @@ -2149,7 +2149,8 @@ int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64 "assigned commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex, lastIndex); + pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex, + lastIndex); return 0; }