diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000000..fc9a51af65 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,9 @@ +# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.209.3/containers/cpp/.devcontainer/base.Dockerfile + +# [Choice] Debian / Ubuntu version (use Debian 11/9, Ubuntu 18.04/21.04 on local arm64/Apple Silicon): debian-11, debian-10, debian-9, ubuntu-21.04, ubuntu-20.04, ubuntu-18.04 +ARG VARIANT="bullseye" +FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT} + +# [Optional] Uncomment this section to install additional packages. +# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ +# && apt-get -y install --no-install-recommends diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000000..9b752d091d --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,32 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the README at: +// https://github.com/microsoft/vscode-dev-containers/tree/v0.209.3/containers/cpp +{ + "name": "C++", + "build": { + "dockerfile": "Dockerfile", + // Update 'VARIANT' to pick an Debian / Ubuntu OS version: debian-11, debian-10, debian-9, ubuntu-21.04, ubuntu-20.04, ubuntu-18.04 + // Use Debian 11, Debian 9, Ubuntu 18.04 or Ubuntu 21.04 on local arm64/Apple Silicon + "args": { "VARIANT": "ubuntu-21.04" } + }, + "runArgs": ["--cap-add=SYS_PTRACE", "--security-opt", "seccomp=unconfined"], + + // Set *default* container specific settings.json values on container create. + "settings": {}, + + // Add the IDs of extensions you want installed when the container is created. + "extensions": [ + "ms-vscode.cpptools", + "ms-vscode.cmake-tools", + "austin.code-gnu-global", + "visualstudioexptteam.vscodeintel" + ], + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "gcc -v", + + // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. + "remoteUser": "vscode" +} diff --git a/.gitignore b/.gitignore index c651127992..2308ea7896 100644 --- a/.gitignore +++ b/.gitignore @@ -101,4 +101,4 @@ TAGS contrib/* !contrib/CMakeLists.txt -!contrib/test +!contrib/test \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index b626977588..1a8cb1d710 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake") set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib") include(${CMAKE_SUPPORT_DIR}/cmake.options) -SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma") +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma -g3") # contrib add_subdirectory(contrib) @@ -20,9 +20,13 @@ add_library(api INTERFACE) target_include_directories(api INTERFACE "include/client") # src +if(${BUILD_TEST}) + include(CTest) + enable_testing() +endif(${BUILD_TEST}) add_subdirectory(source) # docs add_subdirectory(docs) -# tests (TODO) +# tests (TODO) \ No newline at end of file diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 85533f65bc..7359df92cc 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -256,7 +256,7 @@ typedef struct STQ { // the collection of group handle // the handle of kvstore char* path; - STqCfg* tqConfig; + STqCfg* tqConfig; TqLogReader* tqLogReader; TqMemRef tqMemRef; TqMetaStore* tqMeta; diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 30531ad738..007ce83812 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -36,6 +36,8 @@ typedef struct SVnodeCfg { struct { /** write buffer size */ uint64_t wsize; + uint64_t ssize; + uint64_t lsize; /** use heap allocator or arena allocator */ bool isHeapAllocator; }; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 48c775292a..ae1e630c6f 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -44,7 +44,7 @@ typedef struct SWalReadHead { int8_t reserved[2]; int32_t len; int64_t version; - char cont[]; + char body[]; } SWalReadHead; typedef struct { @@ -52,9 +52,9 @@ typedef struct { int32_t fsyncPeriod; // millisecond int32_t retentionPeriod; // secs int32_t rollPeriod; // secs - int32_t retentionSize; // secs + int64_t retentionSize; int64_t segSize; - EWalType walLevel; // wal level + EWalType level; // wal level } SWalCfg; typedef struct { @@ -90,15 +90,17 @@ typedef struct { #define WAL_CUR_FILE_WRITABLE 2 #define WAL_CUR_FAILED 4 +typedef struct SWalVer { + int64_t firstVer; + int64_t verInSnapshotting; + int64_t snapshotVer; + int64_t commitVer; + int64_t lastVer; +} SWalVer; + typedef struct SWal { // cfg - int32_t vgId; - int32_t fsyncPeriod; // millisecond - int32_t rollPeriod; // second - int64_t segSize; - int64_t retentionSize; - int32_t retentionPeriod; - EWalType level; + SWalCfg cfg; //total size int64_t totSize; //fsync seq @@ -109,12 +111,7 @@ typedef struct SWal { int64_t writeLogTfd; int64_t writeIdxTfd; //wal lifecycle - int64_t firstVersion; - int64_t snapshotVersion; - int64_t commitVersion; - int64_t lastVersion; - //snapshotting version - int64_t snapshottingVer; + SWalVer vers; //roll status int64_t lastRollSeq; //file set @@ -126,9 +123,20 @@ typedef struct SWal { //path char path[WAL_PATH_LEN]; //reusable write head - SWalHead head; + SWalHead writeHead; } SWal; // WAL HANDLE +typedef struct SWalReadHandle { + SWal* pWal; + int64_t readLogTfd; + int64_t readIdxTfd; + int64_t curFileFirstVer; + int64_t curVersion; + int64_t capacity; + int64_t status; //if cursor valid + SWalHead head; +} SWalReadHandle; + typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); // module initialization @@ -154,6 +162,10 @@ int32_t walEndTakeSnapshot(SWal *); //int32_t walDataCorrupted(SWal*); // read +SWalReadHandle* walOpenReadHandle(SWal *); +void walCloseReadHandle(SWalReadHandle *); +int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); + int32_t walRead(SWal *, SWalHead **, int64_t ver); int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); diff --git a/include/util/tdlist.h b/include/util/tdlist.h new file mode 100644 index 0000000000..a19f3bebec --- /dev/null +++ b/include/util/tdlist.h @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_TDLIST_H_ +#define _TD_UTIL_TDLIST_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +// Single linked list +#define TD_SLIST_NODE(TYPE) \ + struct { \ + struct TYPE *sl_next_; \ + } + +#define TD_SLIST(TYPE) \ + struct { \ + struct TYPE *sl_head_; \ + int sl_neles_; \ + } + +#define TD_SLIST_HEAD(sl) ((sl)->sl_head_) +#define TD_SLIST_NELES(sl) ((sl)->sl_neles_) +#define TD_SLIST_NODE_NEXT(sln) ((sln)->sl_next_) + +#define tSListInit(sl) \ + do { \ + (sl)->sl_head_ = NULL; \ + (sl)->sl_neles_ = 0; \ + } while (0) + +#define tSListPush(sl, sln) \ + do { \ + TD_SLIST_NODE_NEXT(sln) = TD_SLIST_HEAD(sl); \ + TD_SLIST_HEAD(sl) = (sln); \ + TD_SLIST_NELES(sl) += 1; \ + } while (0) + +#define tSListPop(sl) \ + do { \ + TD_SLIST_HEAD(sl) = TD_SLIST_NODE_NEXT(TD_SLIST_HEAD(sl)); \ + TD_SLIST_NELES(sl) -= 1; \ + } while (0) + +// Double linked list +#define TD_DLIST_NODE(TYPE) \ + struct { \ + TYPE *dl_prev_; \ + TYPE *dl_next_; \ + } + +#define TD_DLIST(TYPE) \ + struct { \ + struct TYPE *dl_head_; \ + struct TYPE *dl_tail_; \ + int dl_neles_; \ + } + +#define TD_DLIST_NODE_PREV(dln) ((dln)->dl_prev_) +#define TD_DLIST_NODE_NEXT(dln) ((dln)->dl_next_) +#define TD_DLIST_HEAD(dl) ((dl)->dl_head_) +#define TD_DLIST_TAIL(dl) ((dl)->dl_tail_) +#define TD_DLIST_NELES(dl) ((dl)->dl_neles_) + +#define tDListInit(dl) \ + do { \ + TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = NULL; \ + TD_DLIST_NELES(dl) = 0; \ + } while (0) + +#define tDListAppend(dl, dln) \ + do { \ + if (TD_DLIST_HEAD(dl) == NULL) { \ + TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \ + TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = (dln); \ + } else { \ + TD_DLIST_NODE_PREV(dln) = TD_DLIST_TAIL(dl); \ + TD_DLIST_NODE_NEXT(dln) = NULL; \ + TD_DLIST_NODE_NEXT(TD_DLIST_TAIL(dl)) = (dln); \ + TD_DLIST_TAIL(dl) = (dln); \ + } \ + TD_DLIST_NELES(dl) += 1; \ + } while (0) + +#define tDListPrepend(dl, dln) \ + do { \ + if (TD_DLIST_HEAD(dl) == NULL) { \ + TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \ + TD_DLIST_HEAD(dl) = TD_DLIST_TAIL(dl) = (dln); \ + } else { \ + TD_DLIST_NODE_PREV(dln) = NULL; \ + TD_DLIST_NODE_NEXT(dln) = TD_DLIST_HEAD(dl); \ + TD_DLIST_NODE_PREV(TD_DLIST_HEAD(dl)) = (dln); \ + TD_DLIST_HEAD(dl) = (dln); \ + } \ + TD_DLIST_NELES(dl) += 1; \ + } while (0) + +#define tDListPop(dl, dln) \ + do { \ + if (TD_DLIST_HEAD(dl) == (dln)) { \ + TD_DLIST_HEAD(dl) = TD_DLIST_NODE_NEXT(dln); \ + } \ + if (TD_DLIST_TAIL(dl) == (dln)) { \ + TD_DLIST_TAIL(dl) = TD_DLIST_NODE_PREV(dln); \ + } \ + if (TD_DLIST_NODE_PREV(dln) != NULL) { \ + TD_DLIST_NODE_NEXT(TD_DLIST_NODE_PREV(dln)) = TD_DLIST_NODE_NEXT(dln); \ + } \ + if (TD_DLIST_NODE_NEXT(dln) != NULL) { \ + TD_DLIST_NODE_PREV(TD_DLIST_NODE_NEXT(dln)) = TD_DLIST_NODE_PREV(dln); \ + } \ + TD_DLIST_NELES(dl) -= 1; \ + TD_DLIST_NODE_PREV(dln) = TD_DLIST_NODE_NEXT(dln) = NULL; \ + } while (0) + +#if 0 +// List iterator +#define TD_LIST_FITER 0 +#define TD_LIST_BITER 1 +#define TD_LIST_ITER(S) \ + struct { \ + int it_dir_; \ + S * it_next_; \ + S * it_ptr_; \ + TD_DLIST(S) * it_list_; \ + } + +#define tlistIterInit(it, l, dir) \ + (it)->it_dir_ = (dir); \ + (it)->it_list_ = l; \ + if ((dir) == TD_LIST_FITER) { \ + (it)->it_next_ = (l)->dl_head_; \ + } else { \ + (it)->it_next_ = (l)->dl_tail_; \ + } + +#define tlistIterNext(it) \ + ({ \ + (it)->it_ptr_ = (it)->it_next_; \ + if ((it)->it_next_ != NULL) { \ + if ((it)->it_dir_ == TD_LIST_FITER) { \ + (it)->it_next_ = (it)->it_next_->next_; \ + } else { \ + (it)->it_next_ = (it)->it_next_->prev_; \ + } \ + } \ + (it)->it_ptr_; \ + }) +#endif + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_TDLIST_H_*/ \ No newline at end of file diff --git a/include/util/tmacro.h b/include/util/tmacro.h new file mode 100644 index 0000000000..74056cfe07 --- /dev/null +++ b/include/util/tmacro.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_MACRO_H_ +#define _TD_UTIL_MACRO_H_ + +#include "os.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// Module init/clear MACRO definitions +#define TD_MOD_UNINITIALIZED 0 +#define TD_MOD_INITIALIZED 1 + +#define TD_MOD_UNCLEARD 0 +#define TD_MOD_CLEARD 1 + +#define TD_DEF_MOD_INIT_FLAG(MOD) static int8_t MOD##InitFlag = TD_MOD_UNINITIALIZED +#define TD_DEF_MOD_CLEAR_FLAG(MOD) static int8_t MOD##ClearFlag = TD_MOD_UNCLEARD + +#define TD_CHECK_AND_SET_MODE_INIT(MOD) \ + atomic_val_compare_exchange_8(&(MOD##InitFlag), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED) + +#define TD_CHECK_AND_SET_MOD_CLEAR(MOD) atomic_val_compare_exchange_8(&(MOD##ClearFlag), TD_MOD_UNCLEARD, TD_MOD_CLEARD) + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_MACRO_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index c156852905..fba3794f6a 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -271,7 +271,6 @@ TEST_F(DndTestDnode, DropDnode_01) { ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - taosMsleep(1300); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); SendThenCheckShowRetrieveMsg(1); CheckInt16(1); @@ -363,7 +362,7 @@ TEST_F(DndTestDnode, CreateDnode_02) { } TEST_F(DndTestDnode, RestartDnode_01) { - uInfo("===> stop all server"); + uInfo("stop all server"); stopServer(pServer1); stopServer(pServer2); stopServer(pServer3); @@ -375,14 +374,16 @@ TEST_F(DndTestDnode, RestartDnode_01) { pServer4 = NULL; pServer5 = NULL; - taosMsleep(3000); // wait tcp port cleanedup - uInfo("===> start all server"); + uInfo("start all server"); const char* fqdn = "localhost"; const char* firstEp = "localhost:9521"; pServer1 = startServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp); + // pServer1 = startServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp); + // pServer1 = startServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp); + // pServer1 = startServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp); - uInfo("===> all server is running"); + uInfo("all server is running"); // taosMsleep(1300); // SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); diff --git a/source/dnode/mgmt/impl/test/user/user.cpp b/source/dnode/mgmt/impl/test/user/user.cpp index c3c1d1d406..48be2635cd 100644 --- a/source/dnode/mgmt/impl/test/user/user.cpp +++ b/source/dnode/mgmt/impl/test/user/user.cpp @@ -17,270 +17,223 @@ class DndTestUser : public ::testing::Test { protected: - void SetUp() override {} - void TearDown() override {} + static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) { + SServer* pServer = createServer(path, fqdn, port, firstEp); + ASSERT(pServer); + return pServer; + } static void SetUpTestSuite() { - const char* user = "root"; - const char* pass = "taosdata"; - const char* path = "/tmp/dndTestUser"; - const char* fqdn = "localhost"; - uint16_t port = 9524; - const char* firstEp = "localhost:9524"; + initLog("/tmp/dndTestUser"); - pServer = createServer(path, fqdn, port, firstEp); - ASSERT(pServer); - pClient = createClient(user, pass, fqdn, port); + const char* fqdn = "localhost"; + const char* firstEp = "localhost:9530"; + pServer = CreateServer("/tmp/dndTestUser", fqdn, 9530, firstEp); + pClient = createClient("root", "taosdata", fqdn, 9530); + taosMsleep(300); } static void TearDownTestSuite() { stopServer(pServer); dropClient(pClient); + pServer = NULL; + pClient = NULL; } static SServer* pServer; static SClient* pClient; static int32_t connId; + + public: + void SetUp() override {} + void TearDown() override {} + + void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) { + SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); + pShow->type = showType; + strcpy(pShow->db, ""); + + SRpcMsg showRpcMsg = {0}; + showRpcMsg.pCont = pShow; + showRpcMsg.contLen = sizeof(SShowMsg); + showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + + sendMsg(pClient, &showRpcMsg); + ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); + + SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; + ASSERT_NE(pShowRsp, nullptr); + pShowRsp->showId = htonl(pShowRsp->showId); + pMeta = &pShowRsp->tableMeta; + pMeta->numOfTags = htons(pMeta->numOfTags); + pMeta->numOfColumns = htons(pMeta->numOfColumns); + pMeta->sversion = htons(pMeta->sversion); + pMeta->tversion = htons(pMeta->tversion); + pMeta->tuid = htobe64(pMeta->tuid); + pMeta->suid = htobe64(pMeta->suid); + + showId = pShowRsp->showId; + + EXPECT_NE(pShowRsp->showId, 0); + EXPECT_STREQ(pMeta->tbFname, showName); + EXPECT_EQ(pMeta->numOfTags, 0); + EXPECT_EQ(pMeta->numOfColumns, columns); + EXPECT_EQ(pMeta->precision, 0); + EXPECT_EQ(pMeta->tableType, 0); + EXPECT_EQ(pMeta->update, 0); + EXPECT_EQ(pMeta->sversion, 0); + EXPECT_EQ(pMeta->tversion, 0); + EXPECT_EQ(pMeta->tuid, 0); + EXPECT_EQ(pMeta->suid, 0); + } + + void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) { + SSchema* pSchema = &pMeta->pSchema[index]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, type); + EXPECT_EQ(pSchema->bytes, bytes); + EXPECT_STREQ(pSchema->name, name); + } + + void SendThenCheckShowRetrieveMsg(int32_t rows) { + SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); + pRetrieve->showId = htonl(showId); + pRetrieve->free = 0; + + SRpcMsg retrieveRpcMsg = {0}; + retrieveRpcMsg.pCont = pRetrieve; + retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); + retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + + sendMsg(pClient, &retrieveRpcMsg); + + ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); + + pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; + ASSERT_NE(pRetrieveRsp, nullptr); + pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); + pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset); + pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); + pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); + + EXPECT_EQ(pRetrieveRsp->numOfRows, rows); + EXPECT_EQ(pRetrieveRsp->offset, 0); + EXPECT_EQ(pRetrieveRsp->useconds, 0); + // EXPECT_EQ(pRetrieveRsp->completed, completed); + EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(pRetrieveRsp->compressed, 0); + EXPECT_EQ(pRetrieveRsp->reserved, 0); + EXPECT_EQ(pRetrieveRsp->compLen, 0); + + pData = pRetrieveRsp->data; + pos = 0; + } + + void CheckInt16(int16_t val) { + int16_t data = *((int16_t*)(pData + pos)); + pos += sizeof(int16_t); + EXPECT_EQ(data, val); + } + + void CheckInt64(int64_t val) { + int64_t data = *((int64_t*)(pData + pos)); + pos += sizeof(int64_t); + EXPECT_EQ(data, val); + } + + void CheckTimestamp() { + int64_t data = *((int64_t*)(pData + pos)); + pos += sizeof(int64_t); + EXPECT_GT(data, 0); + } + + void CheckBinary(const char* val, int32_t len) { + pos += sizeof(VarDataLenT); + char* data = (char*)(pData + pos); + pos += len; + EXPECT_STREQ(data, val); + } + + int32_t showId; + STableMetaMsg* pMeta; + SRetrieveTableRsp* pRetrieveRsp; + char* pData; + int32_t pos; }; SServer* DndTestUser::pServer; SClient* DndTestUser::pClient; int32_t DndTestUser::connId; -#if 0 TEST_F(DndTestUser, ShowUser) { - int32_t showId = 0; + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4); + CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "name"); + CheckSchema(1, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "privilege"); + CheckSchema(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); + CheckSchema(3, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "account"); - //--- meta --- - SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); - pShow->type = TSDB_MGMT_TABLE_USER; - strcpy(pShow->db, ""); - - SRpcMsg showRpcMsg = {0}; - showRpcMsg.pCont = pShow; - showRpcMsg.contLen = sizeof(SShowMsg); - showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; - - sendMsg(pClient, &showRpcMsg); - ASSERT_NE(pClient->pRsp, nullptr); - - SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; - ASSERT_NE(pShowRsp, nullptr); - pShowRsp->showId = htonl(pShowRsp->showId); - STableMetaMsg* pMeta = &pShowRsp->tableMeta; - pMeta->contLen = htonl(pMeta->contLen); - pMeta->numOfColumns = htons(pMeta->numOfColumns); - pMeta->sversion = htons(pMeta->sversion); - pMeta->tversion = htons(pMeta->tversion); - pMeta->tid = htonl(pMeta->tid); - pMeta->uid = htobe64(pMeta->uid); - pMeta->suid = htobe64(pMeta->suid); - - showId = pShowRsp->showId; - - EXPECT_NE(pShowRsp->showId, 0); - EXPECT_EQ(pMeta->contLen, 0); - EXPECT_STREQ(pMeta->tbFname, "show users"); - EXPECT_EQ(pMeta->numOfTags, 0); - EXPECT_EQ(pMeta->precision, 0); - EXPECT_EQ(pMeta->tableType, 0); - EXPECT_EQ(pMeta->numOfColumns, 4); - EXPECT_EQ(pMeta->sversion, 0); - EXPECT_EQ(pMeta->tversion, 0); - EXPECT_EQ(pMeta->tid, 0); - EXPECT_EQ(pMeta->uid, 0); - EXPECT_STREQ(pMeta->sTableName, ""); - EXPECT_EQ(pMeta->suid, 0); - - SSchema* pSchema = NULL; - - pSchema = &pMeta->pSchema[0]; - pSchema->bytes = htons(pSchema->bytes); - EXPECT_EQ(pSchema->colId, 0); - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); - EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); - EXPECT_STREQ(pSchema->name, "name"); - - pSchema = &pMeta->pSchema[1]; - pSchema->bytes = htons(pSchema->bytes); - EXPECT_EQ(pSchema->colId, 0); - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); - EXPECT_EQ(pSchema->bytes, 10 + VARSTR_HEADER_SIZE); - EXPECT_STREQ(pSchema->name, "privilege"); - - pSchema = &pMeta->pSchema[2]; - pSchema->bytes = htons(pSchema->bytes); - EXPECT_EQ(pSchema->colId, 0); - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); - EXPECT_EQ(pSchema->bytes, 8); - EXPECT_STREQ(pSchema->name, "create_time"); - - pSchema = &pMeta->pSchema[3]; - pSchema->bytes = htons(pSchema->bytes); - EXPECT_EQ(pSchema->colId, 0); - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); - EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); - EXPECT_STREQ(pSchema->name, "account"); - - //--- retrieve --- - SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); - pRetrieve->showId = htonl(showId); - pRetrieve->free = 0; - - SRpcMsg retrieveRpcMsg = {0}; - retrieveRpcMsg.pCont = pRetrieve; - retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); - retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - - sendMsg(pClient, &retrieveRpcMsg); - ASSERT_NE(pClient->pRsp, nullptr); - ASSERT_EQ(pClient->pRsp->code, 0); - - SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; - ASSERT_NE(pRetrieveRsp, nullptr); - pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); - pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset); - pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); - pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); - - EXPECT_EQ(pRetrieveRsp->numOfRows, 2); - EXPECT_EQ(pRetrieveRsp->offset, 0); - EXPECT_EQ(pRetrieveRsp->useconds, 0); - EXPECT_EQ(pRetrieveRsp->completed, 1); - EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); - EXPECT_EQ(pRetrieveRsp->compressed, 0); - EXPECT_EQ(pRetrieveRsp->reserved, 0); - EXPECT_EQ(pRetrieveRsp->compLen, 0); - - char* pData = pRetrieveRsp->data; - int32_t pos = 0; - char* strVal = NULL; - int64_t int64Val = 0; - - //--- name --- - { - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "root"); - - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "_root"); - } - - //--- privilege --- - { - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += 10; - EXPECT_STREQ(strVal, "super"); - - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += 10; - EXPECT_STREQ(strVal, "writable"); - } - - //--- create_time --- - { - int64Val = *((int64_t*)(pData + pos)); - pos += sizeof(int64_t); - EXPECT_GT(int64Val, 0); - - int64Val = *((int64_t*)(pData + pos)); - pos += sizeof(int64_t); - EXPECT_GT(int64Val, 0); - } - - //--- account --- - { - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "root"); - - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "root"); - } + SendThenCheckShowRetrieveMsg(1); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("super", 10); + CheckTimestamp(); + CheckBinary("root", TSDB_USER_LEN); } -#endif TEST_F(DndTestUser, CreateUser_01) { - ASSERT_NE(pClient, nullptr); - - //--- create user --- - SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg)); - strcpy(pReq->user, "u1"); - strcpy(pReq->pass, "p1"); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SCreateUserMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER; - - sendMsg(pClient, &rpcMsg); - SRpcMsg* pMsg = pClient->pRsp; - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - - //--- meta --- - SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); - pShow->type = TSDB_MGMT_TABLE_USER; - SRpcMsg showRpcMsg = {0}; - showRpcMsg.pCont = pShow; - showRpcMsg.contLen = sizeof(SShowMsg); - showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; - - sendMsg(pClient, &showRpcMsg); - SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; - STableMetaMsg* pMeta = &pShowRsp->tableMeta; - pMeta->numOfColumns = htons(pMeta->numOfColumns); - EXPECT_EQ(pMeta->numOfColumns, 4); - - //--- retrieve --- - SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); - pRetrieve->showId = pShowRsp->showId; - SRpcMsg retrieveRpcMsg = {0}; - retrieveRpcMsg.pCont = pRetrieve; - retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); - retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - - sendMsg(pClient, &retrieveRpcMsg); - SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; - pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); - EXPECT_EQ(pRetrieveRsp->numOfRows, 3); - - char* pData = pRetrieveRsp->data; - int32_t pos = 0; - char* strVal = NULL; - - //--- name --- { - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "u1"); + SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg)); + strcpy(pReq->user, "u1"); + strcpy(pReq->pass, "p1"); - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "root"); + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateUserMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER; - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "_root"); + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); } + + { + SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg)); + strcpy(pReq->user, "u2"); + strcpy(pReq->pass, "p2"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateUserMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } + + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4); + SendThenCheckShowRetrieveMsg(3); + CheckBinary("u1", TSDB_USER_LEN); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("u2", TSDB_USER_LEN); + CheckBinary("normal", 10); + CheckBinary("super", 10); + CheckBinary("normal", 10); + CheckTimestamp(); + CheckTimestamp(); + CheckTimestamp(); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("root", TSDB_USER_LEN); } TEST_F(DndTestUser, AlterUser_01) { - ASSERT_NE(pClient, nullptr); - - //--- drop user --- SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg)); strcpy(pReq->user, "u1"); strcpy(pReq->pass, "p2"); @@ -295,60 +248,23 @@ TEST_F(DndTestUser, AlterUser_01) { ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - //--- meta --- - SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); - pShow->type = TSDB_MGMT_TABLE_USER; - SRpcMsg showRpcMsg = {0}; - showRpcMsg.pCont = pShow; - showRpcMsg.contLen = sizeof(SShowMsg); - showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; - - sendMsg(pClient, &showRpcMsg); - SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; - STableMetaMsg* pMeta = &pShowRsp->tableMeta; - pMeta->numOfColumns = htons(pMeta->numOfColumns); - EXPECT_EQ(pMeta->numOfColumns, 4); - - //--- retrieve --- - SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); - pRetrieve->showId = pShowRsp->showId; - SRpcMsg retrieveRpcMsg = {0}; - retrieveRpcMsg.pCont = pRetrieve; - retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); - retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - - sendMsg(pClient, &retrieveRpcMsg); - SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; - pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); - EXPECT_EQ(pRetrieveRsp->numOfRows, 3); - - char* pData = pRetrieveRsp->data; - int32_t pos = 0; - char* strVal = NULL; - - //--- name --- - { - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "u1"); - - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "root"); - - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "_root"); - } + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4); + SendThenCheckShowRetrieveMsg(3); + CheckBinary("u1", TSDB_USER_LEN); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("u2", TSDB_USER_LEN); + CheckBinary("normal", 10); + CheckBinary("super", 10); + CheckBinary("normal", 10); + CheckTimestamp(); + CheckTimestamp(); + CheckTimestamp(); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("root", TSDB_USER_LEN); } TEST_F(DndTestUser, DropUser_01) { - ASSERT_NE(pClient, nullptr); - - //--- drop user --- SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg)); strcpy(pReq->user, "u1"); @@ -362,47 +278,38 @@ TEST_F(DndTestUser, DropUser_01) { ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - //--- meta --- - SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); - pShow->type = TSDB_MGMT_TABLE_USER; - SRpcMsg showRpcMsg = {0}; - showRpcMsg.pCont = pShow; - showRpcMsg.contLen = sizeof(SShowMsg); - showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4); + SendThenCheckShowRetrieveMsg(2); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("u2", TSDB_USER_LEN); + CheckBinary("super", 10); + CheckBinary("normal", 10); + CheckTimestamp(); + CheckTimestamp(); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("root", TSDB_USER_LEN); +} - sendMsg(pClient, &showRpcMsg); - SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; - STableMetaMsg* pMeta = &pShowRsp->tableMeta; - pMeta->numOfColumns = htons(pMeta->numOfColumns); - EXPECT_EQ(pMeta->numOfColumns, 4); +TEST_F(DndTestUser, RestartDnode) { + stopServer(pServer); + pServer = NULL; - //--- retrieve --- - SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); - pRetrieve->showId = pShowRsp->showId; - SRpcMsg retrieveRpcMsg = {0}; - retrieveRpcMsg.pCont = pRetrieve; - retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); - retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + uInfo("start all server"); - sendMsg(pClient, &retrieveRpcMsg); - SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; - pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); - EXPECT_EQ(pRetrieveRsp->numOfRows, 2); + const char* fqdn = "localhost"; + const char* firstEp = "localhost:9530"; + pServer = startServer("/tmp/dndTestUser", fqdn, 9530, firstEp); - char* pData = pRetrieveRsp->data; - int32_t pos = 0; - char* strVal = NULL; + uInfo("all server is running"); - //--- name --- - { - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "root"); - - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "_root"); - } -} \ No newline at end of file + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4); + SendThenCheckShowRetrieveMsg(2); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("u2", TSDB_USER_LEN); + CheckBinary("super", 10); + CheckBinary("normal", 10); + CheckTimestamp(); + CheckTimestamp(); + CheckBinary("root", TSDB_USER_LEN); + CheckBinary("root", TSDB_USER_LEN); +} diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 2e4719cceb..2f582a810d 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -85,9 +85,11 @@ static int32_t mndCreateDefaultUsers(SMnode *pMnode) { return -1; } +#if 0 if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { return -1; } +#endif return 0; } @@ -469,7 +471,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create_time"); + strcpy(pSchema[cols].name, "create time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -516,11 +518,8 @@ static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in if (pUser->superAuth) { const char *src = "super"; STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); - } else if (pUser->writeAuth) { - const char *src = "writable"; - STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); } else { - const char *src = "readable"; + const char *src = "normal"; STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); } cols++; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 1d4888c2eb..ccff5b6c82 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -52,10 +52,10 @@ SSdb *sdbInit(SSdbOpt *pOption) { void sdbCleanup(SSdb *pSdb) { mDebug("start to cleanup sdb"); - if (pSdb->curVer != pSdb->lastCommitVer) { + // if (pSdb->curVer != pSdb->lastCommitVer) { mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); sdbWriteFile(pSdb); - } + // } if (pSdb->currDir != NULL) { tfree(pSdb->currDir); diff --git a/source/dnode/vnode/impl/CMakeLists.txt b/source/dnode/vnode/impl/CMakeLists.txt index d6d267c4d4..6972605afd 100644 --- a/source/dnode/vnode/impl/CMakeLists.txt +++ b/source/dnode/vnode/impl/CMakeLists.txt @@ -18,6 +18,6 @@ target_link_libraries( ) # test -#if(${BUILD_TEST}) -# add_subdirectory(test) -#endif(${BUILD_TEST}) \ No newline at end of file +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/dnode/vnode/impl/inc/vnodeBufferPool.h b/source/dnode/vnode/impl/inc/vnodeBufferPool.h index d64dc93847..bfc4de9e12 100644 --- a/source/dnode/vnode/impl/inc/vnodeBufferPool.h +++ b/source/dnode/vnode/impl/inc/vnodeBufferPool.h @@ -28,6 +28,7 @@ typedef struct SVBufPool SVBufPool; int vnodeOpenBufPool(SVnode *pVnode); void vnodeCloseBufPool(SVnode *pVnode); void *vnodeMalloc(SVnode *pVnode, uint64_t size); +bool vnodeBufPoolIsFull(SVnode *pVnode); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/inc/vnodeCommit.h b/source/dnode/vnode/impl/inc/vnodeCommit.h index c8ff4947aa..a60e8feac2 100644 --- a/source/dnode/vnode/impl/inc/vnodeCommit.h +++ b/source/dnode/vnode/impl/inc/vnodeCommit.h @@ -22,8 +22,8 @@ extern "C" { #endif -bool vnodeShouldCommit(SVnode *pVnode); -int vnodeAsyncCommit(SVnode *pVnode); +#define vnodeShouldCommit vnodeBufPoolIsFull +int vnodeAsyncCommit(SVnode *pVnode); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index c92de433c3..e3a3fac6b9 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -18,15 +18,17 @@ #include "mallocator.h" #include "sync.h" +#include "tcoding.h" +#include "tdlist.h" #include "tlockfree.h" #include "wal.h" -#include "tcoding.h" #include "vnode.h" #include "vnodeBufferPool.h" #include "vnodeCfg.h" #include "vnodeCommit.h" #include "vnodeFS.h" +#include "vnodeMemAllocator.h" #include "vnodeRequest.h" #include "vnodeStateMgr.h" #include "vnodeSync.h" diff --git a/source/dnode/vnode/impl/inc/vnodeMemAllocator.h b/source/dnode/vnode/impl/inc/vnodeMemAllocator.h index 9184eb416b..c8c58e9f69 100644 --- a/source/dnode/vnode/impl/inc/vnodeMemAllocator.h +++ b/source/dnode/vnode/impl/inc/vnodeMemAllocator.h @@ -16,15 +16,37 @@ #ifndef _TD_VNODE_MEM_ALLOCATOR_H_ #define _TD_VNODE_MEM_ALLOCATOR_H_ -#include "mallocator.h" -#include "vnode.h" +#include "os.h" #ifdef __cplusplus extern "C" { #endif -SMemAllocator *vnodeCreateMemAllocator(SVnode *pVnode); -void vnodeDestroyMemAllocator(SMemAllocator *pma); +typedef struct SVArenaNode SVArenaNode; +typedef struct SVMemAllocator SVMemAllocator; + +struct SVArenaNode { + TD_SLIST_NODE(SVArenaNode); + uint64_t size; // current node size + void * ptr; + char data[]; +}; + +struct SVMemAllocator { + TD_DLIST_NODE(SVMemAllocator); + uint64_t capacity; + uint64_t ssize; + uint64_t lsize; + SVArenaNode *pNode; + TD_SLIST(SVArenaNode) nlist; +}; + +SVMemAllocator *vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize); +void vmaDestroy(SVMemAllocator *pVMA); +void vmaReset(SVMemAllocator *pVMA); +void * vmaMalloc(SVMemAllocator *pVMA, uint64_t size); +void vmaFree(SVMemAllocator *pVMA, void *ptr); +bool vmaIsFull(SVMemAllocator *pVMA); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeArenaMAImpl.c b/source/dnode/vnode/impl/src/vnodeArenaMAImpl.c new file mode 100644 index 0000000000..99d4781df9 --- /dev/null +++ b/source/dnode/vnode/impl/src/vnodeArenaMAImpl.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeDef.h" + +static SVArenaNode *vArenaNodeNew(uint64_t capacity); +static void vArenaNodeFree(SVArenaNode *pNode); + +SVMemAllocator *vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize) { + SVMemAllocator *pVMA = (SVMemAllocator *)malloc(sizeof(*pVMA)); + if (pVMA == NULL) { + return NULL; + } + + pVMA->capacity = capacity; + pVMA->ssize = ssize; + pVMA->lsize = lsize; + tSListInit(&(pVMA->nlist)); + + pVMA->pNode = vArenaNodeNew(capacity); + if (pVMA->pNode == NULL) { + free(pVMA); + return NULL; + } + + tSListPush(&(pVMA->nlist), pVMA->pNode); + + return pVMA; +} + +void vmaDestroy(SVMemAllocator *pVMA) { + if (pVMA) { + while (TD_SLIST_NELES(&(pVMA->nlist)) > 1) { + SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist)); + tSListPop(&(pVMA->nlist)); + vArenaNodeFree(pNode); + } + + free(pVMA); + } +} + +void vmaReset(SVMemAllocator *pVMA) { + while (TD_SLIST_NELES(&(pVMA->nlist)) > 1) { + SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist)); + tSListPop(&(pVMA->nlist)); + vArenaNodeFree(pNode); + } + + SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist)); + pNode->ptr = pNode->data; +} + +void *vmaMalloc(SVMemAllocator *pVMA, uint64_t size) { + SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist)); + void * ptr; + + if (pNode->size < POINTER_DISTANCE(pNode->ptr, pNode->data) + size) { + uint64_t capacity = MAX(pVMA->ssize, size); + pNode = vArenaNodeNew(capacity); + if (pNode == NULL) { + // TODO: handle error + return NULL; + } + + tSListPush(&(pVMA->nlist), pNode); + } + + ptr = pNode->ptr; + pNode->ptr = POINTER_SHIFT(ptr, size); + + return ptr; +} + +void vmaFree(SVMemAllocator *pVMA, void *ptr) { + // TODO +} + +bool vmaIsFull(SVMemAllocator *pVMA) { + SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist)); + + return (TD_SLIST_NELES(&(pVMA->nlist)) > 1) || + (pNode->size < POINTER_DISTANCE(pNode->ptr, pNode->data) + pVMA->lsize); +} + +/* ------------------------ STATIC METHODS ------------------------ */ +static SVArenaNode *vArenaNodeNew(uint64_t capacity) { + SVArenaNode *pNode = NULL; + + pNode = (SVArenaNode *)malloc(sizeof(*pNode) + capacity); + if (pNode == NULL) { + return NULL; + } + + pNode->size = capacity; + pNode->ptr = pNode->data; + + return pNode; +} + +static void vArenaNodeFree(SVArenaNode *pNode) { + if (pNode) { + free(pNode); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeBufferPool.c b/source/dnode/vnode/impl/src/vnodeBufferPool.c index 00203ed9b6..1db15c3990 100644 --- a/source/dnode/vnode/impl/src/vnodeBufferPool.c +++ b/source/dnode/vnode/impl/src/vnodeBufferPool.c @@ -19,14 +19,90 @@ #define VNODE_BUF_POOL_SHARDS 3 struct SVBufPool { - // buffer pool impl - SList free; - SList incycle; - SListNode *inuse; + TD_DLIST(SVMemAllocator) free; + TD_DLIST(SVMemAllocator) incycle; + SVMemAllocator *inuse; // MAF for submodules - SMemAllocatorFactory maf; + // SMemAllocatorFactory maf; }; +int vnodeOpenBufPool(SVnode *pVnode) { + uint64_t capacity; + // EVMemAllocatorT type = E_V_ARENA_ALLOCATOR; + + if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) { + /* TODO */ + return -1; + } + + tDListInit(&(pVnode->pBufPool->free)); + tDListInit(&(pVnode->pBufPool->incycle)); + + pVnode->pBufPool->inuse = NULL; + + // TODO + capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS; + + for (int i = 0; i < VNODE_BUF_POOL_SHARDS; i++) { + SVMemAllocator *pVMA = vmaCreate(capacity, pVnode->config.ssize, pVnode->config.lsize); + if (pVMA == NULL) { + // TODO: handle error + return -1; + } + + tDListAppend(&(pVnode->pBufPool->free), pVMA); + } + + return 0; +} + +void vnodeCloseBufPool(SVnode *pVnode) { + if (pVnode->pBufPool) { + vmaDestroy(pVnode->pBufPool->inuse); + + while (true) { + SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->incycle)); + if (pVMA == NULL) break; + tDListPop(&(pVnode->pBufPool->incycle), pVMA); + vmaDestroy(pVMA); + } + + while (true) { + SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->free)); + if (pVMA == NULL) break; + tDListPop(&(pVnode->pBufPool->free), pVMA); + vmaDestroy(pVMA); + } + + free(pVnode->pBufPool); + pVnode->pBufPool = NULL; + } +} + +void *vnodeMalloc(SVnode *pVnode, uint64_t size) { + SVBufPool *pBufPool = pVnode->pBufPool; + + if (pBufPool->inuse == NULL) { + while (true) { + // TODO: add sem_wait and sem_post + pBufPool->inuse = TD_DLIST_HEAD(&(pBufPool->free)); + if (pBufPool->inuse) { + tDListPop(&(pBufPool->free), pBufPool->inuse); + break; + } + } + } + + return vmaMalloc(pBufPool->inuse, size); +} + +bool vnodeBufPoolIsFull(SVnode *pVnode) { + if (pVnode->pBufPool->inuse == NULL) return false; + return vmaIsFull(pVnode->pBufPool->inuse); +} + +#if 0 + typedef enum { // Heap allocator E_V_HEAP_ALLOCATOR = 0, @@ -57,15 +133,6 @@ typedef struct { SListNode *pNode; } SVMAWrapper; -typedef struct { - T_REF_DECLARE() - uint64_t capacity; - EVMemAllocatorT type; - union { - SVHeapAllocator vha; - SVArenaAllocator vaa; - }; -} SVMemAllocator; static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); static void vBufPoolFreeNode(SListNode *pNode); @@ -73,106 +140,13 @@ static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf); static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma); static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size); -int vnodeOpenBufPool(SVnode *pVnode) { - uint64_t capacity; - EVMemAllocatorT type = E_V_ARENA_ALLOCATOR; - - if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) { - /* TODO */ - return -1; - } - - tdListInit(&(pVnode->pBufPool->free), 0); - tdListInit(&(pVnode->pBufPool->incycle), 0); - - capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS; - if (pVnode->config.isHeapAllocator) { - type = E_V_HEAP_ALLOCATOR; - } - - for (int i = 0; i < VNODE_BUF_POOL_SHARDS; i++) { - SListNode *pNode = vBufPoolNewNode(capacity, type); - if (pNode == NULL) { - vnodeCloseBufPool(pVnode); - return -1; - } - - tdListAppendNode(&(pVnode->pBufPool->free), pNode); - } - - pVnode->pBufPool->maf.impl = pVnode; - pVnode->pBufPool->maf.create = vBufPoolCreateMA; - pVnode->pBufPool->maf.destroy = vBufPoolDestroyMA; - - return 0; -} - -void vnodeCloseBufPool(SVnode *pVnode) { - SListNode *pNode; - if (pVnode->pBufPool) { - // Clear free list - while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) != NULL) { - vBufPoolFreeNode(pNode); - } - - // Clear incycle list - while ((pNode = tdListPopHead(&(pVnode->pBufPool->incycle))) != NULL) { - vBufPoolFreeNode(pNode); - } - - // Free inuse node - if (pVnode->pBufPool->inuse) { - vBufPoolFreeNode(pVnode->pBufPool->inuse); - } - - free(pVnode->pBufPool); - pVnode->pBufPool = NULL; - } -} - -void *vnodeMalloc(SVnode *pVnode, uint64_t size) { - void *ptr; - - if (pVnode->pBufPool->inuse == NULL) { - SListNode *pNode; - while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) == NULL) { - // todo - // tsem_wait(); - ASSERT(0); - } - - pVnode->pBufPool->inuse = pNode; - } - - SVMemAllocator *pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); - return vBufPoolMalloc(pvma, size); -} - /* ------------------------ STATIC METHODS ------------------------ */ -static void vArenaAllocatorInit(SVArenaAllocator *pvaa, uint64_t capacity, uint64_t ssize, uint64_t lsize) { /* TODO */ - pvaa->ssize = ssize; - pvaa->lsize = lsize; - pvaa->inuse = &pvaa->node; - - pvaa->node.prev = NULL; - pvaa->node.size = capacity; - pvaa->node.ptr = pvaa->node.data; -} - -static void vArenaAllocatorClear(SVArenaAllocator *pvaa) { /* TODO */ - while (pvaa->inuse != &(pvaa->node)) { - SVArenaNode *pANode = pvaa->inuse; - pvaa->inuse = pANode->prev; - free(pANode); - } -} - static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type) { SListNode * pNode; SVMemAllocator *pvma; uint64_t msize; - uint64_t ssize = 0; // TODO - uint64_t lsize = 0; // TODO + uint64_t ssize = 4096; // TODO + uint64_t lsize = 1024; // TODO msize = sizeof(SListNode) + sizeof(SVMemAllocator); if (type == E_V_ARENA_ALLOCATOR) { @@ -317,4 +291,5 @@ static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) { tdListAppendNode(&(pVnode->pBufPool->free), tdListPopNode(&(pVnode->pBufPool->incycle), pNode)); // tsem_post(); todo: sem_post } -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeCfg.c b/source/dnode/vnode/impl/src/vnodeCfg.c index 01facba888..f5bb7e35d2 100644 --- a/source/dnode/vnode/impl/src/vnodeCfg.c +++ b/source/dnode/vnode/impl/src/vnodeCfg.c @@ -15,7 +15,7 @@ #include "vnodeDef.h" -const SVnodeCfg defaultVnodeOptions = {0}; /* TODO */ +const SVnodeCfg defaultVnodeOptions = {.wsize = 16 * 1024 * 1024, .walCfg = {.level = TAOS_WAL_WRITE}}; /* TODO */ void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */ vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions); diff --git a/source/dnode/vnode/impl/src/vnodeCommit.c b/source/dnode/vnode/impl/src/vnodeCommit.c index 18a0c6d91d..cac7999f59 100644 --- a/source/dnode/vnode/impl/src/vnodeCommit.c +++ b/source/dnode/vnode/impl/src/vnodeCommit.c @@ -18,8 +18,6 @@ static int vnodeStartCommit(SVnode *pVnode); static int vnodeEndCommit(SVnode *pVnode); -bool vnodeShouldCommit(SVnode *pVnode) { return false; } - int vnodeAsyncCommit(SVnode *pVnode) { #if 0 if (vnodeStartCommit(pVnode) < 0) { diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index ab33b58858..9b94b4a361 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tmacro.h" #include "vnodeDef.h" static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); @@ -20,8 +21,14 @@ static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); +TD_DEF_MOD_INIT_FLAG(vnode); +TD_DEF_MOD_CLEAR_FLAG(vnode); + int vnodeInit() { - // TODO + if (TD_CHECK_AND_SET_MODE_INIT(vnode) == TD_MOD_INITIALIZED) { + return 0; + } + if (walInit() < 0) { return -1; } @@ -30,6 +37,10 @@ int vnodeInit() { } void vnodeClear() { + if (TD_CHECK_AND_SET_MOD_CLEAR(vnode) == TD_MOD_CLEARD) { + return; + } + walCleanUp(); } diff --git a/source/dnode/vnode/impl/src/vnodeMemAllocator.c b/source/dnode/vnode/impl/src/vnodeMemAllocator.c deleted file mode 100644 index 902014eb47..0000000000 --- a/source/dnode/vnode/impl/src/vnodeMemAllocator.c +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeDef.h" - -SMemAllocator *vnodeCreateMemAllocator(SVnode *pVnode) { - SMemAllocator *pma = NULL; - /* TODO */ - return pma; -} - -void vnodeDestroyMemAllocator(SMemAllocator *pma) { - // TODO -} - -#if 0 -#define VNODE_HEAP_ALLOCATOR 0 -#define VNODE_ARENA_ALLOCATOR 1 - -typedef struct { - uint64_t tsize; - uint64_t used; -} SVHeapAllocator; - -typedef struct SVArenaNode { - struct SVArenaNode *prev; - void * nptr; - char data[]; -} SVArenaNode; - -typedef struct { - SVArenaNode *inuse; - SVArenaNode node; -} SVArenaAllocator; - -typedef struct { - int8_t type; - uint64_t tsize; - T_REF_DECLARE() - union { - SVHeapAllocator vha; - SVArenaAllocator vaa; - }; -} SVMemAllocator; - -SMemAllocator *vnodeCreateMemAllocator(int8_t type, uint64_t tsize, uint64_t ssize /* step size only for arena */) { - SMemAllocator * pma; - uint64_t msize; - SVMemAllocator *pva; - - msize = sizeof(*pma) + sizeof(SVMemAllocator); - if (type == VNODE_ARENA_ALLOCATOR) { - msize += tsize; - } - - pma = (SMemAllocator *)calloc(1, msize); - if (pma == NULL) { - return NULL; - } - - pma->impl = POINTER_SHIFT(pma, sizeof(*pma)); - pva = (SVMemAllocator *)(pma->impl); - pva->type = type; - pva->tsize = tsize; - - if (type == VNODE_HEAP_ALLOCATOR) { - pma->malloc = NULL; - pma->calloc = NULL; - pma->realloc = NULL; - pma->free = NULL; - pma->usage = NULL; - } else if (type == VNODE_ARENA_ALLOCATOR) { - pma->malloc = NULL; - pma->calloc = NULL; - pma->realloc = NULL; - pma->free = NULL; - pma->usage = NULL; - } else { - ASSERT(0); - } - - return pma; -} - -void vnodeDestroyMemAllocator(SMemAllocator *pma) { - // TODO -} - -void vnodeRefMemAllocator(SMemAllocator *pma) { - // TODO -} - -void vnodeUnrefMemAllocator(SMemAllocator *pma) { - // TODO -} - -/* ------------------------ Heap Allocator IMPL ------------------------ */ - -/* ------------------------ Arena Allocator IMPL ------------------------ */ - -#endif \ No newline at end of file diff --git a/source/dnode/vnode/impl/test/CMakeLists.txt b/source/dnode/vnode/impl/test/CMakeLists.txt index 83506a4fde..e1226331e9 100644 --- a/source/dnode/vnode/impl/test/CMakeLists.txt +++ b/source/dnode/vnode/impl/test/CMakeLists.txt @@ -4,4 +4,9 @@ target_sources(vnodeApiTests PRIVATE "vnodeApiTests.cpp" ) -target_link_libraries(vnodeApiTests vnode gtest gtest_main) \ No newline at end of file +target_link_libraries(vnodeApiTests vnode gtest gtest_main) + +add_test( + NAME vnode_api_tests + COMMAND ${CMAKE_CURRENT_BINARY_DIR}/vnodeApiTests + ) \ No newline at end of file diff --git a/source/dnode/vnode/impl/test/vnodeApiTests.cpp b/source/dnode/vnode/impl/test/vnodeApiTests.cpp index df784181b7..ac2ccbc132 100644 --- a/source/dnode/vnode/impl/test/vnodeApiTests.cpp +++ b/source/dnode/vnode/impl/test/vnodeApiTests.cpp @@ -1,3 +1,14 @@ +/** + * @file vnodeApiTests.cpp + * @author hzcheng (hzcheng@taosdata.com) + * @brief VNODE module API tests + * @version 0.1 + * @date 2021-12-13 + * + * @copyright Copyright (c) 2021 + * + */ + #include #include diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index f8f2e2eadd..ec01f7d7fc 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -90,7 +90,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) { } static inline int walValidBodyCksum(SWalHead* pHead) { - return taosCheckChecksum((uint8_t*)pHead->head.cont, pHead->head.len, pHead->cksumBody); + return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody); } static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 705cf051be..49f4fde3a0 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -25,15 +25,15 @@ #include int64_t walGetFirstVer(SWal *pWal) { - return pWal->firstVersion; + return pWal->vers.firstVer; } int64_t walGetSnaphostVer(SWal *pWal) { - return pWal->snapshotVersion; + return pWal->vers.snapshotVer; } int64_t walGetLastVer(SWal *pWal) { - return pWal->lastVersion; + return pWal->vers.lastVer; } int walRollFileInfo(SWal* pWal) { @@ -42,7 +42,7 @@ int walRollFileInfo(SWal* pWal) { SArray* pArray = pWal->fileInfoSet; if(taosArrayGetSize(pArray) != 0) { WalFileInfo *pInfo = taosArrayGetLast(pArray); - pInfo->lastVer = pWal->lastVersion; + pInfo->lastVer = pWal->vers.lastVer; pInfo->closeTs = ts; } @@ -51,7 +51,7 @@ int walRollFileInfo(SWal* pWal) { if(pNewInfo == NULL) { return -1; } - pNewInfo->firstVer = pWal->lastVersion + 1; + pNewInfo->firstVer = pWal->vers.lastVer + 1; pNewInfo->lastVer = -1; pNewInfo->createTs = ts; pNewInfo->closeTs = -1; @@ -74,13 +74,13 @@ char* walMetaSerialize(SWal* pWal) { return NULL; } cJSON_AddItemToObject(pRoot, "meta", pMeta); - sprintf(buf, "%" PRId64, pWal->firstVersion); + sprintf(buf, "%" PRId64, pWal->vers.firstVer); cJSON_AddStringToObject(pMeta, "firstVer", buf); - sprintf(buf, "%" PRId64, pWal->snapshotVersion); + sprintf(buf, "%" PRId64, pWal->vers.snapshotVer); cJSON_AddStringToObject(pMeta, "snapshotVer", buf); - sprintf(buf, "%" PRId64, pWal->commitVersion); + sprintf(buf, "%" PRId64, pWal->vers.commitVer); cJSON_AddStringToObject(pMeta, "commitVer", buf); - sprintf(buf, "%" PRId64, pWal->lastVersion); + sprintf(buf, "%" PRId64, pWal->vers.lastVer); cJSON_AddStringToObject(pMeta, "lastVer", buf); cJSON_AddItemToObject(pRoot, "files", pFiles); @@ -116,13 +116,13 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { pRoot = cJSON_Parse(bytes); pMeta = cJSON_GetObjectItem(pRoot, "meta"); pField = cJSON_GetObjectItem(pMeta, "firstVer"); - pWal->firstVersion = atoll(cJSON_GetStringValue(pField)); + pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); - pWal->snapshotVersion = atoll(cJSON_GetStringValue(pField)); + pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "commitVer"); - pWal->commitVersion = atoll(cJSON_GetStringValue(pField)); + pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "lastVer"); - pWal->lastVersion = atoll(cJSON_GetStringValue(pField)); + pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField)); pFiles = cJSON_GetObjectItem(pRoot, "files"); int sz = cJSON_GetArraySize(pFiles); @@ -161,7 +161,7 @@ static int walFindCurMetaVer(SWal* pWal) { DIR *dir = opendir(pWal->path); if(dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return -1; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 3e47d18df6..7c100b4883 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -68,9 +68,12 @@ int32_t walInit() { } void walCleanUp() { + int old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0); + if(old == 0) { + return; + } walStopThread(); taosCloseRef(tsWal.refSetId); - atomic_store_8(&tsWal.inited, 0); wInfo("wal module is cleaned up"); } @@ -86,21 +89,15 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->writeCur = -1; //set config - pWal->vgId = pCfg->vgId; - pWal->fsyncPeriod = pCfg->fsyncPeriod; - pWal->rollPeriod = pCfg->rollPeriod; - pWal->segSize = pCfg->segSize; - pWal->retentionSize = pCfg->retentionSize; - pWal->retentionPeriod = pCfg->retentionPeriod; - pWal->level = pCfg->walLevel; + memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); //init version info - pWal->firstVersion = -1; - pWal->commitVersion = -1; - pWal->snapshotVersion = -1; - pWal->lastVersion = -1; + pWal->vers.firstVer = -1; + pWal->vers.commitVer = -1; + pWal->vers.snapshotVer = -1; + pWal->vers.lastVer = -1; - pWal->snapshottingVer = -1; + pWal->vers.verInSnapshotting = -1; pWal->totSize = 0; @@ -108,8 +105,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->lastRollSeq = -1; //init write buffer - memset(&pWal->head, 0, sizeof(SWalHead)); - pWal->head.head.sver = 0; + memset(&pWal->writeHead, 0, sizeof(SWalHead)); + pWal->writeHead.head.sver = 0; tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); @@ -129,7 +126,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } walReadMeta(pWal); - wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); + wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); return pWal; } @@ -137,17 +134,17 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR; - if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { - wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level, - pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); + if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod) { + wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->cfg.vgId, pWal->cfg.level, + pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod); return 0; } - wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level, - pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); + wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->cfg.vgId, pWal->cfg.level, + pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod); - pWal->level = pCfg->walLevel; - pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->cfg.level = pCfg->level; + pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod; pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; @@ -171,22 +168,22 @@ void walClose(SWal *pWal) { static int32_t walInitObj(SWal *pWal) { if (taosMkDir(pWal->path) != 0) { - wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); if(pWal->fileInfoSet == NULL) { - wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } - wDebug("vgId:%d, object is initialized", pWal->vgId); + wDebug("vgId:%d, object is initialized", pWal->cfg.vgId); return 0; } static void walFreeObj(void *wal) { SWal *pWal = wal; - wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); + wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal); tfClose(pWal->writeLogTfd); tfClose(pWal->writeIdxTfd); @@ -197,7 +194,7 @@ static void walFreeObj(void *wal) { } static bool walNeedFsync(SWal *pWal) { - if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { + if (pWal->cfg.fsyncPeriod <= 0 || pWal->cfg.level != TAOS_WAL_FSYNC) { return false; } @@ -217,10 +214,10 @@ static void walFsyncAll() { SWal *pWal = taosIterateRef(tsWal.refSetId, 0); while (pWal) { if (walNeedFsync(pWal)) { - wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); + wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); int32_t code = tfFsync(pWal->writeLogTfd); if (code != 0) { - wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); + wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(code)); } } pWal = taosIterateRef(tsWal.refSetId, pWal->refId); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 27dffddf83..554a5c846b 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -16,6 +16,147 @@ #include "walInt.h" #include "tfile.h" +SWalReadHandle* walOpenReadHandle(SWal* pWal) { + SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle)); + if(pRead == NULL) { + return NULL; + } + memset(pRead, 0, sizeof(SWalReadHandle)); + pRead->pWal = pWal; + pRead->readIdxTfd = -1; + pRead->readLogTfd = -1; + return NULL; +} + +void walCloseReadHandle(SWalReadHandle *pRead) { + tfClose(pRead->readIdxTfd); + tfClose(pRead->readLogTfd); + free(pRead); +} + +int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { + return 0; +} + +static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { + int code = 0; + + int64_t idxTfd = pRead->readIdxTfd; + int64_t logTfd = pRead->readLogTfd; + + //seek position + int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE; + code = tfLseek(idxTfd, offset, SEEK_SET); + if(code != 0) { + return -1; + } + WalIdxEntry entry; + code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry)); + if(code != 0) { + return -1; + } + //TODO:deserialize + ASSERT(entry.ver == ver); + code = tfLseek(logTfd, entry.offset, SEEK_SET); + if (code != 0) { + return -1; + } + return code; +} + +static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { + char fnameStr[WAL_FILE_LEN]; + + tfClose(pRead->readIdxTfd); + tfClose(pRead->readLogTfd); + + walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); + int logTfd = tfOpenRead(fnameStr); + if(logTfd < 0) { + return -1; + } + + walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); + int idxTfd = tfOpenRead(fnameStr); + if(idxTfd < 0) { + return -1; + } + + pRead->readLogTfd = logTfd; + pRead->readIdxTfd = idxTfd; + return 0; +} + +static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { + int code; + SWal *pWal = pRead->pWal; + if(ver == pWal->vers.lastVer) { + return 0; + } + if(ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { + return -1; + } + if(ver < pWal->vers.snapshotVer) { + + } + + WalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + //bsearch in fileSet + WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + if(pRead->curFileFirstVer != pRet->firstVer) { + code = walReadChangeFile(pRead, pRet->firstVer); + if(code < 0) { + //TODO: set error flag + return -1; + } + } + + code = walReadSeekFilePos(pRead, pRet->firstVer, ver); + if(code < 0) { + return -1; + } + pRead->curVersion = ver; + + return 0; +} + +int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { + int code; + //TODO: check wal life + if(pRead->curVersion != ver) { + walReadSeekVer(pRead, ver); + } + + if(!tfValid(pRead->readLogTfd)) return -1; + + if(sizeof(SWalHead) != tfRead(pRead->readLogTfd, &pRead->head, sizeof(SWalHead))) { + return -1; + } + code = walValidHeadCksum(&pRead->head); + if(code != 0) { + return -1; + } + if(pRead->capacity < pRead->head.head.len) { + void* ptr = realloc(pRead, pRead->head.head.len); + if(ptr == NULL) { + return -1; + } + pRead = ptr; + pRead->capacity = pRead->head.head.len; + } + if(pRead->head.head.len != tfRead(pRead->readLogTfd, &pRead->head.head.body, pRead->head.head.len)) { + return -1; + } + code = walValidBodyCksum(&pRead->head); + if(code != 0) { + return -1; + } + + return 0; +} + int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { int code; code = walSeekVer(pWal, ver); @@ -42,7 +183,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { *ppHead = NULL; return -1; } - if(tfRead(pWal->writeLogTfd, (*ppHead)->head.cont, (*ppHead)->head.len) != (*ppHead)->head.len) { + if(tfRead(pWal->writeLogTfd, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) { return -1; } //TODO: endian compatibility processing after read diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 48272f8f32..953aae703c 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -78,10 +78,12 @@ int walChangeFile(SWal *pWal, int64_t ver) { code = tfClose(pWal->writeLogTfd); if(code != 0) { //TODO + return -1; } code = tfClose(pWal->writeIdxTfd); if(code != 0) { //TODO + return -1; } WalFileInfo tmpInfo; tmpInfo.firstVer = ver; @@ -106,24 +108,19 @@ int walChangeFile(SWal *pWal, int64_t ver) { pWal->writeLogTfd = logTfd; pWal->writeIdxTfd = idxTfd; - return code; -} - -int walGetVerOffset(SWal* pWal, int64_t ver) { - int code; - return 0; + return fileFirstVer; } int walSeekVer(SWal *pWal, int64_t ver) { int code; - if(ver == pWal->lastVersion) { + if(ver == pWal->vers.lastVer) { return 0; } - if(ver > pWal->lastVersion || ver < pWal->firstVersion) { + if(ver > pWal->vers.lastVer|| ver < pWal->vers.firstVer) { return -1; } - if(ver < pWal->snapshotVersion) { - //TODO: set flag to prevent roll back + if(ver < pWal->vers.snapshotVer) { + } if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { code = walChangeFile(pWal, ver); diff --git a/source/libs/wal/src/walUtil.c b/source/libs/wal/src/walUtil.c index c88cc918fe..849d0c3e51 100644 --- a/source/libs/wal/src/walUtil.c +++ b/source/libs/wal/src/walUtil.c @@ -17,6 +17,7 @@ #include "os.h" #include "walInt.h" +#if 0 int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { int64_t curFileId = *nextFileId; int64_t minFileId = INT64_MAX; @@ -116,3 +117,4 @@ int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { return 0; } +#endif diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 972ac5c682..44e8cec153 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -114,22 +114,22 @@ void walRemoveAllOldFiles(void *handle) { #endif int32_t walCommit(SWal *pWal, int64_t ver) { - ASSERT(pWal->commitVersion >= pWal->snapshotVersion); - ASSERT(pWal->commitVersion <= pWal->lastVersion); - if(ver < pWal->commitVersion || ver > pWal->lastVersion) { + ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); + ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); + if(ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) { return -1; } - pWal->commitVersion = ver; + pWal->vers.commitVer = ver; return 0; } int32_t walRollback(SWal *pWal, int64_t ver) { int code; char fnameStr[WAL_FILE_LEN]; - if(ver == pWal->lastVersion) { + if(ver == pWal->vers.lastVer) { return 0; } - if(ver > pWal->lastVersion || ver < pWal->commitVersion) { + if(ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { return -1; } pthread_mutex_lock(&pWal->mutex); @@ -220,7 +220,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { if(code < 0) { return -1; } - pWal->lastVersion = ver - 1; + pWal->vers.lastVer = ver - 1; ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; @@ -230,9 +230,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { - pWal->snapshottingVer = ver; + pWal->vers.verInSnapshotting = ver; //check file rolling - if(pWal->retentionPeriod == 0) { + if(pWal->cfg.retentionPeriod == 0) { walRoll(pWal); } @@ -240,10 +240,10 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { } int32_t walEndTakeSnapshot(SWal *pWal) { - int64_t ver = pWal->snapshottingVer; + int64_t ver = pWal->vers.verInSnapshotting; if(ver == -1) return -1; - pWal->snapshotVersion = ver; + pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); int deleteCnt = 0; @@ -257,8 +257,8 @@ int32_t walEndTakeSnapshot(SWal *pWal) { } //iterate files, until the searched result for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if(pWal->totSize > pWal->retentionSize || - iter->closeTs + pWal->retentionPeriod > ts) { + if(pWal->totSize > pWal->cfg.retentionSize || + iter->closeTs + pWal->cfg.retentionPeriod > ts) { //delete according to file size or close time deleteCnt++; newTotSize -= iter->fileSize; @@ -278,13 +278,13 @@ int32_t walEndTakeSnapshot(SWal *pWal) { taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); if(taosArrayGetSize(pWal->fileInfoSet) == 0) { pWal->writeCur = -1; - pWal->firstVersion = -1; + pWal->vers.firstVer = -1; } else { - pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + pWal->vers.firstVer = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; } pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;; pWal->totSize = newTotSize; - pWal->snapshottingVer = -1; + pWal->vers.verInSnapshotting = -1; //save snapshot ver, commit ver int code = walWriteMeta(pWal); @@ -311,7 +311,7 @@ int walRoll(SWal *pWal) { } int64_t idxTfd, logTfd; //create new file - int64_t newFileFirstVersion = pWal->lastVersion + 1; + int64_t newFileFirstVersion = pWal->vers.lastVer + 1; char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, newFileFirstVersion, fnameStr); idxTfd = tfOpenCreateWrite(fnameStr); @@ -357,18 +357,18 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i int code = 0; // no wal - if (pWal->level == TAOS_WAL_NOLOG) return 0; + if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0; - if (index == pWal->lastVersion + 1) { + if (index == pWal->vers.lastVer + 1) { if(taosArrayGetSize(pWal->fileInfoSet) == 0) { - pWal->firstVersion = index; + pWal->vers.firstVer = index; code = walRoll(pWal); ASSERT(code == 0); } else { int64_t passed = walGetSeq() - pWal->lastRollSeq; - if(pWal->rollPeriod != -1 && pWal->rollPeriod != 0 && passed > pWal->rollPeriod) { + if(pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) { walRoll(pWal); - } else if(pWal->segSize != -1 && pWal->segSize != 0 && walGetLastFileSize(pWal) > pWal->segSize) { + } else if(pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) { walRoll(pWal); } } @@ -380,23 +380,23 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i /*if (!tfValid(pWal->curLogTfd)) return 0;*/ pthread_mutex_lock(&pWal->mutex); - pWal->head.head.version = index; + pWal->writeHead.head.version = index; - pWal->head.head.len = bodyLen; - pWal->head.head.msgType = msgType; - pWal->head.cksumHead = walCalcHeadCksum(&pWal->head); - pWal->head.cksumBody = walCalcBodyCksum(body, bodyLen); + pWal->writeHead.head.len = bodyLen; + pWal->writeHead.head.msgType = msgType; + pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); + pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); - if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { + if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); if(code != 0) { @@ -405,7 +405,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } //set status - pWal->lastVersion = index; + pWal->vers.lastVer = index; pWal->totSize += sizeof(SWalHead) + bodyLen; walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; @@ -416,10 +416,10 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } void walFsync(SWal *pWal, bool forceFsync) { - if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { - wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); + if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { + wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); if (tfFsync(pWal->writeLogTfd) < 0) { - wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno)); } } } @@ -492,29 +492,29 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { } #endif -static int walValidateOffset(SWal* pWal, int64_t ver) { - int code = 0; - SWalHead *pHead = NULL; - code = (int)walRead(pWal, &pHead, ver); - if(pHead->head.version != ver) { - return -1; - } - return 0; -} +/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/ + /*int code = 0;*/ + /*SWalHead *pHead = NULL;*/ + /*code = (int)walRead(pWal, &pHead, ver);*/ + /*if(pHead->head.version != ver) {*/ + /*return -1;*/ + /*}*/ + /*return 0;*/ +/*}*/ -static int64_t walGetOffset(SWal* pWal, int64_t ver) { - int code = walSeekVer(pWal, ver); - if(code != 0) { - return -1; - } +/*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/ + /*int code = walSeekVer(pWal, ver);*/ + /*if(code != 0) {*/ + /*return -1;*/ + /*}*/ - code = walValidateOffset(pWal, ver); - if(code != 0) { - return -1; - } + /*code = walValidateOffset(pWal, ver);*/ + /*if(code != 0) {*/ + /*return -1;*/ + /*}*/ - return 0; -} + /*return 0;*/ +/*}*/ #if 0 static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index c81343d03b..504f1ada3f 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -24,7 +24,7 @@ class WalCleanEnv : public ::testing::Test { pCfg->segSize = -1; pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; - pCfg->walLevel = TAOS_WAL_FSYNC; + pCfg->level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, pCfg); free(pCfg); ASSERT(pWal != NULL); @@ -56,7 +56,7 @@ class WalCleanDeleteEnv : public ::testing::Test { memset(pCfg, 0, sizeof(SWalCfg)); pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; - pCfg->walLevel = TAOS_WAL_FSYNC; + pCfg->level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, pCfg); free(pCfg); ASSERT(pWal != NULL); @@ -95,7 +95,7 @@ class WalKeepEnv : public ::testing::Test { pCfg->segSize = -1; pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; - pCfg->walLevel = TAOS_WAL_FSYNC; + pCfg->level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, pCfg); free(pCfg); ASSERT(pWal != NULL); @@ -164,18 +164,18 @@ TEST_F(WalKeepEnv, readOldMeta) { for(int i = 0; i < 10; i++) { code = walWrite(pWal, i, i+1, (void*)ranStr, len); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); code = walWrite(pWal, i+2, i, (void*)ranStr, len); ASSERT_EQ(code, -1); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); } char* oldss = walMetaSerialize(pWal); TearDown(); SetUp(); - ASSERT_EQ(pWal->firstVersion, 0); - ASSERT_EQ(pWal->lastVersion, 9); + ASSERT_EQ(pWal->vers.firstVer, 0); + ASSERT_EQ(pWal->vers.lastVer, 9); char* newss = walMetaSerialize(pWal); @@ -195,10 +195,10 @@ TEST_F(WalCleanEnv, write) { for(int i = 0; i < 10; i++) { code = walWrite(pWal, i, i+1, (void*)ranStr, len); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); code = walWrite(pWal, i+2, i, (void*)ranStr, len); ASSERT_EQ(code, -1); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); } code = walWriteMeta(pWal); ASSERT_EQ(code, 0); @@ -211,14 +211,14 @@ TEST_F(WalCleanEnv, rollback) { for(int i = 0; i < 10; i++) { code = walWrite(pWal, i, i+1, (void*)ranStr, len); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); } code = walRollback(pWal, 5); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, 4); + ASSERT_EQ(pWal->vers.lastVer, 4); code = walRollback(pWal, 3); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, 2); + ASSERT_EQ(pWal->vers.lastVer, 2); code = walWriteMeta(pWal); ASSERT_EQ(code, 0); } @@ -231,16 +231,16 @@ TEST_F(WalCleanDeleteEnv, roll) { for(i = 0; i < 100; i++) { code = walWrite(pWal, i, 0, (void*)ranStr, len); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); code = walCommit(pWal, i); - ASSERT_EQ(pWal->commitVersion, i); + ASSERT_EQ(pWal->vers.commitVer, i); } walBeginTakeSnapshot(pWal, i-1); - ASSERT_EQ(pWal->snapshottingVer, i-1); + ASSERT_EQ(pWal->vers.verInSnapshotting, i-1); walEndTakeSnapshot(pWal); - ASSERT_EQ(pWal->snapshotVersion, i-1); - ASSERT_EQ(pWal->snapshottingVer, -1); + ASSERT_EQ(pWal->vers.snapshotVer, i-1); + ASSERT_EQ(pWal->vers.verInSnapshotting, -1); code = walWrite(pWal, 5, 0, (void*)ranStr, len); ASSERT_NE(code, 0); @@ -249,7 +249,7 @@ TEST_F(WalCleanDeleteEnv, roll) { code = walWrite(pWal, i, 0, (void*)ranStr, len); ASSERT_EQ(code, 0); code = walCommit(pWal, i); - ASSERT_EQ(pWal->commitVersion, i); + ASSERT_EQ(pWal->vers.commitVer, i); } //code = walWriteMeta(pWal); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 6216188496..581a797343 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -250,7 +250,7 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { if(pArray->size == 0) { return; } - memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size); + memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size * pArray->elemSize); } void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {