From 3efe621526c2b94367108a0ff72e6fb128dfbdfc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 17:44:47 +0800 Subject: [PATCH 01/12] integration with wal module --- include/dnode/mnode/sdb/sdb.h | 13 ++++++++-- source/dnode/mnode/impl/CMakeLists.txt | 1 + source/dnode/mnode/impl/inc/mndInt.h | 3 +++ source/dnode/mnode/impl/src/mndSync.c | 34 +++++++++++++++++++++++++- source/dnode/mnode/sdb/inc/sdbInt.h | 1 + source/dnode/mnode/sdb/src/sdb.c | 5 ++++ 6 files changed, 54 insertions(+), 3 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 9373e258be..fa10d46878 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -260,7 +260,7 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2 * * @param pSdb The sdb object. * @param pIter The type of the table. - * @record int32_t The number of rows in the table + * @return int32_t The number of rows in the table */ int32_t sdbGetSize(SSdb *pSdb, ESdbType type); @@ -269,10 +269,19 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type); * * @param pSdb The sdb object. * @param pIter The type of the table. - * @record int32_t The max id of the table + * @return int32_t The max id of the table */ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type); +/** + * @brief Update the version of sdb + * + * @param pSdb The sdb object. + * @param val The update value of the version. + * @return int32_t The current version of sdb + */ +int64_t sdbUpdateVer(SSdb *pSdb, int32_t val); + SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val); diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 98c604e520..6768651922 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -8,6 +8,7 @@ target_include_directories( target_link_libraries( mnode PRIVATE sdb + PRIVATE wal PRIVATE transport PRIVATE cjson PRIVATE sync diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 15ff65a8fc..5c8d409d90 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -17,11 +17,13 @@ #define _TD_MND_INT_H_ #include "mndDef.h" + #include "sdb.h" #include "tcache.h" #include "tep.h" #include "tqueue.h" #include "ttime.h" +#include "wal.h" #ifdef __cplusplus extern "C" { @@ -65,6 +67,7 @@ typedef struct { typedef struct { int32_t errCode; sem_t syncSem; + SWal *pWal; SSyncNode *pSyncNode; ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6a2fca836f..d3d4ef33e9 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -20,6 +20,21 @@ int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; tsem_init(&pMgmt->syncSem, 0, 0); + char path[PATH_MAX] = {0}; + snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); + SWalCfg cfg = {.vgId = 1, + .fsyncPeriod = 0, + .rollPeriod = -1, + .segSize = -1, + .retentionPeriod = 0, + .retentionSize = 0, + .level = TAOS_WAL_FSYNC}; + pMgmt->pWal = walOpen(path, &cfg); + if (pMgmt->pWal == NULL) { + mError("failed to open wal in %s since %s", path, terrstr()); + return -1; + } + pMgmt->state = TAOS_SYNC_STATE_LEADER; pMgmt->pSyncNode = NULL; return 0; @@ -27,7 +42,11 @@ int32_t mndInitSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - tsem_destroy(&pMgmt->syncSem); + if (pMgmt->pWal != NULL) { + walClose(pMgmt->pWal); + pMgmt->pWal = NULL; + tsem_destroy(&pMgmt->syncSem); + } } static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { @@ -41,6 +60,19 @@ static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSync } int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { + SWal *pWal = pMnode->syncMgmt.pWal; + SSdb *pSdb = pMnode->pSdb; + + int64_t ver = sdbUpdateVer(pSdb, 1); + if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) { + sdbUpdateVer(pSdb, -1); + mError("failed to write raw:%p since %s, ver:%" PRId64, pRaw, terrstr(), ver); + return -1; + } + + mTrace("raw:%p has been write to wal, ver:%" PRId64, pRaw, ver); + walFsync(pWal, true); + #if 1 return 0; #else diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 070aa56944..98c822cae8 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -59,6 +59,7 @@ typedef struct SSdb { char *tmpDir; int64_t lastCommitVer; int64_t curVer; + int64_t tableVer[SDB_MAX]; int32_t maxId[SDB_MAX]; EKeyType keyTypes[SDB_MAX]; SHashObj *hashObjs[SDB_MAX]; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 97bc0ecbdb..7df5052d6e 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -159,3 +159,8 @@ static int32_t sdbCreateDir(SSdb *pSdb) { return 0; } + +int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { + pSdb->curVer += val; + return val; +} \ No newline at end of file From ab9366215590a2ce5139525e9558bc3ee877df81 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 17:54:54 +0800 Subject: [PATCH 02/12] Move the location of the test cases --- source/dnode/mgmt/impl/test/CMakeLists.txt | 2 -- source/dnode/mnode/impl/CMakeLists.txt | 6 +++++- source/dnode/mnode/impl/test/CMakeLists.txt | 4 ++++ source/dnode/{mgmt => mnode}/impl/test/acct/CMakeLists.txt | 0 source/dnode/{mgmt => mnode}/impl/test/acct/acct.cpp | 0 source/dnode/mnode/impl/test/mnodeTests.cpp | 0 source/dnode/{mgmt => mnode}/impl/test/user/CMakeLists.txt | 0 source/dnode/{mgmt => mnode}/impl/test/user/user.cpp | 0 8 files changed, 9 insertions(+), 3 deletions(-) create mode 100644 source/dnode/mnode/impl/test/CMakeLists.txt rename source/dnode/{mgmt => mnode}/impl/test/acct/CMakeLists.txt (100%) rename source/dnode/{mgmt => mnode}/impl/test/acct/acct.cpp (100%) delete mode 100644 source/dnode/mnode/impl/test/mnodeTests.cpp rename source/dnode/{mgmt => mnode}/impl/test/user/CMakeLists.txt (100%) rename source/dnode/{mgmt => mnode}/impl/test/user/user.cpp (100%) diff --git a/source/dnode/mgmt/impl/test/CMakeLists.txt b/source/dnode/mgmt/impl/test/CMakeLists.txt index b0596bed08..b36cdbd690 100644 --- a/source/dnode/mgmt/impl/test/CMakeLists.txt +++ b/source/dnode/mgmt/impl/test/CMakeLists.txt @@ -1,6 +1,5 @@ enable_testing() -add_subdirectory(acct) # add_subdirectory(auth) # add_subdirectory(balance) add_subdirectory(cluster) @@ -17,7 +16,6 @@ add_subdirectory(stb) # add_subdirectory(sync) # add_subdirectory(telem) # add_subdirectory(trans) -add_subdirectory(user) add_subdirectory(vgroup) add_subdirectory(sut) diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 6768651922..adbef3b55f 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -12,4 +12,8 @@ target_link_libraries( PRIVATE transport PRIVATE cjson PRIVATE sync -) \ No newline at end of file +) + +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt new file mode 100644 index 0000000000..fa7b45f988 --- /dev/null +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -0,0 +1,4 @@ +enable_testing() + +add_subdirectory(acct) +add_subdirectory(user) diff --git a/source/dnode/mgmt/impl/test/acct/CMakeLists.txt b/source/dnode/mnode/impl/test/acct/CMakeLists.txt similarity index 100% rename from source/dnode/mgmt/impl/test/acct/CMakeLists.txt rename to source/dnode/mnode/impl/test/acct/CMakeLists.txt diff --git a/source/dnode/mgmt/impl/test/acct/acct.cpp b/source/dnode/mnode/impl/test/acct/acct.cpp similarity index 100% rename from source/dnode/mgmt/impl/test/acct/acct.cpp rename to source/dnode/mnode/impl/test/acct/acct.cpp diff --git a/source/dnode/mnode/impl/test/mnodeTests.cpp b/source/dnode/mnode/impl/test/mnodeTests.cpp deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/dnode/mgmt/impl/test/user/CMakeLists.txt b/source/dnode/mnode/impl/test/user/CMakeLists.txt similarity index 100% rename from source/dnode/mgmt/impl/test/user/CMakeLists.txt rename to source/dnode/mnode/impl/test/user/CMakeLists.txt diff --git a/source/dnode/mgmt/impl/test/user/user.cpp b/source/dnode/mnode/impl/test/user/user.cpp similarity index 100% rename from source/dnode/mgmt/impl/test/user/user.cpp rename to source/dnode/mnode/impl/test/user/user.cpp From 0e8c226085bc8acb0154a52ff623c586014007f2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 18:19:30 +0800 Subject: [PATCH 03/12] the starting version of wal is 0 --- source/dnode/mnode/impl/test/acct/CMakeLists.txt | 8 ++++---- source/dnode/mnode/impl/test/acct/acct.cpp | 4 ++-- source/dnode/mnode/impl/test/user/CMakeLists.txt | 8 ++++---- source/dnode/mnode/impl/test/user/user.cpp | 4 ++-- source/dnode/mnode/sdb/src/sdb.c | 4 +++- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/source/dnode/mnode/impl/test/acct/CMakeLists.txt b/source/dnode/mnode/impl/test/acct/CMakeLists.txt index a06becd127..8c8bf54bc4 100644 --- a/source/dnode/mnode/impl/test/acct/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/acct/CMakeLists.txt @@ -1,11 +1,11 @@ aux_source_directory(. ACCT_SRC) -add_executable(dnode_test_acct ${ACCT_SRC}) +add_executable(mnode_test_acct ${ACCT_SRC}) target_link_libraries( - dnode_test_acct + mnode_test_acct PUBLIC sut ) add_test( - NAME dnode_test_acct - COMMAND dnode_test_acct + NAME mnode_test_acct + COMMAND mnode_test_acct ) diff --git a/source/dnode/mnode/impl/test/acct/acct.cpp b/source/dnode/mnode/impl/test/acct/acct.cpp index be0ad4ab0f..934a2d96b4 100644 --- a/source/dnode/mnode/impl/test/acct/acct.cpp +++ b/source/dnode/mnode/impl/test/acct/acct.cpp @@ -1,7 +1,7 @@ /** * @file acct.cpp * @author slguan (slguan@taosdata.com) - * @brief DNODE module acct-msg tests + * @brief MNODE module acct-msg tests * @version 0.1 * @date 2021-12-15 * @@ -13,7 +13,7 @@ class DndTestAcct : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/dnode_test_acct", 9012); } + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_acct", 9012); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; diff --git a/source/dnode/mnode/impl/test/user/CMakeLists.txt b/source/dnode/mnode/impl/test/user/CMakeLists.txt index 5068660eba..c6aeef7221 100644 --- a/source/dnode/mnode/impl/test/user/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/user/CMakeLists.txt @@ -1,11 +1,11 @@ aux_source_directory(. USER_SRC) -add_executable(dnode_test_user ${USER_SRC}) +add_executable(mnode_test_user ${USER_SRC}) target_link_libraries( - dnode_test_user + mnode_test_user PUBLIC sut ) add_test( - NAME dnode_test_user - COMMAND dnode_test_user + NAME mnode_test_user + COMMAND mnode_test_user ) diff --git a/source/dnode/mnode/impl/test/user/user.cpp b/source/dnode/mnode/impl/test/user/user.cpp index aa160304c8..00153e778c 100644 --- a/source/dnode/mnode/impl/test/user/user.cpp +++ b/source/dnode/mnode/impl/test/user/user.cpp @@ -1,7 +1,7 @@ /** * @file user.cpp * @author slguan (slguan@taosdata.com) - * @brief DNODE module user-msg tests + * @brief MNODE module user-msg tests * @version 0.1 * @date 2021-12-15 * @@ -13,7 +13,7 @@ class DndTestUser : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/dnode_test_user", 9140); } + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_user", 9140); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 7df5052d6e..fb6ac7bb37 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -51,6 +51,8 @@ SSdb *sdbInit(SSdbOpt *pOption) { taosInitRWLatch(&pSdb->locks[i]); } + pSdb->curVer = -1; + pSdb->lastCommitVer = -1; pSdb->pMnode = pOption->pMnode; mDebug("sdb init successfully"); return pSdb; @@ -162,5 +164,5 @@ static int32_t sdbCreateDir(SSdb *pSdb) { int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { pSdb->curVer += val; - return val; + return pSdb->curVer; } \ No newline at end of file From 5ac3398f3add7d99a2c5b6f82dc0cb93eee2fdf9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 21:36:31 +0800 Subject: [PATCH 04/12] Mnode intergate with wal module --- include/dnode/mnode/sdb/sdb.h | 8 ++ include/util/taoserror.h | 1 + source/dnode/mnode/impl/src/mndSync.c | 103 +++++++++++++++++++++++--- source/dnode/mnode/sdb/inc/sdbInt.h | 2 - source/libs/wal/inc/walInt.h | 2 + source/libs/wal/src/walRead.c | 2 + source/util/src/terror.c | 1 + 7 files changed, 106 insertions(+), 13 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index fa10d46878..48c8df5ba0 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -188,6 +188,14 @@ int32_t sdbDeploy(SSdb *pSdb); */ int32_t sdbReadFile(SSdb *pSdb); +/** + * @brief Write sdb file. + * + * @param pSdb The sdb object. + * @return int32_t 0 for success, -1 for failure. + */ +int32_t sdbWriteFile(SSdb *pSdb); + /** * @brief Parse and write raw data to sdb, then free the pRaw object * diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ae36ac7216..2dcc74213c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -160,6 +160,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0339) #define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x033A) #define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x033B) +#define TSDB_CODE_SDB_INVALID_WAl_VER TAOS_DEF_ERROR_CODE(0, 0x033C) // mnode-dnode #define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d3d4ef33e9..3f5bb77855 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE #include "mndSync.h" +#include "mndTrans.h" -int32_t mndInitSync(SMnode *pMnode) { +static int32_t mndInitWal(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - tsem_init(&pMgmt->syncSem, 0, 0); char path[PATH_MAX] = {0}; snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); @@ -26,12 +26,95 @@ int32_t mndInitSync(SMnode *pMnode) { .fsyncPeriod = 0, .rollPeriod = -1, .segSize = -1, - .retentionPeriod = 0, - .retentionSize = 0, + .retentionPeriod = -1, + .retentionSize = -1, .level = TAOS_WAL_FSYNC}; pMgmt->pWal = walOpen(path, &cfg); - if (pMgmt->pWal == NULL) { - mError("failed to open wal in %s since %s", path, terrstr()); + if (pMgmt->pWal == NULL) return -1; + + return 0; +} + +static void mndCloseWal(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + if (pMgmt->pWal != NULL) { + walClose(pMgmt->pWal); + pMgmt->pWal = NULL; + } +} + +static int32_t mndRestoreWal(SMnode *pMnode) { + SWal *pWal = pMnode->syncMgmt.pWal; + SSdb *pSdb = pMnode->pSdb; + int64_t lastSdbVer = sdbUpdateVer(pSdb, 0); + int32_t code = -1; + + SWalReadHandle *pHandle = walOpenReadHandle(pWal); + if (pHandle == NULL) return -1; + + int64_t start = walGetFirstVer(pWal); + int64_t end = walGetLastVer(pWal); + start = MAX(lastSdbVer, start); + + for (int64_t ver = start; ver >= 0 && ver <= end; ++ver) { + if (walReadWithHandle(pHandle, ver) < 0) { + mError("failed to read with wal handle since %s, ver:%" PRId64, terrstr(), ver); + goto WAL_RESTORE_OVER; + } + + SWalHead *pHead = pHandle->pHead; + int64_t sdbVer = sdbUpdateVer(pSdb, 0); + if (sdbVer + 1 != ver) { + terrno = TSDB_CODE_SDB_INVALID_WAl_VER; + mError("failed to write wal to sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver); + goto WAL_RESTORE_OVER; + } + + if (sdbWriteNotFree(pSdb, (void *)pHead->head.body) < 0) { + mError("failed to write wal to sdb since %s, ver:%" PRId64, terrstr(), ver); + goto WAL_RESTORE_OVER; + } + + sdbUpdateVer(pSdb, 1); + } + + int64_t sdbVer = sdbUpdateVer(pSdb, 0); + if (sdbVer != lastSdbVer) { + if (walBeginSnapshot(pWal, sdbVer) < 0) { + goto WAL_RESTORE_OVER; + } + + if (sdbVer != lastSdbVer) { + mInfo("sdb restore wal from %" PRId64 " to %" PRId64, lastSdbVer, sdbVer); + if (sdbWriteFile(pSdb) != 0) { + goto WAL_RESTORE_OVER; + } + } + + if (walEndSnapshot(pWal) < 0) { + goto WAL_RESTORE_OVER; + } + } + + code = 0; + +WAL_RESTORE_OVER: + walCloseReadHandle(pHandle); + return 0; +} + +int32_t mndInitSync(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_init(&pMgmt->syncSem, 0, 0); + + if (mndInitWal(pMnode) < 0) { + mError("failed to open wal since %s", terrstr()); + return -1; + } + + if (mndRestoreWal(pMnode) < 0) { + mError("failed to restore wal since %s", terrstr()); return -1; } @@ -42,11 +125,8 @@ int32_t mndInitSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - if (pMgmt->pWal != NULL) { - walClose(pMgmt->pWal); - pMgmt->pWal = NULL; - tsem_destroy(&pMgmt->syncSem); - } + tsem_destroy(&pMgmt->syncSem); + mndCloseWal(pMnode); } static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { @@ -71,6 +151,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { } mTrace("raw:%p has been write to wal, ver:%" PRId64, pRaw, ver); + walCommit(pWal, ver); walFsync(pWal, true); #if 1 diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 98c822cae8..25db988a0c 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -72,8 +72,6 @@ typedef struct SSdb { SdbDecodeFp decodeFps[SDB_MAX]; } SSdb; -int32_t sdbWriteFile(SSdb *pSdb); - const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 1579cad7b6..871c95193f 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -20,6 +20,8 @@ #include "tchecksum.h" #include "wal.h" +#include "taoserror.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index c80fb4eed8..b5a30e4397 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -19,8 +19,10 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) { SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle)); if (pRead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pRead->pWal = pWal; pRead->readIdxTfd = -1; pRead->readLogTfd = -1; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9fa5b3198b..e821f1f803 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -170,6 +170,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_WAl_VER, "Invalid wal version") // mnode-dnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "Dnode already exists") From 99e53317ce03583c3acbbc056c25276c4bd32d90 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 21:41:45 +0800 Subject: [PATCH 05/12] minor changes --- source/dnode/mnode/impl/src/mndSync.c | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3f5bb77855..7c8a8cdb96 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -37,7 +37,6 @@ static int32_t mndInitWal(SMnode *pMnode) { static void mndCloseWal(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - if (pMgmt->pWal != NULL) { walClose(pMgmt->pWal); pMgmt->pWal = NULL; @@ -59,7 +58,7 @@ static int32_t mndRestoreWal(SMnode *pMnode) { for (int64_t ver = start; ver >= 0 && ver <= end; ++ver) { if (walReadWithHandle(pHandle, ver) < 0) { - mError("failed to read with wal handle since %s, ver:%" PRId64, terrstr(), ver); + mError("failed to read by wal handle since %s, ver:%" PRId64, terrstr(), ver); goto WAL_RESTORE_OVER; } @@ -67,12 +66,12 @@ static int32_t mndRestoreWal(SMnode *pMnode) { int64_t sdbVer = sdbUpdateVer(pSdb, 0); if (sdbVer + 1 != ver) { terrno = TSDB_CODE_SDB_INVALID_WAl_VER; - mError("failed to write wal to sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver); + mError("failed to read wal from sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver); goto WAL_RESTORE_OVER; } if (sdbWriteNotFree(pSdb, (void *)pHead->head.body) < 0) { - mError("failed to write wal to sdb since %s, ver:%" PRId64, terrstr(), ver); + mError("failed to read wal from sdb since %s, ver:%" PRId64, terrstr(), ver); goto WAL_RESTORE_OVER; } @@ -85,11 +84,9 @@ static int32_t mndRestoreWal(SMnode *pMnode) { goto WAL_RESTORE_OVER; } - if (sdbVer != lastSdbVer) { - mInfo("sdb restore wal from %" PRId64 " to %" PRId64, lastSdbVer, sdbVer); - if (sdbWriteFile(pSdb) != 0) { - goto WAL_RESTORE_OVER; - } + mInfo("sdb restore wal from %" PRId64 " to %" PRId64, lastSdbVer, sdbVer); + if (sdbWriteFile(pSdb) != 0) { + goto WAL_RESTORE_OVER; } if (walEndSnapshot(pWal) < 0) { @@ -101,7 +98,7 @@ static int32_t mndRestoreWal(SMnode *pMnode) { WAL_RESTORE_OVER: walCloseReadHandle(pHandle); - return 0; + return code; } int32_t mndInitSync(SMnode *pMnode) { From efb3d812b183e7831717045ad158a036cd4b2f68 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 4 Jan 2022 11:23:54 +0800 Subject: [PATCH 06/12] add commit version to sdb --- include/dnode/mnode/sdb/sdb.h | 37 ++++---- source/dnode/mnode/sdb/inc/sdbInt.h | 5 +- source/dnode/mnode/sdb/src/sdb.c | 11 ++- source/dnode/mnode/sdb/src/sdbFile.c | 128 +++++++++++++++++++++++++-- source/dnode/mnode/sdb/src/sdbHash.c | 37 ++++---- 5 files changed, 171 insertions(+), 47 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 48c8df5ba0..497da71c13 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -102,25 +102,24 @@ typedef enum { } ESdbStatus; typedef enum { - SDB_START = 0, - SDB_TRANS = 1, - SDB_CLUSTER = 2, - SDB_MNODE = 3, - SDB_QNODE = 4, - SDB_SNODE = 5, - SDB_BNODE = 6, - SDB_DNODE = 7, - SDB_USER = 8, - SDB_AUTH = 9, - SDB_ACCT = 10, - SDB_CONSUMER = 11, - SDB_CGROUP = 12, - SDB_TOPIC = 13, - SDB_VGROUP = 14, - SDB_STB = 15, - SDB_DB = 16, - SDB_FUNC = 17, - SDB_MAX = 18 + SDB_TRANS = 0, + SDB_CLUSTER = 1, + SDB_MNODE = 2, + SDB_QNODE = 3, + SDB_SNODE = 4, + SDB_BNODE = 5, + SDB_DNODE = 6, + SDB_USER = 7, + SDB_AUTH = 8, + SDB_ACCT = 9, + SDB_CONSUMER = 10, + SDB_CGROUP = 11, + SDB_TOPIC = 12, + SDB_VGROUP = 13, + SDB_STB = 14, + SDB_DB = 15, + SDB_FUNC = 16, + SDB_MAX = 17 } ESdbType; typedef struct SSdb SSdb; diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 25db988a0c..c99dff57e1 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -17,11 +17,12 @@ #define _TD_SDB_INT_H_ #include "os.h" + #include "sdb.h" -#include "tmsg.h" #include "thash.h" #include "tlockfree.h" #include "tlog.h" +#include "tmsg.h" #ifdef __cplusplus extern "C" { @@ -60,7 +61,7 @@ typedef struct SSdb { int64_t lastCommitVer; int64_t curVer; int64_t tableVer[SDB_MAX]; - int32_t maxId[SDB_MAX]; + int64_t maxId[SDB_MAX]; EKeyType keyTypes[SDB_MAX]; SHashObj *hashObjs[SDB_MAX]; SRWLatch locks[SDB_MAX]; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index fb6ac7bb37..0671434218 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -49,6 +49,9 @@ SSdb *sdbInit(SSdbOpt *pOption) { for (ESdbType i = 0; i < SDB_MAX; ++i) { taosInitRWLatch(&pSdb->locks[i]); + pSdb->maxId[i] = 0; + pSdb->tableVer[i] = -1; + pSdb->keyTypes[i] = SDB_KEY_INT32; } pSdb->curVer = -1; @@ -61,10 +64,10 @@ SSdb *sdbInit(SSdbOpt *pOption) { void sdbCleanup(SSdb *pSdb) { mDebug("start to cleanup sdb"); - // if (pSdb->curVer != pSdb->lastCommitVer) { - mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); - sdbWriteFile(pSdb); - // } + if (pSdb->curVer != pSdb->lastCommitVer) { + mDebug("write sdb file for curVer:% " PRId64 " and lastCommitVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); + sdbWriteFile(pSdb); + } if (pSdb->currDir != NULL) { tfree(pSdb->currDir); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 5a03b3409e..c5306478f2 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -17,10 +17,13 @@ #include "sdbInt.h" #include "tchecksum.h" +#define SDB_TABLE_SIZE 24 +#define SDB_RESERVE_SIZE 512 + static int32_t sdbRunDeployFp(SSdb *pSdb) { mDebug("start to deploy sdb"); - for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { + for (int32_t i = SDB_MAX - 1; i >= 0; --i) { SdbDeployFp fp = pSdb->deployFps[i]; if (fp == NULL) continue; @@ -34,6 +37,100 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) { return 0; } +static int32_t sdbReadFileHead(SSdb *pSdb, FileFd fd) { + int32_t ret = taosReadFile(fd, &pSdb->curVer, sizeof(int64_t)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(int64_t)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { + int64_t maxId = -1; + ret = taosReadFile(fd, &maxId, sizeof(int64_t)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(int64_t)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + if (i < SDB_MAX) { + pSdb->maxId[i] = maxId; + } + } + + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { + int64_t ver = -1; + ret = taosReadFile(fd, &ver, sizeof(int64_t)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(int64_t)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + if (i < SDB_MAX) { + pSdb->tableVer[i] = ver; + } + } + + char reserve[SDB_RESERVE_SIZE] = {0}; + ret = taosWriteFile(fd, reserve, sizeof(reserve)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(reserve)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + + return 0; +} + +static int32_t sdbWriteFileHead(SSdb *pSdb, FileFd fd) { + if (taosWriteFile(fd, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { + int64_t maxId = -1; + if (i < SDB_MAX) { + maxId = pSdb->maxId[i]; + } + if (taosWriteFile(fd, &maxId, sizeof(int64_t)) != sizeof(int64_t)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { + int64_t ver = -1; + if (i < SDB_MAX) { + ver = pSdb->tableVer[i]; + } + if (taosWriteFile(fd, &ver, sizeof(int64_t)) != sizeof(int64_t)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + char reserve[512] = {0}; + if (taosWriteFile(fd, reserve, sizeof(reserve)) != sizeof(reserve)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} + int32_t sdbReadFile(SSdb *pSdb) { int64_t offset = 0; int32_t code = 0; @@ -43,7 +140,7 @@ int32_t sdbReadFile(SSdb *pSdb) { SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); if (pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("failed read file since %s", terrstr()); + mError("failed read file since %s", terrstr()); return -1; } @@ -58,6 +155,14 @@ int32_t sdbReadFile(SSdb *pSdb) { return 0; } + if (sdbReadFileHead(pSdb, fd) != 0) { + mError("failed to read file:%s head since %s", file, terrstr()); + pSdb->curVer = -1; + free(pRaw); + taosCloseFile(fd); + return -1; + } + while (1) { readLen = sizeof(SSdbRaw); ret = taosReadFile(fd, pRaw, readLen); @@ -104,6 +209,8 @@ int32_t sdbReadFile(SSdb *pSdb) { } code = 0; + pSdb->lastCommitVer = pSdb->curVer; + mError("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer); PARSE_SDB_DATA_ERROR: taosCloseFile(fd); @@ -130,11 +237,17 @@ int32_t sdbWriteFile(SSdb *pSdb) { return -1; } - for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { + if (sdbWriteFileHead(pSdb, fd) != 0) { + mError("failed to write file:%s head since %s", tmpfile, terrstr()); + taosCloseFile(fd); + return -1; + } + + for (int32_t i = SDB_MAX - 1; i >= 0; --i) { SdbEncodeFp encodeFp = pSdb->encodeFps[i]; if (encodeFp == NULL) continue; - mTrace("sdb write %s, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); + mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); SHashObj *hash = pSdb->hashObjs[i]; SRWLatch *pLock = &pSdb->locks[i]; @@ -155,7 +268,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { pRaw->status = pRow->status; int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen; if (taosWriteFile(fd, pRaw, writeLen) != writeLen) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); taosHashCancelIterate(hash, ppRow); sdbFreeRaw(pRaw); break; @@ -163,7 +276,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen); if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); taosHashCancelIterate(hash, ppRow); sdbFreeRaw(pRaw); break; @@ -201,7 +314,8 @@ int32_t sdbWriteFile(SSdb *pSdb) { if (code != 0) { mError("failed to write file:%s since %s", curfile, tstrerror(code)); } else { - mDebug("write file:%s successfully", curfile); + pSdb->lastCommitVer = pSdb->curVer; + mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer); } terrno = code; diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 597484dad1..4b11ec3e76 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -38,6 +38,10 @@ const char *sdbTableName(ESdbType type) { return "auth"; case SDB_ACCT: return "acct"; + case SDB_CONSUMER: + return "consumer"; + case SDB_CGROUP: + return "cgroup"; case SDB_TOPIC: return "topic"; case SDB_VGROUP: @@ -70,7 +74,7 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { } static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) { - if (type >= SDB_MAX || type <= SDB_START) { + if (type >= SDB_MAX || type < 0) { terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE; return NULL; } @@ -100,8 +104,6 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) { } static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - int32_t code = 0; - SRWLatch *pLock = &pSdb->locks[pRow->type]; taosWLockLatch(pLock); @@ -126,10 +128,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosWUnLockLatch(pLock); - if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { - pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); - } - + int32_t code = 0; SdbInsertFp insertFp = pSdb->insertFps[pRow->type]; if (insertFp != NULL) { code = (*insertFp)(pSdb, pRow->pObj); @@ -143,12 +142,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } } + if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { + pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); + } + if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT64) { + pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); + } + pSdb->tableVer[pRow->type]++; + return 0; } static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { - int32_t code = 0; - SRWLatch *pLock = &pSdb->locks[pNewRow->type]; taosRLockLatch(pLock); @@ -157,23 +162,24 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosRUnLockLatch(pLock); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); } - SSdbRow *pOldRow = *ppOldRow; + SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; taosRUnLockLatch(pLock); + int32_t code = 0; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; if (updateFp != NULL) { code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); } sdbFreeRow(pSdb, pNewRow); + + pSdb->tableVer[pOldRow->type]++; return code; } static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - int32_t code = 0; - SRWLatch *pLock = &pSdb->locks[pRow->type]; taosWLockLatch(pLock); @@ -190,9 +196,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosHashRemove(hash, pOldRow->pObj, keySize); taosWUnLockLatch(pLock); - // sdbRelease(pSdb, pOldRow->pObj); + pSdb->tableVer[pOldRow->type]++; sdbFreeRow(pSdb, pRow); - return code; + // sdbRelease(pSdb, pOldRow->pObj); + return 0; } int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) { @@ -277,7 +284,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { if (pObj == NULL) return; SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); - if (pRow->type >= SDB_MAX || pRow->type <= SDB_START) return; + if (pRow->type >= SDB_MAX ) return; SRWLatch *pLock = &pSdb->locks[pRow->type]; taosRLockLatch(pLock); From 28480de78583fca53b07710380f38d4f7f5bc303 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Jan 2022 13:15:23 +0800 Subject: [PATCH 07/12] [td-11818]fix compiler error. --- include/util/thash.h | 2 +- source/util/src/thash.c | 58 ++++++++++++++++++++--------------------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/include/util/thash.h b/include/util/thash.h index f38ab50893..a736fc26af 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -24,7 +24,7 @@ extern "C" { #include "tlockfree.h" typedef uint32_t (*_hash_fn_t)(const char *, uint32_t); -typedef int32_t (*_equal_fn_t)(const void*, const void*, uint32_t len); +typedef int32_t (*_equal_fn_t)(const void*, const void*, size_t len); typedef void (*_hash_before_fn_t)(void *); typedef void (*_hash_free_fn_t)(void *); diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 2841f27da4..f90b157558 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -28,13 +28,13 @@ tfree(_n); \ } while (0) -#define FREE_HASH_NODE(_h, _n) \ - do { \ - if ((_h)->freeFp) { \ +#define FREE_HASH_NODE(_h, _n) \ + do { \ + if ((_h)->freeFp) { \ (_h)->freeFp(GET_HASH_NODE_DATA(_n)); \ - } \ - \ - DO_FREE_HASH_NODE(_n); \ + } \ + \ + DO_FREE_HASH_NODE(_n); \ } while (0); static FORCE_INLINE void __wr_lock(void *lock, int32_t type) { @@ -55,7 +55,6 @@ static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) { if (type == HASH_NO_LOCK) { return; } - taosRUnLockLatch(lock); } @@ -63,7 +62,6 @@ static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) { if (type == HASH_NO_LOCK) { return; } - taosWUnLockLatch(lock); } @@ -225,12 +223,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da // need the resize process, write lock applied if (HASH_NEED_RESIZE(pHashObj)) { - __wr_lock(&pHashObj->lock, pHashObj->type); + __wr_lock((void*) &pHashObj->lock, pHashObj->type); taosHashTableResize(pHashObj); - __wr_unlock(&pHashObj->lock, pHashObj->type); + __wr_unlock((void*) &pHashObj->lock, pHashObj->type); } - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; @@ -272,7 +270,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } // enable resize - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); atomic_add_fetch_32(&pHashObj->size, 1); return 0; @@ -289,7 +287,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } // enable resize - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return pHashObj->enableUpdate ? 0 : -2; } @@ -308,14 +306,14 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // only add the read lock to disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; // no data, return directly if (atomic_load_32(&pe->num) == 0) { - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return NULL; } @@ -358,7 +356,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo taosRUnLockLatch(&pe->latch); } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return data; } @@ -370,14 +368,14 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // only add the read lock to disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; // no data, return directly if (atomic_load_32(&pe->num) == 0) { - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return NULL; } @@ -415,7 +413,7 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v taosRUnLockLatch(&pe->latch); } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return data; } @@ -436,7 +434,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; @@ -450,7 +448,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi assert(pe->next == NULL); taosWUnLockLatch(&pe->latch); - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return -1; } @@ -491,7 +489,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi taosWUnLockLatch(&pe->latch); } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return code; } @@ -502,7 +500,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi } // disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t numOfEntries = (int32_t)pHashObj->capacity; for (int32_t i = 0; i < numOfEntries; ++i) { @@ -566,7 +564,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi } } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return 0; } @@ -577,7 +575,7 @@ void taosHashClear(SHashObj *pHashObj) { SHashNode *pNode, *pNext; - __wr_lock(&pHashObj->lock, pHashObj->type); + __wr_lock((void*) &pHashObj->lock, pHashObj->type); for (int32_t i = 0; i < pHashObj->capacity; ++i) { SHashEntry *pEntry = pHashObj->hashList[i]; @@ -601,7 +599,7 @@ void taosHashClear(SHashObj *pHashObj) { } atomic_store_32(&pHashObj->size, 0); - __wr_unlock(&pHashObj->lock, pHashObj->type); + __wr_unlock((void*) &pHashObj->lock, pHashObj->type); } void taosHashCleanup(SHashObj *pHashObj) { @@ -864,7 +862,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { char *data = NULL; // only add the read lock to disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); SHashNode *pNode = NULL; if (p) { @@ -911,7 +909,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { } } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return data; } @@ -920,7 +918,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) { if (pHashObj == NULL || p == NULL) return; // only add the read lock to disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int slot; taosHashReleaseNode(pHashObj, p, &slot); @@ -930,7 +928,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) { taosWUnLockLatch(&pe->latch); } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); } void taosHashRelease(SHashObj *pHashObj, void *p) { From 81e7ff450e73b5635ee49f1448dd25fe3ef70d85 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 4 Jan 2022 13:40:47 +0800 Subject: [PATCH 08/12] minor changes --- source/dnode/mnode/impl/src/mndSync.c | 16 ++++++++-------- source/dnode/mnode/sdb/src/sdbFile.c | 6 +++--- source/libs/wal/src/walWrite.c | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 7c8a8cdb96..49eaa45156 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -54,7 +54,7 @@ static int32_t mndRestoreWal(SMnode *pMnode) { int64_t start = walGetFirstVer(pWal); int64_t end = walGetLastVer(pWal); - start = MAX(lastSdbVer, start); + start = MAX(lastSdbVer + 1, start); for (int64_t ver = start; ver >= 0 && ver <= end; ++ver) { if (walReadWithHandle(pHandle, ver) < 0) { @@ -79,19 +79,19 @@ static int32_t mndRestoreWal(SMnode *pMnode) { } int64_t sdbVer = sdbUpdateVer(pSdb, 0); - if (sdbVer != lastSdbVer) { - if (walBeginSnapshot(pWal, sdbVer) < 0) { - goto WAL_RESTORE_OVER; - } + if (walBeginSnapshot(pWal, sdbVer) < 0) { + goto WAL_RESTORE_OVER; + } + if (sdbVer != lastSdbVer) { mInfo("sdb restore wal from %" PRId64 " to %" PRId64, lastSdbVer, sdbVer); if (sdbWriteFile(pSdb) != 0) { goto WAL_RESTORE_OVER; } + } - if (walEndSnapshot(pWal) < 0) { - goto WAL_RESTORE_OVER; - } + if (walEndSnapshot(pWal) < 0) { + goto WAL_RESTORE_OVER; } code = 0; diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index c5306478f2..cc36b8ec13 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -81,7 +81,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, FileFd fd) { } char reserve[SDB_RESERVE_SIZE] = {0}; - ret = taosWriteFile(fd, reserve, sizeof(reserve)); + ret = taosReadFile(fd, reserve, sizeof(reserve)); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -122,7 +122,7 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, FileFd fd) { } } - char reserve[512] = {0}; + char reserve[SDB_RESERVE_SIZE] = {0}; if (taosWriteFile(fd, reserve, sizeof(reserve)) != sizeof(reserve)) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -210,7 +210,7 @@ int32_t sdbReadFile(SSdb *pSdb) { code = 0; pSdb->lastCommitVer = pSdb->curVer; - mError("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer); + mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer); PARSE_SDB_DATA_ERROR: taosCloseFile(fd); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index c8ffd9d07d..05750d6446 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -148,7 +148,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { int32_t walEndSnapshot(SWal *pWal) { int64_t ver = pWal->vers.verInSnapshotting; - if (ver == -1) return -1; + if (ver == -1) return 0; pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); From 1826b3035f1adb64f722da7f9530cbbd9edf225f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 21:55:10 -0800 Subject: [PATCH 09/12] minor changes --- tests/script/general/table/basic1.sim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/script/general/table/basic1.sim b/tests/script/general/table/basic1.sim index f2341a84ce..298f663822 100644 --- a/tests/script/general/table/basic1.sim +++ b/tests/script/general/table/basic1.sim @@ -46,7 +46,6 @@ print =============== create child table sql create table c1 using st tags(1) sql create table c2 using st tags(2) -return sql show tables if $rows != 2 then return -1 @@ -56,6 +55,8 @@ print $data00 $data01 $data02 print $data10 $data11 $data22 print $data20 $data11 $data22 +return + print =============== insert data sql insert into c1 values(now+1s, 1) sql insert into c1 values(now+2s, 2) From 09fe692b85560067c05c1f55f36c13d655022449 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Jan 2022 15:17:17 +0800 Subject: [PATCH 10/12] [td-11818] refactor log/fix bug in create child table. --- include/libs/catalog/catalog.h | 4 +- source/client/src/clientEnv.c | 4 +- source/client/src/clientImpl.c | 13 +- source/client/test/clientTests.cpp | 193 ++++++++++++------------- source/libs/catalog/src/catalog.c | 10 +- source/libs/parser/src/dCDAstProcess.c | 2 +- source/libs/qcom/src/querymsg.c | 11 +- source/libs/transport/src/rpcMain.c | 11 +- 8 files changed, 120 insertions(+), 128 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 1bd29ce396..6250bbbe9e 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -90,12 +90,12 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons /** * Force renew a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) - * @param pRpc (input, rpc object) + * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) * @return error code */ -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName); /** * Force renew a table's local cached meta data and get the new one. diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index bfb884c57e..1c7354b445 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -53,8 +53,8 @@ static void registerRequest(SRequestObj* pRequest) { int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1); int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1); - tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self, - pRequest->pTscObj->id, num, currentInst, total); + tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self, + pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0d590235ad..71fd8462af 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -371,7 +371,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con taos_close(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); + tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId); destroyRequest(pRequest); } @@ -441,14 +441,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { * There is not response callback function for submit response. * The actual inserted number of points is the first number. */ + int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; if (pMsg->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, - pRequest->metric.rsp - pRequest->metric.start); + tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->requestId, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); } else { - tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, - pRequest->metric.rsp - pRequest->metric.start); + tscError("reqId:0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x"PRIx64, pRequest->requestId, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); } taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index bb40d9ada2..193f436734 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -57,95 +57,95 @@ TEST(testCase, connect_Test) { taos_close(pConn); } -//TEST(testCase, create_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, create_account_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, drop_account_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} +TEST(testCase, create_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); -//TEST(testCase, show_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show users"); -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_close(pConn); -//} + TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } -//TEST(testCase, drop_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop user abc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + taos_free_result(pRes); + taos_close(pConn); +} -//TEST(testCase, show_db_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show databases"); -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_close(pConn); -//} +TEST(testCase, create_account_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, drop_account_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show users"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_close(pConn); +} + +TEST(testCase, drop_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop user abc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_db_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show databases"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_close(pConn); +} TEST(testCase, create_db_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -500,22 +500,17 @@ TEST(testCase, create_multiple_tables) { } TEST(testCase, generated_request_id_test) { - uint64_t id0 = generateRequestId(); + SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - uint64_t id1 = generateRequestId(); - uint64_t id2 = generateRequestId(); - uint64_t id3 = generateRequestId(); - uint64_t id4 = generateRequestId(); + for(int32_t i = 0; i < 1000000; ++i) { + uint64_t v = generateRequestId(); + void* result = taosHashGet(phash, &v, sizeof(v)); + ASSERT_EQ(result, nullptr); - ASSERT_NE(id0, id1); - ASSERT_NE(id1, id2); - ASSERT_NE(id2, id3); - ASSERT_NE(id4, id3); - ASSERT_NE(id0, id2); - ASSERT_NE(id0, id4); - ASSERT_NE(id0, id3); + taosHashPut(phash, &v, sizeof(v), NULL, 0); + } -// SHashObj *phash = taosHashInit() + taosHashClear(phash); } //TEST(testCase, projection_query_tables) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 236264873e..5992962419 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -675,28 +675,26 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta); } -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) { +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) { + if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } SVgroupInfo vgroupInfo = {0}; int32_t code = 0; - CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo)); + CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); STableMetaOutput output = {0}; - CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output)); + CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output)); //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); _return: - tfree(output.tbMeta); - CTG_RET(code); } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 6b42a93b73..1e92e32fef 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -501,7 +501,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(tNameGetTableName(&tableName)); - req.ctbCfg.suid = pSuperTableMeta->suid; + req.ctbCfg.suid = pSuperTableMeta->uid; req.ctbCfg.pTag = row; SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index b50eb2c92d..f24b191db3 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -43,15 +43,12 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 bMsg->header.vgId = htonl(bInput->vgId); if (bInput->dbName) { - strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname)); - bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0; + tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname)); } - strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); - bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; + tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname)); *msgLen = (int32_t)sizeof(*bMsg); - return TSDB_CODE_SUCCESS; } @@ -211,7 +208,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl pTableMeta->vgId = isSuperTable ? 0 : msg->vgId; pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType; - pTableMeta->uid = msg->suid; + pTableMeta->uid = msg->tuid; pTableMeta->suid = msg->suid; pTableMeta->sversion = msg->sversion; pTableMeta->tversion = msg->tversion; @@ -272,7 +269,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); } - code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta); + code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta); } return code; diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 310944e9b6..a7b9bfedbe 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -1094,13 +1094,16 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { SRpcReqContext *pContext; pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); + char ipstr[24] = {0}; + taosIpPort2String(pRecv->ip, pRecv->port, ipstr); + if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) { - tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, - pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), pRecv->ip, pRecv->port, terrno, pRecv->msgLen, + tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, + pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); } else { - tDebug("%s %p %p, %d received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, - pConn, (void *)pHead->ahandle, pHead->msgType, pRecv->ip, pRecv->port, terrno, pRecv->msgLen, + tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, + pConn, (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); } From 7128b3cdab2bcd31671016b8415207a4f043062d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Jan 2022 15:20:16 +0800 Subject: [PATCH 11/12] [td-11818] refactor. --- include/libs/parser/parsenodes.h | 4 ++-- source/libs/parser/inc/insertParser.h | 2 +- source/libs/parser/inc/parserInt.h | 2 +- source/libs/parser/src/dCDAstProcess.c | 13 +++++++------ source/libs/parser/src/insertParser.c | 6 +++--- source/libs/parser/src/parser.c | 4 ++-- source/libs/parser/test/insertParserTest.cpp | 4 ++-- source/libs/planner/src/logicPlan.c | 2 +- 8 files changed, 19 insertions(+), 18 deletions(-) diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index b326ac032c..18596a9e18 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -154,14 +154,14 @@ typedef struct SVgDataBlocks { char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ... } SVgDataBlocks; -typedef struct SInsertStmtInfo { +typedef struct SVnodeModifOpStmtInfo { int16_t nodeType; SArray* pDataBlocks; // data block for each vgroup, SArray. int8_t schemaAttache; // denote if submit block is built with table schema or not uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] const char* sql; // current sql statement position -} SInsertStmtInfo; +} SVnodeModifOpStmtInfo; typedef struct SDclStmtInfo { int16_t nodeType; diff --git a/source/libs/parser/inc/insertParser.h b/source/libs/parser/inc/insertParser.h index b0191b155d..796bd9b429 100644 --- a/source/libs/parser/inc/insertParser.h +++ b/source/libs/parser/inc/insertParser.h @@ -22,7 +22,7 @@ extern "C" { #include "parser.h" -int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo); +int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo); #ifdef __cplusplus } diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index 346bd0cbe4..d1629a2a3e 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -70,7 +70,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ */ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); -SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); +SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); /** * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 1e92e32fef..d343451516 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -548,11 +548,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p taosArrayPush(pBufArray, &pVgData); } while (true); - SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo)); - pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; + SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo)); + pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; pStmtInfo->pDataBlocks = pBufArray; - *pOutput = pStmtInfo; - *len = sizeof(SInsertStmtInfo); + + *pOutput = (char*) pStmtInfo; + *len = sizeof(SVnodeModifOpStmtInfo); return TSDB_CODE_SUCCESS; } @@ -823,14 +824,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c return NULL; } -SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { +SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; assert(pCreateTable->type == TSQL_CREATE_CTABLE); SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf* pMsgBuf = &m; - SInsertStmtInfo* pInsertStmt = NULL; + SVnodeModifOpStmtInfo* pInsertStmt = NULL; int32_t msgLen = 0; int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen); diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 66966f75db..c0ba4f40b4 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -61,7 +61,7 @@ typedef struct SInsertParseContext { SArray* pTableDataBlocks; // global SArray* pVgDataBlocks; // global int32_t totalNum; - SInsertStmtInfo* pOutput; + SVnodeModifOpStmtInfo* pOutput; } SInsertParseContext; static int32_t skipInsertInto(SInsertParseContext* pCxt) { @@ -611,7 +611,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { // [(field1_name, ...)] // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; -int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { +int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) { SInsertParseContext context = { .pComCxt = pContext, .pSql = (char*) pContext->pSql, @@ -620,7 +620,7 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { .pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false), .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .totalNum = 0, - .pOutput = calloc(1, sizeof(SInsertStmtInfo)) + .pOutput = calloc(1, sizeof(SVnodeModifOpStmtInfo)) }; if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 1b4d05808c..9455d23a1c 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -53,7 +53,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } if (toVnode) { - SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); + SVnodeModifOpStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); if (pInsertInfo == NULL) { return terrno; } @@ -87,7 +87,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { - return parseInsertSql(pCxt, (SInsertStmtInfo**)pQuery); + return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQuery); } else { return parseQuerySql(pCxt, pQuery); } diff --git a/source/libs/parser/test/insertParserTest.cpp b/source/libs/parser/test/insertParserTest.cpp index 9e681c7ccb..5c175cd023 100644 --- a/source/libs/parser/test/insertParserTest.cpp +++ b/source/libs/parser/test/insertParserTest.cpp @@ -60,7 +60,7 @@ protected: return code_; } - SInsertStmtInfo* reslut() { + SVnodeModifOpStmtInfo* reslut() { return res_; } @@ -128,7 +128,7 @@ private: char sqlBuf_[max_sql_len]; SParseContext cxt_; int32_t code_; - SInsertStmtInfo* res_; + SVnodeModifOpStmtInfo* res_; }; // INSERT INTO tb_name VALUES (field1_value, ...) diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 2de0ae2da3..d04fd716c2 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -38,7 +38,7 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { } static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { - SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode; + SVnodeModifOpStmtInfo* pInsert = (SVnodeModifOpStmtInfo*)pNode; *pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); From c94f30e3838106d50423bd8c64eb901ca6675313 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 23:20:29 -0800 Subject: [PATCH 12/12] add some logs --- source/dnode/mnode/impl/src/mndDnode.c | 2 +- source/dnode/mnode/impl/src/mndShow.c | 2 +- source/dnode/mnode/impl/src/mndSync.c | 16 ++++++++----- source/dnode/mnode/impl/src/mndTrans.c | 14 +++++++---- source/dnode/mnode/impl/src/mnode.c | 28 +++++++++++----------- source/dnode/mnode/sdb/src/sdb.c | 4 ++-- source/dnode/mnode/sdb/src/sdbFile.c | 1 + source/dnode/mnode/sdb/src/sdbHash.c | 32 +++++++++++++++++++++----- 8 files changed, 64 insertions(+), 35 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d110969025..1d78359015 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -354,7 +354,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { } if (pStatus->dnodeId == 0) { - mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); + mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); } else { if (pStatus->clusterId != pMnode->clusterId) { if (pDnode != NULL) { diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index c156c87123..125c250614 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -152,7 +152,7 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { } int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta); - mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d type:%s result:%s", pShow->id, pShow->numOfRows, + mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d type:%s, result:%s", pShow->id, pShow->numOfRows, pShow->numOfColumns, mndShowStr(type), tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 49eaa45156..19fb89454e 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -52,11 +52,12 @@ static int32_t mndRestoreWal(SMnode *pMnode) { SWalReadHandle *pHandle = walOpenReadHandle(pWal); if (pHandle == NULL) return -1; - int64_t start = walGetFirstVer(pWal); - int64_t end = walGetLastVer(pWal); - start = MAX(lastSdbVer + 1, start); + int64_t first = walGetFirstVer(pWal); + int64_t last = walGetLastVer(pWal); + mDebug("restore sdb wal start, sdb ver:%" PRId64 ", wal first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last); - for (int64_t ver = start; ver >= 0 && ver <= end; ++ver) { + first = MAX(lastSdbVer + 1, first); + for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) { if (walReadWithHandle(pHandle, ver) < 0) { mError("failed to read by wal handle since %s, ver:%" PRId64, terrstr(), ver); goto WAL_RESTORE_OVER; @@ -76,15 +77,18 @@ static int32_t mndRestoreWal(SMnode *pMnode) { } sdbUpdateVer(pSdb, 1); + mDebug("wal:%" PRId64 ", is restored", ver); } int64_t sdbVer = sdbUpdateVer(pSdb, 0); + mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer); + if (walBeginSnapshot(pWal, sdbVer) < 0) { goto WAL_RESTORE_OVER; } if (sdbVer != lastSdbVer) { - mInfo("sdb restore wal from %" PRId64 " to %" PRId64, lastSdbVer, sdbVer); + mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer); if (sdbWriteFile(pSdb) != 0) { goto WAL_RESTORE_OVER; } @@ -147,7 +151,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { return -1; } - mTrace("raw:%p has been write to wal, ver:%" PRId64, pRaw, ver); + mTrace("raw:%p, write to wal, ver:%" PRId64, pRaw, ver); walCommit(pWal, ver); walFsync(pWal, true); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5062048d6d..bf472a504c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -169,7 +169,7 @@ TRANS_ENCODE_OVER: return NULL; } - mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos); + mTrace("trans:%d, encode to raw:%p, row:%p len:%d", pTrans->id, pRaw, pTrans, dataPos); return pRaw; } @@ -226,6 +226,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER) pData = malloc(dataLen); if (pData == NULL) goto TRANS_DECODE_OVER; + mTrace("raw:%p, is created", pData); SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER); if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER; pData = NULL; @@ -235,6 +236,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER) pData = malloc(dataLen); if (pData == NULL) goto TRANS_DECODE_OVER; + mTrace("raw:%p, is created", pData); SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER); if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER; pData = NULL; @@ -243,6 +245,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < commitLogNum; ++i) { SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER) pData = malloc(dataLen); + if (pData == NULL) goto TRANS_DECODE_OVER; + mTrace("raw:%p, is created", pData); SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER); if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER; pData = NULL; @@ -284,13 +288,13 @@ TRANS_DECODE_OVER: return NULL; } - mTrace("trans:%d, decode from raw:%p, data:%p", pTrans->id, pRaw, pTrans); + mTrace("trans:%d, decode from raw:%p, row:%p", pTrans->id, pRaw, pTrans); return pRow; } static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { pTrans->stage = TRN_STAGE_PREPARE; - mTrace("trans:%d, perform insert action, data:%p", pTrans->id, pTrans); + mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans); return 0; } @@ -303,13 +307,13 @@ static void mndTransDropData(STrans *pTrans) { } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { - mTrace("trans:%d, perform delete action, data:%p", pTrans->id, pTrans); + mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans); mndTransDropData(pTrans); return 0; } static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) { - mTrace("trans:%d, perform update action, data:%p", pOldTrans->id, pOldTrans); + mTrace("trans:%d, perform update action, old_row:%p new_row:%p", pOldTrans->id, pOldTrans, pNewTrans); pOldTrans->stage = pNewTrans->stage; return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 3a55961d4a..abc86a7d35 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -185,7 +185,7 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) { for (int32_t s = pos; s >= 0; s--) { SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s); - mDebug("step:%s will cleanup", pStep->name); + mDebug("%s will cleanup", pStep->name); if (pStep->cleanupFp != NULL) { (*pStep->cleanupFp)(pMnode); } @@ -204,12 +204,12 @@ static int32_t mndExecSteps(SMnode *pMnode) { if ((*pStep->initFp)(pMnode) != 0) { int32_t code = terrno; - mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr()); + mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr()); mndCleanupSteps(pMnode, pos); terrno = code; return -1; } else { - mDebug("step:%s is initialized", pStep->name); + mDebug("%s is initialized", pStep->name); } } @@ -357,7 +357,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr()); + mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle); return NULL; } @@ -365,7 +365,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { taosFreeQitem(pMsg); terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr()); + mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle); return NULL; } memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); @@ -374,12 +374,12 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { pMsg->rpcMsg = *pRpcMsg; pMsg->createdTime = taosGetTimestampSec(); - mTrace("msg:%p, app:%p is created, RPC:%p", pMsg, pRpcMsg->ahandle, pRpcMsg->handle); + mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMsg->user); return pMsg; } void mndCleanupMsg(SMnodeMsg *pMsg) { - mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle); + mTrace("msg:%p, is destroyed, app:%p RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle); rpcFreeCont(pMsg->rpcMsg.pCont); pMsg->rpcMsg.pCont = NULL; taosFreeQitem(pMsg); @@ -397,37 +397,37 @@ void mndProcessMsg(SMnodeMsg *pMsg) { void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType & 1U); - mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, TMSG_INFO(msgType)); + mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); if (isReq && !mndIsMaster(pMnode)) { code = TSDB_CODE_APP_NOT_READY; - mDebug("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); + mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); goto PROCESS_RPC_END; } if (isReq && pMsg->rpcMsg.pCont == NULL) { code = TSDB_CODE_MND_INVALID_MSG_LEN; - mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); + mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); goto PROCESS_RPC_END; } MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)]; if (fp == NULL) { code = TSDB_CODE_MSG_NOT_PROCESSED; - mError("msg:%p, app:%p failed to process since no handle", pMsg, ahandle); + mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle); goto PROCESS_RPC_END; } code = (*fp)(pMsg); if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mTrace("msg:%p, app:%p in progressing", pMsg, ahandle); + mTrace("msg:%p, in progress, app:%p", pMsg, ahandle); return; } else if (code != 0) { code = terrno; - mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); + mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); goto PROCESS_RPC_END; } else { - mTrace("msg:%p, app:%p is processed", pMsg, ahandle); + mTrace("msg:%p, is processed, app:%p", pMsg, ahandle); } PROCESS_RPC_END: diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 0671434218..ef5bb6f16f 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -65,7 +65,7 @@ void sdbCleanup(SSdb *pSdb) { mDebug("start to cleanup sdb"); if (pSdb->curVer != pSdb->lastCommitVer) { - mDebug("write sdb file for curVer:% " PRId64 " and lastCommitVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); + mDebug("write sdb file for current ver:%" PRId64 " != last commit ver:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); sdbWriteFile(pSdb); } @@ -138,7 +138,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->maxId[sdbType] = 0; pSdb->hashObjs[sdbType] = hash; taosInitRWLatch(&pSdb->locks[sdbType]); - mDebug("sdb table:%d is initialized", sdbType); + mDebug("sdb table:%s is initialized", sdbTableName(sdbType)); return 0; } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index cc36b8ec13..970fdc2061 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -146,6 +146,7 @@ int32_t sdbReadFile(SSdb *pSdb) { char file[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + mDebug("start to read file:%s", file); FileFd fd = taosOpenFileRead(file); if (fd <= 0) { diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 4b11ec3e76..733075757f 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -57,18 +57,35 @@ const char *sdbTableName(ESdbType type) { } } +static const char *sdbStatusStr(ESdbStatus status) { + switch (status) { + case SDB_STATUS_CREATING: + return "creating"; + case SDB_STATUS_UPDATING: + return "updating"; + case SDB_STATUS_DROPPING: + return "dropping"; + case SDB_STATUS_READY: + return "ready"; + case SDB_STATUS_DROPPED: + return "dropped"; + default: + return "undefine"; + } +} + void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { EKeyType keyType = pSdb->keyTypes[pRow->type]; if (keyType == SDB_KEY_BINARY) { - mTrace("%s:%s, refCount:%d oper:%s row:%p", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper, - pRow->pObj); + mTrace("%s:%s, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, + oper, pRow->pObj, sdbStatusStr(pRow->status)); } else if (keyType == SDB_KEY_INT32) { - mTrace("%s:%d, refCount:%d oper:%s row:%p", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount, oper, - pRow->pObj); + mTrace("%s:%d, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, + pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status)); } else if (keyType == SDB_KEY_INT64) { - mTrace("%s:%" PRId64 ", refCount:%d oper:%s row:%p", sdbTableName(pRow->type), *(int64_t *)pRow->pObj, - pRow->refCount, oper, pRow->pObj); + mTrace("%s:%" PRId64 ", refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj, + pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status)); } else { } } @@ -165,6 +182,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; + sdbPrintOper(pSdb, pOldRow, "updateRow"); taosRUnLockLatch(pLock); int32_t code = 0; @@ -193,6 +211,8 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; + sdbPrintOper(pSdb, pOldRow, "deleteRow"); + taosHashRemove(hash, pOldRow->pObj, keySize); taosWUnLockLatch(pLock);