Merge pull request #26767 from taosdata/fix/TD-30990
terrno/tsdb-cache: use new terrno macros
This commit is contained in:
commit
d27227c235
File diff suppressed because it is too large
Load Diff
|
@ -51,7 +51,12 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);
|
taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);
|
||||||
|
|
||||||
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
|
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
|
||||||
ASSERT(pLogStore->data != NULL);
|
if (!pLogStore->data) {
|
||||||
|
taosMemoryFree(pLogStore);
|
||||||
|
taosLRUCacheCleanup(pLogStore->pCache);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
pData->pSyncNode = pSyncNode;
|
pData->pSyncNode = pSyncNode;
|
||||||
|
@ -60,7 +65,13 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
taosThreadMutexInit(&(pData->mutex), NULL);
|
taosThreadMutexInit(&(pData->mutex), NULL);
|
||||||
pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
|
pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
|
||||||
ASSERT(pData->pWalHandle != NULL);
|
if (!pData->pWalHandle) {
|
||||||
|
taosMemoryFree(pLogStore);
|
||||||
|
taosLRUCacheCleanup(pLogStore->pCache);
|
||||||
|
taosThreadMutexDestroy(&(pData->mutex));
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;
|
pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;
|
||||||
pLogStore->syncLogCommitIndex = raftlogCommitIndex;
|
pLogStore->syncLogCommitIndex = raftlogCommitIndex;
|
||||||
|
@ -110,7 +121,7 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
int32_t code = walRestoreFromSnapshot(pWal, snapshotIndex);
|
int32_t code = walRestoreFromSnapshot(pWal, snapshotIndex);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = code;
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
|
@ -118,10 +129,10 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
|
||||||
sNError(pData->pSyncNode,
|
sNError(pData->pSyncNode,
|
||||||
"wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex,
|
"wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex,
|
||||||
err, errStr, sysErr, sysErrStr);
|
err, errStr, sysErr, sysErrStr);
|
||||||
return -1;
|
TAOS_RETURN(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
|
SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
|
||||||
|
@ -224,7 +235,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
|
|
||||||
sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
|
sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pEntry->index, err, errStr, sysErr, sysErrStr);
|
pEntry->index, err, errStr, sysErr, sysErrStr);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walFsync(pWal, forceSync);
|
code = walFsync(pWal, forceSync);
|
||||||
|
@ -235,7 +247,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
|
|
||||||
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
|
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
|
||||||
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
|
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
// entry found, return 0
|
// entry found, return 0
|
||||||
|
@ -253,10 +265,10 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
|
||||||
|
|
||||||
SWalReader* pWalHandle = pData->pWalHandle;
|
SWalReader* pWalHandle = pData->pWalHandle;
|
||||||
if (pWalHandle == NULL) {
|
if (pWalHandle == NULL) {
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
|
||||||
sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId);
|
sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId);
|
||||||
taosThreadMutexUnlock(&(pData->mutex));
|
taosThreadMutexUnlock(&(pData->mutex));
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ts2 = taosGetTimestampNs();
|
int64_t ts2 = taosGetTimestampNs();
|
||||||
|
@ -266,7 +278,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
|
||||||
|
|
||||||
// code = walReadVerCached(pWalHandle, index);
|
// code = walReadVerCached(pWalHandle, index);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = code;
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
|
@ -286,7 +298,8 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
|
||||||
*/
|
*/
|
||||||
|
|
||||||
taosThreadMutexUnlock(&(pData->mutex));
|
taosThreadMutexUnlock(&(pData->mutex));
|
||||||
return code;
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
||||||
|
@ -319,7 +332,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
|
||||||
", elapsed-build:%" PRId64,
|
", elapsed-build:%" PRId64,
|
||||||
index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);
|
index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// truncate semantic
|
// truncate semantic
|
||||||
|
@ -329,7 +342,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
|
||||||
|
|
||||||
int32_t code = walRollback(pWal, fromIndex);
|
int32_t code = walRollback(pWal, fromIndex);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = code;
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
|
@ -339,7 +352,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
|
||||||
|
|
||||||
// event log
|
// event log
|
||||||
sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex);
|
sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex);
|
||||||
return code;
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// entry found, return 0
|
// entry found, return 0
|
||||||
|
@ -352,16 +366,16 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
|
||||||
|
|
||||||
*ppLastEntry = NULL;
|
*ppLastEntry = NULL;
|
||||||
if (walIsEmpty(pWal)) {
|
if (walIsEmpty(pWal)) {
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
return -1;
|
|
||||||
} else {
|
} else {
|
||||||
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
|
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
|
||||||
ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
|
ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
|
||||||
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
|
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
|
||||||
return code;
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
|
@ -375,20 +389,22 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
|
|
||||||
if (index < snapshotVer || index > wallastVer) {
|
if (index < snapshotVer || index > wallastVer) {
|
||||||
// ignore
|
// ignore
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = walCommit(pWal, index);
|
int32_t code = walCommit(pWal, index);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = code;
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
|
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr);
|
pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) {
|
SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) {
|
||||||
|
@ -405,5 +421,6 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
|
||||||
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
|
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
|
||||||
return walGetCommittedVer(pWal);
|
return walGetCommittedVer(pWal);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,17 +25,17 @@ static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code);
|
tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code);
|
tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code);
|
tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftStoreReadFile(SSyncNode *pNode) {
|
int32_t raftStoreReadFile(SSyncNode *pNode) {
|
||||||
int32_t code = -1;
|
int32_t code = -1, lino = 0;
|
||||||
TdFilePtr pFile = NULL;
|
TdFilePtr pFile = NULL;
|
||||||
char *pData = NULL;
|
char *pData = NULL;
|
||||||
SJson *pJson = NULL;
|
SJson *pJson = NULL;
|
||||||
|
@ -52,41 +52,38 @@ int32_t raftStoreReadFile(SSyncNode *pNode) {
|
||||||
|
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr());
|
sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr());
|
||||||
goto _OVER;
|
|
||||||
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t size = 0;
|
int64_t size = 0;
|
||||||
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr());
|
sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr());
|
||||||
goto _OVER;
|
|
||||||
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
pData = taosMemoryMalloc(size + 1);
|
pData = taosMemoryMalloc(size + 1);
|
||||||
if (pData == NULL) {
|
if (pData == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosReadFile(pFile, pData, size) != size) {
|
if (taosReadFile(pFile, pData, size) != size) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
|
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
|
||||||
goto _OVER;
|
|
||||||
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
pData[size] = '\0';
|
pData[size] = '\0';
|
||||||
|
|
||||||
pJson = tjsonParse(pData);
|
pJson = tjsonParse(pData);
|
||||||
if (pJson == NULL) {
|
if (pJson == NULL) {
|
||||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_JSON_FORMAT, &lino, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (raftStoreDecode(pJson, pStore) < 0) {
|
if (raftStoreDecode(pJson, pStore) < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_JSON_FORMAT, &lino, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -100,18 +97,20 @@ _OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
|
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
|
||||||
}
|
}
|
||||||
return code;
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) {
|
static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) {
|
||||||
if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) return -1;
|
if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
||||||
int32_t code = -1;
|
int32_t code = -1, lino = 0;
|
||||||
char *buffer = NULL;
|
char *buffer = NULL;
|
||||||
SJson *pJson = NULL;
|
SJson *pJson = NULL;
|
||||||
TdFilePtr pFile = NULL;
|
TdFilePtr pFile = NULL;
|
||||||
|
@ -120,23 +119,23 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
||||||
char file[PATH_MAX] = {0};
|
char file[PATH_MAX] = {0};
|
||||||
snprintf(file, sizeof(file), "%s.bak", realfile);
|
snprintf(file, sizeof(file), "%s.bak", realfile);
|
||||||
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
pJson = tjsonCreateObject();
|
pJson = tjsonCreateObject();
|
||||||
if (pJson == NULL) goto _OVER;
|
if (pJson == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
if (raftStoreEncode(pJson, pStore) != 0) goto _OVER;
|
if (raftStoreEncode(pJson, pStore) != 0) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
|
|
||||||
buffer = tjsonToString(pJson);
|
buffer = tjsonToString(pJson);
|
||||||
if (buffer == NULL) goto _OVER;
|
if (buffer == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
terrno = 0;
|
|
||||||
|
|
||||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
if (pFile == NULL) goto _OVER;
|
if (pFile == NULL) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
|
|
||||||
int32_t len = strlen(buffer);
|
int32_t len = strlen(buffer);
|
||||||
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
|
if (taosWriteFile(pFile, buffer, len) <= 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
if (taosFsyncFile(pFile) < 0) goto _OVER;
|
|
||||||
|
if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
if (taosRenameFile(file, realfile) != 0) goto _OVER;
|
if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
sInfo("vgId:%d, succeed to write raft store file:%s, term:%" PRId64, pNode->vgId, realfile, pStore->currentTerm);
|
sInfo("vgId:%d, succeed to write raft store file:%s, term:%" PRId64, pNode->vgId, realfile, pStore->currentTerm);
|
||||||
|
@ -147,7 +146,6 @@ _OVER:
|
||||||
if (pFile != NULL) taosCloseFile(&pFile);
|
if (pFile != NULL) taosCloseFile(&pFile);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr());
|
sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr());
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -52,7 +52,8 @@ int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
|
||||||
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
|
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
|
||||||
syncLogReplReset(pMgr);
|
syncLogReplReset(pMgr);
|
||||||
taosThreadMutexUnlock(&pBuf->mutex);
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeReplicate(SSyncNode* pNode) {
|
int32_t syncNodeReplicate(SSyncNode* pNode) {
|
||||||
|
@ -60,13 +61,14 @@ int32_t syncNodeReplicate(SSyncNode* pNode) {
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
int32_t ret = syncNodeReplicateWithoutLock(pNode);
|
int32_t ret = syncNodeReplicateWithoutLock(pNode);
|
||||||
taosThreadMutexUnlock(&pBuf->mutex);
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
return ret;
|
|
||||||
|
TAOS_RETURN(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
|
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
|
||||||
if ((pNode->state != TAOS_SYNC_STATE_LEADER && pNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) ||
|
if ((pNode->state != TAOS_SYNC_STATE_LEADER && pNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) ||
|
||||||
pNode->raftCfg.cfg.totalReplicaNum == 1) {
|
pNode->raftCfg.cfg.totalReplicaNum == 1) {
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < pNode->totalReplicaNum; i++) {
|
for (int32_t i = 0; i < pNode->totalReplicaNum; i++) {
|
||||||
if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) {
|
if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) {
|
||||||
|
@ -75,14 +77,16 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
|
||||||
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
||||||
(void)syncLogReplStart(pMgr, pNode);
|
(void)syncLogReplStart(pMgr, pNode);
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
|
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
|
||||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||||
pMsg->destId = *destRaftId;
|
pMsg->destId = *destRaftId;
|
||||||
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
|
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
||||||
|
@ -112,5 +116,5 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "syncVoteMgr.h"
|
#include "syncVoteMgr.h"
|
||||||
#include "syncUtil.h"
|
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
// HandleRequestVoteRequest(i, j, m) ==
|
// HandleRequestVoteRequest(i, j, m) ==
|
||||||
|
@ -95,7 +94,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
|
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
|
||||||
syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");
|
syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
|
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
|
||||||
|
@ -122,8 +122,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
ret = syncBuildRequestVoteReply(&rpcMsg, ths->vgId);
|
|
||||||
ASSERT(ret == 0);
|
TAOS_CHECK_RETURN(syncBuildRequestVoteReply(&rpcMsg, ths->vgId));
|
||||||
|
|
||||||
SyncRequestVoteReply* pReply = rpcMsg.pCont;
|
SyncRequestVoteReply* pReply = rpcMsg.pCont;
|
||||||
pReply->srcId = ths->myRaftId;
|
pReply->srcId = ths->myRaftId;
|
||||||
|
@ -138,5 +138,6 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
|
||||||
if (resetElect) syncNodeResetElectTimer(ths);
|
if (resetElect) syncNodeResetElectTimer(ths);
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,19 +45,22 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
syncLogRecvRequestVoteReply(ths, pMsg, "not in my config");
|
syncLogRecvRequestVoteReply(ths, pMsg, "not in my config");
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||||
// drop stale response
|
// drop stale response
|
||||||
if (pMsg->term < currentTerm) {
|
if (pMsg->term < currentTerm) {
|
||||||
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
|
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->term > currentTerm) {
|
if (pMsg->term > currentTerm) {
|
||||||
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
|
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
|
||||||
syncNodeStepDown(ths, pMsg->term);
|
syncNodeStepDown(ths, pMsg->term);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncLogRecvRequestVoteReply(ths, pMsg, "");
|
syncLogRecvRequestVoteReply(ths, pMsg, "");
|
||||||
|
@ -69,7 +72,8 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
if (ths->pVotesRespond->term != pMsg->term) {
|
if (ths->pVotesRespond->term != pMsg->term) {
|
||||||
sNError(ths, "vote respond error vote-respond-mgr term:%" PRIu64 ", msg term:%" PRIu64 "",
|
sNError(ths, "vote respond error vote-respond-mgr term:%" PRIu64 ", msg term:%" PRIu64 "",
|
||||||
ths->pVotesRespond->term, pMsg->term);
|
ths->pVotesRespond->term, pMsg->term);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
votesRespondAdd(ths->pVotesRespond, pMsg);
|
votesRespondAdd(ths->pVotesRespond, pMsg);
|
||||||
|
@ -93,5 +97,5 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
typedef struct SLRUEntry SLRUEntry;
|
typedef struct SLRUEntry SLRUEntry;
|
||||||
typedef struct SLRUEntryTable SLRUEntryTable;
|
typedef struct SLRUEntryTable SLRUEntryTable;
|
||||||
|
@ -114,13 +115,13 @@ static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) {
|
||||||
table->lengthBits = 4;
|
table->lengthBits = 4;
|
||||||
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
|
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
|
||||||
if (!table->list) {
|
if (!table->list) {
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
table->elems = 0;
|
table->elems = 0;
|
||||||
table->maxLengthBits = maxUpperHashBits;
|
table->maxLengthBits = maxUpperHashBits;
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) {
|
static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) {
|
||||||
|
@ -349,9 +350,7 @@ static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity)
|
||||||
|
|
||||||
static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio,
|
static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio,
|
||||||
int maxUpperHashBits) {
|
int maxUpperHashBits) {
|
||||||
if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) {
|
TAOS_CHECK_RETURN(taosLRUEntryTableInit(&shard->table, maxUpperHashBits));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexInit(&shard->mutex, NULL);
|
taosThreadMutexInit(&shard->mutex, NULL);
|
||||||
|
|
||||||
|
@ -372,7 +371,7 @@ static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool st
|
||||||
|
|
||||||
taosLRUCacheShardSetCapacity(shard, capacity);
|
taosLRUCacheShardSetCapacity(shard, capacity);
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) {
|
static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) {
|
||||||
|
@ -671,16 +670,13 @@ static int getDefaultCacheShardBits(size_t capacity) {
|
||||||
|
|
||||||
SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) {
|
SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) {
|
||||||
if (numShardBits >= 20) {
|
if (numShardBits >= 20) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) {
|
if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache));
|
SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache));
|
||||||
if (!cache) {
|
if (!cache) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -692,14 +688,15 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
|
||||||
cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
|
cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
|
||||||
if (!cache->shards) {
|
if (!cache->shards) {
|
||||||
taosMemoryFree(cache);
|
taosMemoryFree(cache);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool strictCapacity = 1;
|
bool strictCapacity = 1;
|
||||||
size_t perShard = (capacity + (numShards - 1)) / numShards;
|
size_t perShard = (capacity + (numShards - 1)) / numShards;
|
||||||
for (int i = 0; i < numShards; ++i) {
|
for (int i = 0; i < numShards; ++i) {
|
||||||
taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits);
|
if (TSDB_CODE_SUCCESS !=
|
||||||
|
taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits))
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
cache->numShards = numShards;
|
cache->numShards = numShards;
|
||||||
|
|
Loading…
Reference in New Issue