From b286a296bf735ca0ecb99501c75e9540b4bb2f49 Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 14 Apr 2020 16:38:15 +0800 Subject: [PATCH 01/12] [TD-52] first version mpeer --- src/dnode/CMakeLists.txt | 4 ++ src/dnode/inc/dnodeMgmt.h | 1 + src/dnode/src/dnodeMClient.c | 2 +- src/dnode/src/dnodeMgmt.c | 4 ++ src/inc/mnode.h | 1 - src/inc/mpeer.h | 5 ++ src/mnode/inc/mgmtSdb.h | 12 ++++ src/mnode/src/mgmtMain.c | 10 +-- src/mnode/src/mgmtMnode.c | 5 +- src/mnode/src/mgmtSdb.c | 119 ++++++++++++++++------------------- 10 files changed, 87 insertions(+), 76 deletions(-) diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index a81f8c0c9d..ee05403a61 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -31,6 +31,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(taosd balance sync) ENDIF () + IF (TD_MPEER) + TARGET_LINK_LIBRARIES(taosd mpeer sync) + ENDIF () + SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index f944bd5add..0be6e40e75 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -24,6 +24,7 @@ int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); void dnodeMgmt(SRpcMsg *rpcMsg); void dnodeUpdateDnodeId(int32_t dnodeId); +int32_t dnodeGetDnodeId(); void* dnodeGetVnode(int32_t vgId); int32_t dnodeGetVnodeStatus(void *pVnode); diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 5dd015313a..53eea93137 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -294,4 +294,4 @@ uint32_t dnodeGetMnodeMasteIp() { void* dnodeGetMpeerInfos() { return &tsMnodeInfos; -} \ No newline at end of file +} diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 0ec769c0af..f4cbd3e1be 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -299,3 +299,7 @@ void dnodeUpdateDnodeId(int32_t dnodeId) { dnodeSaveDnodeId(); } } + +int32_t dnodeGetDnodeId() { + return tsDnodeId; +} \ No newline at end of file diff --git a/src/inc/mnode.h b/src/inc/mnode.h index f2c072453f..00b7519258 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -45,7 +45,6 @@ struct _mnode_obj; typedef struct _mnode_obj { int32_t mnodeId; - int32_t dnodeId; int64_t createdTime; int8_t reserved[14]; int8_t updateEnd[1]; diff --git a/src/inc/mpeer.h b/src/inc/mpeer.h index 157ea40119..e5051b39eb 100644 --- a/src/inc/mpeer.h +++ b/src/inc/mpeer.h @@ -45,6 +45,11 @@ void mpeerGetMpeerInfos(void *mpeers); char * mpeerGetMnodeStatusStr(int32_t status); char * mpeerGetMnodeRoleStr(int32_t role); +int32_t mpeerAddMnode(int32_t dnodeId); +int32_t mpeerRemoveMnode(int32_t dnodeId); + +int32_t sdbForwardDbReqToPeer(void *pHead); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index 83afa2a081..27f9a51650 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -34,6 +34,7 @@ typedef enum { typedef enum { SDB_KEY_STRING, + SDB_KEY_INT, SDB_KEY_AUTO } ESdbKeyType; @@ -66,8 +67,19 @@ typedef struct { int32_t (*updateAllFp)(); } SSdbTableDesc; +typedef struct { + int32_t code; + int64_t version; + void * sync; + void * wal; + sem_t sem; + pthread_mutex_t mutex; +} SSdbObject; + int32_t sdbInit(); void sdbCleanUp(); +SSdbObject *sdbGetObj(); +int sdbProcessWrite(void *param, void *data, int type); void * sdbOpenTable(SSdbTableDesc *desc); void sdbCloseTable(void *handle); diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index a074060f52..b6fb1ba425 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -109,6 +109,11 @@ int32_t mgmtStartSystem() { return -1; } + if (mpeerInit() < 0) { + mError("failed to init mpeers"); + return -1; + } + if (sdbInit() < 0) { mError("failed to init sdb"); return -1; @@ -122,11 +127,6 @@ int32_t mgmtStartSystem() { return -1; } - if (mpeerInit() < 0) { - mError("failed to init mpeers"); - return -1; - } - if (balanceInit() < 0) { mError("failed to init dnode balance") } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 82da454793..faa66d1fd9 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -79,7 +79,7 @@ void mpeerGetMpeerInfos(void *param) { strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName); } -void mpeerCleanupDnodes() {} +void mpeerCleanupMnodes() {} int32_t mpeerGetMnodesNum() { return 1; } void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; } @@ -91,12 +91,11 @@ bool mpeerCheckRedirect() { return false; } int32_t mpeerInit() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); - return mpeerInitMnodes(); } void mpeerCleanup() { - mpeerCleanupDnodes(); + mpeerCleanupMnodes(); } char *mpeerGetMnodeStatusStr(int32_t status) { diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index bc0d1b81f2..c47d9cd9d8 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -24,19 +24,11 @@ #include "tutil.h" #include "twal.h" #include "tsync.h" +#include "mpeer.h" #include "hashint.h" #include "hashstr.h" #include "mgmtSdb.h" -typedef struct { - int32_t code; - int64_t version; - void * sync; - void * wal; - sem_t sem; - pthread_mutex_t mutex; -} SSdbSync; - typedef struct _SSdbTable { char tableName[TSDB_DB_NAME_LEN + 1]; ESdbTable tableId; @@ -70,17 +62,16 @@ typedef enum { static SSdbTable *tsSdbTableList[SDB_TABLE_MAX] = {0}; static int32_t tsSdbNumOfTables = 0; -static SSdbSync * tsSdbSync; +static SSdbObject * tsSdbObj; -static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash}; -static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash}; -static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash}; -static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData}; -static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash}; -static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData}; -static int sdbProcessWrite(void *param, void *data, int type); +static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash}; +static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash, sdbAddIntHash}; +static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash, sdbDeleteIntHash}; +static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData, sdbGetIntHashData}; +static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash}; +static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; -uint64_t sdbGetVersion() { return tsSdbSync->version; } +uint64_t sdbGetVersion() { return tsSdbObj->version; } int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } @@ -101,6 +92,7 @@ static char *sdbGetkeyStr(SSdbTable *pTable, void *row) { switch (pTable->keyType) { case SDB_KEY_STRING: return (char *)row; + case SDB_KEY_INT: case SDB_KEY_AUTO: sprintf(str, "%d", *(int32_t *)row); return str; @@ -113,40 +105,30 @@ static void *sdbGetTableFromId(int32_t tableId) { return tsSdbTableList[tableId]; } -// static void mpeerConfirmForward(void *ahandle, void *param, int32_t code) { -// sem_post(&tsSdbSync->sem); -// mPrint("mpeerConfirmForward"); -// } - -static int32_t sdbForwardDbReqToPeer(SWalHead *pHead) { - // int32_t code = syncForwardToPeer(NULL, pHead, NULL); - // if (code < 0) { - // return code; - // } - - // sem_wait(&tsSdbSync->sem); - // return tsSdbSync->code; +#ifndef _MPEER +int32_t sdbForwardDbReqToPeer(void *pHead) { return TSDB_CODE_SUCCESS; } +#endif int32_t sdbInit() { - tsSdbSync = calloc(1, sizeof(SSdbSync)); - sem_init(&tsSdbSync->sem, 0, 0); - pthread_mutex_init(&tsSdbSync->mutex, NULL); + tsSdbObj = calloc(1, sizeof(SSdbObject)); + sem_init(&tsSdbObj->sem, 0, 0); + pthread_mutex_init(&tsSdbObj->mutex, NULL); SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1}; - tsSdbSync->wal = walOpen(tsMnodeDir, &walCfg); - if (tsSdbSync->wal == NULL) { + tsSdbObj->wal = walOpen(tsMnodeDir, &walCfg); + if (tsSdbObj->wal == NULL) { sdbError("failed to open sdb in %s", tsMnodeDir); return -1; } sdbTrace("open sdb file for read"); - walRestore(tsSdbSync->wal, tsSdbSync, sdbProcessWrite); + walRestore(tsSdbObj->wal, tsSdbObj, sdbProcessWrite); int32_t totalRows = 0; int32_t numOfTables = 0; - for (int32_t tableId = SDB_TABLE_DNODE; tableId < SDB_TABLE_MAX; ++tableId) { + for (int32_t tableId = SDB_TABLE_MNODE; tableId < SDB_TABLE_MAX; ++tableId) { SSdbTable *pTable = sdbGetTableFromId(tableId); if (pTable == NULL) continue; if (pTable->updateAllFp) { @@ -158,20 +140,24 @@ int32_t sdbInit() { sdbTrace("table:%s, is initialized, numOfRows:%d", pTable->tableName, pTable->numOfRows); } - sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbSync->version, totalRows, numOfTables); + sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables); return TSDB_CODE_SUCCESS; } void sdbCleanUp() { - if (tsSdbSync) { - sem_destroy(&tsSdbSync->sem); - pthread_mutex_destroy(&tsSdbSync->mutex); - walClose(tsSdbSync->wal); - free(tsSdbSync); - tsSdbSync = NULL; + if (tsSdbObj) { + sem_destroy(&tsSdbObj->sem); + pthread_mutex_destroy(&tsSdbObj->mutex); + walClose(tsSdbObj->wal); + free(tsSdbObj); + tsSdbObj = NULL; } } +SSdbObject *sdbGetObj() { + return tsSdbObj; +} + void sdbIncRef(void *handle, void *pRow) { if (pRow) { SSdbTable *pTable = handle; @@ -278,20 +264,20 @@ static int32_t sdbUpdateLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_t action) { int32_t code = 0; - pthread_mutex_lock(&tsSdbSync->mutex); - tsSdbSync->version++; - pHead->version = tsSdbSync->version; + pthread_mutex_lock(&tsSdbObj->mutex); + tsSdbObj->version++; + pHead->version = tsSdbObj->version; code = sdbForwardDbReqToPeer(pHead); if (code != TSDB_CODE_SUCCESS) { - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code)); return code; } - code = walWrite(tsSdbSync->wal, pHead); - pthread_mutex_unlock(&tsSdbSync->mutex); + code = walWrite(tsSdbObj->wal, pHead); + pthread_mutex_unlock(&tsSdbObj->mutex); if (code < 0) { sdbError("table:%s, failed to %s record:%s to file, version:%" PRId64 ", reason:%s", pTable->tableName, @@ -301,26 +287,26 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ sdbGetkeyStr(pTable, pHead->cont), pHead->version); } - walFsync(tsSdbSync->wal); + walFsync(tsSdbObj->wal); free(pHead); return code; } static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_t action) { - pthread_mutex_lock(&tsSdbSync->mutex); - if (pHead->version <= tsSdbSync->version) { - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_lock(&tsSdbObj->mutex); + if (pHead->version <= tsSdbObj->version) { + pthread_mutex_unlock(&tsSdbObj->mutex); return TSDB_CODE_SUCCESS; - } else if (pHead->version != tsSdbSync->version + 1) { - pthread_mutex_unlock(&tsSdbSync->mutex); + } else if (pHead->version != tsSdbObj->version + 1) { + pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to restore %s record:%s from file, version:%" PRId64 " too large, sdb version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, - tsSdbSync->version); + tsSdbObj->version); return TSDB_CODE_OTHERS; } - tsSdbSync->version = pHead->version; + tsSdbObj->version = pHead->version; sdbTrace("table:%s, success to restore %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); @@ -335,7 +321,7 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_ if (code < 0) { sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } @@ -369,17 +355,17 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_ if (code < 0) { sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } code = sdbInsertLocal(pTable, &oper2); } - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } -static int sdbProcessWrite(void *param, void *data, int type) { +int sdbProcessWrite(void *param, void *data, int type) { SWalHead *pHead = data; int32_t tableId = pHead->msgType / 10; int32_t action = pHead->msgType % 10; @@ -426,7 +412,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; } @@ -453,6 +439,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { case SDB_KEY_STRING: rowSize = strlen((char *)pOper->pObj) + 1; break; + case SDB_KEY_INT: case SDB_KEY_AUTO: rowSize = sizeof(uint64_t); break; @@ -467,7 +454,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; memcpy(pHead->cont, pOper->pObj, rowSize); - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; } @@ -497,7 +484,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; } From a7e1c7cec6cde547734d4181c38ad1b8bd6794e7 Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 14 Apr 2020 23:33:22 +0800 Subject: [PATCH 02/12] [TD-52] refactor sdb codes --- src/inc/mnode.h | 5 ++-- src/inc/mpeer.h | 19 ++++++------- src/inc/tbalance.h | 1 + src/mnode/inc/mgmtSdb.h | 7 ++--- src/mnode/src/mgmtDb.c | 4 +-- src/mnode/src/mgmtMnode.c | 54 ++++++++++++------------------------- src/mnode/src/mgmtSdb.c | 37 +++++++++---------------- src/mnode/src/mgmtShell.c | 21 +-------------- src/mnode/src/mgmtTable.c | 8 +++--- src/mnode/src/mgmtUser.c | 4 +-- src/mnode/src/mgmtVgroup.c | 4 +-- tests/script/tmp/dnode2.sim | 6 ----- tests/script/tmp/mnodes.sim | 7 +++++ 13 files changed, 62 insertions(+), 115 deletions(-) delete mode 100644 tests/script/tmp/dnode2.sim create mode 100644 tests/script/tmp/mnodes.sim diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 00b7519258..e8a0ba3bcc 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -49,11 +49,10 @@ typedef struct _mnode_obj { int8_t reserved[14]; int8_t updateEnd[1]; int32_t refCount; - int8_t role; - int8_t status; - uint16_t port; uint32_t privateIp; uint32_t publicIp; + uint16_t port; + int8_t role; char mnodeName[TSDB_NODE_NAME_LEN + 1]; } SMnodeObj; diff --git a/src/inc/mpeer.h b/src/inc/mpeer.h index e5051b39eb..e7abf09321 100644 --- a/src/inc/mpeer.h +++ b/src/inc/mpeer.h @@ -28,27 +28,28 @@ enum _TAOS_MN_STATUS { TAOS_MN_STATUS_READY }; +// general implementation int32_t mpeerInit(); void mpeerCleanup(); + +// special implementation +int32_t mpeerInitMnodes(); +void mpeerCleanupMnodes(); +int32_t mpeerAddMnode(int32_t dnodeId); +int32_t mpeerRemoveMnode(int32_t dnodeId); + +void * mpeerGetMnode(int32_t mnodeId); int32_t mpeerGetMnodesNum(); void * mpeerGetNextMnode(void *pNode, struct _mnode_obj **pMnode); void mpeerReleaseMnode(struct _mnode_obj *pMnode); -bool mpeerInServerStatus(); bool mpeerIsMaster(); -bool mpeerCheckRedirect(); void mpeerGetPrivateIpList(SRpcIpSet *ipSet); void mpeerGetPublicIpList(SRpcIpSet *ipSet); void mpeerGetMpeerInfos(void *mpeers); -char * mpeerGetMnodeStatusStr(int32_t status); -char * mpeerGetMnodeRoleStr(int32_t role); - -int32_t mpeerAddMnode(int32_t dnodeId); -int32_t mpeerRemoveMnode(int32_t dnodeId); - -int32_t sdbForwardDbReqToPeer(void *pHead); +int32_t mpeerForwardReqToPeer(void *pHead); #ifdef __cplusplus } diff --git a/src/inc/tbalance.h b/src/inc/tbalance.h index 8cf8cb9fb9..c73d6a91a9 100644 --- a/src/inc/tbalance.h +++ b/src/inc/tbalance.h @@ -31,6 +31,7 @@ struct _dnode_obj; int32_t balanceInit(); void balanceCleanUp(); void balanceNotify(); +void balanceReset(); int32_t balanceAllocVnodes(struct _vg_obj *pVgroup); int32_t balanceDropDnode(struct _dnode_obj *pDnode); diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index 27f9a51650..2804d40a71 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -64,25 +64,22 @@ typedef struct { int32_t (*encodeFp)(SSdbOperDesc *pOper); int32_t (*decodeFp)(SSdbOperDesc *pDesc); int32_t (*destroyFp)(SSdbOperDesc *pDesc); - int32_t (*updateAllFp)(); + int32_t (*restoredFp)(); } SSdbTableDesc; typedef struct { - int32_t code; int64_t version; - void * sync; void * wal; - sem_t sem; pthread_mutex_t mutex; } SSdbObject; int32_t sdbInit(); void sdbCleanUp(); SSdbObject *sdbGetObj(); -int sdbProcessWrite(void *param, void *data, int type); void * sdbOpenTable(SSdbTableDesc *desc); void sdbCloseTable(void *handle); +int sdbProcessWrite(void *param, void *data, int type); int32_t sdbInsertRow(SSdbOperDesc *pOper); int32_t sdbDeleteRow(SSdbOperDesc *pOper); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 946ec29d8c..089bf494e7 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -102,7 +102,7 @@ static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionUpdateAll() { +static int32_t mgmtDbActionRestored() { return 0; } @@ -123,7 +123,7 @@ int32_t mgmtInitDbs() { .encodeFp = mgmtDbActionEncode, .decodeFp = mgmtDbActionDecode, .destroyFp = mgmtDbActionDestroy, - .updateAllFp = mgmtDbActionUpdateAll + .restoredFp = mgmtDbActionRestored }; tsDbSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index faa66d1fd9..ca18d6bdba 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -19,12 +19,9 @@ #include "trpc.h" #include "tsync.h" #include "mpeer.h" -#include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtUser.h" -extern int32_t mpeerInitMnodes(); -extern void mpeerCleanupMnodes(); static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -34,18 +31,24 @@ static SMnodeObj tsMnodeObj = {0}; int32_t mpeerInitMnodes() { tsMnodeObj.mnodeId = 1; - tsMnodeObj.dnodeId = 1; tsMnodeObj.privateIp = inet_addr(tsPrivateIp); tsMnodeObj.publicIp = inet_addr(tsPublicIp); tsMnodeObj.createdTime = taosGetTimestampMs(); tsMnodeObj.role = TAOS_SYNC_ROLE_MASTER; - tsMnodeObj.status = TAOS_MN_STATUS_READY; tsMnodeObj.port = tsMnodeDnodePort; sprintf(tsMnodeObj.mnodeName, "m%d", tsMnodeObj.mnodeId); return TSDB_CODE_SUCCESS; } +void mpeerCleanupMnodes() {} +int32_t mpeerAddMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } +int32_t mpeerRemoveMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } +void * mpeerGetMnode(int32_t mnodeId) { return &tsMnodeObj; } +int32_t mpeerGetMnodesNum() { return 1; } +void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} +bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } + void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { if (*pMnode == NULL) { *pMnode = &tsMnodeObj; @@ -58,20 +61,21 @@ void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { void mpeerGetPrivateIpList(SRpcIpSet *ipSet) { ipSet->inUse = 0; - ipSet->port = htons(tsMnodeDnodePort); ipSet->numOfIps = 1; + ipSet->port = htons(tsMnodeObj.port); ipSet->ip[0] = htonl(tsMnodeObj.privateIp); } void mpeerGetPublicIpList(SRpcIpSet *ipSet) { ipSet->inUse = 0; - ipSet->port = htons(tsMnodeDnodePort); ipSet->numOfIps = 1; + ipSet->port = htons(tsMnodeObj.port); ipSet->ip[0] = htonl(tsMnodeObj.publicIp); } void mpeerGetMpeerInfos(void *param) { SDMNodeInfos *mpeers = param; + mpeers->inUse = 0; mpeers->nodeNum = 1; mpeers->nodeInfos[0].nodeId = htonl(tsMnodeObj.mnodeId); mpeers->nodeInfos[0].nodeIp = htonl(tsMnodeObj.privateIp); @@ -79,12 +83,9 @@ void mpeerGetMpeerInfos(void *param) { strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName); } -void mpeerCleanupMnodes() {} -int32_t mpeerGetMnodesNum() { return 1; } -void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} -bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; } -bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } -bool mpeerCheckRedirect() { return false; } +int32_t mpeerForwardReqToPeer(void *pHead) { + return TSDB_CODE_SUCCESS; +} #endif @@ -98,20 +99,7 @@ void mpeerCleanup() { mpeerCleanupMnodes(); } -char *mpeerGetMnodeStatusStr(int32_t status) { - switch (status) { - case TAOS_MN_STATUS_OFFLINE: - return "offline"; - case TAOS_MN_STATUS_DROPPING: - return "dropping"; - case TAOS_MN_STATUS_READY: - return "ready"; - default: - return "undefined"; - } -} - -char *mpeerGetMnodeRoleStr(int32_t role) { +static char *mpeerGetMnodeRoleStr(int32_t role) { switch (role) { case TAOS_SYNC_ROLE_OFFLINE: return "offline"; @@ -159,12 +147,6 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = 10; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "status"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - pShow->bytes[cols] = 10; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "role"); @@ -219,14 +201,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, mpeerGetMnodeStatusStr(pMnode->status)); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; strcpy(pWrite, mpeerGetMnodeRoleStr(pMnode->role)); cols++; numOfRows++; + + mpeerReleaseMnode(pMnode); } pShow->numOfReads += numOfRows; diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index c47d9cd9d8..3038b0eadc 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -15,18 +15,13 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosdef.h" #include "taoserror.h" -#include "tchecksum.h" -#include "tglobalcfg.h" #include "tlog.h" #include "trpc.h" -#include "tutil.h" #include "twal.h" -#include "tsync.h" -#include "mpeer.h" #include "hashint.h" #include "hashstr.h" +#include "mpeer.h" #include "mgmtSdb.h" typedef struct _SSdbTable { @@ -39,13 +34,13 @@ typedef struct _SSdbTable { int32_t autoIndex; int64_t numOfRows; void * iHandle; - int32_t (*insertFp)(SSdbOperDesc *pDesc); - int32_t (*deleteFp)(SSdbOperDesc *pOper); - int32_t (*updateFp)(SSdbOperDesc *pOper); - int32_t (*decodeFp)(SSdbOperDesc *pOper); - int32_t (*encodeFp)(SSdbOperDesc *pOper); - int32_t (*destroyFp)(SSdbOperDesc *pOper); - int32_t (*updateAllFp)(); + int32_t (*insertFp)(SSdbOperDesc *pDesc); + int32_t (*deleteFp)(SSdbOperDesc *pOper); + int32_t (*updateFp)(SSdbOperDesc *pOper); + int32_t (*decodeFp)(SSdbOperDesc *pOper); + int32_t (*encodeFp)(SSdbOperDesc *pOper); + int32_t (*destroyFp)(SSdbOperDesc *pOper); + int32_t (*restoredFp)(); pthread_mutex_t mutex; } SSdbTable; @@ -105,15 +100,8 @@ static void *sdbGetTableFromId(int32_t tableId) { return tsSdbTableList[tableId]; } -#ifndef _MPEER -int32_t sdbForwardDbReqToPeer(void *pHead) { - return TSDB_CODE_SUCCESS; -} -#endif - int32_t sdbInit() { tsSdbObj = calloc(1, sizeof(SSdbObject)); - sem_init(&tsSdbObj->sem, 0, 0); pthread_mutex_init(&tsSdbObj->mutex, NULL); SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1}; @@ -131,8 +119,8 @@ int32_t sdbInit() { for (int32_t tableId = SDB_TABLE_MNODE; tableId < SDB_TABLE_MAX; ++tableId) { SSdbTable *pTable = sdbGetTableFromId(tableId); if (pTable == NULL) continue; - if (pTable->updateAllFp) { - (*pTable->updateAllFp)(); + if (pTable->restoredFp) { + (*pTable->restoredFp)(); } totalRows += pTable->numOfRows; @@ -146,7 +134,6 @@ int32_t sdbInit() { void sdbCleanUp() { if (tsSdbObj) { - sem_destroy(&tsSdbObj->sem); pthread_mutex_destroy(&tsSdbObj->mutex); walClose(tsSdbObj->wal); free(tsSdbObj); @@ -268,7 +255,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ tsSdbObj->version++; pHead->version = tsSdbObj->version; - code = sdbForwardDbReqToPeer(pHead); + code = mpeerForwardReqToPeer(pHead); if (code != TSDB_CODE_SUCCESS) { pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName, @@ -523,7 +510,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->encodeFp = pDesc->encodeFp; pTable->decodeFp = pDesc->decodeFp; pTable->destroyFp = pDesc->destroyFp; - pTable->updateAllFp = pDesc->updateAllFp; + pTable->restoredFp = pDesc->restoredFp; if (sdbInitIndexFp[pTable->keyType] != NULL) { pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SRowMeta)); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index dbd7627d3f..5010429db3 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -42,7 +42,6 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *sec static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); -static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg); static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg); @@ -142,19 +141,13 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } - if (mpeerCheckRedirect()) { + if (!mpeerIsMaster()) { // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER); rpcFreeCont(rpcMsg->pCont); return; } - if (!mpeerInServerStatus()) { - mgmtProcessMsgWhileNotReady(rpcMsg); - rpcFreeCont(rpcMsg->pCont); - return; - } - if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED); rpcFreeCont(rpcMsg->pCont); @@ -501,18 +494,6 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { rpcSendResponse(&rpcRsp); } -static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg) { - mTrace("%s is ignored since SDB is not ready", taosMsg[rpcMsg->msgType]); - SRpcMsg rpcRsp = { - .msgType = 0, - .pCont = 0, - .contLen = 0, - .code = TSDB_CODE_NOT_READY, - .handle = rpcMsg->handle - }; - rpcSendResponse(&rpcRsp); -} - void mgmtSendSimpleResp(void *thandle, int32_t code) { SRpcMsg rpcRsp = { .msgType = 0, diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index feed12f97e..1c384fdfdf 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -220,7 +220,7 @@ static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionUpdateAll() { +static int32_t mgmtChildTableActionRestored() { void *pNode = NULL; void *pLastNode = NULL; SChildTableObj *pTable = NULL; @@ -320,7 +320,7 @@ static int32_t mgmtInitChildTables() { .encodeFp = mgmtChildTableActionEncode, .decodeFp = mgmtChildTableActionDecode, .destroyFp = mgmtChildTableActionDestroy, - .updateAllFp = mgmtChildTableActionUpdateAll + .restoredFp = mgmtChildTableActionRestored }; tsChildTableSdb = sdbOpenTable(&tableDesc); @@ -414,7 +414,7 @@ static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionUpdateAll() { +static int32_t mgmtSuperTableActionRestored() { return 0; } @@ -435,7 +435,7 @@ static int32_t mgmtInitSuperTables() { .encodeFp = mgmtSuperTableActionEncode, .decodeFp = mgmtSuperTableActionDecode, .destroyFp = mgmtSuperTableActionDestroy, - .updateAllFp = mgmtSuperTableActionUpdateAll + .restoredFp = mgmtSuperTableActionRestored }; tsSuperTableSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 3a49e56331..ef01faf6ba 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -84,7 +84,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionUpdateAll() { +static int32_t mgmtUserActionRestored() { SAcctObj *pAcct = acctGetAcct("root"); mgmtCreateUser(pAcct, "root", "taosdata"); mgmtCreateUser(pAcct, "monitor", tsInternalPass); @@ -111,7 +111,7 @@ int32_t mgmtInitUsers() { .encodeFp = mgmtUserActionEncode, .decodeFp = mgmtUserActionDecode, .destroyFp = mgmtUserActionDestroy, - .updateAllFp = mgmtUserActionUpdateAll + .restoredFp = mgmtUserActionRestored }; tsUserSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 19468dc547..cc8dba52dd 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -152,7 +152,7 @@ static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionUpdateAll() { +static int32_t mgmtVgroupActionRestored() { return 0; } @@ -173,7 +173,7 @@ int32_t mgmtInitVgroups() { .encodeFp = mgmtVgroupActionEncode, .decodeFp = mgmtVgroupActionDecode, .destroyFp = mgmtVgroupActionDestroy, - .updateAllFp = mgmtVgroupActionUpdateAll, + .restoredFp = mgmtVgroupActionRestored, }; tsVgroupSdb = sdbOpenTable(&tableDesc); diff --git a/tests/script/tmp/dnode2.sim b/tests/script/tmp/dnode2.sim deleted file mode 100644 index 6d9a844fb6..0000000000 --- a/tests/script/tmp/dnode2.sim +++ /dev/null @@ -1,6 +0,0 @@ -system sh/stop_dnodes.sh -system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 -system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 -system sh/exec_up.sh -n dnode1 -s start -system sh/exec_up.sh -n dnode2 -s start -sql connect \ No newline at end of file diff --git a/tests/script/tmp/mnodes.sim b/tests/script/tmp/mnodes.sim new file mode 100644 index 0000000000..32e72f16ff --- /dev/null +++ b/tests/script/tmp/mnodes.sim @@ -0,0 +1,7 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 +system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 +system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3 +system sh/cfg.sh -n dnode1 -c numOfMPeers -v 3 +system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3 +system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3 From cc52d293efc21124caa3e28c3b914fc64535cebc Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 15:09:47 +0800 Subject: [PATCH 03/12] fix error while sync sdb --- src/dnode/inc/dnodeMClient.h | 1 + src/dnode/inc/dnodeMgmt.h | 2 - src/dnode/src/dnodeMClient.c | 138 +++++++++++++++++++++++++++++++-- src/dnode/src/dnodeMgmt.c | 121 ----------------------------- src/inc/tcluster.h | 1 + src/mnode/inc/mgmtSdb.h | 6 +- src/mnode/src/mgmtDnode.c | 1 + src/mnode/src/mgmtSdb.c | 14 ++-- src/vnode/main/src/vnodeMain.c | 2 +- 9 files changed, 146 insertions(+), 140 deletions(-) diff --git a/src/dnode/inc/dnodeMClient.h b/src/dnode/inc/dnodeMClient.h index ba63894631..594fb84d3b 100644 --- a/src/dnode/inc/dnodeMClient.h +++ b/src/dnode/inc/dnodeMClient.h @@ -25,6 +25,7 @@ void dnodeCleanupMClient(); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); uint32_t dnodeGetMnodeMasteIp(); void * dnodeGetMpeerInfos(); +int32_t dnodeGetDnodeId(); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 0be6e40e75..b8d01916fe 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -23,8 +23,6 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); void dnodeMgmt(SRpcMsg *rpcMsg); -void dnodeUpdateDnodeId(int32_t dnodeId); -int32_t dnodeGetDnodeId(); void* dnodeGetVnode(int32_t vgId); int32_t dnodeGetVnodeStatus(void *pVnode); diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 53eea93137..42eca2152b 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -21,30 +21,51 @@ #include "trpc.h" #include "tutil.h" #include "tsync.h" +#include "ttime.h" +#include "ttimer.h" #include "dnode.h" #include "dnodeMClient.h" #include "dnodeModule.h" #include "dnodeMgmt.h" +#include "vnode.h" #define MPEER_CONTENT_LEN 2000 static bool dnodeReadMnodeIpList(); static void dnodeSaveMnodeIpList(); +static void dnodeReadDnodeInfo(); +static void dnodeUpdateDnodeInfo(int32_t dnodeId); static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); static void dnodeProcessStatusRsp(SRpcMsg *pMsg); +static void dnodeSendStatusMsg(void *handle, void *tmrId); static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); + static void *tsDnodeMClientRpc = NULL; static SRpcIpSet tsMnodeIpList = {0}; static SDMNodeInfos tsMnodeInfos = {0}; +static void *tsDnodeTmr = NULL; +static void *tsStatusTimer = NULL; +static uint32_t tsRebootTime; +static int32_t tsDnodeId = 0; +static char tsDnodeName[TSDB_NODE_NAME_LEN]; int32_t dnodeInitMClient() { + dnodeReadDnodeInfo(); + tsRebootTime = taosGetTimestampSec(); + + tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); + if (tsDnodeTmr == NULL) { + dError("failed to init dnode timer"); + return -1; + } + if (!dnodeReadMnodeIpList()) { memset(&tsMnodeIpList, 0, sizeof(SRpcIpSet)); memset(&tsMnodeInfos, 0, sizeof(SDMNodeInfos)); tsMnodeIpList.port = tsMnodeDnodePort; tsMnodeIpList.numOfIps = 1; tsMnodeIpList.ip[0] = inet_addr(tsMasterIp); - if (tsSecondIp[0]) { + if (strcmp(tsSecondIp, tsMasterIp) != 0) { tsMnodeIpList.numOfIps = 2; tsMnodeIpList.ip[1] = inet_addr(tsSecondIp); } @@ -57,8 +78,6 @@ int32_t dnodeInitMClient() { } } - tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; - SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; @@ -79,11 +98,24 @@ int32_t dnodeInitMClient() { return -1; } + tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; + taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); + dPrint("mnode rpc client is opened"); return 0; } void dnodeCleanupMClient() { + if (tsStatusTimer != NULL) { + taosTmrStopA(&tsStatusTimer); + tsStatusTimer = NULL; + } + + if (tsDnodeTmr != NULL) { + taosTmrCleanUp(tsDnodeTmr); + tsDnodeTmr = NULL; + } + if (tsDnodeMClientRpc) { rpcClose(tsDnodeMClientRpc); tsDnodeMClientRpc = NULL; @@ -104,6 +136,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { if (pMsg->code != TSDB_CODE_SUCCESS) { dError("status rsp is received, error:%s", tstrerror(pMsg->code)); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); return; } @@ -111,6 +144,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { SDMNodeInfos *mpeers = &pStatusRsp->mpeers; if (mpeers->nodeNum <= 0) { dError("status msg is invalid, num of ips is %d", mpeers->nodeNum); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); return; } @@ -122,14 +156,16 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { mgmtIpSet.ip[i] = htonl(mpeers->nodeInfos[i].nodeIp); } - if (memcmp(&mgmtIpSet, &tsMnodeIpList, sizeof(SRpcIpSet)) != 0) { + if (memcmp(&mgmtIpSet, &tsMnodeIpList, sizeof(SRpcIpSet)) != 0 || tsMnodeInfos.nodeNum == 0) { memcpy(&tsMnodeIpList, &mgmtIpSet, sizeof(SRpcIpSet)); - memcpy(&tsMnodeInfos, mpeers, sizeof(SDMNodeInfos)); + tsMnodeInfos.inUse = mpeers->inUse; + tsMnodeInfos.nodeNum = mpeers->nodeNum; dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse); for (int32_t i = 0; i < mpeers->nodeNum; i++) { tsMnodeInfos.nodeInfos[i].nodeId = htonl(mpeers->nodeInfos[i].nodeId); tsMnodeInfos.nodeInfos[i].nodeIp = htonl(mpeers->nodeInfos[i].nodeIp); tsMnodeInfos.nodeInfos[i].nodePort = htons(mpeers->nodeInfos[i].nodePort); + strcpy(tsMnodeInfos.nodeInfos[i].nodeName, mpeers->nodeInfos[i].nodeName); dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId, taosIpStr(tsMnodeInfos.nodeInfos[i].nodeId), tsMnodeInfos.nodeInfos[i].nodePort, tsMnodeInfos.nodeInfos[i].nodeName); @@ -144,7 +180,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { pState->dnodeId = htonl(pState->dnodeId); dnodeProcessModuleStatus(pState->moduleStatus); - dnodeUpdateDnodeId(pState->dnodeId); + dnodeUpdateDnodeInfo(pState->dnodeId); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); } void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { @@ -295,3 +332,92 @@ uint32_t dnodeGetMnodeMasteIp() { void* dnodeGetMpeerInfos() { return &tsMnodeInfos; } + +static void dnodeSendStatusMsg(void *handle, void *tmrId) { + if (tsDnodeTmr == NULL) { + dError("dnode timer is already released"); + return; + } + + if (tsStatusTimer == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + dError("failed to start status timer"); + return; + } + + int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SDMStatusMsg *pStatus = rpcMallocCont(contLen); + if (pStatus == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + dError("failed to malloc status message"); + return; + } + + strcpy(pStatus->dnodeName, tsDnodeName); + pStatus->version = htonl(tsVersion); + pStatus->dnodeId = htonl(tsDnodeId); + pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); + pStatus->publicIp = htonl(inet_addr(tsPublicIp)); + pStatus->lastReboot = htonl(tsRebootTime); + pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); + pStatus->numOfCores = htons((uint16_t) tsNumOfCores); + pStatus->diskAvailable = tsAvailDataDirGB; + pStatus->alternativeRole = (uint8_t) tsAlternativeRole; + + vnodeBuildStatusMsg(pStatus); + contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); + pStatus->openVnodes = htons(pStatus->openVnodes); + + SRpcMsg rpcMsg = { + .pCont = pStatus, + .contLen = contLen, + .msgType = TSDB_MSG_TYPE_DM_STATUS + }; + + dnodeSendMsgToMnode(&rpcMsg); +} + +static void dnodeReadDnodeInfo() { + char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; + sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); + + FILE *fp = fopen(dnodeIdFile, "r"); + if (!fp) return; + + char option[32] = {0}; + int32_t value = 0; + int32_t num = 0; + + num = fscanf(fp, "%s %d", option, &value); + if (num != 2) return; + if (strcmp(option, "dnodeId") != 0) return; + tsDnodeId = value;; + + fclose(fp); + dPrint("read dnodeId:%d successed", tsDnodeId); +} + +static void dnodeSaveDnodeInfo() { + char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; + sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); + + FILE *fp = fopen(dnodeIdFile, "w"); + if (!fp) return; + + fprintf(fp, "dnodeId %d\n", tsDnodeId); + + fclose(fp); + dPrint("save dnodeId successed"); +} + +void dnodeUpdateDnodeInfo(int32_t dnodeId) { + if (tsDnodeId == 0) { + dPrint("dnodeId is set to %d", dnodeId); + tsDnodeId = dnodeId; + dnodeSaveDnodeInfo(); + } +} + +int32_t dnodeGetDnodeId() { + return tsDnodeId; +} \ No newline at end of file diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index f4cbd3e1be..abfee2239b 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -21,8 +21,6 @@ #include "tlog.h" #include "trpc.h" #include "tsdb.h" -#include "ttime.h" -#include "ttimer.h" #include "twal.h" #include "dnodeMClient.h" #include "dnodeMgmt.h" @@ -38,52 +36,23 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); -static void dnodeSendStatusMsg(void *handle, void *tmrId); -static void dnodeReadDnodeId(); - -static void *tsDnodeTmr = NULL; -static void *tsStatusTimer = NULL; -static uint32_t tsRebootTime; -static int32_t tsDnodeId = 0; -static char tsDnodeName[TSDB_NODE_NAME_LEN]; int32_t dnodeInitMgmt() { - dnodeReadDnodeId(); - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; - tsRebootTime = taosGetTimestampSec(); - - tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); - if (tsDnodeTmr == NULL) { - dError("failed to init dnode timer"); - return -1; - } - int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; } - taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); return TSDB_CODE_SUCCESS; } void dnodeCleanupMgmt() { - if (tsStatusTimer != NULL) { - taosTmrStopA(&tsStatusTimer); - tsStatusTimer = NULL; - } - - if (tsDnodeTmr != NULL) { - taosTmrCleanUp(tsDnodeTmr); - tsDnodeTmr = NULL; - } - dnodeCloseVnodes(); } @@ -213,93 +182,3 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; return tsCfgDynamicOptions(pCfg->config); } - -static void dnodeSendStatusMsg(void *handle, void *tmrId) { - if (tsDnodeTmr == NULL) { - dError("dnode timer is already released"); - return; - } - - if (tsStatusTimer == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - dError("failed to start status timer"); - return; - } - - int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); - SDMStatusMsg *pStatus = rpcMallocCont(contLen); - if (pStatus == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - dError("failed to malloc status message"); - return; - } - - strcpy(pStatus->dnodeName, tsDnodeName); - pStatus->version = htonl(tsVersion); - pStatus->dnodeId = htonl(tsDnodeId); - pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); - pStatus->publicIp = htonl(inet_addr(tsPublicIp)); - pStatus->lastReboot = htonl(tsRebootTime); - pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); - pStatus->numOfCores = htons((uint16_t) tsNumOfCores); - pStatus->diskAvailable = tsAvailDataDirGB; - pStatus->alternativeRole = (uint8_t) tsAlternativeRole; - - vnodeBuildStatusMsg(pStatus); - contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); - pStatus->openVnodes = htons(pStatus->openVnodes); - - SRpcMsg rpcMsg = { - .pCont = pStatus, - .contLen = contLen, - .msgType = TSDB_MSG_TYPE_DM_STATUS - }; - - dnodeSendMsgToMnode(&rpcMsg); - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); -} - -static void dnodeReadDnodeId() { - char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; - sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); - - FILE *fp = fopen(dnodeIdFile, "r"); - if (!fp) return; - - char option[32] = {0}; - int32_t value = 0; - int32_t num = 0; - - num = fscanf(fp, "%s %d", option, &value); - if (num != 2) return; - if (strcmp(option, "dnodeId") != 0) return; - tsDnodeId = value;; - - fclose(fp); - dPrint("read dnodeId:%d successed", tsDnodeId); -} - -static void dnodeSaveDnodeId() { - char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; - sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); - - FILE *fp = fopen(dnodeIdFile, "w"); - if (!fp) return; - - fprintf(fp, "dnodeId %d\n", tsDnodeId); - - fclose(fp); - dPrint("save dnodeId successed"); -} - -void dnodeUpdateDnodeId(int32_t dnodeId) { - if (tsDnodeId == 0) { - dPrint("dnodeId is set to %d", dnodeId); - tsDnodeId = dnodeId; - dnodeSaveDnodeId(); - } -} - -int32_t dnodeGetDnodeId() { - return tsDnodeId; -} \ No newline at end of file diff --git a/src/inc/tcluster.h b/src/inc/tcluster.h index 769a819b90..a56285fe1c 100644 --- a/src/inc/tcluster.h +++ b/src/inc/tcluster.h @@ -37,6 +37,7 @@ int32_t clusterInit(); void clusterCleanUp(); char* clusterGetDnodeStatusStr(int32_t dnodeStatus); bool clusterCheckModuleInDnode(struct _dnode_obj *pDnode, int moduleType); +void clusterMonitorDnodeModule(); int32_t clusterInitDnodes(); void clusterCleanupDnodes(); diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index 2804d40a71..8ecb5ef152 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -21,8 +21,8 @@ extern "C" { #endif typedef enum { - SDB_TABLE_MNODE = 0, - SDB_TABLE_DNODE = 1, + SDB_TABLE_DNODE = 0, + SDB_TABLE_MNODE = 1, SDB_TABLE_ACCOUNT = 2, SDB_TABLE_USER = 3, SDB_TABLE_DB = 4, @@ -90,7 +90,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow); void sdbIncRef(void *thandle, void *pRow); void sdbDecRef(void *thandle, void *pRow); int64_t sdbGetNumOfRows(void *handle); -int64_t sdbGetId(void *handle); +int32_t sdbGetId(void *handle); uint64_t sdbGetVersion(); #ifdef __cplusplus diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 51ac4e842d..196b8c1026 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -208,6 +208,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; balanceNotify(); + clusterMonitorDnodeModule(); } clusterReleaseDnode(pDnode); diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 3038b0eadc..2cec20c653 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -18,6 +18,7 @@ #include "taoserror.h" #include "tlog.h" #include "trpc.h" +#include "tqueue.h" #include "twal.h" #include "hashint.h" #include "hashstr.h" @@ -67,7 +68,7 @@ static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIn static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; uint64_t sdbGetVersion() { return tsSdbObj->version; } -int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } +int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } static char *sdbGetActionStr(int32_t action) { @@ -116,7 +117,7 @@ int32_t sdbInit() { int32_t totalRows = 0; int32_t numOfTables = 0; - for (int32_t tableId = SDB_TABLE_MNODE; tableId < SDB_TABLE_MAX; ++tableId) { + for (int32_t tableId = SDB_TABLE_DNODE; tableId < SDB_TABLE_MAX; ++tableId) { SSdbTable *pTable = sdbGetTableFromId(tableId); if (pTable == NULL) continue; if (pTable->restoredFp) { @@ -275,8 +276,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ } walFsync(tsSdbObj->wal); - free(pHead); - + taosFreeQitem(pHead); return code; } @@ -390,7 +390,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { if (pOper->type == SDB_OPER_GLOBAL) { int32_t size = sizeof(SWalHead) + pTable->maxRowSize; - SWalHead *pHead = calloc(1, size); + SWalHead *pHead = taosAllocateQitem(size); pHead->version = 0; pHead->len = pOper->rowSize; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; @@ -435,7 +435,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { } int32_t size = sizeof(SWalHead) + rowSize; - SWalHead *pHead = calloc(1, size); + SWalHead *pHead = taosAllocateQitem(size); pHead->version = 0; pHead->len = rowSize; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; @@ -463,7 +463,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { if (pOper->type == SDB_OPER_GLOBAL) { int32_t size = sizeof(SWalHead) + pTable->maxRowSize; - SWalHead *pHead = calloc(1, size); + SWalHead *pHead = taosAllocateQitem(size); pHead->version = 0; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 4d77e007ad..7ec9b0fef7 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -160,7 +160,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; - pVnode->sync = syncStart(&syncInfo);; + pVnode->sync = syncStart(&syncInfo); pVnode->events = NULL; pVnode->cq = NULL; From dcbf5972941dcdbe6b54f9ec03faf54c0e847da0 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 15 Apr 2020 17:21:33 +0800 Subject: [PATCH 04/12] remove the bug in rpcClose process --- src/rpc/src/rpcMain.c | 12 ++++++++---- src/rpc/src/rpcTcp.c | 20 +++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 5048d5db14..75b23950ed 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -286,15 +286,15 @@ void *rpcOpen(const SRpcInit *pInit) { void rpcClose(void *param) { SRpcInfo *pRpc = (SRpcInfo *)param; - (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); - (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); - for (int i = 0; i < pRpc->sessions; ++i) { if (pRpc->connList && pRpc->connList[i].user[0]) { rpcCloseConn((void *)(pRpc->connList + i)); } } + (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); + (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); + taosHashCleanup(pRpc->hash); taosTmrCleanUp(pRpc->tmrCtrl); taosIdPoolCleanUp(pRpc->idPool); @@ -521,11 +521,15 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, static void rpcCloseConn(void *thandle) { SRpcConn *pConn = (SRpcConn *)thandle; SRpcInfo *pRpc = pConn->pRpc; - if (pConn->user[0] == 0) return; rpcLockConn(pConn); + if (pConn->user[0] == 0) { + rpcUnlockConn(pConn); + return; + } + pConn->user[0] = 0; if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle); diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 27b81deda5..8e8b285621 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -158,8 +158,9 @@ void taosCleanUpTcpServer(void *handle) { pThreadObj = pServerObj->pThreadObj + i; while (pThreadObj->pHead) { - taosFreeFdObj(pThreadObj->pHead); - pThreadObj->pHead = pThreadObj->pHead; + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosFreeFdObj(pFdObj); } close(pThreadObj->pollFd); @@ -269,8 +270,9 @@ void taosCleanUpTcpClient(void *chandle) { if (pThreadObj == NULL) return; while (pThreadObj->pHead) { - taosFreeFdObj(pThreadObj->pHead); - pThreadObj->pHead = pThreadObj->pHead->next; + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosFreeFdObj(pFdObj); } close(pThreadObj->pollFd); @@ -456,14 +458,18 @@ static void taosFreeFdObj(SFdObj *pFdObj) { if (pFdObj == NULL) return; if (pFdObj->signature != pFdObj) return; - pFdObj->signature = NULL; SThreadObj *pThreadObj = pFdObj->pThreadObj; + pthread_mutex_lock(&pThreadObj->mutex); + if (pFdObj->signature == NULL) { + pthread_mutex_unlock(&pThreadObj->mutex); + return; + } + + pFdObj->signature = NULL; close(pFdObj->fd); epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); - pthread_mutex_lock(&pThreadObj->mutex); - pThreadObj->numOfFds--; if (pThreadObj->numOfFds < 0) From a5d0e01ab445b3e3e83f3e7a7f181c41086b6c76 Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 20:18:15 +0800 Subject: [PATCH 05/12] fix error in sdb --- src/dnode/src/dnodeMClient.c | 21 ++++++++++++--------- src/dnode/src/dnodeMain.c | 2 +- src/inc/mpeer.h | 1 + src/mnode/src/mgmtDnode.c | 1 + src/mnode/src/mgmtMnode.c | 1 + src/mnode/src/mgmtSdb.c | 8 ++++++++ src/mnode/src/mgmtUser.c | 12 +++++++----- src/plugins/http/src/httpSystem.c | 2 +- 8 files changed, 32 insertions(+), 16 deletions(-) diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 42eca2152b..d791ad0f01 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -28,6 +28,7 @@ #include "dnodeModule.h" #include "dnodeMgmt.h" #include "vnode.h" +#include "mpeer.h" #define MPEER_CONTENT_LEN 2000 @@ -148,6 +149,15 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { return; } + SDnodeState *pState = &pStatusRsp->dnodeState; + pState->numOfVnodes = htonl(pState->numOfVnodes); + pState->moduleStatus = htonl(pState->moduleStatus); + pState->createdTime = htonl(pState->createdTime); + pState->dnodeId = htonl(pState->dnodeId); + + dnodeProcessModuleStatus(pState->moduleStatus); + dnodeUpdateDnodeInfo(pState->dnodeId); + SRpcIpSet mgmtIpSet = {0}; mgmtIpSet.inUse = mpeers->inUse; mgmtIpSet.numOfIps = mpeers->nodeNum; @@ -167,20 +177,13 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { tsMnodeInfos.nodeInfos[i].nodePort = htons(mpeers->nodeInfos[i].nodePort); strcpy(tsMnodeInfos.nodeInfos[i].nodeName, mpeers->nodeInfos[i].nodeName); dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId, - taosIpStr(tsMnodeInfos.nodeInfos[i].nodeId), tsMnodeInfos.nodeInfos[i].nodePort, + taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp), tsMnodeInfos.nodeInfos[i].nodePort, tsMnodeInfos.nodeInfos[i].nodeName); } dnodeSaveMnodeIpList(); + mpeerUpdateSync(); } - SDnodeState *pState = &pStatusRsp->dnodeState; - pState->numOfVnodes = htonl(pState->numOfVnodes); - pState->moduleStatus = htonl(pState->moduleStatus); - pState->createdTime = htonl(pState->createdTime); - pState->dnodeId = htonl(pState->dnodeId); - - dnodeProcessModuleStatus(pState->moduleStatus); - dnodeUpdateDnodeInfo(pState->dnodeId); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); } diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 01e2c4dfcc..64d1ad4048 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -159,10 +159,10 @@ static int32_t dnodeInitSystem() { dPrint("starting to initialize TDengine ..."); if (dnodeInitStorage() != 0) return -1; - if (dnodeInitModules() != 0) return -1; if (dnodeInitRead() != 0) return -1; if (dnodeInitWrite() != 0) return -1; if (dnodeInitMClient() != 0) return -1; + if (dnodeInitModules() != 0) return -1; if (dnodeInitMnode() != 0) return -1; if (dnodeInitMgmt() != 0) return -1; if (dnodeInitShell() != 0) return -1; diff --git a/src/inc/mpeer.h b/src/inc/mpeer.h index e7abf09321..ba1b7d32cf 100644 --- a/src/inc/mpeer.h +++ b/src/inc/mpeer.h @@ -50,6 +50,7 @@ void mpeerGetPublicIpList(SRpcIpSet *ipSet); void mpeerGetMpeerInfos(void *mpeers); int32_t mpeerForwardReqToPeer(void *pHead); +void mpeerUpdateSync(); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 196b8c1026..d13d37586a 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -77,6 +77,7 @@ void * clusterGetDnode(int32_t dnodeId) { return dnodeId == 1 ? &tsDnodeObj : N void * clusterGetDnodeByIp(uint32_t ip) { return &tsDnodeObj; } void clusterReleaseDnode(struct _dnode_obj *pDnode) {} void clusterUpdateDnode(struct _dnode_obj *pDnode) {} +void clusterMonitorDnodeModule() {} #endif diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index ca18d6bdba..e2edb201b9 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -48,6 +48,7 @@ void * mpeerGetMnode(int32_t mnodeId) { return &tsMnodeObj; } int32_t mpeerGetMnodesNum() { return 1; } void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } +void mpeerUpdateSync() {} void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { if (*pMnode == NULL) { diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 2cec20c653..6bc3c20d18 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -130,6 +130,9 @@ int32_t sdbInit() { } sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables); + + mpeerUpdateSync(); + return TSDB_CODE_SUCCESS; } @@ -215,6 +218,11 @@ static int32_t sdbInsertLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj, &rowMeta); sdbIncRef(pTable, pOper->pObj); pTable->numOfRows++; + + if (pTable->keyType == SDB_KEY_AUTO) { + pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj)); + } + pthread_mutex_unlock(&pTable->mutex); sdbTrace("table:%s, insert record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index ef01faf6ba..7fa1a13bfd 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -85,11 +85,13 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { } static int32_t mgmtUserActionRestored() { - SAcctObj *pAcct = acctGetAcct("root"); - mgmtCreateUser(pAcct, "root", "taosdata"); - mgmtCreateUser(pAcct, "monitor", tsInternalPass); - mgmtCreateUser(pAcct, "_root", tsInternalPass); - acctReleaseAcct(pAcct); + if (strcmp(tsMasterIp, tsPrivateIp) == 0) { + SAcctObj *pAcct = acctGetAcct("root"); + mgmtCreateUser(pAcct, "root", "taosdata"); + mgmtCreateUser(pAcct, "monitor", tsInternalPass); + mgmtCreateUser(pAcct, "_root", tsInternalPass); + acctReleaseAcct(pAcct); + } return 0; } diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 46f31a12d6..deb0d877c7 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -129,7 +129,7 @@ void httpCleanUpSystem() { httpPrint("http service cleanup"); httpStopSystem(); -#if 1 +#if 0 if (httpServer == NULL) { return; } From 98286acce4f227f4bc3823ed2d02f89d809b4213 Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 21:34:13 +0800 Subject: [PATCH 06/12] fix possible memory lost --- src/dnode/src/dnodeMClient.c | 2 +- src/dnode/src/dnodeMain.c | 1 - src/mnode/src/mgmtSdb.c | 3 ++- src/plugins/http/src/httpServer.c | 13 ++++++++----- src/plugins/http/src/httpSystem.c | 4 ++-- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index d791ad0f01..85454af095 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -59,7 +59,7 @@ int32_t dnodeInitMClient() { dError("failed to init dnode timer"); return -1; } - + if (!dnodeReadMnodeIpList()) { memset(&tsMnodeIpList, 0, sizeof(SRpcIpSet)); memset(&tsMnodeInfos, 0, sizeof(SDMNodeInfos)); diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 64d1ad4048..5fba941788 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -177,7 +177,6 @@ static int32_t dnodeInitSystem() { static void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { - tclearModuleStatus(TSDB_MOD_MGMT); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeCleanupShell(); dnodeCleanupMnode(); diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 6bc3c20d18..70e2be5f4c 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -504,6 +504,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { void *sdbOpenTable(SSdbTableDesc *pDesc) { SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable)); + if (pTable == NULL) return NULL; strcpy(pTable->tableName, pDesc->tableName); @@ -557,7 +558,7 @@ void sdbCloseTable(void *handle) { } pthread_mutex_destroy(&pTable->mutex); - + sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbNumOfTables); free(pTable); } diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 171f811b7d..5c321beffb 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -270,7 +270,7 @@ void httpCleanUpConnect(HttpServer *pServer) { for (i = 0; i < pServer->numOfThreads; ++i) { pThread = pServer->pThreads + i; - taosCloseSocket(pThread->pollFd); + //taosCloseSocket(pThread->pollFd); while (pThread->pHead) { httpCleanUpContext(pThread->pHead, 0); @@ -591,7 +591,6 @@ void httpAcceptHttpConnection(void *arg) { bool httpInitConnect(HttpServer *pServer) { int i; - pthread_attr_t thattr; HttpThread * pThread; pServer->pThreads = (HttpThread *)malloc(sizeof(HttpThread) * (size_t)pServer->numOfThreads); @@ -601,8 +600,6 @@ bool httpInitConnect(HttpServer *pServer) { } memset(pServer->pThreads, 0, sizeof(HttpThread) * (size_t)pServer->numOfThreads); - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pThread = pServer->pThreads; for (i = 0; i < pServer->numOfThreads; ++i) { sprintf(pThread->label, "%s%d", pServer->label, i); @@ -626,21 +623,27 @@ bool httpInitConnect(HttpServer *pServer) { return false; } + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) { httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", pThread->label, strerror(errno)); return false; } + pthread_attr_destroy(&thattr); httpTrace("http thread:%p:%s, initialized", pThread, pThread->label); pThread++; } + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pServer->thread), &thattr, (void *)httpAcceptHttpConnection, (void *)(pServer)) != 0) { httpError("http server:%s, failed to create Http accept thread, reason:%s", pServer->label, strerror(errno)); return false; } - pthread_attr_destroy(&thattr); httpTrace("http server:%s, initialized, ip:%s:%u, numOfThreads:%d", pServer->label, pServer->serverIp, diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index deb0d877c7..2a118cc2b1 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -54,7 +54,7 @@ static HttpServer *httpServer = NULL; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); int httpInitSystem() { - taos_init(); + // taos_init(); httpServer = (HttpServer *)malloc(sizeof(HttpServer)); memset(httpServer, 0, sizeof(HttpServer)); @@ -129,7 +129,7 @@ void httpCleanUpSystem() { httpPrint("http service cleanup"); httpStopSystem(); -#if 0 +#if 1 if (httpServer == NULL) { return; } From 1c9d23b061c8d3c2c4b48fe983695d4039963257 Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 22:22:12 +0800 Subject: [PATCH 07/12] Rearrange the source code directory --- deps/CMakeLists.txt | 2 ++ deps/cJson/CMakeLists.txt | 4 ++++ {src/util => deps/cJson}/inc/cJSON.h | 0 {src/util => deps/cJson}/src/cJSON.c | 0 deps/lz4/CMakeLists.txt | 4 ++++ {src/thirdparty => deps/lz4}/inc/lz4.h | 0 {src/thirdparty => deps/lz4}/src/lz4.c | 0 src/CMakeLists.txt | 4 ++-- src/dnode/CMakeLists.txt | 8 +++++--- src/plugins/http/CMakeLists.txt | 2 ++ src/query/CMakeLists.txt | 2 +- src/rpc/CMakeLists.txt | 4 +++- src/thirdparty/CMakeLists.txt | 4 ---- src/{vnode => }/tsdb/CMakeLists.txt | 0 src/{vnode => }/tsdb/inc/tsdb.h | 0 src/{vnode => }/tsdb/inc/tsdbMain.h | 0 src/{vnode => }/tsdb/src/tsdbCache.c | 0 src/{vnode => }/tsdb/src/tsdbCompactor.c | 0 src/{vnode => }/tsdb/src/tsdbFile.c | 0 src/{vnode => }/tsdb/src/tsdbMain.c | 0 src/{vnode => }/tsdb/src/tsdbMeta.c | 0 src/{vnode => }/tsdb/src/tsdbMetaFile.c | 0 src/{vnode => }/tsdb/src/tsdbRead.c | 0 src/{vnode => }/tsdb/tests/CMakeLists.txt | 0 src/{vnode => }/tsdb/tests/tsdbTests.cpp | 0 src/util/CMakeLists.txt | 8 +++++--- src/vnode/CMakeLists.txt | 18 +++++++++++++++--- src/vnode/cache/inc/cache.h | 17 ----------------- src/vnode/{main => }/inc/vnodeInt.h | 0 src/vnode/main/CMakeLists.txt | 18 ------------------ src/vnode/{main => }/src/vnodeMain.c | 0 src/vnode/{main => }/src/vnodeRead.c | 0 src/vnode/{main => }/src/vnodeWrite.c | 0 src/{vnode => }/wal/CMakeLists.txt | 0 src/{vnode => }/wal/src/walMain.c | 0 src/{vnode => }/wal/test/CMakeLists.txt | 0 src/{vnode => }/wal/test/waltest.c | 0 tests/tsim/CMakeLists.txt | 3 ++- 38 files changed, 45 insertions(+), 53 deletions(-) create mode 100644 deps/cJson/CMakeLists.txt rename {src/util => deps/cJson}/inc/cJSON.h (100%) rename {src/util => deps/cJson}/src/cJSON.c (100%) create mode 100644 deps/lz4/CMakeLists.txt rename {src/thirdparty => deps/lz4}/inc/lz4.h (100%) rename {src/thirdparty => deps/lz4}/src/lz4.c (100%) delete mode 100644 src/thirdparty/CMakeLists.txt rename src/{vnode => }/tsdb/CMakeLists.txt (100%) rename src/{vnode => }/tsdb/inc/tsdb.h (100%) rename src/{vnode => }/tsdb/inc/tsdbMain.h (100%) rename src/{vnode => }/tsdb/src/tsdbCache.c (100%) rename src/{vnode => }/tsdb/src/tsdbCompactor.c (100%) rename src/{vnode => }/tsdb/src/tsdbFile.c (100%) rename src/{vnode => }/tsdb/src/tsdbMain.c (100%) rename src/{vnode => }/tsdb/src/tsdbMeta.c (100%) rename src/{vnode => }/tsdb/src/tsdbMetaFile.c (100%) rename src/{vnode => }/tsdb/src/tsdbRead.c (100%) rename src/{vnode => }/tsdb/tests/CMakeLists.txt (100%) rename src/{vnode => }/tsdb/tests/tsdbTests.cpp (100%) delete mode 100644 src/vnode/cache/inc/cache.h rename src/vnode/{main => }/inc/vnodeInt.h (100%) delete mode 100644 src/vnode/main/CMakeLists.txt rename src/vnode/{main => }/src/vnodeMain.c (100%) rename src/vnode/{main => }/src/vnodeRead.c (100%) rename src/vnode/{main => }/src/vnodeWrite.c (100%) rename src/{vnode => }/wal/CMakeLists.txt (100%) rename src/{vnode => }/wal/src/walMain.c (100%) rename src/{vnode => }/wal/test/CMakeLists.txt (100%) rename src/{vnode => }/wal/test/waltest.c (100%) diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index bc5bd8c037..5cdf84658b 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -5,3 +5,5 @@ ADD_SUBDIRECTORY(zlib-1.2.11) ADD_SUBDIRECTORY(pthread) ADD_SUBDIRECTORY(regex) ADD_SUBDIRECTORY(iconv) +ADD_SUBDIRECTORY(lz4) +ADD_SUBDIRECTORY(cJson) diff --git a/deps/cJson/CMakeLists.txt b/deps/cJson/CMakeLists.txt new file mode 100644 index 0000000000..43b31589b3 --- /dev/null +++ b/deps/cJson/CMakeLists.txt @@ -0,0 +1,4 @@ +aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST) + +add_library(cJson ${SOURCE_LIST}) +target_include_directories(cJson PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc) \ No newline at end of file diff --git a/src/util/inc/cJSON.h b/deps/cJson/inc/cJSON.h similarity index 100% rename from src/util/inc/cJSON.h rename to deps/cJson/inc/cJSON.h diff --git a/src/util/src/cJSON.c b/deps/cJson/src/cJSON.c similarity index 100% rename from src/util/src/cJSON.c rename to deps/cJson/src/cJSON.c diff --git a/deps/lz4/CMakeLists.txt b/deps/lz4/CMakeLists.txt new file mode 100644 index 0000000000..a142d8d468 --- /dev/null +++ b/deps/lz4/CMakeLists.txt @@ -0,0 +1,4 @@ +aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST) + +add_library(lz4 ${SOURCE_LIST}) +target_include_directories(lz4 PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc) \ No newline at end of file diff --git a/src/thirdparty/inc/lz4.h b/deps/lz4/inc/lz4.h similarity index 100% rename from src/thirdparty/inc/lz4.h rename to deps/lz4/inc/lz4.h diff --git a/src/thirdparty/src/lz4.c b/deps/lz4/src/lz4.c similarity index 100% rename from src/thirdparty/src/lz4.c rename to deps/lz4/src/lz4.c diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 326f00dbbd..1a8f7d8807 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -3,8 +3,6 @@ PROJECT(TDengine) # Base compile ADD_SUBDIRECTORY(os) -ADD_SUBDIRECTORY(thirdparty) - ADD_SUBDIRECTORY(common) ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(rpc) @@ -14,5 +12,7 @@ ADD_SUBDIRECTORY(kit) ADD_SUBDIRECTORY(plugins) ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(vnode) +ADD_SUBDIRECTORY(tsdb) +ADD_SUBDIRECTORY(wal) ADD_SUBDIRECTORY(dnode) #ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index ee05403a61..8999770618 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -7,14 +7,16 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_EXECUTABLE(taosd ${SRC}) - TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal vnode) + TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal vnode cJson lz4) IF (TD_ACCOUNT) TARGET_LINK_LIBRARIES(taosd account) @@ -31,7 +33,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(taosd balance sync) ENDIF () - IF (TD_MPEER) + IF (TD_MPEER) TARGET_LINK_LIBRARIES(taosd mpeer sync) ENDIF () diff --git a/src/plugins/http/CMakeLists.txt b/src/plugins/http/CMakeLists.txt index 57ca4ca9f0..9b36f4029a 100644 --- a/src/plugins/http/CMakeLists.txt +++ b/src/plugins/http/CMakeLists.txt @@ -7,6 +7,8 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index 1bbd067804..01be5e643b 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -5,7 +5,7 @@ INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) INCLUDE_DIRECTORIES(inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index e8768c10dd..229b1077f8 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -4,6 +4,8 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) INCLUDE_DIRECTORIES(inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) @@ -22,7 +24,7 @@ ELSEIF (TD_WINDOWS_64) ENDIF () ADD_LIBRARY(trpc ${SRC}) -TARGET_LINK_LIBRARIES(trpc tutil) +TARGET_LINK_LIBRARIES(trpc tutil lz4) ADD_SUBDIRECTORY(test) diff --git a/src/thirdparty/CMakeLists.txt b/src/thirdparty/CMakeLists.txt deleted file mode 100644 index 9958e1e28c..0000000000 --- a/src/thirdparty/CMakeLists.txt +++ /dev/null @@ -1,4 +0,0 @@ -aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST) - -add_library(thirdparty ${SOURCE_LIST}) -target_include_directories(thirdparty PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc) \ No newline at end of file diff --git a/src/vnode/tsdb/CMakeLists.txt b/src/tsdb/CMakeLists.txt similarity index 100% rename from src/vnode/tsdb/CMakeLists.txt rename to src/tsdb/CMakeLists.txt diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/tsdb/inc/tsdb.h similarity index 100% rename from src/vnode/tsdb/inc/tsdb.h rename to src/tsdb/inc/tsdb.h diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h similarity index 100% rename from src/vnode/tsdb/inc/tsdbMain.h rename to src/tsdb/inc/tsdbMain.h diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/tsdb/src/tsdbCache.c similarity index 100% rename from src/vnode/tsdb/src/tsdbCache.c rename to src/tsdb/src/tsdbCache.c diff --git a/src/vnode/tsdb/src/tsdbCompactor.c b/src/tsdb/src/tsdbCompactor.c similarity index 100% rename from src/vnode/tsdb/src/tsdbCompactor.c rename to src/tsdb/src/tsdbCompactor.c diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c similarity index 100% rename from src/vnode/tsdb/src/tsdbFile.c rename to src/tsdb/src/tsdbFile.c diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c similarity index 100% rename from src/vnode/tsdb/src/tsdbMain.c rename to src/tsdb/src/tsdbMain.c diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c similarity index 100% rename from src/vnode/tsdb/src/tsdbMeta.c rename to src/tsdb/src/tsdbMeta.c diff --git a/src/vnode/tsdb/src/tsdbMetaFile.c b/src/tsdb/src/tsdbMetaFile.c similarity index 100% rename from src/vnode/tsdb/src/tsdbMetaFile.c rename to src/tsdb/src/tsdbMetaFile.c diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c similarity index 100% rename from src/vnode/tsdb/src/tsdbRead.c rename to src/tsdb/src/tsdbRead.c diff --git a/src/vnode/tsdb/tests/CMakeLists.txt b/src/tsdb/tests/CMakeLists.txt similarity index 100% rename from src/vnode/tsdb/tests/CMakeLists.txt rename to src/tsdb/tests/CMakeLists.txt diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp similarity index 100% rename from src/vnode/tsdb/tests/tsdbTests.cpp rename to src/tsdb/tests/tsdbTests.cpp diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 6c8f94ec39..a814bf52ab 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -6,9 +6,11 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil thirdparty pthread os m rt) + TARGET_LINK_LIBRARIES(tutil pthread os m rt) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) IF (ICONV_INCLUDE_EXIST) ADD_DEFINITIONS(-DUSE_LIBICONV) @@ -65,7 +67,7 @@ ELSEIF (TD_WINDOWS_64) LIST(APPEND SRC ./src/tutil.c) LIST(APPEND SRC ./src/version.c) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil thirdparty iconv regex pthread os winmm IPHLPAPI ws2_32) + TARGET_LINK_LIBRARIES(tutil iconv regex pthread os winmm IPHLPAPI ws2_32) ELSEIF(TD_DARWIN_64) ADD_DEFINITIONS(-DUSE_LIBICONV) LIST(APPEND SRC ./src/hash.c) @@ -102,7 +104,7 @@ ELSEIF(TD_DARWIN_64) LIST(APPEND SRC ./src/version.c) LIST(APPEND SRC ./src/hash.c) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil thirdparty iconv pthread os) + TARGET_LINK_LIBRARIES(tutil iconv pthread os) ENDIF() #IF (TD_CLUSTER) diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 3df980ece1..51065b8645 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -1,6 +1,18 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) -ADD_SUBDIRECTORY(wal) -ADD_SUBDIRECTORY(tsdb) -ADD_SUBDIRECTORY(main) +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) + INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) + INCLUDE_DIRECTORIES(inc) + AUX_SOURCE_DIRECTORY(src SRC) + + ADD_LIBRARY(vnode ${SRC}) + TARGET_LINK_LIBRARIES(vnode tsdb) +ENDIF () \ No newline at end of file diff --git a/src/vnode/cache/inc/cache.h b/src/vnode/cache/inc/cache.h deleted file mode 100644 index 52f357ee8f..0000000000 --- a/src/vnode/cache/inc/cache.h +++ /dev/null @@ -1,17 +0,0 @@ -#if !defined(_TD_CACHE_H_) -#define _TD_CACHE_H_ - -#define TD_MIN_CACHE_BLOCK_SIZE 1024*1024 /* 1M */ -#define TD_MAX_CACHE_BLOCK_SIZE 64*1024*1024 /* 64M */ - -typedef void cache_pool_t; - -typedef struct SCacheBlock -{ - int32_t blockId; - char data[]; -} SCacheBlock; - - - -#endif // _TD_CACHE_H_ diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h similarity index 100% rename from src/vnode/main/inc/vnodeInt.h rename to src/vnode/inc/vnodeInt.h diff --git a/src/vnode/main/CMakeLists.txt b/src/vnode/main/CMakeLists.txt deleted file mode 100644 index 19b1b4c22a..0000000000 --- a/src/vnode/main/CMakeLists.txt +++ /dev/null @@ -1,18 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8) -PROJECT(TDengine) - -IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) - INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) - INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) - INCLUDE_DIRECTORIES(inc) - AUX_SOURCE_DIRECTORY(src SRC) - - ADD_LIBRARY(vnode ${SRC}) - TARGET_LINK_LIBRARIES(vnode tsdb) -ENDIF () \ No newline at end of file diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/src/vnodeMain.c similarity index 100% rename from src/vnode/main/src/vnodeMain.c rename to src/vnode/src/vnodeMain.c diff --git a/src/vnode/main/src/vnodeRead.c b/src/vnode/src/vnodeRead.c similarity index 100% rename from src/vnode/main/src/vnodeRead.c rename to src/vnode/src/vnodeRead.c diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c similarity index 100% rename from src/vnode/main/src/vnodeWrite.c rename to src/vnode/src/vnodeWrite.c diff --git a/src/vnode/wal/CMakeLists.txt b/src/wal/CMakeLists.txt similarity index 100% rename from src/vnode/wal/CMakeLists.txt rename to src/wal/CMakeLists.txt diff --git a/src/vnode/wal/src/walMain.c b/src/wal/src/walMain.c similarity index 100% rename from src/vnode/wal/src/walMain.c rename to src/wal/src/walMain.c diff --git a/src/vnode/wal/test/CMakeLists.txt b/src/wal/test/CMakeLists.txt similarity index 100% rename from src/vnode/wal/test/CMakeLists.txt rename to src/wal/test/CMakeLists.txt diff --git a/src/vnode/wal/test/waltest.c b/src/wal/test/waltest.c similarity index 100% rename from src/vnode/wal/test/waltest.c rename to src/wal/test/waltest.c diff --git a/tests/tsim/CMakeLists.txt b/tests/tsim/CMakeLists.txt index 2fec751cec..2eaf3fe7cf 100644 --- a/tests/tsim/CMakeLists.txt +++ b/tests/tsim/CMakeLists.txt @@ -3,6 +3,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(inc) @@ -12,4 +13,4 @@ ENDIF () AUX_SOURCE_DIRECTORY(src SRC) ADD_EXECUTABLE(tsim ${SRC}) -TARGET_LINK_LIBRARIES(tsim taos_static trpc tutil pthread ) +TARGET_LINK_LIBRARIES(tsim taos_static trpc tutil pthread cJson) From 85420cf37d61c1f93ec738a3364bf8427d141b81 Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 23:08:06 +0800 Subject: [PATCH 08/12] add test program --- tests/test/c/CMakeLists.txt | 7 +- tests/test/c/insertPerRow.c | 314 ++++++++++++++++++ .../{benchmarkPerTable.c => insertPerTable.c} | 0 3 files changed, 319 insertions(+), 2 deletions(-) create mode 100644 tests/test/c/insertPerRow.c rename tests/test/c/{benchmarkPerTable.c => insertPerTable.c} (100%) diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index 532f304b99..9f34d6b774 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -6,6 +6,9 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) - add_executable(benchmarkPerTable benchmarkPerTable.c) - target_link_libraries(benchmarkPerTable taos_static pthread) + add_executable(insertPerTable insertPerTable.c) + target_link_libraries(insertPerTable taos_static pthread) + + add_executable(insertPerRow insertPerRow.c) + target_link_libraries(insertPerRow taos_static pthread) ENDIF() diff --git a/tests/test/c/insertPerRow.c b/tests/test/c/insertPerRow.c new file mode 100644 index 0000000000..eea69e6462 --- /dev/null +++ b/tests/test/c/insertPerRow.c @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taos.h" +#include "tlog.h" +#include "ttimer.h" +#include "tutil.h" + +#define MAX_RANDOM_POINTS 20000 +#define GREEN "\033[1;32m" +#define NC "\033[0m" + +typedef struct { + int64_t rowsPerTable; + int64_t pointsPerTable; + int64_t tableBeginIndex; + int64_t tableEndIndex; + int threadIndex; + char dbName[32]; + char stableName[64]; + pthread_t thread; +} SInfo; + +void *syncTest(void *param); +void generateRandomPoints(); +void shellParseArgument(int argc, char *argv[]); +void createDbAndTable(); +void insertData(); + +int32_t randomData[MAX_RANDOM_POINTS]; +int64_t rowsPerTable = 10000; +int64_t pointsPerTable = 1; +int64_t numOfThreads = 1; +int64_t numOfTablesPerThread = 1; +char dbName[32] = "db"; +char stableName[64] = "st"; +int32_t cache = 16384; +int32_t tables = 1000; + +int main(int argc, char *argv[]) { + shellParseArgument(argc, argv); + generateRandomPoints(); + taos_init(); + createDbAndTable(); + insertData(); +} + +void createDbAndTable() { + dPrint("start to create table"); + + TAOS * con; + struct timeval systemTime; + int64_t st, et; + char qstr[64000]; + + con = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0); + if (con == NULL) { + dError("failed to connect to DB, reason:%s", taos_errstr(con)); + exit(1); + } + + sprintf(qstr, "create database if not exists %s cache %d tables %d", dbName, cache, tables); + if (taos_query(con, qstr)) { + dError("failed to create database:%s, code:%d reason:%s", dbName, taos_errno(con), taos_errstr(con)); + exit(0); + } + + sprintf(qstr, "use %s", dbName); + if (taos_query(con, qstr)) { + dError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + exit(0); + } + + gettimeofday(&systemTime, NULL); + st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + int64_t totalTables = numOfTablesPerThread * numOfThreads; + + if (strcmp(stableName, "no") != 0) { + int len = sprintf(qstr, "create table if not exists %s(ts timestamp", stableName); + for (int64_t f = 0; f < pointsPerTable; ++f) { + len += sprintf(qstr + len, ", f%ld double", f); + } + sprintf(qstr + len, ") tags(t int)"); + + if (taos_query(con, qstr)) { + dError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + exit(0); + } + + for (int64_t t = 0; t < totalTables; ++t) { + sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t); + if (taos_query(con, qstr)) { + dError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con)); + exit(0); + } + } + } else { + for (int64_t t = 0; t < totalTables; ++t) { + int len = sprintf(qstr, "create table if not exists %s%ld(ts timestamp", stableName, t); + for (int64_t f = 0; f < pointsPerTable; ++f) { + len += sprintf(qstr + len, ", f%ld double", f); + } + sprintf(qstr + len, ")"); + + if (taos_query(con, qstr)) { + dError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con)); + exit(0); + } + } + } + + gettimeofday(&systemTime, NULL); + et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + dPrint("%.1f seconds to create %ld tables", (et - st) / 1000.0 / 1000.0, totalTables); +} + +void insertData() { + struct timeval systemTime; + int64_t st, et; + + gettimeofday(&systemTime, NULL); + st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + + dPrint("%d threads are spawned to insert data", numOfThreads); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + SInfo *pInfo = (SInfo *)malloc(sizeof(SInfo) * numOfThreads); + + // Start threads to write + for (int i = 0; i < numOfThreads; ++i) { + pInfo[i].rowsPerTable = rowsPerTable; + pInfo[i].pointsPerTable = pointsPerTable; + pInfo[i].tableBeginIndex = i * numOfTablesPerThread; + pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread; + pInfo[i].threadIndex = i; + strcpy(pInfo[i].dbName, dbName); + strcpy(pInfo[i].stableName, stableName); + pthread_create(&(pInfo[i].thread), &thattr, syncTest, (void *)(pInfo + i)); + } + + taosMsleep(300); + for (int i = 0; i < numOfThreads; i++) { + pthread_join(pInfo[i].thread, NULL); + } + + gettimeofday(&systemTime, NULL); + et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + double seconds = (et - st) / 1000.0 / 1000.0; + + int64_t totalTables = numOfTablesPerThread * numOfThreads; + int64_t totalRows = totalTables * rowsPerTable; + int64_t totalPoints = totalTables * rowsPerTable * pointsPerTable; + double speedOfRows = totalRows / seconds; + double speedOfPoints = totalPoints / seconds; + + dPrint( + "%sall threads:%ld finished, use %.1lf seconds, tables:%.ld rows:%ld points:%ld, speed RowsPerSecond:%.1lf " + "PointsPerSecond:%.1lf%s", + GREEN, numOfThreads, seconds, totalTables, totalRows, totalPoints, speedOfRows, speedOfPoints, NC); + + dPrint("threads exit"); + + pthread_attr_destroy(&thattr); + free(pInfo); +} + +void *syncTest(void *param) { + TAOS * con; + SInfo * pInfo = (SInfo *)param; + struct timeval systemTime; + int64_t st, et; + char qstr[65000]; + int maxBytes = 60000; + + dPrint("thread:%d, start to run", pInfo->threadIndex); + + con = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0); + if (con == NULL) { + dError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); + exit(1); + } + + sprintf(qstr, "use %s", pInfo->dbName); + taos_query(con, qstr); + + gettimeofday(&systemTime, NULL); + st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + + int64_t start = 1430000000000; + int64_t interval = 1000; // 1000 ms + + char *sql = qstr; + char inserStr[] = "insert into"; + int len = sprintf(sql, "%s", inserStr); + + for (int64_t row = 0; row < pInfo->rowsPerTable; row++) { + for (int64_t table = pInfo->tableBeginIndex; table < pInfo->tableEndIndex; ++table) { + len += sprintf(sql + len, " %s%ld values", pInfo->stableName, table); + len += sprintf(sql + len, "(%ld", start + row * interval); + for (int64_t point = 0; point < pInfo->pointsPerTable; ++point) { + len += sprintf(sql + len, ",%d", randomData[(123 * table + 456 * row + 789 * point) % MAX_RANDOM_POINTS]); + // len += sprintf(sql + len, ",%ld", row); + } + len += sprintf(sql + len, ")"); + if (len > maxBytes) { + // if (taos_query(con, qstr)) { + // dError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName, + // table, row, taos_errstr(con)); + // } + dPrint("%s ", qstr); + // "insert into" + len = sprintf(sql, "%s", inserStr); + } + } + } + + if (len != strlen(inserStr)) { + taos_query(con, qstr); + } + + gettimeofday(&systemTime, NULL); + et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + int64_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex; + int64_t totalRows = totalTables * pInfo->rowsPerTable; + int64_t totalPoints = totalRows * pInfo->pointsPerTable; + dPrint("thread:%d, insert finished, use %.2f seconds, tables:%ld rows:%ld points:%ld", pInfo->threadIndex, + (et - st) / 1000.0 / 1000.0, totalTables, totalRows, totalPoints); + + return NULL; +} + +void generateRandomPoints() { + for (int r = 0; r < MAX_RANDOM_POINTS; ++r) { + randomData[r] = rand() % 1000; + } +} + +void printHelp() { + char indent[10] = " "; + printf("Used to test the performance of TDengine, the insert method is table-by-table\n"); + + printf("%s%s\n", indent, "-d"); + printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", dbName); + printf("%s%s\n", indent, "-s"); + printf("%s%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", stableName, ", if 'no' then create normal table"); + printf("%s%s\n", indent, "-c"); + printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); + printf("%s%s\n", indent, "-r"); + printf("%s%s%s%ld\n", indent, indent, "Number of records to write to each table, default is ", rowsPerTable); + printf("%s%s\n", indent, "-p"); + printf("%s%s%s%" PRId64 "\n", indent, indent, "Number of columns per table, default is ", pointsPerTable); + printf("%s%s\n", indent, "-t"); + printf("%s%s%s%" PRId64 "\n", indent, indent, "Number of threads to be used, default is ", numOfThreads); + printf("%s%s\n", indent, "-n"); + printf("%s%s%s%" PRId64 "\n", indent, indent, "Number of tables per thread, default is ", numOfTablesPerThread); + printf("%s%s\n", indent, "-tables"); + printf("%s%s%s%d\n", indent, indent, "Database parameters tables, default is ", tables); + printf("%s%s\n", indent, "-cache"); + printf("%s%s%s%d\n", indent, indent, "Database parameters cache, default is ", cache); + + exit(EXIT_SUCCESS); +} + +void shellParseArgument(int argc, char *argv[]) { + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-d") == 0) { + strcpy(dbName, argv[++i]); + } else if (strcmp(argv[i], "-s") == 0) { + strcpy(stableName, argv[++i]); + } else if (strcmp(argv[i], "-r") == 0) { + rowsPerTable = atoi(argv[++i]); + } else if (strcmp(argv[i], "-p") == 0) { + pointsPerTable = atoi(argv[++i]); + } else if (strcmp(argv[i], "-t") == 0) { + numOfThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-n") == 0) { + numOfTablesPerThread = atoi(argv[++i]); + } else if (strcmp(argv[i], "-tables") == 0) { + tables = atoi(argv[++i]); + } else if (strcmp(argv[i], "-cache") == 0) { + cache = atoi(argv[++i]); + } else { + } + } + + dPrint("%srowsPerTable:%" PRId64 "%s", GREEN, rowsPerTable, NC); + dPrint("%spointsPerTable:%" PRId64 "%s", GREEN, pointsPerTable, NC); + dPrint("%snumOfThreads:%" PRId64 "%s", GREEN, numOfThreads, NC); + dPrint("%snumOfTablesPerThread:%" PRId64 "%s", GREEN, numOfTablesPerThread, NC); + dPrint("%scache:%" PRId64 "%s", GREEN, cache, NC); + dPrint("%stables:%" PRId64 "%s", GREEN, tables, NC); + dPrint("%sdbName:%s%s", GREEN, dbName, NC); + dPrint("%stableName:%s%s", GREEN, stableName, NC); + dPrint("%sstart to run%s", GREEN, NC); +} diff --git a/tests/test/c/benchmarkPerTable.c b/tests/test/c/insertPerTable.c similarity index 100% rename from tests/test/c/benchmarkPerTable.c rename to tests/test/c/insertPerTable.c From 25444ef7bc20aa43a62af64b1af23ad810badd25 Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 23:12:27 +0800 Subject: [PATCH 09/12] add test program --- tests/test/c/insertPerRow.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test/c/insertPerRow.c b/tests/test/c/insertPerRow.c index eea69e6462..3c9564bcd2 100644 --- a/tests/test/c/insertPerRow.c +++ b/tests/test/c/insertPerRow.c @@ -45,7 +45,7 @@ int32_t randomData[MAX_RANDOM_POINTS]; int64_t rowsPerTable = 10000; int64_t pointsPerTable = 1; int64_t numOfThreads = 1; -int64_t numOfTablesPerThread = 1; +int64_t numOfTablesPerThread = 200; char dbName[32] = "db"; char stableName[64] = "st"; int32_t cache = 16384; @@ -219,11 +219,11 @@ void *syncTest(void *param) { } len += sprintf(sql + len, ")"); if (len > maxBytes) { - // if (taos_query(con, qstr)) { - // dError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName, - // table, row, taos_errstr(con)); - // } - dPrint("%s ", qstr); + if (taos_query(con, qstr)) { + dError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName, + table, row, taos_errstr(con)); + } + // "insert into" len = sprintf(sql, "%s", inserStr); } From 6b94643111e14f08dac759d785fef193bea8ff73 Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 23:18:14 +0800 Subject: [PATCH 10/12] add test program --- tests/test/c/insertPerRow.c | 4 +++- tests/test/c/insertPerTable.c | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test/c/insertPerRow.c b/tests/test/c/insertPerRow.c index 3c9564bcd2..74ab238c3f 100644 --- a/tests/test/c/insertPerRow.c +++ b/tests/test/c/insertPerRow.c @@ -253,7 +253,7 @@ void generateRandomPoints() { void printHelp() { char indent[10] = " "; - printf("Used to test the performance of TDengine, the insert method is table-by-table\n"); + printf("Used to test the performance of TDengine\n After writing one row of data to all tables, write the next row\n"); printf("%s%s\n", indent, "-d"); printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", dbName); @@ -284,6 +284,8 @@ void shellParseArgument(int argc, char *argv[]) { exit(0); } else if (strcmp(argv[i], "-d") == 0) { strcpy(dbName, argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-s") == 0) { strcpy(stableName, argv[++i]); } else if (strcmp(argv[i], "-r") == 0) { diff --git a/tests/test/c/insertPerTable.c b/tests/test/c/insertPerTable.c index ee4e0fecbd..ca63407463 100644 --- a/tests/test/c/insertPerTable.c +++ b/tests/test/c/insertPerTable.c @@ -258,7 +258,7 @@ void generateRandomPoints() { void printHelp() { char indent[10] = " "; - printf("Used to test the performance of TDengine, the insert method is table-by-table\n"); + printf("Used to test the performance of TDengine\n After writing all the data in one table, start the next table\n"); printf("%s%s\n", indent, "-d"); printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", dbName); @@ -289,6 +289,8 @@ void shellParseArgument(int argc, char *argv[]) { exit(0); } else if (strcmp(argv[i], "-d") == 0) { strcpy(dbName, argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-s") == 0) { strcpy(stableName, argv[++i]); } else if (strcmp(argv[i], "-r") == 0) { From 0765c20c3b55733913e6ff9eec05926a8ba9adec Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 23:31:50 +0800 Subject: [PATCH 11/12] add import test --- tests/test/c/CMakeLists.txt | 3 + tests/test/c/importOneRow.c | 119 ++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 tests/test/c/importOneRow.c diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index 9f34d6b774..8189dd4d09 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -11,4 +11,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) add_executable(insertPerRow insertPerRow.c) target_link_libraries(insertPerRow taos_static pthread) + + add_executable(importOneRow importOneRow.c) + target_link_libraries(importOneRow taos_static pthread) ENDIF() diff --git a/tests/test/c/importOneRow.c b/tests/test/c/importOneRow.c new file mode 100644 index 0000000000..ee548a4df1 --- /dev/null +++ b/tests/test/c/importOneRow.c @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taos.h" +#include "tlog.h" +#include "ttimer.h" +#include "tutil.h" + +void taos_error(TAOS *taos); +void* taos_execute(void *param); + +typedef struct { + pthread_t pid; + int index; +} ThreadObj; + +int threadNum = 1; +int rowNum = 1000; +int replica = 1; + +int main(int argc, char *argv[]) { + if (argc == 1) { + printf("usage: %s rowNum threadNum replica configDir\n", argv[0]); + printf("default rowNum %d\n", rowNum); + printf("default threadNum %d\n", threadNum); + printf("default replica %d\n", replica); + exit(0); + } + + // a simple way to parse input parameters + if (argc >= 2) rowNum = atoi(argv[1]); + if (argc >= 3) threadNum = atoi(argv[2]); + if (argc >= 4) replica = atoi(argv[3]); + if (argc >= 5) strcpy(configDir, argv[4]); + + printf("rowNum:%d threadNum:%d replica:%d\n", threadNum, rowNum, replica); + + taos_init(); + + ThreadObj *threads = calloc(threadNum, sizeof(ThreadObj)); + for (int i = 0; i < threadNum; ++i) { + ThreadObj * pthread = threads + i; + pthread_attr_t thattr; + pthread->index = i; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pthread_create(&pthread->pid, &thattr, taos_execute, pthread); + } + + for (int i = 0; i < threadNum; i++) { + pthread_join(threads[i].pid, NULL); + } + + printf("all finished\n"); + + return 0; +} + +void taos_error(TAOS *con) { + fprintf(stderr, "TDengine error: %s\n", taos_errstr(con)); + taos_close(con); + exit(1); +} + +void* taos_execute(void *param) { + ThreadObj *pThread = (ThreadObj *)param; + + void *taos = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0); + if (taos == NULL) taos_error(taos); + + char sql[1024] = {0}; + sprintf(sql, "create database if not exists db replica %d", replica); + taos_query(taos, sql); + + sprintf(sql, "create table if not exists db.t%d (ts timestamp, i int, j float, k double)", pThread->index); + taos_query(taos, sql); + + int64_t timestamp = 1530374400000L; + + sprintf(sql, "insert into db.t%d values(%ld, %d, %d, %d)", pThread->index, timestamp, 0, 0, 0); + int code = taos_query(taos, sql); + if (code != 0) printf("error code:%d, sql:%s\n", code, sql); + int affectrows = taos_affected_rows(taos); + if (affectrows != 1) printf("affect rows:%d, sql:%s\n", affectrows, sql); + + timestamp -= 1000; + + int total_affect_rows = affectrows; + + for (int i = 1; i < rowNum; ++i) { + sprintf(sql, "import into db.t%d values(%ld, %d, %d, %d)", pThread->index, timestamp, i, i, i); + code = taos_query(taos, sql); + if (code != 0) printf("error code:%d, sql:%s\n", code, sql); + int affectrows = taos_affected_rows(taos); + if (affectrows != 1) printf("affect rows:%d, sql:%s\n", affectrows, sql); + + total_affect_rows += affectrows; + + timestamp -= 1000; + } + + printf("thread:%d run finished total_affect_rows:%d\n", pThread->index, total_affect_rows); + + return NULL; +} From 13b44b30d5942ca904a97b57bd624a6a393fd54c Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 15 Apr 2020 23:39:48 +0800 Subject: [PATCH 12/12] add import test --- tests/test/c/importOneRow.c | 50 +++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/tests/test/c/importOneRow.c b/tests/test/c/importOneRow.c index ee548a4df1..9c0b39f7c8 100644 --- a/tests/test/c/importOneRow.c +++ b/tests/test/c/importOneRow.c @@ -20,6 +20,10 @@ #include "ttimer.h" #include "tutil.h" +#define MAX_RANDOM_POINTS 20000 +#define GREEN "\033[1;32m" +#define NC "\033[0m" + void taos_error(TAOS *taos); void* taos_execute(void *param); @@ -32,22 +36,42 @@ int threadNum = 1; int rowNum = 1000; int replica = 1; -int main(int argc, char *argv[]) { - if (argc == 1) { - printf("usage: %s rowNum threadNum replica configDir\n", argv[0]); - printf("default rowNum %d\n", rowNum); - printf("default threadNum %d\n", threadNum); - printf("default replica %d\n", replica); - exit(0); +void printHelp() { + char indent[10] = " "; + printf("Used to test the performance of TDengine\n After writing one row of data to all tables, write the next row\n"); + + printf("%s%s\n", indent, "-r"); + printf("%s%s%s%d\n", indent, indent, "Number of records to write table, default is ", rowNum); + printf("%s%s\n", indent, "-t"); + printf("%s%s%s%d\n", indent, indent, "Number of threads to be used, default is ", threadNum); + printf("%s%s\n", indent, "-replica"); + printf("%s%s%s%d\n", indent, indent, "Database parameters replica, default is ", replica); + + exit(EXIT_SUCCESS); +} + +void shellParseArgument(int argc, char *argv[]) { + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-r") == 0) { + rowNum = atoi(argv[++i]); + } else if (strcmp(argv[i], "-t") == 0) { + threadNum = atoi(argv[++i]); + } else if (strcmp(argv[i], "-replica") == 0) { + replica = atoi(argv[++i]); + } else { + } } - // a simple way to parse input parameters - if (argc >= 2) rowNum = atoi(argv[1]); - if (argc >= 3) threadNum = atoi(argv[2]); - if (argc >= 4) replica = atoi(argv[3]); - if (argc >= 5) strcpy(configDir, argv[4]); + dPrint("%s rowNum:%d %s", GREEN, rowNum, NC); + dPrint("%s threadNum:%d %s", GREEN, threadNum, NC); + dPrint("%s replica:%d %s", GREEN, replica, NC); +} - printf("rowNum:%d threadNum:%d replica:%d\n", threadNum, rowNum, replica); +int main(int argc, char *argv[]) { + shellParseArgument(argc, argv); taos_init();