diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 155da9d116..8d402b953a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -174,13 +174,8 @@ void walClose(SWal *); // write interfaces // By assigning index by the caller, wal gurantees linearizability -int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); -int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, - int32_t bodyLen); - // Assign version automatically and return to caller, -// -1 will be returned for failed writes -int64_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); +int32_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); void walFsync(SWal *, bool force); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index b9c6838fda..3ef59b3823 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -206,17 +206,17 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - SyncIndex index = 0; SWalSyncInfo syncMeta = {0}; syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; + int64_t tsWriteBegin = taosGetTimestampNs(); - index = walAppendLog(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); + int32_t code = walAppendLog(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); int64_t tsWriteEnd = taosGetTimestampNs(); int64_t tsElapsed = tsWriteEnd - tsWriteBegin; - if (index < 0) { + if (TSDB_CODE_SUCCESS != code) { int32_t err = terrno; const char* errStr = tstrerror(err); int32_t sysErr = errno; @@ -227,8 +227,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr return -1; } - ASSERT(pEntry->index == index); - walFsync(pWal, forceSync); sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, diff --git a/source/libs/sync/test/syncRaftLogTest.cpp b/source/libs/sync/test/syncRaftLogTest.cpp index e309a2e432..883f80dd13 100644 --- a/source/libs/sync/test/syncRaftLogTest.cpp +++ b/source/libs/sync/test/syncRaftLogTest.cpp @@ -51,8 +51,14 @@ void test2() { SWal *pWal = walOpen(gWalPath, &walCfg); assert(pWal != NULL); + SWalSyncInfo syncMeta = { + .isWeek = -1, + .seqNum = UINT64_MAX, + .term = UINT64_MAX, + }; + for (int i = 0; i < 5; ++i) { - int code = walWrite(pWal, i, 100, "aa", 3); + int code = walAppendLog(pWal, i, 100, syncMeta, "aa", 3); if (code != 0) { printf("code:%d terror:%d msg:%s i:%d \n", code, terrno, tstrerror(terrno), i); assert(0); @@ -105,10 +111,16 @@ void test4() { SWal *pWal = walOpen(gWalPath, &walCfg); assert(pWal != NULL); + SWalSyncInfo syncMeta = { + .isWeek = -1, + .seqNum = UINT64_MAX, + .term = UINT64_MAX, + }; + walRestoreFromSnapshot(pWal, 5); for (int i = 6; i < 10; ++i) { - int code = walWrite(pWal, i, 100, "aa", 3); + int code = walAppendLog(pWal, i, 100, syncMeta, "aa", 3); if (code != 0) { printf("code:%d terror:%d msg:%s i:%d \n", code, terrno, tstrerror(terrno), i); assert(0); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 19345e0644..e43faa70ac 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -617,82 +617,39 @@ END: return -1; } -int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, +int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) { + int32_t code = 0, lino = 0; + taosThreadMutexLock(&pWal->mutex); if (index != pWal->vers.lastVer + 1) { - terrno = TSDB_CODE_WAL_INVALID_VER; - taosThreadMutexUnlock(&pWal->mutex); - return -1; + TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit); } if (walCheckAndRoll(pWal) < 0) { - taosThreadMutexUnlock(&pWal->mutex); - return -1; + TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit); } if (pWal->pLogFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) { if (walInitWriteFile(pWal) < 0) { - taosThreadMutexUnlock(&pWal->mutex); - return -1; + TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit); } } if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { - taosThreadMutexUnlock(&pWal->mutex); - return -1; + TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit); } - taosThreadMutexUnlock(&pWal->mutex); - return index; -} - -int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, - int32_t bodyLen) { - int32_t code = 0; - - taosThreadMutexLock(&pWal->mutex); - - // concurrency control: - // if logs are write with assigned index, - // smaller index must be write before larger one - if (index != pWal->vers.lastVer + 1) { - terrno = TSDB_CODE_WAL_INVALID_VER; - taosThreadMutexUnlock(&pWal->mutex); - return -1; - } - - if (walCheckAndRoll(pWal) < 0) { - taosThreadMutexUnlock(&pWal->mutex); - return -1; - } - - if (pWal->pIdxFile == NULL || pWal->pLogFile == NULL || pWal->writeCur < 0) { - if (walInitWriteFile(pWal) < 0) { - taosThreadMutexUnlock(&pWal->mutex); - return -1; - } - } - - if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { - taosThreadMutexUnlock(&pWal->mutex); - return -1; +_exit: + if (code) { + wError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } taosThreadMutexUnlock(&pWal->mutex); return code; } -int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { - SWalSyncInfo syncMeta = { - .isWeek = -1, - .seqNum = UINT64_MAX, - .term = UINT64_MAX, - }; - return walWriteWithSyncInfo(pWal, index, msgType, syncMeta, body, bodyLen); -} - void walFsync(SWal *pWal, bool forceFsync) { if (pWal->cfg.level == TAOS_WAL_SKIP) { return; diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index fb64bec722..e11e167d5f 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -5,8 +5,13 @@ #include "walInt.h" -const char* ranStr = "tvapq02tcp"; -const int ranStrLen = strlen(ranStr); +const char* ranStr = "tvapq02tcp"; +const int ranStrLen = strlen(ranStr); +SWalSyncInfo syncMeta = { + .isWeek = -1, + .seqNum = UINT64_MAX, + .term = UINT64_MAX, +}; class WalCleanEnv : public ::testing::Test { protected: @@ -194,11 +199,11 @@ TEST_F(WalKeepEnv, readOldMeta) { int code; for (int i = 0; i < 10; i++) { - code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen); + code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); - code = walWrite(pWal, i + 2, i, (void*)ranStr, ranStrLen); - ASSERT_EQ(code, -1); + code = walAppendLog(pWal, i + 2, i, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER); ASSERT_EQ(pWal->vers.lastVer, i); } char* oldss = walMetaSerialize(pWal); @@ -223,11 +228,11 @@ TEST_F(WalKeepEnv, readOldMeta) { TEST_F(WalCleanEnv, write) { int code; for (int i = 0; i < 10; i++) { - code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen); + code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); - code = walWrite(pWal, i + 2, i, (void*)ranStr, ranStrLen); - ASSERT_EQ(code, -1); + code = walAppendLog(pWal, i + 2, i, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER); ASSERT_EQ(pWal->vers.lastVer, i); } code = walSaveMeta(pWal); @@ -237,7 +242,7 @@ TEST_F(WalCleanEnv, write) { TEST_F(WalCleanEnv, rollback) { int code; for (int i = 0; i < 10; i++) { - code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen); + code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); } @@ -260,7 +265,7 @@ TEST_F(WalCleanEnv, rollback) { TEST_F(WalCleanEnv, rollbackMultiFile) { int code; for (int i = 0; i < 10; i++) { - code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen); + code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); if (i == 5) { @@ -282,7 +287,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) { ASSERT_EQ(pWal->vers.lastVer, 5); - code = walWrite(pWal, 6, 6, (void*)ranStr, ranStrLen); + code = walAppendLog(pWal, 6, 6, syncMeta, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, 6); @@ -294,7 +299,7 @@ TEST_F(WalCleanDeleteEnv, roll) { int code; int i; for (i = 0; i < 100; i++) { - code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen); + code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); code = walCommit(pWal, i); @@ -307,11 +312,11 @@ TEST_F(WalCleanDeleteEnv, roll) { ASSERT_EQ(pWal->vers.snapshotVer, i - 1); ASSERT_EQ(pWal->vers.verInSnapshotting, -1); - code = walWrite(pWal, 5, 0, (void*)ranStr, ranStrLen); + code = walAppendLog(pWal, 5, 0, syncMeta, (void*)ranStr, ranStrLen); ASSERT_NE(code, 0); for (; i < 200; i++) { - code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen); + code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); code = walCommit(pWal, i); ASSERT_EQ(pWal->vers.commitVer, i); @@ -334,7 +339,7 @@ TEST_F(WalKeepEnv, readHandleRead) { char newStr[100]; sprintf(newStr, "%s-%d", ranStr, i); int len = strlen(newStr); - code = walWrite(pWal, i, 0, newStr, len); + code = walAppendLog(pWal, i, 0, syncMeta, newStr, len); ASSERT_EQ(code, 0); } for (int i = 0; i < 1000; i++) { @@ -370,7 +375,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { char newStr[100]; sprintf(newStr, "%s-%d", ranStr, i); int len = strlen(newStr); - code = walWrite(pWal, i, 0, newStr, len); + code = walAppendLog(pWal, i, 0, syncMeta, newStr, len); ASSERT_EQ(code, 0); } @@ -416,7 +421,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { char newStr[100]; sprintf(newStr, "%s-%d", ranStr, i); int len = strlen(newStr); - code = walWrite(pWal, i, 0, newStr, len); + code = walAppendLog(pWal, i, 0, syncMeta, newStr, len); ASSERT_EQ(code, 0); }