From b286a296bf735ca0ecb99501c75e9540b4bb2f49 Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 14 Apr 2020 16:38:15 +0800 Subject: [PATCH] [TD-52] first version mpeer --- src/dnode/CMakeLists.txt | 4 ++ src/dnode/inc/dnodeMgmt.h | 1 + src/dnode/src/dnodeMClient.c | 2 +- src/dnode/src/dnodeMgmt.c | 4 ++ src/inc/mnode.h | 1 - src/inc/mpeer.h | 5 ++ src/mnode/inc/mgmtSdb.h | 12 ++++ src/mnode/src/mgmtMain.c | 10 +-- src/mnode/src/mgmtMnode.c | 5 +- src/mnode/src/mgmtSdb.c | 119 ++++++++++++++++------------------- 10 files changed, 87 insertions(+), 76 deletions(-) diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index a81f8c0c9d..ee05403a61 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -31,6 +31,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(taosd balance sync) ENDIF () + IF (TD_MPEER) + TARGET_LINK_LIBRARIES(taosd mpeer sync) + ENDIF () + SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index f944bd5add..0be6e40e75 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -24,6 +24,7 @@ int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); void dnodeMgmt(SRpcMsg *rpcMsg); void dnodeUpdateDnodeId(int32_t dnodeId); +int32_t dnodeGetDnodeId(); void* dnodeGetVnode(int32_t vgId); int32_t dnodeGetVnodeStatus(void *pVnode); diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 5dd015313a..53eea93137 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -294,4 +294,4 @@ uint32_t dnodeGetMnodeMasteIp() { void* dnodeGetMpeerInfos() { return &tsMnodeInfos; -} \ No newline at end of file +} diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 0ec769c0af..f4cbd3e1be 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -299,3 +299,7 @@ void dnodeUpdateDnodeId(int32_t dnodeId) { dnodeSaveDnodeId(); } } + +int32_t dnodeGetDnodeId() { + return tsDnodeId; +} \ No newline at end of file diff --git a/src/inc/mnode.h b/src/inc/mnode.h index f2c072453f..00b7519258 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -45,7 +45,6 @@ struct _mnode_obj; typedef struct _mnode_obj { int32_t mnodeId; - int32_t dnodeId; int64_t createdTime; int8_t reserved[14]; int8_t updateEnd[1]; diff --git a/src/inc/mpeer.h b/src/inc/mpeer.h index 157ea40119..e5051b39eb 100644 --- a/src/inc/mpeer.h +++ b/src/inc/mpeer.h @@ -45,6 +45,11 @@ void mpeerGetMpeerInfos(void *mpeers); char * mpeerGetMnodeStatusStr(int32_t status); char * mpeerGetMnodeRoleStr(int32_t role); +int32_t mpeerAddMnode(int32_t dnodeId); +int32_t mpeerRemoveMnode(int32_t dnodeId); + +int32_t sdbForwardDbReqToPeer(void *pHead); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index 83afa2a081..27f9a51650 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -34,6 +34,7 @@ typedef enum { typedef enum { SDB_KEY_STRING, + SDB_KEY_INT, SDB_KEY_AUTO } ESdbKeyType; @@ -66,8 +67,19 @@ typedef struct { int32_t (*updateAllFp)(); } SSdbTableDesc; +typedef struct { + int32_t code; + int64_t version; + void * sync; + void * wal; + sem_t sem; + pthread_mutex_t mutex; +} SSdbObject; + int32_t sdbInit(); void sdbCleanUp(); +SSdbObject *sdbGetObj(); +int sdbProcessWrite(void *param, void *data, int type); void * sdbOpenTable(SSdbTableDesc *desc); void sdbCloseTable(void *handle); diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index a074060f52..b6fb1ba425 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -109,6 +109,11 @@ int32_t mgmtStartSystem() { return -1; } + if (mpeerInit() < 0) { + mError("failed to init mpeers"); + return -1; + } + if (sdbInit() < 0) { mError("failed to init sdb"); return -1; @@ -122,11 +127,6 @@ int32_t mgmtStartSystem() { return -1; } - if (mpeerInit() < 0) { - mError("failed to init mpeers"); - return -1; - } - if (balanceInit() < 0) { mError("failed to init dnode balance") } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 82da454793..faa66d1fd9 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -79,7 +79,7 @@ void mpeerGetMpeerInfos(void *param) { strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName); } -void mpeerCleanupDnodes() {} +void mpeerCleanupMnodes() {} int32_t mpeerGetMnodesNum() { return 1; } void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; } @@ -91,12 +91,11 @@ bool mpeerCheckRedirect() { return false; } int32_t mpeerInit() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); - return mpeerInitMnodes(); } void mpeerCleanup() { - mpeerCleanupDnodes(); + mpeerCleanupMnodes(); } char *mpeerGetMnodeStatusStr(int32_t status) { diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index bc0d1b81f2..c47d9cd9d8 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -24,19 +24,11 @@ #include "tutil.h" #include "twal.h" #include "tsync.h" +#include "mpeer.h" #include "hashint.h" #include "hashstr.h" #include "mgmtSdb.h" -typedef struct { - int32_t code; - int64_t version; - void * sync; - void * wal; - sem_t sem; - pthread_mutex_t mutex; -} SSdbSync; - typedef struct _SSdbTable { char tableName[TSDB_DB_NAME_LEN + 1]; ESdbTable tableId; @@ -70,17 +62,16 @@ typedef enum { static SSdbTable *tsSdbTableList[SDB_TABLE_MAX] = {0}; static int32_t tsSdbNumOfTables = 0; -static SSdbSync * tsSdbSync; +static SSdbObject * tsSdbObj; -static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash}; -static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash}; -static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash}; -static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData}; -static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash}; -static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData}; -static int sdbProcessWrite(void *param, void *data, int type); +static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash}; +static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash, sdbAddIntHash}; +static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash, sdbDeleteIntHash}; +static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData, sdbGetIntHashData}; +static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash}; +static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; -uint64_t sdbGetVersion() { return tsSdbSync->version; } +uint64_t sdbGetVersion() { return tsSdbObj->version; } int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } @@ -101,6 +92,7 @@ static char *sdbGetkeyStr(SSdbTable *pTable, void *row) { switch (pTable->keyType) { case SDB_KEY_STRING: return (char *)row; + case SDB_KEY_INT: case SDB_KEY_AUTO: sprintf(str, "%d", *(int32_t *)row); return str; @@ -113,40 +105,30 @@ static void *sdbGetTableFromId(int32_t tableId) { return tsSdbTableList[tableId]; } -// static void mpeerConfirmForward(void *ahandle, void *param, int32_t code) { -// sem_post(&tsSdbSync->sem); -// mPrint("mpeerConfirmForward"); -// } - -static int32_t sdbForwardDbReqToPeer(SWalHead *pHead) { - // int32_t code = syncForwardToPeer(NULL, pHead, NULL); - // if (code < 0) { - // return code; - // } - - // sem_wait(&tsSdbSync->sem); - // return tsSdbSync->code; +#ifndef _MPEER +int32_t sdbForwardDbReqToPeer(void *pHead) { return TSDB_CODE_SUCCESS; } +#endif int32_t sdbInit() { - tsSdbSync = calloc(1, sizeof(SSdbSync)); - sem_init(&tsSdbSync->sem, 0, 0); - pthread_mutex_init(&tsSdbSync->mutex, NULL); + tsSdbObj = calloc(1, sizeof(SSdbObject)); + sem_init(&tsSdbObj->sem, 0, 0); + pthread_mutex_init(&tsSdbObj->mutex, NULL); SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1}; - tsSdbSync->wal = walOpen(tsMnodeDir, &walCfg); - if (tsSdbSync->wal == NULL) { + tsSdbObj->wal = walOpen(tsMnodeDir, &walCfg); + if (tsSdbObj->wal == NULL) { sdbError("failed to open sdb in %s", tsMnodeDir); return -1; } sdbTrace("open sdb file for read"); - walRestore(tsSdbSync->wal, tsSdbSync, sdbProcessWrite); + walRestore(tsSdbObj->wal, tsSdbObj, sdbProcessWrite); int32_t totalRows = 0; int32_t numOfTables = 0; - for (int32_t tableId = SDB_TABLE_DNODE; tableId < SDB_TABLE_MAX; ++tableId) { + for (int32_t tableId = SDB_TABLE_MNODE; tableId < SDB_TABLE_MAX; ++tableId) { SSdbTable *pTable = sdbGetTableFromId(tableId); if (pTable == NULL) continue; if (pTable->updateAllFp) { @@ -158,20 +140,24 @@ int32_t sdbInit() { sdbTrace("table:%s, is initialized, numOfRows:%d", pTable->tableName, pTable->numOfRows); } - sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbSync->version, totalRows, numOfTables); + sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables); return TSDB_CODE_SUCCESS; } void sdbCleanUp() { - if (tsSdbSync) { - sem_destroy(&tsSdbSync->sem); - pthread_mutex_destroy(&tsSdbSync->mutex); - walClose(tsSdbSync->wal); - free(tsSdbSync); - tsSdbSync = NULL; + if (tsSdbObj) { + sem_destroy(&tsSdbObj->sem); + pthread_mutex_destroy(&tsSdbObj->mutex); + walClose(tsSdbObj->wal); + free(tsSdbObj); + tsSdbObj = NULL; } } +SSdbObject *sdbGetObj() { + return tsSdbObj; +} + void sdbIncRef(void *handle, void *pRow) { if (pRow) { SSdbTable *pTable = handle; @@ -278,20 +264,20 @@ static int32_t sdbUpdateLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_t action) { int32_t code = 0; - pthread_mutex_lock(&tsSdbSync->mutex); - tsSdbSync->version++; - pHead->version = tsSdbSync->version; + pthread_mutex_lock(&tsSdbObj->mutex); + tsSdbObj->version++; + pHead->version = tsSdbObj->version; code = sdbForwardDbReqToPeer(pHead); if (code != TSDB_CODE_SUCCESS) { - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code)); return code; } - code = walWrite(tsSdbSync->wal, pHead); - pthread_mutex_unlock(&tsSdbSync->mutex); + code = walWrite(tsSdbObj->wal, pHead); + pthread_mutex_unlock(&tsSdbObj->mutex); if (code < 0) { sdbError("table:%s, failed to %s record:%s to file, version:%" PRId64 ", reason:%s", pTable->tableName, @@ -301,26 +287,26 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ sdbGetkeyStr(pTable, pHead->cont), pHead->version); } - walFsync(tsSdbSync->wal); + walFsync(tsSdbObj->wal); free(pHead); return code; } static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_t action) { - pthread_mutex_lock(&tsSdbSync->mutex); - if (pHead->version <= tsSdbSync->version) { - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_lock(&tsSdbObj->mutex); + if (pHead->version <= tsSdbObj->version) { + pthread_mutex_unlock(&tsSdbObj->mutex); return TSDB_CODE_SUCCESS; - } else if (pHead->version != tsSdbSync->version + 1) { - pthread_mutex_unlock(&tsSdbSync->mutex); + } else if (pHead->version != tsSdbObj->version + 1) { + pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to restore %s record:%s from file, version:%" PRId64 " too large, sdb version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, - tsSdbSync->version); + tsSdbObj->version); return TSDB_CODE_OTHERS; } - tsSdbSync->version = pHead->version; + tsSdbObj->version = pHead->version; sdbTrace("table:%s, success to restore %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); @@ -335,7 +321,7 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_ if (code < 0) { sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } @@ -369,17 +355,17 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_ if (code < 0) { sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } code = sdbInsertLocal(pTable, &oper2); } - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } -static int sdbProcessWrite(void *param, void *data, int type) { +int sdbProcessWrite(void *param, void *data, int type) { SWalHead *pHead = data; int32_t tableId = pHead->msgType / 10; int32_t action = pHead->msgType % 10; @@ -426,7 +412,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; } @@ -453,6 +439,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { case SDB_KEY_STRING: rowSize = strlen((char *)pOper->pObj) + 1; break; + case SDB_KEY_INT: case SDB_KEY_AUTO: rowSize = sizeof(uint64_t); break; @@ -467,7 +454,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; memcpy(pHead->cont, pOper->pObj, rowSize); - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; } @@ -497,7 +484,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; }