From dceb46795d271a3a37b228c52953bb69a5d74f69 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 17 Dec 2021 16:47:30 +0800 Subject: [PATCH 1/6] more --- include/dnode/vnode/tsdb/tsdb.h | 20 ++++++++++---------- include/util/mallocator.h | 3 +++ source/dnode/vnode/impl/src/vnodeCommit.c | 8 ++++++++ source/dnode/vnode/tq/src/tq.c | 2 +- source/dnode/vnode/tsdb/src/tsdbCommit.c | 12 ++++++++++++ 5 files changed, 34 insertions(+), 11 deletions(-) diff --git a/include/dnode/vnode/tsdb/tsdb.h b/include/dnode/vnode/tsdb/tsdb.h index b85c6b64f6..e5522ddbd3 100644 --- a/include/dnode/vnode/tsdb/tsdb.h +++ b/include/dnode/vnode/tsdb/tsdb.h @@ -23,27 +23,27 @@ extern "C" { #endif // TYPES EXPOSED -typedef struct STsdb STsdb; -typedef struct STsdbCfg STsdbCfg; +typedef struct STsdb STsdb; + +typedef struct STsdbCfg { + uint64_t lruCacheSize; + uint32_t keep0; + uint32_t keep1; + uint32_t keep2; +} STsdbCfg; // STsdb STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); void tsdbClose(STsdb *); void tsdbRemove(const char *path); int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg); +int tsdbPrepareCommit(STsdb *pTsdb); +int tsdbCommit(STsdb *pTsdb); // STsdbCfg int tsdbOptionsInit(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *); -/* ------------------------ STRUCT DEFINITIONS ------------------------ */ -struct STsdbCfg { - uint64_t lruCacheSize; - uint32_t keep0; - uint32_t keep1; - uint32_t keep2; -}; - #ifdef __cplusplus } #endif diff --git a/include/util/mallocator.h b/include/util/mallocator.h index 49a9327353..5ecdc316a4 100644 --- a/include/util/mallocator.h +++ b/include/util/mallocator.h @@ -39,6 +39,9 @@ typedef struct SMemAllocator { TD_MEM_ALCT(SMemAllocator); } SMemAllocator; +#define tMalloc(pMA, SIZE) TD_MA_MALLOC(PMA, SIZE) +#define tFree(pMA, PTR) TD_MA_FREE(PMA, PTR) + typedef struct SMemAllocatorFactory { void *impl; SMemAllocator *(*create)(struct SMemAllocatorFactory *); diff --git a/source/dnode/vnode/impl/src/vnodeCommit.c b/source/dnode/vnode/impl/src/vnodeCommit.c index a728de0ebb..7213e31cb4 100644 --- a/source/dnode/vnode/impl/src/vnodeCommit.c +++ b/source/dnode/vnode/impl/src/vnodeCommit.c @@ -25,6 +25,10 @@ int vnodeAsyncCommit(SVnode *pVnode) { pTask->execute = vnodeCommit; // TODO pTask->arg = pVnode; // TODO + tsdbPrepareCommit(pVnode->pTsdb); + // metaPrepareCommit(pVnode->pMeta); + // walPreapareCommit(pVnode->pWal); + vnodeScheduleTask(pTask); return 0; } @@ -32,6 +36,10 @@ int vnodeAsyncCommit(SVnode *pVnode) { int vnodeCommit(void *arg) { SVnode *pVnode = (SVnode *)arg; + metaCommit(pVnode->pMeta); + tqCommit(pVnode->pTq); + tsdbCommit(pVnode->pTq); + vnodeBufPoolRecycle(pVnode); // TODO return 0; diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index b88ef353b0..1a27870a1b 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -50,7 +50,7 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA pTq->tqConfig = tqConfig; pTq->tqLogReader = tqLogReader; pTq->tqMemRef.pAlloctorFactory = allocFac; - pTq->tqMemRef.pAllocator = allocFac->create(allocFac); + // pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if (pTq->tqMemRef.pAllocator == NULL) { // TODO } diff --git a/source/dnode/vnode/tsdb/src/tsdbCommit.c b/source/dnode/vnode/tsdb/src/tsdbCommit.c index b124197736..a747c7333e 100644 --- a/source/dnode/vnode/tsdb/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb/src/tsdbCommit.c @@ -15,7 +15,19 @@ #include "tsdbDef.h" +int tsdbPrepareCommit(STsdb *pTsdb) { + if (pTsdb->mem == NULL) return 0; + + // tsem_wait(&(pTsdb->canCommit)); + ASSERT(pTsdb->imem == NULL); + + pTsdb->imem = pTsdb->mem; + pTsdb->mem = NULL; +} + int tsdbCommit(STsdb *pTsdb) { // TODO + pTsdb->imem = NULL; + // tsem_post(&(pTsdb->canCommit)); return 0; } \ No newline at end of file From c51780622f04119eeb213883c625986b6130af7d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Dec 2021 18:16:07 +0800 Subject: [PATCH 2/6] adjust index interface --- cmake/cmake.options | 7 +++ include/libs/index/index.h | 20 ++++++-- source/libs/index/inc/indexInt.h | 19 ++++--- source/libs/index/inc/index_cache.h | 8 +-- source/libs/index/src/index.c | 77 ++++++++++++++++++----------- source/libs/index/src/index_cache.c | 32 ++++++------ 6 files changed, 100 insertions(+), 63 deletions(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index edaab3bd45..e44a38b3f5 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -37,6 +37,7 @@ option( off ) + option( BUILD_WITH_NURAFT "If build with NuRaft" @@ -54,3 +55,9 @@ option( "If use doxygen build documents" OFF ) + +option( + USE_INVERTEDINDEX + "If use invertedIndex" + ON +) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 0885ce151e..2cc2aeb6a0 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -24,6 +24,7 @@ extern "C" { #endif typedef struct SIndex SIndex; +typedef struct SIndexTerm SIndexTerm; typedef struct SIndexOpts SIndexOpts; typedef struct SIndexMultiTermQuery SIndexMultiTermQuery; typedef struct SArray SIndexMultiTerm; @@ -35,7 +36,7 @@ typedef enum { ADD_INDEX, // add index on specify column DROP_INDEX, // drop existed index DROP_SATBLE // drop stable -} SIndexColumnType; +} SIndexOperOnColumn; typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = 3} EIndexQueryType; @@ -45,7 +46,7 @@ typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = */ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType oper); void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery); -int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, const char *field, int32_t nFields, const char *value, int32_t nValue, EIndexQueryType type); +int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EIndexQueryType type); /* * @param: * @param: @@ -61,8 +62,8 @@ int indexRebuild(SIndex *index, SIndexOpts *opt); * @param */ SIndexMultiTerm *indexMultiTermCreate(); -int indexMultiTermAdd(SIndexMultiTerm *terms, const char *field, int32_t nFields, const char *value, int32_t nValue); -void indexMultiTermDestroy(SIndexMultiTerm *terms); +int indexMultiTermAdd(SIndexMultiTerm *terms, SIndexTerm *term); +void indexMultiTermDestroy(SIndexMultiTerm *terms); /* * @param: * @param: @@ -70,6 +71,17 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms); SIndexOpts *indexOptsCreate(); void indexOptsDestroy(SIndexOpts *opts); + +/* + * @param: + * @param: + */ + +SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, + const char *colName, int32_t nColName, const char *colVal, int32_t nColVal); +void indexTermDestroy(SIndexTerm *p); + + #ifdef __cplusplus } #endif diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index cc740826e9..fb5a9e40b5 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -60,22 +60,21 @@ struct SIndexMultiTermQuery { // field and key; typedef struct SIndexTerm { - uint8_t type; // term data type, str/interger/json - char *key; - int32_t nKey; - char *val; - int32_t nVal; + int64_t suid; + SIndexOperOnColumn operType; // oper type, add/del/update + uint8_t colType; // term data type, str/interger/json + char *colName; + int32_t nColName; + char *colVal; + int32_t nColVal; } SIndexTerm; typedef struct SIndexTermQuery { - SIndexTerm* field_value; - EIndexQueryType type; + SIndexTerm* term; + EIndexQueryType qType; } SIndexTermQuery; -SIndexTerm *indexTermCreate(const char *key, int32_t nKey, const char *val, int32_t nVal); -void indexTermDestroy(SIndexTerm *p); - #define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); }} while(0) #define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); }} while(0) diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 39107a78ac..b952e16a8e 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -38,13 +38,13 @@ typedef struct IndexCache { // IndexCache *indexCacheCreate(); -void indexCacheDestroy(IndexCache *cache); +void indexCacheDestroy(void *cache); -int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, +int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, uint32_t version, uint64_t uid, int8_t operType); -int indexCacheGet(IndexCache *cache, uint64_t *rst); -int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result); +int indexCacheGet(void *cache, uint64_t *rst); +int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result); #ifdef __cplusplus } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 6a2697491d..ca6c2062f1 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -46,7 +46,7 @@ SIndex *indexOpen(SIndexOpts *opts, const char *path) { index_t *index = index_open(path); sIdx->index = index; #endif - + sIdx->cache = (void*)indexCacheCreate(); sIdx->tindex = NULL; sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); @@ -61,9 +61,12 @@ void indexClose(SIndex *sIdx) { index_close(sIdex->index); sIdx->index = NULL; #endif + +#ifdef USE_INVERTEDINDEX indexCacheDestroy(sIdx->cache); taosHashCleanup(sIdx->fieldObj); pthread_mutex_destroy(&sIdx->mtx); +#endif free(sIdx); return; } @@ -86,6 +89,7 @@ int indexPut(SIndex *index, SArray* fVals, int uid) { index_document_destroy(doc); #endif +#ifdef USE_INVERTEDINDEX //TODO(yihao): reduce the lock range pthread_mutex_lock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { @@ -106,11 +110,16 @@ int indexPut(SIndex *index, SArray* fVals, int uid) { SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey); assert(fi != NULL); int32_t fieldId = fi->fieldId; - int32_t colType = fi->type; + int32_t fieldType = fi->type; int32_t version = index->cVersion; - + int res = indexCachePut(index->cache, fieldId, fieldType, p->val, p->nVal, version, uid, p->operType); + if (ret != 0) { + return + } } pthread_mutex_unlock(&index->mtx); +#endif + return 1; } int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) { @@ -148,16 +157,26 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result free(fields); free(keys); free(types); +#endif + +#ifdef USE_INVERTEDINDEX + #endif return 1; } int indexDelete(SIndex *index, SIndexMultiTermQuery *query) { +#ifdef USE_INVERTEDINDEX +#endif return 1; } -int indexRebuild(SIndex *index, SIndexOpts *opts); +int indexRebuild(SIndex *index, SIndexOpts *opts) { +#ifdef USE_INVERTEDINDEX +#endif + +} SIndexOpts *indexOptsCreate() { @@ -184,53 +203,55 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) { void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) { for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) { SIndexTermQuery *p = (SIndexTermQuery *)taosArrayGet(pQuery->query, i); - indexTermDestroy(p->field_value); + indexTermDestroy(p->term); } taosArrayDestroy(pQuery->query); free(pQuery); }; -int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, const char *field, int32_t nFields, const char *value, int32_t nValue, EIndexQueryType type){ - SIndexTerm *t = indexTermCreate(field, nFields, value, nValue); - if (t == NULL) {return -1;} - SIndexTermQuery q = {.type = type, .field_value = t}; +int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EIndexQueryType qType){ + SIndexTermQuery q = {.qType = qType, .term = term}; taosArrayPush(pQuery->query, &q); return 0; } -SIndexTerm *indexTermCreate(const char *key, int32_t nKey, const char *val, int32_t nVal) { - SIndexTerm *t = (SIndexTerm *)malloc(sizeof(SIndexTerm)); - t->key = (char *)calloc(nKey + 1, 1); - memcpy(t->key, key, nKey); - t->nKey = nKey; +SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char *colName, int32_t nColName, const char *colVal, int32_t nColVal) { + SIndexTerm *t = (SIndexTerm *)calloc(1, (sizeof(SIndexTerm))); + if (t == NULL) { return NULL; } - t->val = (char *)calloc(nVal + 1, 1); - memcpy(t->val, val, nVal); - t->nVal = nVal; + t->suid = suid; + t->operType= oper; + t->colType = colType; + + t->colName = (char *)calloc(1, nColName + 1); + memcpy(t->colName, colName, nColName); + t->nColName = nColName; + + t->colVal = (char *)calloc(1, nColVal + 1); + memcpy(t->colVal, colVal, nColVal); + t->nColVal = nColVal; return t; } void indexTermDestroy(SIndexTerm *p) { - free(p->key); - free(p->val); + free(p->colName); + free(p->colVal); free(p); } -SArray *indexMultiTermCreate() { +SIndexMultiTerm *indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm *)); } -int indexMultiTermAdd(SArray *array, const char *field, int32_t nField, const char *val, int32_t nVal) { - SIndexTerm *term = indexTermCreate(field, nField, val, nVal); - if (term == NULL) { return -1; } - taosArrayPush(array, &term); +int indexMultiTermAdd(SIndexMultiTerm *terms, SIndexTerm *term) { + taosArrayPush(terms, &term); return 0; } -void indexMultiTermDestroy(SArray *array) { - for (int32_t i = 0; i < taosArrayGetSize(array); i++) { - SIndexTerm *p = taosArrayGetP(array, i); +void indexMultiTermDestroy(SIndexMultiTerm *terms) { + for (int32_t i = 0; i < taosArrayGetSize(terms); i++) { + SIndexTerm *p = taosArrayGetP(terms, i); indexTermDestroy(p); } - taosArrayDestroy(array); + taosArrayDestroy(terms); } void indexInit() { diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index acb8e32157..23f7a08823 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -16,7 +16,7 @@ #include "index_cache.h" #include "tcompare.h" -#define MAX_INDEX_KEY_LEN 128 // test only, change later +#define MAX_INDEX_KEY_LEN 256// test only, change later static char* getIndexKey(const void *pData) { return NULL; @@ -96,16 +96,19 @@ IndexCache *indexCacheCreate() { } -void indexCacheDestroy(IndexCache *cache) { - if (cache == NULL) { return; } - tSkipListDestroy(cache->skiplist); - free(cache); +void indexCacheDestroy(void *cache) { + IndexCache *pCache = cache; + if (pCache == NULL) { return; } + tSkipListDestroy(pCache->skiplist); + free(pCache); } -int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, +int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, uint32_t version, uint64_t uid, int8_t operType) { if (cache == NULL) { return -1;} + IndexCache *pCache = cache; + // encode data int32_t total = sizeof(int32_t) + sizeof(fieldId) + sizeof(fieldType) + sizeof(fvLen) + fvLen + sizeof(version) + sizeof(uid) + sizeof(operType); @@ -135,20 +138,15 @@ int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const c memcpy(p, &operType, sizeof(operType)); p += sizeof(operType); - tSkipListPut(cache->skiplist, (void *)buf); + tSkipListPut(pCache->skiplist, (void *)buf); // encode end - } -int indexCacheDel(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { - +int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { + IndexCache *pCache = cache; + return 0; } -int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) { - +int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result) { + return 0; } - - - - - From 8bba00f24763a4a514f2b392906040ef6ebd5752 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 18 Dec 2021 20:56:05 +0800 Subject: [PATCH 3/6] TD-10431 build create vnode msg --- include/common/taosmsg.h | 21 +-- include/util/taoserror.h | 23 +-- source/dnode/mnode/impl/inc/mndTrans.h | 4 +- source/dnode/mnode/impl/inc/mndVgroup.h | 4 +- source/dnode/mnode/impl/src/mndDb.c | 54 ++++++- source/dnode/mnode/impl/src/mndDnode.c | 8 +- source/dnode/mnode/impl/src/mndTrans.c | 182 +++++++++++++++++++----- source/dnode/mnode/impl/src/mndVgroup.c | 81 ++++++++++- source/util/src/terror.c | 1 + 9 files changed, 314 insertions(+), 64 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 5d12780a3b..98798a6235 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -750,31 +750,36 @@ typedef struct { } SReplica; typedef struct { - char db[TSDB_FULL_DB_NAME_LEN]; int32_t vgId; + int32_t dnodeId; + char db[TSDB_FULL_DB_NAME_LEN]; + uint64_t dbUid; int32_t cacheBlockSize; int32_t totalBlocks; int32_t daysPerFile; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; - int32_t minRowsPerFileBlock; - int32_t maxRowsPerFileBlock; + int32_t minRows; + int32_t maxRows; + int32_t commitTime; int32_t fsyncPeriod; - int8_t reserved[16]; + int8_t walLevel; int8_t precision; int8_t compression; - int8_t cacheLastRow; - int8_t update; - int8_t walLevel; int8_t quorum; + int8_t update; + int8_t cacheLastRow; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; } SCreateVnodeMsg, SAlterVnodeMsg; typedef struct { - int32_t vgId; + int32_t vgId; + int32_t dnodeId; + char db[TSDB_FULL_DB_NAME_LEN]; + uint64_t dbUid; } SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; typedef struct { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 74263667ea..ad4b383d03 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -120,17 +120,18 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input") // mnode-common -#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0300) -#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0301) -#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302) -#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0303) -#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0304) -#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0305) -#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0306) -#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0307) -#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0308) -#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0309) -#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030A) +#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300) +#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301) +#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302) +#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0303) +#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0304) +#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0305) +#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0306) +#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0307) +#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0308) +#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0309) +#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x030A) +#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030B) // mnode-show #define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310) diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 5da1d1ca2b..92e86efa9e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg); -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg); +int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); char *mndTransStageStr(ETrnStage stage); diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 696f798c9a..e9cdedd332 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -28,7 +28,9 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); -SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); + +SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); +SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index fbbae13b63..6decc82743 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -285,10 +285,62 @@ static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg } static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { + for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + SVgObj *pVgroup = pVgroups + v; + + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) { + return -1; + } + + SEpSet epset = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); + if (pMsg == NULL) { + return -1; + } + + SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SCreateVnodeMsg)}; + if (mndTransAppendRedoAction(pTrans, &epset, &rpcMsg) != 0) { + rpcFreeCont(pMsg); + return -1; + } + } + } + return 0; } static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { + for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + SVgObj *pVgroup = pVgroups + v; + + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) { + return -1; + } + + SEpSet epset = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); + if (pMsg == NULL) { + return -1; + } + + SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SDropVnodeMsg)}; + if (mndTransAppendUndoAction(pTrans, &epset, &rpcMsg) != 0) { + rpcFreeCont(pMsg); + return -1; + } + } + } + return 0; } @@ -644,7 +696,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { return -1; } - int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); + int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); SUseDbRsp *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 55e8b3a721..493f20bc9a 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -180,8 +180,12 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj } SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_DNODE, &dnodeId); + SSdb *pSdb = pMnode->pSdb; + SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId); + if (pDnode == NULL) { + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + } + return pDnode; } void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 32ac795301..8399b79921 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -21,6 +21,11 @@ #define TSDB_TRN_ARRAY_SIZE 8 #define TSDB_TRN_RESERVE_SIZE 64 +typedef struct { + SEpSet epSet; + SRpcMsg msg; +} STransAction; + static SSdbRaw *mndTransActionEncode(STrans *pTrans); static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); @@ -29,8 +34,10 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle); static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); -static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw); -static void mndTransDropArray(SArray *pArray); +static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); +static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg); +static void mndTransDropLog(SArray *pArray); +static void mndTransDropAction(SArray *pArray); static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); @@ -58,7 +65,7 @@ int32_t mndInitTrans(SMnode *pMnode) { void mndCleanupTrans(SMnode *pMnode) {} static SSdbRaw *mndTransActionEncode(STrans *pTrans) { - int32_t rawDataLen = 16 * sizeof(int32_t) + TSDB_TRN_RESERVE_SIZE; + int32_t rawDataLen = sizeof(STrans) + TSDB_TRN_RESERVE_SIZE; int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); @@ -80,6 +87,16 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { rawDataLen += sdbGetRawTotalSize(pTmp); } + for (int32_t i = 0; i < redoActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + rawDataLen += (sizeof(STransAction) + pAction->msg.contLen); + } + + for (int32_t i = 0; i < undoActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->undoActions, i); + rawDataLen += (sizeof(STransAction) + pAction->msg.contLen); + } + SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen); if (pRaw == NULL) { mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr()); @@ -116,6 +133,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) } + for (int32_t i = 0; i < redoActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); + SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType) + SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen); + } + + for (int32_t i = 0; i < undoActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->undoActions, i); + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); + SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType) + SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen); + } + SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE) SDB_SET_DATALEN(pRaw, dataPos); mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos); @@ -147,8 +180,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); + pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { @@ -175,42 +208,77 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < redoLogNum; ++i) { int32_t dataLen = 0; SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->redoLogs, &pData); if (ret == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; - break; } } for (int32_t i = 0; i < undoLogNum; ++i) { int32_t dataLen = 0; SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->undoLogs, &pData); if (ret == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; - break; } } for (int32_t i = 0; i < commitLogNum; ++i) { int32_t dataLen = 0; SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->commitLogs, &pData); if (ret == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; - break; + } + } + + for (int32_t i = 0; i < redoActionNum; ++i) { + STransAction action = {0}; + SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); + SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType) + SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen) + action.msg.pCont = rpcMallocCont(action.msg.contLen); + if (action.msg.pCont == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen); + + void *ret = taosArrayPush(pTrans->redoActions, &action); + if (ret == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; + } + } + + for (int32_t i = 0; i < undoActionNum; ++i) { + STransAction action = {0}; + SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); + SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType) + SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen) + action.msg.pCont = rpcMallocCont(action.msg.contLen); + if (action.msg.pCont == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen); + + void *ret = taosArrayPush(pTrans->undoActions, &action); + if (ret == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; } } @@ -237,11 +305,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); - mndTransDropArray(pTrans->redoLogs); - mndTransDropArray(pTrans->undoLogs); - mndTransDropArray(pTrans->commitLogs); - mndTransDropArray(pTrans->redoActions); - mndTransDropArray(pTrans->undoActions); + mndTransDropLog(pTrans->redoLogs); + mndTransDropLog(pTrans->undoLogs); + mndTransDropLog(pTrans->commitLogs); + mndTransDropAction(pTrans->redoActions); + mndTransDropAction(pTrans->undoActions); return 0; } @@ -274,6 +342,8 @@ char *mndTransStageStr(ETrnStage stage) { return "rollback"; case TRN_STAGE_RETRY: return "retry"; + case TRN_STAGE_OVER: + return "stop"; default: return "undefined"; } @@ -305,8 +375,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); + pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { @@ -319,7 +389,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { return pTrans; } -static void mndTransDropArray(SArray *pArray) { +static void mndTransDropLog(SArray *pArray) { for (int32_t i = 0; i < pArray->size; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); tfree(pRaw); @@ -328,12 +398,21 @@ static void mndTransDropArray(SArray *pArray) { taosArrayDestroy(pArray); } +static void mndTransDropAction(SArray *pArray) { + for (int32_t i = 0; i < pArray->size; ++i) { + STransAction *pAction = taosArrayGet(pArray, i); + rpcFreeCont(pAction->msg.pCont); + } + + taosArrayDestroy(pArray); +} + void mndTransDrop(STrans *pTrans) { - mndTransDropArray(pTrans->redoLogs); - mndTransDropArray(pTrans->undoLogs); - mndTransDropArray(pTrans->commitLogs); - mndTransDropArray(pTrans->redoActions); - mndTransDropArray(pTrans->undoActions); + mndTransDropLog(pTrans->redoLogs); + mndTransDropLog(pTrans->undoLogs); + mndTransDropLog(pTrans->commitLogs); + mndTransDropAction(pTrans->redoActions); + mndTransDropAction(pTrans->undoActions); mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans); tfree(pTrans); @@ -344,7 +423,7 @@ static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) { mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle); } -static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { +static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { if (pArray == NULL || pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -360,31 +439,43 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { } int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw); + int32_t code = mndTransAppendLog(pTrans->redoLogs, pRaw); mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw); + int32_t code = mndTransAppendLog(pTrans->undoLogs, pRaw); mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw); + int32_t code = mndTransAppendLog(pTrans->commitLogs, pRaw); mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code); return code; } -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { - int32_t code = mndTransAppendArray(pTrans->redoActions, pMsg); +static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg) { + STransAction action = {.epSet = *pEpSet, .msg = *pMsg}; + + void *ptr = taosArrayPush(pArray, &action); + if (ptr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, pMsg); mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg); return code; } -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { - int32_t code = mndTransAppendArray(pTrans->undoActions, pMsg); +int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, pMsg); mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg); return code; } @@ -559,18 +650,37 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { return code; } -static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - if (taosArrayGetSize(pTrans->redoActions) != 0) { - mTrace("trans:%d, execute redo actions finished", pTrans->id); +static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) { + SSdb *pSdb = pMnode->pSdb; + int32_t arraySize = taosArrayGetSize(pArray); + + for (int32_t i = 0; i < arraySize; ++i) { + SSdbRaw *pRaw = taosArrayGetP(pArray, i); + int32_t code = sdbWriteNotFree(pSdb, pRaw); + if (code != 0) { + return code; + } } + return 0; } +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { + int32_t code = 0; + if (taosArrayGetSize(pTrans->redoActions) != 0) { + mTrace("trans:%d, execute redo actions finished", pTrans->id); + } + + return code; +} + static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { + int32_t code = 0; if (taosArrayGetSize(pTrans->undoActions) != 0) { mTrace("trans:%d, execute undo actions finished", pTrans->id); } - return 0; + + return code; } static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index c2be7fa39a..da37bb06b0 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -24,9 +24,10 @@ #define TSDB_VGROUP_VER_NUM 1 #define TSDB_VGROUP_RESERVE_SIZE 64 -static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); -static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); -static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup); +static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); +static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); +static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); +static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup); static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg); @@ -156,6 +157,80 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { sdbRelease(pSdb, pVgroup); } +SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { + SCreateVnodeMsg *pCreate = rpcMallocCont(sizeof(SCreateVnodeMsg)); + if (pCreate == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pCreate->dnodeId = htonl(pDnode->id); + pCreate->vgId = htonl(pVgroup->vgId); + memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN); + pCreate->dbUid = htobe64(pDb->uid); + pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); + pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks); + pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile); + pCreate->daysToKeep0 = htonl(pDb->cfg.daysToKeep0); + pCreate->daysToKeep1 = htonl(pDb->cfg.daysToKeep1); + pCreate->daysToKeep2 = htonl(pDb->cfg.daysToKeep2); + pCreate->minRows = htonl(pDb->cfg.minRows); + pCreate->maxRows = htonl(pDb->cfg.maxRows); + pCreate->commitTime = htonl(pDb->cfg.commitTime); + pCreate->fsyncPeriod = htonl(pDb->cfg.fsyncPeriod); + pCreate->walLevel = pDb->cfg.walLevel; + pCreate->precision = pDb->cfg.precision; + pCreate->compression = pDb->cfg.compression; + pCreate->quorum = pDb->cfg.quorum; + pCreate->update = pDb->cfg.update; + pCreate->cacheLastRow = pDb->cfg.cacheLastRow; + pCreate->replica = pVgroup->replica; + pCreate->selfIndex = -1; + + for (int32_t v = 0; v < pVgroup->replica; ++v) { + SReplica *pReplica = &pCreate->replicas[v]; + SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; + SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pVgidDnode == NULL) { + rpcFreeCont(pCreate); + terrno = TSDB_CODE_MND_APP_ERROR; + return NULL; + } + + pReplica->id = htonl(pVgidDnode->id); + pReplica->port = htons(pVgidDnode->port); + memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); + mndReleaseDnode(pMnode, pVgidDnode); + + if (pDnode->id == pVgid->dnodeId) { + pCreate->selfIndex = v; + } + } + + if (pCreate->selfIndex == -1) { + rpcFreeCont(pCreate); + terrno = TSDB_CODE_MND_APP_ERROR; + return NULL; + } + + return pCreate; +} + +SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { + SDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SDropVnodeMsg)); + if (pDrop == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pDrop->dnodeId = htonl(pDnode->id); + pDrop->vgId = htonl(pVgroup->vgId); + memcpy(pDrop->db, pDb->name, TSDB_FULL_DB_NAME_LEN); + pDrop->dbUid = htobe64(pDb->uid); + + return pDrop; +} + static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) { SSdb *pSdb = pMnode->pSdb; int32_t allocedVnodes = 0; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0450513fc5..3e374b344b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -130,6 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") // mnode-common +TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Cluster not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing") From 6f0f7c5821cadab8abcf72ab86158c2caad8ae86 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 18 Dec 2021 22:12:33 +0800 Subject: [PATCH 4/6] TD-10431 build create vnode msg --- source/dnode/mnode/impl/inc/mndTrans.h | 4 +- source/dnode/mnode/impl/src/mndDb.c | 10 +- source/dnode/mnode/impl/src/mndTrans.c | 134 +++++++++++++----------- source/dnode/mnode/impl/src/mndVgroup.c | 8 +- 4 files changed, 80 insertions(+), 76 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 92e86efa9e..5c15e2f987 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg); -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); +int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); char *mndTransStageStr(ETrnStage stage); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6decc82743..e85edf66da 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -303,9 +303,8 @@ static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV return -1; } - SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SCreateVnodeMsg)}; - if (mndTransAppendRedoAction(pTrans, &epset, &rpcMsg) != 0) { - rpcFreeCont(pMsg); + if (mndTransAppendRedoAction(pTrans, &epset, TSDB_MSG_TYPE_ALTER_VNODE_IN, sizeof(SCreateVnodeMsg), pMsg) != 0) { + free(pMsg); return -1; } } @@ -333,9 +332,8 @@ static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV return -1; } - SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SDropVnodeMsg)}; - if (mndTransAppendUndoAction(pTrans, &epset, &rpcMsg) != 0) { - rpcFreeCont(pMsg); + if (mndTransAppendUndoAction(pTrans, &epset, TSDB_MSG_TYPE_DROP_VNODE_IN, sizeof(SDropVnodeMsg), pMsg) != 0) { + free(pMsg); return -1; } } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 8399b79921..5caec6c78d 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -23,7 +23,9 @@ typedef struct { SEpSet epSet; - SRpcMsg msg; + int8_t msgType; + int32_t contLen; + void *pCont; } STransAction; static SSdbRaw *mndTransActionEncode(STrans *pTrans); @@ -35,10 +37,11 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle); static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); -static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg); -static void mndTransDropLog(SArray *pArray); -static void mndTransDropAction(SArray *pArray); -static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray); +static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); +static void mndTransDropLogs(SArray *pArray); +static void mndTransDropActions(SArray *pArray); +static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); +static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); @@ -89,12 +92,12 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - rawDataLen += (sizeof(STransAction) + pAction->msg.contLen); + rawDataLen += (sizeof(STransAction) + pAction->contLen); } for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - rawDataLen += (sizeof(STransAction) + pAction->msg.contLen); + rawDataLen += (sizeof(STransAction) + pAction->contLen); } SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen); @@ -136,17 +139,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); - SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType) - SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen); + SDB_SET_INT8(pRaw, dataPos, pAction->msgType) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen); } for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); - SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType) - SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen); + SDB_SET_INT8(pRaw, dataPos, pAction->msgType) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen); } SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE) @@ -247,14 +250,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction action = {0}; SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); - SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType) - SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen) - action.msg.pCont = rpcMallocCont(action.msg.contLen); - if (action.msg.pCont == NULL) { + SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType) + SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen) + action.pCont = malloc(action.contLen); + if (action.pCont == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; } - SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen); void *ret = taosArrayPush(pTrans->redoActions, &action); if (ret == NULL) { @@ -266,14 +269,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < undoActionNum; ++i) { STransAction action = {0}; SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); - SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType) - SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen) - action.msg.pCont = rpcMallocCont(action.msg.contLen); - if (action.msg.pCont == NULL) { + SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType) + SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen) + action.pCont = malloc(action.contLen); + if (action.pCont == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; } - SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen); void *ret = taosArrayPush(pTrans->undoActions, &action); if (ret == NULL) { @@ -305,11 +308,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); - mndTransDropLog(pTrans->redoLogs); - mndTransDropLog(pTrans->undoLogs); - mndTransDropLog(pTrans->commitLogs); - mndTransDropAction(pTrans->redoActions); - mndTransDropAction(pTrans->undoActions); + mndTransDropLogs(pTrans->redoLogs); + mndTransDropLogs(pTrans->undoLogs); + mndTransDropLogs(pTrans->commitLogs); + mndTransDropActions(pTrans->redoActions); + mndTransDropActions(pTrans->undoActions); return 0; } @@ -389,7 +392,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { return pTrans; } -static void mndTransDropLog(SArray *pArray) { +static void mndTransDropLogs(SArray *pArray) { for (int32_t i = 0; i < pArray->size; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); tfree(pRaw); @@ -398,21 +401,21 @@ static void mndTransDropLog(SArray *pArray) { taosArrayDestroy(pArray); } -static void mndTransDropAction(SArray *pArray) { +static void mndTransDropActions(SArray *pArray) { for (int32_t i = 0; i < pArray->size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); - rpcFreeCont(pAction->msg.pCont); + free(pAction->pCont); } taosArrayDestroy(pArray); } void mndTransDrop(STrans *pTrans) { - mndTransDropLog(pTrans->redoLogs); - mndTransDropLog(pTrans->undoLogs); - mndTransDropLog(pTrans->commitLogs); - mndTransDropAction(pTrans->redoActions); - mndTransDropAction(pTrans->undoActions); + mndTransDropLogs(pTrans->redoLogs); + mndTransDropLogs(pTrans->undoLogs); + mndTransDropLogs(pTrans->commitLogs); + mndTransDropActions(pTrans->redoActions); + mndTransDropActions(pTrans->undoActions); mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans); tfree(pTrans); @@ -456,8 +459,8 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return code; } -static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg) { - STransAction action = {.epSet = *pEpSet, .msg = *pMsg}; +static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { + STransAction action = {.epSet = *pEpSet, .msgType = msgType, .contLen = contLen, .pCont = pCont}; void *ptr = taosArrayPush(pArray, &action); if (ptr == NULL) { @@ -468,15 +471,15 @@ static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMs return 0; } -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, pMsg); - mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg); +int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { + int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, msgType, contLen, pCont); + mTrace("trans:%d, msg:%s len:%d append to redo actions", pTrans->id, taosMsg[msgType], contLen); return code; } -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, pMsg); - mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg); +int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { + int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, msgType, contLen, pCont); + mTrace("trans:%d, msg:%s len:%d append to undo actions", pTrans->id, taosMsg[msgType], contLen); return code; } @@ -593,7 +596,7 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) // todo } -static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) { +static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { SSdb *pSdb = pMnode->pSdb; int32_t arraySize = taosArrayGetSize(pArray); @@ -611,7 +614,7 @@ static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) { static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; if (taosArrayGetSize(pTrans->redoLogs) != 0) { - code = mndTransExecuteArray(pMnode, pTrans->redoLogs); + code = mndTransExecuteLogs(pMnode, pTrans->redoLogs); if (code != 0) { mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr()) } else { @@ -625,7 +628,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; if (taosArrayGetSize(pTrans->undoLogs) != 0) { - code = mndTransExecuteArray(pMnode, pTrans->undoLogs); + code = mndTransExecuteLogs(pMnode, pTrans->undoLogs); if (code != 0) { mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr()) } else { @@ -639,7 +642,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; if (taosArrayGetSize(pTrans->commitLogs) != 0) { - code = mndTransExecuteArray(pMnode, pTrans->commitLogs); + code = mndTransExecuteLogs(pMnode, pTrans->commitLogs); if (code != 0) { mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr()) } else { @@ -651,36 +654,39 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { } static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) { - SSdb *pSdb = pMnode->pSdb; +#if 0 int32_t arraySize = taosArrayGetSize(pArray); - for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - int32_t code = sdbWriteNotFree(pSdb, pRaw); - if (code != 0) { - return code; + STransAction *pAction = taosArrayGet(pArray, i); + + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen}; + rpcMsg.pCont = rpcMallocCont(pAction->contLen); + if (rpcMsg.pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } + memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); } + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +#else return 0; +#endif } static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; - if (taosArrayGetSize(pTrans->redoActions) != 0) { - mTrace("trans:%d, execute redo actions finished", pTrans->id); - } + if (taosArrayGetSize(pTrans->redoActions) <= 0) return 0; - return code; + mTrace("trans:%d, start to execute redo actions", pTrans->id); + return mndTransExecuteActions(pMnode, pTrans->redoActions); } static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; - if (taosArrayGetSize(pTrans->undoActions) != 0) { - mTrace("trans:%d, execute undo actions finished", pTrans->id); - } + if (taosArrayGetSize(pTrans->undoActions) <= 0) return 0; - return code; + mTrace("trans:%d, start to execute undo actions", pTrans->id); + return mndTransExecuteActions(pMnode, pTrans->undoActions); } static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index da37bb06b0..e99fea200b 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -158,7 +158,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { } SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SCreateVnodeMsg *pCreate = rpcMallocCont(sizeof(SCreateVnodeMsg)); + SCreateVnodeMsg *pCreate = malloc(sizeof(SCreateVnodeMsg)); if (pCreate == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -192,7 +192,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pVgidDnode == NULL) { - rpcFreeCont(pCreate); + free(pCreate); terrno = TSDB_CODE_MND_APP_ERROR; return NULL; } @@ -208,7 +208,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb } if (pCreate->selfIndex == -1) { - rpcFreeCont(pCreate); + free(pCreate); terrno = TSDB_CODE_MND_APP_ERROR; return NULL; } @@ -217,7 +217,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb } SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SDropVnodeMsg)); + SDropVnodeMsg *pDrop = malloc(sizeof(SDropVnodeMsg)); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; From f074a4b3a5c99dcbda3f0f8c857bb92f9d19dd02 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Dec 2021 23:06:08 +0800 Subject: [PATCH 5/6] update cache put --- cmake/cmake.options | 2 +- include/libs/index/index.h | 2 +- source/libs/index/CMakeLists.txt | 6 +- source/libs/index/inc/indexInt.h | 4 +- source/libs/index/src/index.c | 54 ++++++------ source/libs/index/test/CMakeLists.txt | 2 +- .../test/{indexTests.cpp => indexTests.cc} | 86 +++++++++++++++++-- 7 files changed, 116 insertions(+), 40 deletions(-) rename source/libs/index/test/{indexTests.cpp => indexTests.cc} (74%) diff --git a/cmake/cmake.options b/cmake/cmake.options index e44a38b3f5..2384a427e4 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -57,7 +57,7 @@ option( ) option( - USE_INVERTEDINDEX + BUILD_WITH_INVERTEDINDEX "If use invertedIndex" ON ) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 2cc2aeb6a0..3ca8d10603 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -51,7 +51,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm * @param: * @param: */ -SIndex* indexOpen(SIndexOpts *opt, const char *path); +int indexOpen(SIndexOpts *opt, const char *path, SIndex **index); void indexClose(SIndex *index); int indexPut(SIndex *index, SIndexMultiTerm *terms, int uid); int indexDelete(SIndex *index, SIndexMultiTermQuery *query); diff --git a/source/libs/index/CMakeLists.txt b/source/libs/index/CMakeLists.txt index f68fc5e61e..4805bd3b77 100644 --- a/source/libs/index/CMakeLists.txt +++ b/source/libs/index/CMakeLists.txt @@ -22,9 +22,13 @@ if (${BUILD_WITH_LUCENE}) index PUBLIC lucene++ ) - endif(${BUILD_WITH_LUCENE}) +if (${BUILD_WITH_INVERTEDINDEX}) + add_definitions(-DUSE_INVERTED_INDEX) +endif(${BUILD_WITH_INVERTEDINDEX}) + + if (${BUILD_TEST}) add_subdirectory(test) endif(${BUILD_TEST}) diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index fb5a9e40b5..7e017049e8 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -37,10 +37,10 @@ struct SIndex { #endif void *cache; void *tindex; - SHashObj *fieldObj;// < field name, field id> + SHashObj *colObj;// < field name, field id> int64_t suid; // current super table id, -1 is normal table - int fieldId; // field id allocated to cache + int colId; // field id allocated to cache int32_t cVersion; // current version allocated to cache pthread_mutex_t mtx; }; diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index ca6c2062f1..08c59d8d43 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -22,11 +22,10 @@ #endif -typedef struct SIdxFieldInfo { - int fieldId; // generated by index internal +typedef struct SIdxColInfo { + int colId; // generated by index internal int cVersion; - int type; // field type -} SIdxFieldInfo; +} SIdxColInfo; static pthread_once_t isInit = PTHREAD_ONCE_INIT; static void indexInit(); @@ -38,9 +37,10 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) { indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); return 0; } -SIndex *indexOpen(SIndexOpts *opts, const char *path) { +int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) { pthread_once(&isInit, indexInit); SIndex *sIdx = calloc(1, sizeof(SIndex)); + if (sIdx == NULL) { return -1; } #ifdef USE_LUCENE index_t *index = index_open(path); @@ -49,11 +49,13 @@ SIndex *indexOpen(SIndexOpts *opts, const char *path) { sIdx->cache = (void*)indexCacheCreate(); sIdx->tindex = NULL; - sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - sIdx->fieldId = 1; + sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + sIdx->colId = 1; sIdx->cVersion = 1; pthread_mutex_init(&sIdx->mtx, NULL); - return sIdx; + + *index = sIdx; + return 0; } void indexClose(SIndex *sIdx) { @@ -62,16 +64,16 @@ void indexClose(SIndex *sIdx) { sIdx->index = NULL; #endif -#ifdef USE_INVERTEDINDEX +#ifdef USE_INVERTED_INDEX indexCacheDestroy(sIdx->cache); - taosHashCleanup(sIdx->fieldObj); + taosHashCleanup(sIdx->colObj); pthread_mutex_destroy(&sIdx->mtx); #endif free(sIdx); return; } -int indexPut(SIndex *index, SArray* fVals, int uid) { +int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) { #ifdef USE_LUCENE index_document_t *doc = index_document_create(); @@ -89,38 +91,38 @@ int indexPut(SIndex *index, SArray* fVals, int uid) { index_document_destroy(doc); #endif -#ifdef USE_INVERTEDINDEX +#ifdef USE_INVERTED_INDEX + //TODO(yihao): reduce the lock range pthread_mutex_lock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm *p = taosArrayGetP(fVals, i); - SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey); + SIdxColInfo *fi = taosHashGet(index->colObj, p->colName, p->nColName); if (fi == NULL) { - SIdxFieldInfo tfi = {.fieldId = index->fieldId, .type = p->type}; + SIdxColInfo tfi = {.colId = index->colId}; index->cVersion++; - index->fieldId++; - taosHashPut(index->fieldObj, p->key, p->nKey, &tfi, sizeof(tfi)); + index->colId++; + taosHashPut(index->colObj, p->colName, p->nColName, &tfi, sizeof(tfi)); } else { //TODO, del } } + pthread_mutex_unlock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm *p = taosArrayGetP(fVals, i); - SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey); + SIdxColInfo *fi = taosHashGet(index->colObj, p->colName, p->nColName); assert(fi != NULL); - int32_t fieldId = fi->fieldId; - int32_t fieldType = fi->type; + int32_t colId = fi->colId; int32_t version = index->cVersion; - int res = indexCachePut(index->cache, fieldId, fieldType, p->val, p->nVal, version, uid, p->operType); + int ret = indexCachePut(index->cache, colId, p->colType, p->colVal, p->nColVal, version, uid, p->operType); if (ret != 0) { - return + return ret; } } - pthread_mutex_unlock(&index->mtx); #endif - return 1; + return 0; } int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) { #ifdef USE_LUCENE @@ -159,7 +161,7 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result free(types); #endif -#ifdef USE_INVERTEDINDEX +#ifdef USE_INVERTED_INDEX #endif return 1; @@ -167,13 +169,13 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result int indexDelete(SIndex *index, SIndexMultiTermQuery *query) { -#ifdef USE_INVERTEDINDEX +#ifdef USE_INVERTED_INDEX #endif return 1; } int indexRebuild(SIndex *index, SIndexOpts *opts) { -#ifdef USE_INVERTEDINDEX +#ifdef USE_INVERTED_INDEX #endif } diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index f84f874a23..6eb532b41e 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -1,7 +1,7 @@ add_executable(indexTest "") target_sources(indexTest PRIVATE - "indexTests.cpp" + "indexTests.cc" ) target_include_directories ( indexTest PUBLIC diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cc similarity index 74% rename from source/libs/index/test/indexTests.cpp rename to source/libs/index/test/indexTests.cc index f582536817..9dff2e9ea0 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cc @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #include #include #include @@ -61,7 +75,7 @@ class FstReadMemory { // add later bool Search(AutomationCtx *ctx, std::vector &result) { FstStreamBuilder *sb = fstSearch(_fst, ctx); - StreamWithState *st = streamBuilderIntoStream(sb); + StreamWithState *st = streamBuilderIntoStream(sb); StreamWithStateResult *rt = NULL; while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { @@ -279,15 +293,71 @@ void validateFst() { delete m; } +class IndexEnv : public ::testing::Test { + protected: + virtual void SetUp() { + taosRemoveDir(path); + opts = indexOptsCreate(); + int ret = indexOpen(opts, path, &index); + assert(ret == 0); + } + virtual void TearDown() { + indexClose(index); + indexOptsDestroy(opts); + } + + const char *path = "/tmp/tindex"; + SIndexOpts *opts; + SIndex *index; +}; -int main(int argc, char** argv) { - checkFstPerf(); - //checkFstPrefixSearch(); - return 1; +TEST_F(IndexEnv, testPut) { + + // single index column + { + + std::string colName("tag1"), colVal("Hello world"); + SIndexTerm *term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexMultiTerm *terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + + for (size_t i = 0; i < 100; i++) { + int tableId = i; + int ret = indexPut(index, terms, tableId); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } + // multi index column + { + + SIndexMultiTerm *terms = indexMultiTermCreate(); + { + std::string colName("tag1"), colVal("Hello world"); + SIndexTerm *term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + indexMultiTermAdd(terms, term); + } + { + std::string colName("tag2"), colVal("Hello world"); + SIndexTerm *term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + indexMultiTermAdd(terms, term); + } + + for (int i = 0; i < 100; i++) { + int tableId = i; + int ret = indexPut(index, terms, tableId); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } + // +} + +TEST_F(IndexEnv, testDel) { + } -//TEST(IndexFstBuilder, IndexFstInput) { -// -//} + + From 7750309283fc84f87ae89f6afcfe5ece27bd9ef0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 19 Dec 2021 11:08:43 +0800 Subject: [PATCH 6/6] TD-10431 process create vnode msg --- include/util/tdef.h | 2 +- source/dnode/mgmt/impl/src/dndVnodes.c | 88 ++++--- source/dnode/mgmt/impl/test/CMakeLists.txt | 2 +- source/dnode/mgmt/impl/test/db/db.cpp | 1 + .../mgmt/impl/test/vgroup/CMakeLists.txt | 27 +++ source/dnode/mgmt/impl/test/vgroup/vgroup.cpp | 224 ++++++++++++++++++ 6 files changed, 306 insertions(+), 38 deletions(-) create mode 100644 source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt create mode 100644 source/dnode/mgmt/impl/test/vgroup/vgroup.cpp diff --git a/include/util/tdef.h b/include/util/tdef.h index 02b5a1e620..f3f3643268 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -231,7 +231,7 @@ do { \ #define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 -#define TSDB_MIN_VNODES 64 +#define TSDB_MIN_VNODES 16 #define TSDB_MAX_VNODES 512 #define TSDB_MIN_VNODES_PER_DB 1 #define TSDB_MAX_VNODES_PER_DB 4096 diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index a6eb916aef..d3f1b06a4a 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -95,7 +95,7 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj * pVnode = NULL; + SVnodeObj *pVnode = NULL; int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); @@ -107,23 +107,23 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { } taosRUnLockLatch(&pMgmt->latch); - dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); + if (pVnode != NULL) { + dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); + } + return pVnode; } static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { + if (pVnode == NULL) return; + SVnodesMgmt *pMgmt = &pDnode->vmgmt; - int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); - if (pVnode != NULL) { - refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - if (pVnode != NULL) { - dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); - } + dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) { @@ -457,7 +457,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { pMgmt->totalVnodes = numOfVnodes; - int32_t threadNum = tsNumOfCores; + int32_t threadNum = pDnode->opt.numOfCores; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); @@ -525,33 +525,49 @@ static void dndCloseVnodes(SDnode *pDnode) { static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) { SCreateVnodeMsg *pCreate = rpcMsg->pCont; - *vgId = htonl(pCreate->vgId); + pCreate->vgId = htonl(pCreate->vgId); + pCreate->dnodeId = htonl(pCreate->dnodeId); + pCreate->dbUid = htobe64(pCreate->dbUid); + pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); + pCreate->totalBlocks = htonl(pCreate->totalBlocks); + pCreate->daysPerFile = htonl(pCreate->daysPerFile); + pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0); + pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1); + pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2); + pCreate->minRows = htonl(pCreate->minRows); + pCreate->maxRows = htonl(pCreate->maxRows); + pCreate->commitTime = htonl(pCreate->commitTime); + pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod); + for (int r = 0; r < pCreate->replica; ++r) { + SReplica *pReplica = &pCreate->replicas[r]; + pReplica->id = htonl(pReplica->id); + pReplica->port = htons(pReplica->port); + } + + *vgId = pCreate->vgId; #if 0 - tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN); - pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize); - pCfg->totalBlocks = htonl(pCreate->totalBlocks); - pCfg->daysPerFile = htonl(pCreate->daysPerFile); - pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0); - pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1); - pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2); - pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); - pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); - pCfg->precision = pCreate->precision; - pCfg->compression = pCreate->compression; - pCfg->cacheLastRow = pCreate->cacheLastRow; - pCfg->update = pCreate->update; - pCfg->quorum = pCreate->quorum; - pCfg->replica = pCreate->replica; - pCfg->walLevel = pCreate->walLevel; - pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod); - - for (int32_t i = 0; i < pCfg->replica; ++i) { - pCfg->replicas[i].port = htons(pCreate->replicas[i].port); - tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - } + pCfg->wsize = pCreate->cacheBlockSize; + pCfg->ssize = pCreate->cacheBlockSize; + pCfg->wsize = pCreate->cacheBlockSize; + pCfg->lsize = pCreate->cacheBlockSize; + pCfg->isHeapAllocator = true; + pCfg->ttl = 4; + pCfg->keep = pCreate->daysToKeep0; + pCfg->isWeak = true; + pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0; + pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2; + pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0; + pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; + pCfg->metaCfg.lruSize = pCreate->cacheBlockSize; + pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod; + pCfg->walCfg.level = pCreate->walLevel; + pCfg->walCfg.retentionPeriod = 10; + pCfg->walCfg.retentionSize = 128; + pCfg->walCfg.rollPeriod = 128; + pCfg->walCfg.segSize = 128; + pCfg->walCfg.vgId = pCreate->vgId; #endif - return 0; } @@ -1016,7 +1032,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { SVnodesMgmt * pMgmt = &pDnode->vmgmt; SMWorkerPool *pPool = &pMgmt->writePool; pPool->name = "vnode-write"; - pPool->max = tsNumOfCores; + pPool->max = pDnode->opt.numOfCores; if (tMWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -1050,7 +1066,7 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { } static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { - int32_t maxThreads = tsNumOfCores / 2; + int32_t maxThreads = pDnode->opt.numOfCores / 2; if (maxThreads < 1) maxThreads = 1; SVnodesMgmt *pMgmt = &pDnode->vmgmt; diff --git a/source/dnode/mgmt/impl/test/CMakeLists.txt b/source/dnode/mgmt/impl/test/CMakeLists.txt index 8c6d146fb6..a5ece72f42 100644 --- a/source/dnode/mgmt/impl/test/CMakeLists.txt +++ b/source/dnode/mgmt/impl/test/CMakeLists.txt @@ -15,6 +15,6 @@ add_subdirectory(stb) # add_subdirectory(telem) # add_subdirectory(trans) add_subdirectory(user) -# add_subdirectory(vgroup) +add_subdirectory(vgroup) # add_subdirectory(common) diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index de1a606c86..d465a62f2d 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -232,6 +232,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { SRpcMsg* pMsg = pClient->pRsp; ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); + // taosMsleep(1000000); } SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); diff --git a/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt b/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt new file mode 100644 index 0000000000..5670f9dbf2 --- /dev/null +++ b/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt @@ -0,0 +1,27 @@ +add_executable(dnode_test_vgroup "") + +target_sources(dnode_test_vgroup + PRIVATE + "vgroup.cpp" + "../sut/deploy.cpp" +) + +target_link_libraries( + dnode_test_vgroup + PUBLIC dnode + PUBLIC util + PUBLIC os + PUBLIC gtest_main +) + +target_include_directories(dnode_test_vgroup + PUBLIC + "${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt" + "${CMAKE_CURRENT_SOURCE_DIR}/../../inc" + "${CMAKE_CURRENT_SOURCE_DIR}/../sut" +) + +add_test( + NAME dnode_test_vgroup + COMMAND dnode_test_vgroup +) diff --git a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp new file mode 100644 index 0000000000..3f16cd87d8 --- /dev/null +++ b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp @@ -0,0 +1,224 @@ +/** + * @file db.cpp + * @author slguan (slguan@taosdata.com) + * @brief DNODE module vgroup-msg tests + * @version 0.1 + * @date 2021-12-20 + * + * @copyright Copyright (c) 2021 + * + */ + +#include "deploy.h" + +class DndTestVgroup : public ::testing::Test { + protected: + static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) { + SServer* pServer = createServer(path, fqdn, port, firstEp); + ASSERT(pServer); + return pServer; + } + + static void SetUpTestSuite() { + initLog("/tmp/tdlog"); + + const char* fqdn = "localhost"; + const char* firstEp = "localhost:9150"; + pServer = CreateServer("/tmp/dnode_test_vgroup", fqdn, 9150, firstEp); + pClient = createClient("root", "taosdata", fqdn, 9150); + taosMsleep(1100); + } + + static void TearDownTestSuite() { + stopServer(pServer); + dropClient(pClient); + pServer = NULL; + pClient = NULL; + } + + static SServer* pServer; + static SClient* pClient; + static int32_t connId; + + public: + void SetUp() override {} + void TearDown() override {} + + void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) { + SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); + pShow->type = showType; + if (db != NULL) { + strcpy(pShow->db, db); + } + SRpcMsg showRpcMsg = {0}; + showRpcMsg.pCont = pShow; + showRpcMsg.contLen = sizeof(SShowMsg); + showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + + sendMsg(pClient, &showRpcMsg); + ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); + + SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; + ASSERT_NE(pShowRsp, nullptr); + pShowRsp->showId = htonl(pShowRsp->showId); + pMeta = &pShowRsp->tableMeta; + pMeta->numOfTags = htonl(pMeta->numOfTags); + pMeta->numOfColumns = htonl(pMeta->numOfColumns); + pMeta->sversion = htonl(pMeta->sversion); + pMeta->tversion = htonl(pMeta->tversion); + pMeta->tuid = htobe64(pMeta->tuid); + pMeta->suid = htobe64(pMeta->suid); + + showId = pShowRsp->showId; + + EXPECT_NE(pShowRsp->showId, 0); + EXPECT_STREQ(pMeta->tbFname, showName); + EXPECT_EQ(pMeta->numOfTags, 0); + EXPECT_EQ(pMeta->numOfColumns, columns); + EXPECT_EQ(pMeta->precision, 0); + EXPECT_EQ(pMeta->tableType, 0); + EXPECT_EQ(pMeta->update, 0); + EXPECT_EQ(pMeta->sversion, 0); + EXPECT_EQ(pMeta->tversion, 0); + EXPECT_EQ(pMeta->tuid, 0); + EXPECT_EQ(pMeta->suid, 0); + } + + void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) { + SSchema* pSchema = &pMeta->pSchema[index]; + pSchema->bytes = htonl(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, type); + EXPECT_EQ(pSchema->bytes, bytes); + EXPECT_STREQ(pSchema->name, name); + } + + void SendThenCheckShowRetrieveMsg(int32_t rows) { + SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); + pRetrieve->showId = htonl(showId); + pRetrieve->free = 0; + + SRpcMsg retrieveRpcMsg = {0}; + retrieveRpcMsg.pCont = pRetrieve; + retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); + retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + + sendMsg(pClient, &retrieveRpcMsg); + + ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); + + pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; + ASSERT_NE(pRetrieveRsp, nullptr); + pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); + pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); + pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); + + EXPECT_EQ(pRetrieveRsp->numOfRows, rows); + EXPECT_EQ(pRetrieveRsp->useconds, 0); + // EXPECT_EQ(pRetrieveRsp->completed, completed); + EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(pRetrieveRsp->compressed, 0); + EXPECT_EQ(pRetrieveRsp->compLen, 0); + + pData = pRetrieveRsp->data; + pos = 0; + } + + void CheckInt8(int8_t val) { + int8_t data = *((int8_t*)(pData + pos)); + pos += sizeof(int8_t); + EXPECT_EQ(data, val); + } + + void CheckInt16(int16_t val) { + int16_t data = *((int16_t*)(pData + pos)); + pos += sizeof(int16_t); + EXPECT_EQ(data, val); + } + + void CheckInt32(int32_t val) { + int32_t data = *((int32_t*)(pData + pos)); + pos += sizeof(int32_t); + EXPECT_EQ(data, val); + } + + void CheckInt64(int64_t val) { + int64_t data = *((int64_t*)(pData + pos)); + pos += sizeof(int64_t); + EXPECT_EQ(data, val); + } + + void CheckTimestamp() { + int64_t data = *((int64_t*)(pData + pos)); + pos += sizeof(int64_t); + EXPECT_GT(data, 0); + } + + void CheckBinary(const char* val, int32_t len) { + pos += sizeof(VarDataLenT); + char* data = (char*)(pData + pos); + pos += len; + EXPECT_STREQ(data, val); + } + + int32_t showId; + STableMetaMsg* pMeta; + SRetrieveTableRsp* pRetrieveRsp; + char* pData; + int32_t pos; +}; + +SServer* DndTestVgroup::pServer; +SClient* DndTestVgroup::pClient; +int32_t DndTestVgroup::connId; + + +TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { + { + SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg)); + pReq->vgId = htonl(2); + pReq->dnodeId = htonl(1); + strcpy(pReq->db, "1.d1"); + pReq->dbUid = htobe64(9527); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRows = htonl(100); + pReq->minRows = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replica = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->selfIndex = 0; + for (int r = 0; r < pReq->replica; ++r) { + SReplica* pReplica = &pReq->replicas[r]; + pReplica->id = htonl(1); + pReplica->port = htons(9150); + } + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateVnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + taosMsleep(1000000); + } + +} +