fix(wal/write): remove walWrite which is only used in unit testing

This commit is contained in:
Minglei Jin 2024-07-17 14:45:23 +08:00
parent 103e9194b0
commit e6c86f5586
5 changed files with 50 additions and 83 deletions

View File

@ -174,13 +174,8 @@ void walClose(SWal *);
// write interfaces // write interfaces
// By assigning index by the caller, wal gurantees linearizability // By assigning index by the caller, wal gurantees linearizability
int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen);
// Assign version automatically and return to caller, // Assign version automatically and return to caller,
// -1 will be returned for failed writes int32_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
int64_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force); void walFsync(SWal *, bool force);

View File

@ -206,17 +206,17 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
SyncIndex index = 0;
SWalSyncInfo syncMeta = {0}; SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
int64_t tsWriteBegin = taosGetTimestampNs(); int64_t tsWriteBegin = taosGetTimestampNs();
index = walAppendLog(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); int32_t code = walAppendLog(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
int64_t tsWriteEnd = taosGetTimestampNs(); int64_t tsWriteEnd = taosGetTimestampNs();
int64_t tsElapsed = tsWriteEnd - tsWriteBegin; int64_t tsElapsed = tsWriteEnd - tsWriteBegin;
if (index < 0) { if (TSDB_CODE_SUCCESS != code) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t sysErr = errno; int32_t sysErr = errno;
@ -227,8 +227,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
return -1; return -1;
} }
ASSERT(pEntry->index == index);
walFsync(pWal, forceSync); walFsync(pWal, forceSync);
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,

View File

@ -51,8 +51,14 @@ void test2() {
SWal *pWal = walOpen(gWalPath, &walCfg); SWal *pWal = walOpen(gWalPath, &walCfg);
assert(pWal != NULL); assert(pWal != NULL);
SWalSyncInfo syncMeta = {
.isWeek = -1,
.seqNum = UINT64_MAX,
.term = UINT64_MAX,
};
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
int code = walWrite(pWal, i, 100, "aa", 3); int code = walAppendLog(pWal, i, 100, syncMeta, "aa", 3);
if (code != 0) { if (code != 0) {
printf("code:%d terror:%d msg:%s i:%d \n", code, terrno, tstrerror(terrno), i); printf("code:%d terror:%d msg:%s i:%d \n", code, terrno, tstrerror(terrno), i);
assert(0); assert(0);
@ -105,10 +111,16 @@ void test4() {
SWal *pWal = walOpen(gWalPath, &walCfg); SWal *pWal = walOpen(gWalPath, &walCfg);
assert(pWal != NULL); assert(pWal != NULL);
SWalSyncInfo syncMeta = {
.isWeek = -1,
.seqNum = UINT64_MAX,
.term = UINT64_MAX,
};
walRestoreFromSnapshot(pWal, 5); walRestoreFromSnapshot(pWal, 5);
for (int i = 6; i < 10; ++i) { for (int i = 6; i < 10; ++i) {
int code = walWrite(pWal, i, 100, "aa", 3); int code = walAppendLog(pWal, i, 100, syncMeta, "aa", 3);
if (code != 0) { if (code != 0) {
printf("code:%d terror:%d msg:%s i:%d \n", code, terrno, tstrerror(terrno), i); printf("code:%d terror:%d msg:%s i:%d \n", code, terrno, tstrerror(terrno), i);
assert(0); assert(0);

View File

@ -617,82 +617,39 @@ END:
return -1; return -1;
} }
int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen) { int32_t bodyLen) {
int32_t code = 0, lino = 0;
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
if (index != pWal->vers.lastVer + 1) { if (index != pWal->vers.lastVer + 1) {
terrno = TSDB_CODE_WAL_INVALID_VER; TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
} }
if (walCheckAndRoll(pWal) < 0) { if (walCheckAndRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex); TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
return -1;
} }
if (pWal->pLogFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) { if (pWal->pLogFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) { if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex); TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
return -1;
} }
} }
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex); TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
return -1;
} }
taosThreadMutexUnlock(&pWal->mutex); _exit:
return index; if (code) {
} wError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen) {
int32_t code = 0;
taosThreadMutexLock(&pWal->mutex);
// concurrency control:
// if logs are write with assigned index,
// smaller index must be write before larger one
if (index != pWal->vers.lastVer + 1) {
terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (walCheckAndRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (pWal->pIdxFile == NULL || pWal->pLogFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
} }
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return code; return code;
} }
int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
SWalSyncInfo syncMeta = {
.isWeek = -1,
.seqNum = UINT64_MAX,
.term = UINT64_MAX,
};
return walWriteWithSyncInfo(pWal, index, msgType, syncMeta, body, bodyLen);
}
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
if (pWal->cfg.level == TAOS_WAL_SKIP) { if (pWal->cfg.level == TAOS_WAL_SKIP) {
return; return;

View File

@ -5,8 +5,13 @@
#include "walInt.h" #include "walInt.h"
const char* ranStr = "tvapq02tcp"; const char* ranStr = "tvapq02tcp";
const int ranStrLen = strlen(ranStr); const int ranStrLen = strlen(ranStr);
SWalSyncInfo syncMeta = {
.isWeek = -1,
.seqNum = UINT64_MAX,
.term = UINT64_MAX,
};
class WalCleanEnv : public ::testing::Test { class WalCleanEnv : public ::testing::Test {
protected: protected:
@ -194,11 +199,11 @@ TEST_F(WalKeepEnv, readOldMeta) {
int code; int code;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i + 2, i, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, i + 2, i, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1); ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER);
ASSERT_EQ(pWal->vers.lastVer, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
char* oldss = walMetaSerialize(pWal); char* oldss = walMetaSerialize(pWal);
@ -223,11 +228,11 @@ TEST_F(WalKeepEnv, readOldMeta) {
TEST_F(WalCleanEnv, write) { TEST_F(WalCleanEnv, write) {
int code; int code;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i + 2, i, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, i + 2, i, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1); ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER);
ASSERT_EQ(pWal->vers.lastVer, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
code = walSaveMeta(pWal); code = walSaveMeta(pWal);
@ -237,7 +242,7 @@ TEST_F(WalCleanEnv, write) {
TEST_F(WalCleanEnv, rollback) { TEST_F(WalCleanEnv, rollback) {
int code; int code;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
@ -260,7 +265,7 @@ TEST_F(WalCleanEnv, rollback) {
TEST_F(WalCleanEnv, rollbackMultiFile) { TEST_F(WalCleanEnv, rollbackMultiFile) {
int code; int code;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i); ASSERT_EQ(pWal->vers.lastVer, i);
if (i == 5) { if (i == 5) {
@ -282,7 +287,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) {
ASSERT_EQ(pWal->vers.lastVer, 5); ASSERT_EQ(pWal->vers.lastVer, 5);
code = walWrite(pWal, 6, 6, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, 6, 6, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 6); ASSERT_EQ(pWal->vers.lastVer, 6);
@ -294,7 +299,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
int code; int code;
int i; int i;
for (i = 0; i < 100; i++) { for (i = 0; i < 100; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walCommit(pWal, i); code = walCommit(pWal, i);
@ -307,11 +312,11 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.snapshotVer, i - 1); ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
ASSERT_EQ(pWal->vers.verInSnapshotting, -1); ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
code = walWrite(pWal, 5, 0, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, 5, 0, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_NE(code, 0); ASSERT_NE(code, 0);
for (; i < 200; i++) { for (; i < 200; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen); code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = walCommit(pWal, i); code = walCommit(pWal, i);
ASSERT_EQ(pWal->vers.commitVer, i); ASSERT_EQ(pWal->vers.commitVer, i);
@ -334,7 +339,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i); sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr); int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len); code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -370,7 +375,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i); sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr); int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len); code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }
@ -416,7 +421,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i); sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr); int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len); code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }