enh: refactor error code

This commit is contained in:
Hongze Cheng 2024-07-22 19:40:07 +08:00
parent dec8095e53
commit c4d31e7737
5 changed files with 439 additions and 489 deletions

View File

@ -88,33 +88,33 @@ const ESyncRole vnodeStrToRole(char *str) {
int vnodeEncodeConfig(const void *pObj, SJson *pJson) { int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
const SVnodeCfg *pCfg = (SVnodeCfg *)pObj; const SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
if (tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId));
if (tjsonAddStringToObject(pJson, "dbname", pCfg->dbname) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson, "dbname", pCfg->dbname));
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId));
if (tjsonAddIntegerToObject(pJson, "szPage", pCfg->szPage) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "szPage", pCfg->szPage));
if (tjsonAddIntegerToObject(pJson, "szCache", pCfg->szCache) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "szCache", pCfg->szCache));
if (tjsonAddIntegerToObject(pJson, "cacheLast", pCfg->cacheLast) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "cacheLast", pCfg->cacheLast));
if (tjsonAddIntegerToObject(pJson, "cacheLastSize", pCfg->cacheLastSize) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "cacheLastSize", pCfg->cacheLastSize));
if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf));
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap));
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak));
if (tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma));
if (tjsonAddIntegerToObject(pJson, "isRsma", pCfg->isRsma) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "isRsma", pCfg->isRsma));
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision));
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update));
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression));
if (tjsonAddIntegerToObject(pJson, "slLevel", pCfg->tsdbCfg.slLevel) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "slLevel", pCfg->tsdbCfg.slLevel));
if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days));
if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows));
if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows));
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0));
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1));
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2));
if (tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset));
if (tjsonAddIntegerToObject(pJson, "s3ChunkSize", pCfg->s3ChunkSize) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "s3ChunkSize", pCfg->s3ChunkSize));
if (tjsonAddIntegerToObject(pJson, "s3KeepLocal", pCfg->s3KeepLocal) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "s3KeepLocal", pCfg->s3KeepLocal));
if (tjsonAddIntegerToObject(pJson, "s3Compact", pCfg->s3Compact) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "s3Compact", pCfg->s3Compact));
if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize));
if (pCfg->tsdbCfg.retentions[0].keep > 0) { if (pCfg->tsdbCfg.retentions[0].keep > 0) {
int32_t nRetention = 1; int32_t nRetention = 1;
if (pCfg->tsdbCfg.retentions[1].freq > 0) { if (pCfg->tsdbCfg.retentions[1].freq > 0) {
@ -124,61 +124,67 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
} }
} }
SJson *pNodeRetentions = tjsonCreateArray(); SJson *pNodeRetentions = tjsonCreateArray();
if (pNodeRetentions == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
for (int32_t i = 0; i < nRetention; ++i) { for (int32_t i = 0; i < nRetention; ++i) {
SJson *pNodeRetention = tjsonCreateObject(); SJson *pNodeRetention = tjsonCreateObject();
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq));
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit));
tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep); TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep));
tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit); TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit));
tjsonAddItemToArray(pNodeRetentions, pNodeRetention); TAOS_CHECK_RETURN(tjsonAddItemToArray(pNodeRetentions, pNodeRetention));
} }
} }
if (tjsonAddIntegerToObject(pJson, "tsdb.encryptAlgorithm", pCfg->tsdbCfg.encryptAlgorithm) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "tsdb.encryptAlgorithm", pCfg->tsdbCfg.encryptAlgorithm));
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId));
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod));
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod));
if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod));
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize));
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize));
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level));
if (tjsonAddIntegerToObject(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles));
if (tjsonAddIntegerToObject(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm));
if (tjsonAddIntegerToObject(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm));
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger));
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin));
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd));
if (tjsonAddIntegerToObject(pJson, "hashChange", pCfg->hashChange) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashChange", pCfg->hashChange));
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod));
if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix));
if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix));
TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum));
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex));
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion));
if (tjsonAddIntegerToObject(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables));
TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables));
if (tjsonAddIntegerToObject(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables));
if (tjsonAddIntegerToObject(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries));
if (tjsonAddIntegerToObject(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries));
if (tjsonAddIntegerToObject(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries) < 0) return -1;
SJson *nodeInfo = tjsonCreateArray(); SJson *nodeInfo = tjsonCreateArray();
if (nodeInfo == NULL) return -1; if (nodeInfo == NULL) {
if (tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo) < 0) return -1; return TSDB_CODE_OUT_OF_MEMORY;
}
TAOS_CHECK_RETURN(tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo));
vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pCfg->vgId, vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pCfg->vgId,
pCfg->syncCfg.replicaNum, pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex, pCfg->syncCfg.changeVersion); pCfg->syncCfg.replicaNum, pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex, pCfg->syncCfg.changeVersion);
for (int i = 0; i < pCfg->syncCfg.totalReplicaNum; ++i) { for (int i = 0; i < pCfg->syncCfg.totalReplicaNum; ++i) {
SJson *info = tjsonCreateObject(); SJson *info = tjsonCreateObject();
if (info == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNodeInfo *pNode = (SNodeInfo *)&pCfg->syncCfg.nodeInfo[i]; SNodeInfo *pNode = (SNodeInfo *)&pCfg->syncCfg.nodeInfo[i];
if (info == NULL) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(info, "nodePort", pNode->nodePort));
if (tjsonAddIntegerToObject(info, "nodePort", pNode->nodePort) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddStringToObject(info, "nodeFqdn", pNode->nodeFqdn));
if (tjsonAddStringToObject(info, "nodeFqdn", pNode->nodeFqdn) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(info, "nodeId", pNode->nodeId));
if (tjsonAddIntegerToObject(info, "nodeId", pNode->nodeId) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(info, "clusterId", pNode->clusterId));
if (tjsonAddIntegerToObject(info, "clusterId", pNode->clusterId) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddStringToObject(info, "isReplica", vnodeRoleToStr(pNode->nodeRole)));
if (tjsonAddStringToObject(info, "isReplica", vnodeRoleToStr(pNode->nodeRole)) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddItemToArray(nodeInfo, info));
if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1;
vDebug("vgId:%d, encode config, replica:%d ep:%s:%u dnode:%d", pCfg->vgId, i, pNode->nodeFqdn, pNode->nodePort, vDebug("vgId:%d, encode config, replica:%d ep:%s:%u dnode:%d", pCfg->vgId, i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId); pNode->nodeId);
} }
@ -191,50 +197,50 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
int32_t code; int32_t code;
tjsonGetNumberValue(pJson, "vgId", pCfg->vgId, code); tjsonGetNumberValue(pJson, "vgId", pCfg->vgId, code);
if (code < 0) return -1; if (code) return code;
if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1; if ((code = tjsonGetStringValue(pJson, "dbname", pCfg->dbname))) return code;
tjsonGetNumberValue(pJson, "dbId", pCfg->dbId, code); tjsonGetNumberValue(pJson, "dbId", pCfg->dbId, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "szPage", pCfg->szPage, code); tjsonGetNumberValue(pJson, "szPage", pCfg->szPage, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code); tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "cacheLast", pCfg->cacheLast, code); tjsonGetNumberValue(pJson, "cacheLast", pCfg->cacheLast, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "cacheLastSize", pCfg->cacheLastSize, code); tjsonGetNumberValue(pJson, "cacheLastSize", pCfg->cacheLastSize, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code); tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code); tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code); tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code); tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code); tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code); tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code); tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression, code); tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel, code); tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days, code); tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows, code); tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows, code); tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0, code); tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1, code); tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code); tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset, code); tjsonGetNumberValue(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset, code);
if (code < 0) return -1; if (code) return code;
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
int32_t nRetention = tjsonGetArraySize(pNodeRetentions); int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
if (nRetention > TSDB_RETENTION_MAX) { if (nRetention > TSDB_RETENTION_MAX) {
@ -244,52 +250,54 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i);
ASSERT(pNodeRetention != NULL); ASSERT(pNodeRetention != NULL);
tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code); tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code);
if (code) return code;
tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code); tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code);
if (code) return code;
tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep, code); tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep, code);
if (code) return code;
tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code); tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code);
if (code) return code;
} }
tjsonGetNumberValue(pJson, "tsdb.encryptAlgorithm", pCfg->tsdbCfg.encryptAlgorithm, code); tjsonGetNumberValue(pJson, "tsdb.encryptAlgorithm", pCfg->tsdbCfg.encryptAlgorithm, code);
if (code < 0) return -1; if (code) return code;
#if defined(TD_ENTERPRISE) #if defined(TD_ENTERPRISE)
if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) { if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
if (tsEncryptKey[0] == 0) { if (tsEncryptKey[0] == 0) {
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
return -1;
} else { } else {
strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
} }
#endif #endif
tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code); tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod, code); tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod, code); tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod, code); tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize, code); tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize, code); tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code); tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles, code); tjsonGetNumberValue(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm, code); tjsonGetNumberValue(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm, code);
if (code < 0) return -1; if (code) return code;
#if defined(TD_ENTERPRISE) #if defined(TD_ENTERPRISE)
if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) { if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
if (tsEncryptKey[0] == 0) { if (tsEncryptKey[0] == 0) {
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
return -1;
} else { } else {
strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
} }
#endif #endif
tjsonGetNumberValue(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm, code); tjsonGetNumberValue(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm, code);
if (code < 0) return -1; if (code) return code;
#if defined(TD_ENTERPRISE) #if defined(TD_ENTERPRISE)
if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) { if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
if (tsEncryptKey[0] == 0) { if (tsEncryptKey[0] == 0) {
@ -303,35 +311,35 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
tjsonGetNumberValue(pJson, "sstTrigger", pCfg->sttTrigger, code); tjsonGetNumberValue(pJson, "sstTrigger", pCfg->sttTrigger, code);
if (code < 0) pCfg->sttTrigger = TSDB_DEFAULT_SST_TRIGGER; if (code < 0) pCfg->sttTrigger = TSDB_DEFAULT_SST_TRIGGER;
tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code); tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code); tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "hashChange", pCfg->hashChange, code); tjsonGetNumberValue(pJson, "hashChange", pCfg->hashChange, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code); tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "hashPrefix", pCfg->hashPrefix, code); tjsonGetNumberValue(pJson, "hashPrefix", pCfg->hashPrefix, code);
if (code < 0) pCfg->hashPrefix = TSDB_DEFAULT_HASH_PREFIX; if (code < 0) pCfg->hashPrefix = TSDB_DEFAULT_HASH_PREFIX;
tjsonGetNumberValue(pJson, "hashSuffix", pCfg->hashSuffix, code); tjsonGetNumberValue(pJson, "hashSuffix", pCfg->hashSuffix, code);
if (code < 0) pCfg->hashSuffix = TSDB_DEFAULT_HASH_SUFFIX; if (code < 0) pCfg->hashSuffix = TSDB_DEFAULT_HASH_SUFFIX;
tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code); tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code); tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion, code); tjsonGetNumberValue(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables, code); tjsonGetNumberValue(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables, code); tjsonGetNumberValue(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables, code); tjsonGetNumberValue(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries, code); tjsonGetNumberValue(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries, code); tjsonGetNumberValue(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries, code);
if (code < 0) return -1; if (code) return code;
SJson *nodeInfo = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo"); SJson *nodeInfo = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo");
int arraySize = tjsonGetArraySize(nodeInfo); int arraySize = tjsonGetArraySize(nodeInfo);
@ -344,15 +352,15 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i]; SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
if (info == NULL) return -1; if (info == NULL) return -1;
tjsonGetNumberValue(info, "nodePort", pNode->nodePort, code); tjsonGetNumberValue(info, "nodePort", pNode->nodePort, code);
if (code < 0) return -1; if (code) return code;
tjsonGetStringValue(info, "nodeFqdn", pNode->nodeFqdn); tjsonGetStringValue(info, "nodeFqdn", pNode->nodeFqdn);
tjsonGetNumberValue(info, "nodeId", pNode->nodeId, code); tjsonGetNumberValue(info, "nodeId", pNode->nodeId, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(info, "clusterId", pNode->clusterId, code); tjsonGetNumberValue(info, "clusterId", pNode->clusterId, code);
if (code < 0) return -1; if (code) return code;
char role[10] = {0}; char role[10] = {0};
code = tjsonGetStringValue(info, "isReplica", role); code = tjsonGetStringValue(info, "isReplica", role);
if (code < 0) return -1; if (code) return code;
if (strlen(role) != 0) { if (strlen(role) != 0) {
pNode->nodeRole = vnodeStrToRole(role); pNode->nodeRole = vnodeStrToRole(role);
} else { } else {

View File

@ -125,20 +125,16 @@ int vnodeBegin(SVnode *pVnode) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// begin meta // begin meta
if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) { code = metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL);
code = terrno; TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
// begin tsdb // begin tsdb
if (tsdbBegin(pVnode->pTsdb) < 0) { code = tsdbBegin(pVnode->pTsdb);
code = terrno; TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
// begin sma // begin sma
if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) { if (VND_IS_RSMA(pVnode)) {
code = terrno; code = smaBegin(pVnode->pSma);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -171,54 +167,41 @@ int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
} }
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
int32_t code = 0;
int32_t lino;
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile; TdFilePtr pFile = NULL;
char *data; char *data = NULL;
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP); snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
// encode info code = vnodeEncodeInfo(pInfo, &data);
data = NULL; TSDB_CHECK_CODE(code, lino, _exit);
if (vnodeEncodeInfo(pInfo, &data) < 0) {
vError("failed to encode json info.");
return -1;
}
// save info to a vnode_tmp.json // save info to a vnode_tmp.json
pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pFile == NULL) { if (pFile == NULL) {
vError("failed to open info file:%s for write:%s", fname, terrstr()); TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
if (taosWriteFile(pFile, data, strlen(data)) < 0) { if (taosWriteFile(pFile, data, strlen(data)) < 0) {
vError("failed to write info file:%s error:%s", fname, terrstr()); TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
} }
if (taosFsyncFile(pFile) < 0) { if (taosFsyncFile(pFile) < 0) {
vError("failed to fsync info file:%s error:%s", fname, terrstr()); TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
} }
taosCloseFile(&pFile); _exit:
if (code) {
// free info binary vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code));
taosMemoryFree(data); } else {
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname, pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion); }
return 0;
_err:
taosCloseFile(&pFile); taosCloseFile(&pFile);
taosMemoryFree(data); taosMemoryFree(data);
return -1; return code;
} }
int vnodeCommitInfo(const char *dir) { int vnodeCommitInfo(const char *dir) {
@ -229,8 +212,7 @@ int vnodeCommitInfo(const char *dir) {
snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP); snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
if (taosRenameFile(tfname, fname) < 0) { if (taosRenameFile(tfname, fname) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); return terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
vInfo("vnode info is committed, dir:%s", dir); vInfo("vnode info is committed, dir:%s", dir);
@ -238,6 +220,8 @@ int vnodeCommitInfo(const char *dir) {
} }
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
int32_t code = 0;
int32_t lino;
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
char *pData = NULL; char *pData = NULL;
@ -248,44 +232,35 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
// read info // read info
pFile = taosOpenFile(fname, TD_FILE_READ); pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
return -1;
} }
if (taosFStatFile(pFile, &size, NULL) < 0) { if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
goto _err;
} }
pData = taosMemoryMalloc(size + 1); pData = taosMemoryMalloc(size + 1);
if (pData == NULL) { if (pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
goto _err;
} }
if (taosReadFile(pFile, pData, size) < 0) { if (taosReadFile(pFile, pData, size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
goto _err;
} }
pData[size] = '\0'; pData[size] = '\0';
taosCloseFile(&pFile);
// decode info // decode info
if (vnodeDecodeInfo(pData, pInfo) < 0) { code = vnodeDecodeInfo(pData, pInfo);
taosMemoryFree(pData); TSDB_CHECK_CODE(code, lino, _exit);
return -1;
_exit:
if (code) {
vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code));
} }
taosMemoryFree(pData); taosMemoryFree(pData);
return 0;
_err:
taosCloseFile(&pFile); taosCloseFile(&pFile);
taosMemoryFree(pData); return code;
return -1;
} }
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
@ -297,7 +272,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
// wait last commit task // wait last commit task
vnodeAWait(&pVnode->commitTask); vnodeAWait(&pVnode->commitTask);
if (syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; code = syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg);
TSDB_CHECK_CODE(code, lino, _exit);
pVnode->state.commitTerm = pVnode->state.applyTerm; pVnode->state.commitTerm = pVnode->state.applyTerm;
@ -312,17 +288,16 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode)); vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
if (vnodeSaveInfo(dir, &pInfo->info) < 0) { code = vnodeSaveInfo(dir, &pInfo->info);
code = terrno; TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
tsdbPreCommit(pVnode->pTsdb); tsdbPreCommit(pVnode->pTsdb);
metaPrepareAsyncCommit(pVnode->pMeta); code = metaPrepareAsyncCommit(pVnode->pMeta);
TSDB_CHECK_CODE(code, lino, _exit);
code = smaPrepareAsyncCommit(pVnode->pSma); code = smaPrepareAsyncCommit(pVnode->pSma);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
taosThreadMutexLock(&pVnode->mutex); taosThreadMutexLock(&pVnode->mutex);
ASSERT(pVnode->onCommit == NULL); ASSERT(pVnode->onCommit == NULL);
@ -459,10 +434,8 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
} }
// commit info // commit info
if (vnodeCommitInfo(dir) < 0) { code = vnodeCommitInfo(dir);
code = terrno; TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbCommitCommit(pVnode->pTsdb); code = tsdbCommitCommit(pVnode->pTsdb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -472,10 +445,8 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (metaFinishCommit(pVnode->pMeta, pInfo->txn) < 0) { code = metaFinishCommit(pVnode->pMeta, pInfo->txn);
code = terrno; TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
pVnode->state.committed = pInfo->info.state.committed; pVnode->state.committed = pInfo->info.state.committed;
@ -492,7 +463,7 @@ _exit:
} else { } else {
vInfo("vgId:%d, commit end", TD_VID(pVnode)); vInfo("vgId:%d, commit end", TD_VID(pVnode));
} }
return 0; return code;
} }
bool vnodeShouldRollback(SVnode *pVnode) { bool vnodeShouldRollback(SVnode *pVnode) {
@ -514,15 +485,15 @@ void vnodeRollback(SVnode *pVnode) {
offset = strlen(tFName); offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP); snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
(void)taosRemoveFile(tFName); TAOS_UNUSED(taosRemoveFile(tFName));
} }
static int vnodeEncodeState(const void *pObj, SJson *pJson) { static int vnodeEncodeState(const void *pObj, SJson *pJson) {
const SVState *pState = (SVState *)pObj; const SVState *pState = (SVState *)pObj;
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit version", pState->committed));
if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID));
if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1; TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm));
return 0; return 0;
} }
@ -532,70 +503,67 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
int32_t code; int32_t code;
tjsonGetNumberValue(pJson, "commit version", pState->committed, code); tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code); tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
if (code < 0) return -1; if (code) return code;
tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code); tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
if (code < 0) return -1; if (code) return code;
return 0; return 0;
} }
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
SJson *pJson; int32_t code = 0;
char *pData; int32_t lino;
SJson *pJson = NULL;
*ppData = NULL; char *pData = NULL;
pJson = tjsonCreateObject(); pJson = tjsonCreateObject();
if (pJson == NULL) { if (pJson == NULL) {
return -1; TSDB_CHECK_CODE(code = terrno, lino, _exit);
} }
if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) { code = tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
}
if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) { code = tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
}
pData = tjsonToString(pJson); pData = tjsonToString(pJson);
if (pData == NULL) { if (pData == NULL) {
goto _err; TSDB_CHECK_CODE(code = terrno, lino, _exit);
} }
tjsonDelete(pJson); tjsonDelete(pJson);
*ppData = pData; _exit:
return 0; if (code) {
tjsonDelete(pJson);
_err: *ppData = NULL;
tjsonDelete(pJson); } else {
return -1; *ppData = pData;
}
return code;
} }
int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
SJson *pJson = NULL; int32_t code = 0;
int32_t lino;
SJson *pJson = NULL;
pJson = tjsonParse(pData); pJson = tjsonParse(pData);
if (pJson == NULL) { if (pJson == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
return -1; return -1;
} }
if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) { code = tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
}
if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) { code = tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
tjsonDelete(pJson); tjsonDelete(pJson);
return code;
return 0;
_err:
tjsonDelete(pJson);
return -1;
} }

View File

@ -17,20 +17,19 @@
#include "vnd.h" #include "vnd.h"
static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) { static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) {
int32_t code = -1; int32_t code = 0;
STsdbFSetPartList *pList = tsdbFSetPartListCreate(); STsdbFSetPartList *pList = tsdbFSetPartListCreate();
if (pList == NULL) { if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
goto _out; goto _out;
} }
if (tDeserializeTsdbFSetPartList(buf, bufLen, pList) < 0) {
terrno = TSDB_CODE_INVALID_DATA_FMT; code = tDeserializeTsdbFSetPartList(buf, bufLen, pList);
goto _out; if (code) goto _out;
}
if (tsdbFSetPartListToRangeDiff(pList, ppRanges) < 0) { code = tsdbFSetPartListToRangeDiff(pList, ppRanges);
goto _out; if (code) goto _out;
}
code = 0;
_out: _out:
tsdbFSetPartListDestroy(&pList); tsdbFSetPartListDestroy(&pList);
return code; return code;
@ -48,29 +47,29 @@ struct SVSnapReader {
int8_t metaDone; int8_t metaDone;
SMetaSnapReader *pMetaReader; SMetaSnapReader *pMetaReader;
// tsdb // tsdb
int8_t tsdbDone; int8_t tsdbDone;
TFileSetRangeArray *pRanges; TFileSetRangeArray *pRanges;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
// tsdb raw // tsdb raw
int8_t tsdbRAWDone; int8_t tsdbRAWDone;
STsdbSnapRAWReader *pTsdbRAWReader; STsdbSnapRAWReader *pTsdbRAWReader;
// tq // tq
int8_t tqHandleDone; int8_t tqHandleDone;
STqSnapReader *pTqSnapReader; STqSnapReader *pTqSnapReader;
int8_t tqOffsetDone; int8_t tqOffsetDone;
STqSnapReader *pTqOffsetReader; STqSnapReader *pTqOffsetReader;
int8_t tqCheckInfoDone; int8_t tqCheckInfoDone;
STqSnapReader *pTqCheckInfoReader; STqSnapReader *pTqCheckInfoReader;
// stream // stream
int8_t streamTaskDone; int8_t streamTaskDone;
SStreamTaskReader *pStreamTaskReader; SStreamTaskReader *pStreamTaskReader;
int8_t streamStateDone; int8_t streamStateDone;
SStreamStateReader *pStreamStateReader; SStreamStateReader *pStreamStateReader;
// rsma // rsma
int8_t rsmaDone; int8_t rsmaDone;
TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
SRSmaSnapReader *pRsmaReader; SRSmaSnapReader *pRsmaReader;
}; };
static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) { static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) {
@ -88,14 +87,15 @@ static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader,
} }
static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) { static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) {
int32_t code = 0;
SVnode *pVnode = pReader->pVnode; SVnode *pVnode = pReader->pVnode;
int32_t code = -1;
if (pParam->data) { if (pParam->data) {
// decode // decode
SSyncTLV *datHead = (void *)pParam->data; SSyncTLV *datHead = (void *)pParam->data;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
terrno = TSDB_CODE_INVALID_DATA_FMT; code = TSDB_CODE_INVALID_DATA_FMT;
terrno = code;
goto _out; goto _out;
} }
@ -116,21 +116,25 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP
ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ); ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
if (ppRanges == NULL) { if (ppRanges == NULL) {
vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
code = TSDB_CODE_INVALID_DATA_FMT;
goto _out; goto _out;
} }
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
if (code) {
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
goto _out; goto _out;
} }
} break; } break;
case SNAP_DATA_RAW: { case SNAP_DATA_RAW: {
if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) { code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
if (code) {
vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
goto _out; goto _out;
} }
} break; } break;
default: default:
vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
code = TSDB_CODE_INVALID_DATA_FMT;
goto _out; goto _out;
} }
} }
@ -147,7 +151,7 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP
vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode), vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode),
(pReader->tsdbDone ? "raw" : "normal")); (pReader->tsdbDone ? "raw" : "normal"));
} }
code = 0;
_out: _out:
return code; return code;
} }
@ -160,42 +164,43 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader
pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader)); pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) { if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _exit;
} }
pReader->pVnode = pVnode; pReader->pVnode = pVnode;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
// snapshot info // snapshot info
if (vnodeSnapReaderDealWithSnapInfo(pReader, pParam) < 0) { code = vnodeSnapReaderDealWithSnapInfo(pReader, pParam);
goto _err; if (code) goto _exit;
}
// open tsdb snapshot raw reader // open tsdb snapshot raw reader
if (!pReader->tsdbRAWDone) { if (!pReader->tsdbRAWDone) {
ASSERT(pReader->sver == 0); ASSERT(pReader->sver == 0);
code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
if (code) goto _err; if (code) goto _exit;
} }
// check snapshot ever // check snapshot ever
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
vnodeGetSnapshot(pVnode, &snapshot); code = vnodeGetSnapshot(pVnode, &snapshot);
if (code) goto _exit;
if (ever != snapshot.lastApplyIndex) { if (ever != snapshot.lastApplyIndex) {
vError("vgId:%d, abort reader open due to vnode snapshot changed. ever:%" PRId64 ", commit ver:%" PRId64, vError("vgId:%d, abort reader open due to vnode snapshot changed. ever:%" PRId64 ", commit ver:%" PRId64,
TD_VID(pVnode), ever, snapshot.lastApplyIndex); TD_VID(pVnode), ever, snapshot.lastApplyIndex);
code = TSDB_CODE_SYN_INTERNAL_ERROR; code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err; goto _exit;
} }
vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever); _exit:
*ppReader = pReader; if (code) {
return code; vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
*ppReader = NULL;
_err: } else {
vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code)); vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
*ppReader = NULL; *ppReader = pReader;
}
return code; return code;
} }
@ -245,6 +250,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
int32_t lino;
SVnode *pVnode = pReader->pVnode; SVnode *pVnode = pReader->pVnode;
int32_t vgId = TD_VID(pReader->pVnode); int32_t vgId = TD_VID(pReader->pVnode);
@ -261,31 +267,28 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
if (NULL == pFile) { if (NULL == pFile) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
int64_t size; int64_t size;
if (taosFStatFile(pFile, &size, NULL) < 0) { if (taosFStatFile(pFile, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFile); taosCloseFile(&pFile);
goto _err; TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
} }
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
if (*ppData == NULL) { if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFile); taosCloseFile(&pFile);
goto _err; TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
} }
((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG; ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
((SSnapDataHdr *)(*ppData))->size = size + 1; ((SSnapDataHdr *)(*ppData))->size = size + 1;
((SSnapDataHdr *)(*ppData))->data[size] = '\0'; ((SSnapDataHdr *)(*ppData))->data[size] = '\0';
if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) { if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(*ppData); taosMemoryFree(*ppData);
taosCloseFile(&pFile); taosCloseFile(&pFile);
goto _err; TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
@ -299,20 +302,18 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
// open reader if not // open reader if not
if (pReader->pMetaReader == NULL) { if (pReader->pMetaReader == NULL) {
code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader); code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = metaSnapRead(pReader->pMetaReader, ppData); code = metaSnapRead(pReader->pMetaReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err;
if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->metaDone = 1;
goto _exit; code = metaSnapReaderClose(&pReader->pMetaReader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
pReader->metaDone = 1;
code = metaSnapReaderClose(&pReader->pMetaReader);
if (code) goto _err;
}
} }
} }
@ -322,20 +323,17 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (pReader->pTsdbReader == NULL) { if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges, code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges,
&pReader->pTsdbReader); &pReader->pTsdbReader);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbSnapRead(pReader->pTsdbReader, ppData); code = tsdbSnapRead(pReader->pTsdbReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->tsdbDone = 1;
goto _exit; code = tsdbSnapReaderClose(&pReader->pTsdbReader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
pReader->tsdbDone = 1;
code = tsdbSnapReaderClose(&pReader->pTsdbReader);
if (code) goto _err;
}
} }
} }
@ -344,20 +342,17 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (pReader->pTsdbRAWReader == NULL) { if (pReader->pTsdbRAWReader == NULL) {
ASSERT(pReader->sver == 0); ASSERT(pReader->sver == 0);
code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData); code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->tsdbRAWDone = 1;
goto _exit; code = tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
pReader->tsdbRAWDone = 1;
code = tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
if (code) goto _err;
}
} }
} }
@ -365,59 +360,53 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
vInfo("vgId:%d tq transform start", vgId); vInfo("vgId:%d tq transform start", vgId);
if (!pReader->tqHandleDone) { if (!pReader->tqHandleDone) {
if (pReader->pTqSnapReader == NULL) { if (pReader->pTqSnapReader == NULL) {
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE, &pReader->pTqSnapReader); code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE,
if (code < 0) goto _err; &pReader->pTqSnapReader);
TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqSnapRead(pReader->pTqSnapReader, ppData); code = tqSnapRead(pReader->pTqSnapReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->tqHandleDone = 1;
goto _exit; code = tqSnapReaderClose(&pReader->pTqSnapReader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
pReader->tqHandleDone = 1;
code = tqSnapReaderClose(&pReader->pTqSnapReader);
if (code) goto _err;
}
} }
} }
if (!pReader->tqCheckInfoDone) { if (!pReader->tqCheckInfoDone) {
if (pReader->pTqCheckInfoReader == NULL) { if (pReader->pTqCheckInfoReader == NULL) {
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO, &pReader->pTqCheckInfoReader); code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO,
if (code < 0) goto _err; &pReader->pTqCheckInfoReader);
TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqSnapRead(pReader->pTqCheckInfoReader, ppData); code = tqSnapRead(pReader->pTqCheckInfoReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->tqCheckInfoDone = 1;
goto _exit; code = tqSnapReaderClose(&pReader->pTqCheckInfoReader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
pReader->tqCheckInfoDone = 1;
code = tqSnapReaderClose(&pReader->pTqCheckInfoReader);
if (code) goto _err;
}
} }
} }
if (!pReader->tqOffsetDone) { if (!pReader->tqOffsetDone) {
if (pReader->pTqOffsetReader == NULL) { if (pReader->pTqOffsetReader == NULL) {
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET, &pReader->pTqOffsetReader); code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET,
if (code < 0) goto _err; &pReader->pTqOffsetReader);
TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqSnapRead(pReader->pTqOffsetReader, ppData); code = tqSnapRead(pReader->pTqOffsetReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->tqOffsetDone = 1;
goto _exit; code = tqSnapReaderClose(&pReader->pTqOffsetReader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
pReader->tqOffsetDone = 1;
code = tqSnapReaderClose(&pReader->pTqOffsetReader);
if (code) goto _err;
}
} }
} }
@ -426,28 +415,19 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (!pReader->streamTaskDone) { if (!pReader->streamTaskDone) {
if (pReader->pStreamTaskReader == NULL) { if (pReader->pStreamTaskReader == NULL) {
code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader); code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
vError("vgId:%d open streamtask snapshot reader failed, code:%s", vgId, tstrerror(code));
goto _err;
}
} }
code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData); code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
vError("vgId:%d error happens during read data from streatask snapshot, code:%s", vgId, tstrerror(code)); if (*ppData) {
goto _err; vInfo("vgId:%d no streamTask snapshot", vgId);
goto _exit;
} else { } else {
if (*ppData) { pReader->streamTaskDone = 1;
vInfo("vgId:%d no streamTask snapshot", vgId); code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} else { pReader->pStreamTaskReader = NULL;
pReader->streamTaskDone = 1;
code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
if (code) {
goto _err;
}
pReader->pStreamTaskReader = NULL;
}
} }
} }
if (!pReader->streamStateDone) { if (!pReader->streamStateDone) {
@ -457,21 +437,18 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (code) { if (code) {
pReader->streamStateDone = 1; pReader->streamStateDone = 1;
pReader->pStreamStateReader = NULL; pReader->pStreamStateReader = NULL;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
code = streamStateSnapRead(pReader->pStreamStateReader, ppData); code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->streamStateDone = 1;
goto _exit; code = streamStateSnapReaderClose(pReader->pStreamStateReader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
pReader->streamStateDone = 1; pReader->pStreamStateReader = NULL;
code = streamStateSnapReaderClose(pReader->pStreamStateReader);
if (code) goto _err;
pReader->pStreamStateReader = NULL;
}
} }
} }
@ -480,20 +457,17 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
// open if not // open if not
if (pReader->pRsmaReader == NULL) { if (pReader->pRsmaReader == NULL) {
code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader); code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = rsmaSnapRead(pReader->pRsmaReader, ppData); code = rsmaSnapRead(pReader->pRsmaReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->rsmaDone = 1;
goto _exit; code = rsmaSnapReaderClose(&pReader->pRsmaReader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
pReader->rsmaDone = 1;
code = rsmaSnapReaderClose(&pReader->pRsmaReader);
if (code) goto _err;
}
} }
} }
@ -501,21 +475,21 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
*nData = 0; *nData = 0;
_exit: _exit:
if (*ppData) { if (code) {
SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData); vError("vgId:%d, vnode snapshot read failed at %s:%d since %s", vgId, __FILE__, lino, tstrerror(code));
pReader->index++;
*nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index;
vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
pHdr->type, *nData);
} else { } else {
vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index); if (*ppData) {
} SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
return code;
_err: pReader->index++;
vError("vgId:%d, vnode snapshot read failed since %s", vgId, tstrerror(code)); *nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index;
vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
pHdr->type, *nData);
} else {
vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index);
}
}
return code; return code;
} }
@ -532,19 +506,19 @@ struct SVSnapWriter {
SMetaSnapWriter *pMetaSnapWriter; SMetaSnapWriter *pMetaSnapWriter;
// tsdb // tsdb
TFileSetRangeArray *pRanges; TFileSetRangeArray *pRanges;
STsdbSnapWriter *pTsdbSnapWriter; STsdbSnapWriter *pTsdbSnapWriter;
// tsdb raw // tsdb raw
STsdbSnapRAWWriter *pTsdbSnapRAWWriter; STsdbSnapRAWWriter *pTsdbSnapRAWWriter;
// tq // tq
STqSnapWriter *pTqSnapHandleWriter; STqSnapWriter *pTqSnapHandleWriter;
STqSnapWriter *pTqSnapOffsetWriter; STqSnapWriter *pTqSnapOffsetWriter;
STqSnapWriter *pTqSnapCheckInfoWriter; STqSnapWriter *pTqSnapCheckInfoWriter;
// stream // stream
SStreamTaskWriter *pStreamTaskWriter; SStreamTaskWriter *pStreamTaskWriter;
SStreamStateWriter *pStreamStateWriter; SStreamStateWriter *pStreamStateWriter;
// rsma // rsma
TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
SRSmaSnapWriter *pRsmaSnapWriter; SRSmaSnapWriter *pRsmaSnapWriter;
}; };
TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) { TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) {
@ -563,18 +537,18 @@ TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t
static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) { static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) {
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
int32_t code = -1; int32_t code = 0;
int32_t lino;
if (pParam->data) { if (pParam->data) {
SSyncTLV *datHead = (void *)pParam->data; SSyncTLV *datHead = (void *)pParam->data;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
terrno = TSDB_CODE_INVALID_DATA_FMT; TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
goto _out;
} }
STsdbRepOpts tsdbOpts = {0}; STsdbRepOpts tsdbOpts = {0};
TFileSetRangeArray **ppRanges = NULL; TFileSetRangeArray **ppRanges = NULL;
int32_t offset = 0; int32_t offset = 0;
while (offset + sizeof(SSyncTLV) < datHead->len) { while (offset + sizeof(SSyncTLV) < datHead->len) {
SSyncTLV *subField = (void *)(datHead->val + offset); SSyncTLV *subField = (void *)(datHead->val + offset);
@ -589,30 +563,30 @@ static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotP
ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ); ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
if (ppRanges == NULL) { if (ppRanges == NULL) {
vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
goto _out; TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
goto _out;
} }
code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_RAW: { case SNAP_DATA_RAW: {
if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) { code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); TSDB_CHECK_CODE(code, lino, _exit);
goto _out;
}
} break; } break;
default: default:
vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
goto _out; TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
goto _exit;
} }
} }
vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format); vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
} }
code = 0; _exit:
_out: if (code) {
vError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
}
return code; return code;
} }
@ -634,6 +608,7 @@ static int32_t vnodeEnableBgTask(SVnode *pVnode) {
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) { int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino;
SVSnapWriter *pWriter = NULL; SVSnapWriter *pWriter = NULL;
int64_t sver = pParam->start; int64_t sver = pParam->start;
int64_t ever = pParam->end; int64_t ever = pParam->end;
@ -644,8 +619,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter
// alloc // alloc
pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
goto _err;
} }
pWriter->pVnode = pVnode; pWriter->pVnode = pVnode;
pWriter->sver = sver; pWriter->sver = sver;
@ -655,19 +629,19 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter
pWriter->commitID = ++pVnode->state.commitID; pWriter->commitID = ++pVnode->state.commitID;
// snapshot info // snapshot info
if (vnodeSnapWriterDealWithSnapInfo(pWriter, pParam) < 0) { code = vnodeSnapWriterDealWithSnapInfo(pWriter, pParam);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
if (pWriter) taosMemoryFreeClear(pWriter);
*ppWriter = NULL;
} else {
vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
sver, ever, pWriter->commitID);
*ppWriter = pWriter;
} }
vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
sver, ever, pWriter->commitID);
*ppWriter = pWriter;
return code;
_err:
vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
if (pWriter) taosMemoryFreeClear(pWriter);
*ppWriter = NULL;
return code; return code;
} }
@ -785,14 +759,13 @@ _exit:
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
int32_t lino;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
// decode info // decode info
if (vnodeDecodeInfo(pHdr->data, &pWriter->info) < 0) { code = vnodeDecodeInfo(pHdr->data, &pWriter->info);
code = TSDB_CODE_INVALID_MSG; TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
}
// change some value // change some value
pWriter->info.state.commitID = pWriter->commitID; pWriter->info.state.commitID = pWriter->commitID;
@ -805,10 +778,8 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
pWriter->info.config = pVnode->config; pWriter->info.config = pVnode->config;
pWriter->info.config.vndStats = vndStats; pWriter->info.config.vndStats = vndStats;
vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId); vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
if (vnodeSaveInfo(dir, &pWriter->info) < 0) { code = vnodeSaveInfo(dir, &pWriter->info);
code = terrno; TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
}
_exit: _exit:
return code; return code;
@ -816,6 +787,7 @@ _exit:
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
int32_t lino;
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
@ -824,7 +796,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
if (pHdr->index != pWriter->index + 1) { if (pHdr->index != pWriter->index + 1) {
vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode), vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode),
pHdr->index, pWriter->index + 1); pHdr->index, pWriter->index + 1);
return -1; TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_MSG, lino, _exit);
} }
pWriter->index = pHdr->index; pWriter->index = pHdr->index;
@ -835,17 +807,17 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
switch (pHdr->type) { switch (pHdr->type) {
case SNAP_DATA_CFG: { case SNAP_DATA_CFG: {
code = vnodeSnapWriteInfo(pWriter, pData, nData); code = vnodeSnapWriteInfo(pWriter, pData, nData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_META: { case SNAP_DATA_META: {
// meta // meta
if (pWriter->pMetaSnapWriter == NULL) { if (pWriter->pMetaSnapWriter == NULL) {
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_TSDB: case SNAP_DATA_TSDB:
case SNAP_DATA_DEL: { case SNAP_DATA_DEL: {
@ -853,69 +825,69 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
if (pWriter->pTsdbSnapWriter == NULL) { if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges, code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges,
&pWriter->pTsdbSnapWriter); &pWriter->pTsdbSnapWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr); code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_RAW: { case SNAP_DATA_RAW: {
// tsdb // tsdb
if (pWriter->pTsdbSnapRAWWriter == NULL) { if (pWriter->pTsdbSnapRAWWriter == NULL) {
ASSERT(pWriter->sver == 0); ASSERT(pWriter->sver == 0);
code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter); code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr); code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_TQ_HANDLE: { case SNAP_DATA_TQ_HANDLE: {
// tq handle // tq handle
if (pWriter->pTqSnapHandleWriter == NULL) { if (pWriter->pTqSnapHandleWriter == NULL) {
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter); code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData); code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_TQ_CHECKINFO: { case SNAP_DATA_TQ_CHECKINFO: {
// tq checkinfo // tq checkinfo
if (pWriter->pTqSnapCheckInfoWriter == NULL) { if (pWriter->pTqSnapCheckInfoWriter == NULL) {
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapCheckInfoWriter); code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapCheckInfoWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqSnapCheckInfoWrite(pWriter->pTqSnapCheckInfoWriter, pData, nData); code = tqSnapCheckInfoWrite(pWriter->pTqSnapCheckInfoWriter, pData, nData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_TQ_OFFSET: { case SNAP_DATA_TQ_OFFSET: {
// tq offset // tq offset
if (pWriter->pTqSnapOffsetWriter == NULL) { if (pWriter->pTqSnapOffsetWriter == NULL) {
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter); code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData); code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_STREAM_TASK: case SNAP_DATA_STREAM_TASK:
case SNAP_DATA_STREAM_TASK_CHECKPOINT: { case SNAP_DATA_STREAM_TASK_CHECKPOINT: {
if (pWriter->pStreamTaskWriter == NULL) { if (pWriter->pStreamTaskWriter == NULL) {
code = streamTaskSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamTaskWriter); code = streamTaskSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamTaskWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData); code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_STREAM_STATE_BACKEND: { case SNAP_DATA_STREAM_STATE_BACKEND: {
if (pWriter->pStreamStateWriter == NULL) { if (pWriter->pStreamStateWriter == NULL) {
code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter); code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = streamStateSnapWrite(pWriter->pStreamStateWriter, pData, nData); code = streamStateSnapWrite(pWriter->pStreamStateWriter, pData, nData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
case SNAP_DATA_RSMA1: case SNAP_DATA_RSMA1:
@ -925,20 +897,19 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
if (pWriter->pRsmaSnapWriter == NULL) { if (pWriter->pRsmaSnapWriter == NULL) {
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges, code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges,
&pWriter->pRsmaSnapWriter); &pWriter->pRsmaSnapWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData); code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} break; } break;
default: default:
break; break;
} }
_exit: _exit:
return code; if (code) {
vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
_err: tstrerror(code), pHdr->index, pHdr->type, nData);
vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), }
tstrerror(code), pHdr->index, pHdr->type, nData);
return code; return code;
} }

View File

@ -387,7 +387,7 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (msgcb == NULL || msgcb->putToQueueFp == NULL) { if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return -1; return TSDB_CODE_INVALID_PARA;
} }
int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg); int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
@ -400,13 +400,13 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
return -1; return TSDB_CODE_INVALID_PARA;
} }
if (msgcb == NULL || msgcb->putToQueueFp == NULL) { if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return -1; return TSDB_CODE_INVALID_PARA;
} }
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
@ -485,8 +485,7 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader); return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
return code;
} }
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
@ -496,8 +495,7 @@ static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
return code;
} }
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) { static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
@ -514,8 +512,7 @@ static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void
} }
} while (true); } while (true);
int32_t code = vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter); return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
return code;
} }
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
@ -580,7 +577,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
if (pMeta->startInfo.startAllTasks == 1) { if (pMeta->startInfo.startAllTasks == 1) {
pMeta->startInfo.restartCount += 1; pMeta->startInfo.restartCount += 1;
vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount); pMeta->startInfo.restartCount);
} else { } else {
pMeta->startInfo.startAllTasks = 1; pMeta->startInfo.startAllTasks = 1;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -636,7 +633,7 @@ static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
} }
} }
static void vnodeBecomeAssignedLeader(const SSyncFSM* pFsm) { static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become assigned leader", pVnode->config.vgId); vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
if (pVnode->pTq) { if (pVnode->pTq) {
@ -662,12 +659,16 @@ static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
return itemSize; return itemSize;
} else { } else {
return -1; return TSDB_CODE_INVALID_PARA;
} }
} }
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
if (pFsm == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pFsm->data = pVnode; pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex; pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
@ -724,7 +725,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
pVnode->sync = syncOpen(&syncInfo, vnodeVersion); pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
if (pVnode->sync <= 0) { if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
return -1; return terrno;
} }
return 0; return 0;
@ -732,9 +733,10 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
int32_t vnodeSyncStart(SVnode *pVnode) { int32_t vnodeSyncStart(SVnode *pVnode) {
vInfo("vgId:%d, start sync", pVnode->config.vgId); vInfo("vgId:%d, start sync", pVnode->config.vgId);
if (syncStart(pVnode->sync) < 0) { int32_t code = syncStart(pVnode->sync);
vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr()); if (code) {
return -1; vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
return code;
} }
return 0; return 0;
} }

View File

@ -112,7 +112,7 @@ int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid); sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
return -1; return terrno;
} }
*cfg = pSyncNode->raftCfg.cfg; *cfg = pSyncNode->raftCfg.cfg;
@ -2036,7 +2036,7 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
pSyncNode->hbrSlowNum = 0; pSyncNode->hbrSlowNum = 0;
// reset restoreFinish // reset restoreFinish
//pSyncNode->restoreFinish = false; // pSyncNode->restoreFinish = false;
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER; pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
@ -2149,7 +2149,8 @@ int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64 sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
"assigned commit index:%" PRId64 ", last index:%" PRId64, "assigned commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex, lastIndex); pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
lastIndex);
return 0; return 0;
} }