fix mnd config persist.

This commit is contained in:
xiao-77 2024-11-18 19:52:41 +08:00
parent 9b9b3e8e96
commit 4b8feab900
9 changed files with 179 additions and 119 deletions

View File

@ -118,7 +118,6 @@ void cfgCleanup(SConfig *pCfg);
int32_t cfgGetSize(SConfig *pCfg); int32_t cfgGetSize(SConfig *pCfg);
SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName); SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName);
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock); int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock);
int32_t cfgUpdateItem(SConfigItem *pItem, SConfigItem *newItem);
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer); int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer);
int32_t cfgCreateIter(SConfig *pConf, SConfigIter **ppIter); int32_t cfgCreateIter(SConfig *pConf, SConfigIter **ppIter);

View File

@ -23,11 +23,11 @@ extern "C" {
#endif #endif
int32_t mndInitConfig(SMnode *pMnode); int32_t mndInitConfig(SMnode *pMnode);
SSdbRaw *mnCfgActionEncode(SConfigItem *pCfg); SSdbRaw *mnCfgActionEncode(SConfigObj *pCfg);
SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw); SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw);
static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigItem *item); static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigObj *obj);
static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigItem *item); static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigObj *obj);
static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigItem *oldItem, SConfigItem *newItem); static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *oldItem, SConfigObj *newObj);
static int32_t mndCfgActionDeploy(SMnode *pMnode); static int32_t mndCfgActionDeploy(SMnode *pMnode);
static int32_t mndCfgActionPrepare(SMnode *pMnode); static int32_t mndCfgActionPrepare(SMnode *pMnode);

View File

@ -316,6 +316,27 @@ typedef struct {
TdThreadMutex mutex; TdThreadMutex mutex;
} SArbGroup; } SArbGroup;
typedef struct {
char name[CFG_NAME_MAX_LEN];
ECfgDataType dtype;
union {
bool bval;
float fval;
int32_t i32;
int64_t i64;
char* str;
};
union {
int64_t imin;
float fmin;
};
union {
int64_t imax;
float fmax;
};
} SConfigObj;
SConfigObj* mndInitConfigObj(SConfigItem* pItem);
typedef struct { typedef struct {
int32_t maxUsers; int32_t maxUsers;

View File

@ -21,11 +21,12 @@
#define CFG_VER_NUMBER 1 #define CFG_VER_NUMBER 1
#define CFG_RESERVE_SIZE 63 #define CFG_RESERVE_SIZE 63
static int32_t cfgUpdateItem(SConfigItem *pItem, SConfigObj *obj);
static int32_t mndProcessConfigReq(SRpcMsg *pReq); static int32_t mndProcessConfigReq(SRpcMsg *pReq);
static int32_t mndInitWriteCfg(SMnode *pMnode); static int32_t mndInitWriteCfg(SMnode *pMnode);
static int32_t mndInitReadCfg(SMnode *pMnode); static int32_t mndInitReadCfg(SMnode *pMnode);
int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigItem *item); int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigObj *obj);
int32_t mndInitConfig(SMnode *pMnode) { int32_t mndInitConfig(SMnode *pMnode) {
int32_t code = 0; int32_t code = 0;
@ -44,35 +45,36 @@ int32_t mndInitConfig(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
SSdbRaw *mnCfgActionEncode(SConfigItem *pItem) { SSdbRaw *mnCfgActionEncode(SConfigObj *obj) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
char buf[30]; char buf[30];
int32_t size = sizeof(SConfigItem) + CFG_RESERVE_SIZE; int32_t size = sizeof(SConfigObj);
SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER; if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, strlen(pItem->name), _OVER) char name[CFG_NAME_MAX_LEN] = {0};
SDB_SET_BINARY(pRaw, dataPos, pItem->name, strlen(pItem->name), _OVER) strncpy(name, obj->name, CFG_NAME_MAX_LEN);
SDB_SET_INT32(pRaw, dataPos, pItem->dtype, _OVER) SDB_SET_BINARY(pRaw, dataPos, name, CFG_NAME_MAX_LEN, _OVER)
switch (pItem->dtype) { SDB_SET_INT32(pRaw, dataPos, obj->dtype, _OVER)
switch (obj->dtype) {
case CFG_DTYPE_NONE: case CFG_DTYPE_NONE:
break; break;
case CFG_DTYPE_BOOL: case CFG_DTYPE_BOOL:
SDB_SET_BOOL(pRaw, dataPos, pItem->bval, _OVER) SDB_SET_BOOL(pRaw, dataPos, obj->bval, _OVER)
break; break;
case CFG_DTYPE_INT32: case CFG_DTYPE_INT32:
SDB_SET_INT32(pRaw, dataPos, pItem->i32, _OVER); SDB_SET_INT32(pRaw, dataPos, obj->i32, _OVER);
break; break;
case CFG_DTYPE_INT64: case CFG_DTYPE_INT64:
SDB_SET_INT64(pRaw, dataPos, pItem->i64, _OVER); SDB_SET_INT64(pRaw, dataPos, obj->i64, _OVER);
break; break;
case CFG_DTYPE_FLOAT: case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: case CFG_DTYPE_DOUBLE:
(void)sprintf(buf, "%f", pItem->fval); (void)sprintf(buf, "%f", obj->fval);
SDB_SET_INT32(pRaw, dataPos, strlen(buf), _OVER) SDB_SET_INT32(pRaw, dataPos, strlen(buf), _OVER)
SDB_SET_BINARY(pRaw, dataPos, buf, strlen(buf), _OVER) SDB_SET_BINARY(pRaw, dataPos, buf, strlen(buf), _OVER)
break; break;
@ -81,8 +83,8 @@ SSdbRaw *mnCfgActionEncode(SConfigItem *pItem) {
case CFG_DTYPE_LOCALE: case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET: case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE: case CFG_DTYPE_TIMEZONE:
SDB_SET_INT32(pRaw, dataPos, strlen(pItem->str), _OVER) SDB_SET_INT32(pRaw, dataPos, strlen(obj->str), _OVER)
SDB_SET_BINARY(pRaw, dataPos, pItem->str, strlen(pItem->str), _OVER) SDB_SET_BINARY(pRaw, dataPos, obj->str, strlen(obj->str), _OVER)
break; break;
} }
@ -94,7 +96,7 @@ _OVER:
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
return NULL; return NULL;
} }
mTrace("cfg encode to raw:%p, row:%p", pRaw, pItem); mTrace("cfg encode to raw:%p, row:%p", pRaw, obj);
return pRaw; return pRaw;
} }
@ -104,7 +106,7 @@ SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw) {
int32_t len = -1; int32_t len = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL; SSdbRow *pRow = NULL;
SConfigItem *item = NULL; SConfigObj *obj = NULL;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
@ -114,28 +116,26 @@ SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw) {
goto _OVER; goto _OVER;
} }
pRow = sdbAllocRow(sizeof(SConfigItem)); pRow = sdbAllocRow(sizeof(SConfigObj));
if (pRow == NULL) goto _OVER; if (pRow == NULL) goto _OVER;
item = sdbGetRowObj(pRow); obj = sdbGetRowObj(pRow);
if (item == NULL) goto _OVER; if (obj == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &len, _OVER) // TODO(beryl):free it.
char *buf = taosMemoryMalloc(len + 1); SDB_GET_BINARY(pRaw, dataPos, obj->name, CFG_NAME_MAX_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, buf, len, _OVER) SDB_GET_INT32(pRaw, dataPos, (int32_t *)&obj->dtype, _OVER)
item->name = buf; switch (obj->dtype) {
SDB_GET_INT32(pRaw, dataPos, (int32_t *)&item->dtype, _OVER)
switch (item->dtype) {
case CFG_DTYPE_NONE: case CFG_DTYPE_NONE:
break; break;
case CFG_DTYPE_BOOL: case CFG_DTYPE_BOOL:
SDB_GET_BOOL(pRaw, dataPos, &item->bval, _OVER) SDB_GET_BOOL(pRaw, dataPos, &obj->bval, _OVER)
break; break;
case CFG_DTYPE_INT32: case CFG_DTYPE_INT32:
SDB_GET_INT32(pRaw, dataPos, &item->i32, _OVER); SDB_GET_INT32(pRaw, dataPos, &obj->i32, _OVER);
break; break;
case CFG_DTYPE_INT64: case CFG_DTYPE_INT64:
SDB_GET_INT64(pRaw, dataPos, &item->i64, _OVER); SDB_GET_INT64(pRaw, dataPos, &obj->i64, _OVER);
break; break;
case CFG_DTYPE_FLOAT: case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: case CFG_DTYPE_DOUBLE:
@ -149,7 +149,7 @@ SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw) {
case CFG_DTYPE_CHARSET: case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE: case CFG_DTYPE_TIMEZONE:
SDB_GET_INT32(pRaw, dataPos, &len, _OVER) SDB_GET_INT32(pRaw, dataPos, &len, _OVER)
SDB_GET_BINARY(pRaw, dataPos, item->str, len, _OVER) SDB_GET_BINARY(pRaw, dataPos, obj->str, len, _OVER)
break; break;
} }
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
@ -161,21 +161,21 @@ _OVER:
return NULL; return NULL;
} }
mTrace("cfg decode from raw:%p, row:%p", pRaw, item); mTrace("cfg decode from raw:%p, row:%p", pRaw, obj);
return pRow; return pRow;
} }
static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigItem *item) { static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigObj *obj) {
mTrace("cfg:%s, perform insert action, row:%p", item->name, item); mTrace("cfg:%s, perform insert action, row:%p", obj->name, obj);
return 0; return 0;
} }
static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigItem *item) { static int32_t mndCfgActionDelete(SSdb *pSdb, SConfigObj *obj) {
mTrace("cfg:%s, perform delete action, row:%p", item->name, item); mTrace("cfg:%s, perform delete action, row:%p", obj->name, obj);
return 0; return 0;
} }
static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigItem *pOld, SConfigItem *pNew) { static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *pOld, SConfigObj *pNew) {
mTrace("cfg:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); mTrace("cfg:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
return 0; return 0;
} }
@ -232,7 +232,7 @@ int32_t mndInitWriteCfg(SMnode *pMnode) {
int code = -1; int code = -1;
size_t sz = 0; size_t sz = 0;
SConfigItem item = {0}; SConfigObj obj = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "init-write-config"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "init-write-config");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to init write cfg in create trans, since %s", terrstr()); mError("failed to init write cfg in create trans, since %s", terrstr());
@ -240,17 +240,19 @@ int32_t mndInitWriteCfg(SMnode *pMnode) {
} }
// encode mnd config version // encode mnd config version
item = (SConfigItem){.name = "tsmmConfigVersion", .dtype = CFG_DTYPE_INT32, .i32 = tsmmConfigVersion}; obj = (SConfigObj){.name = "tsmmConfigVersion", .dtype = CFG_DTYPE_INT32, .i32 = tsmmConfigVersion};
if ((code = mndSetCreateConfigCommitLogs(pTrans, &item)) != 0) { if ((code = mndSetCreateConfigCommitLogs(pTrans, &obj)) != 0) {
mError("failed to init mnd config version, since %s", terrstr()); mError("failed to init mnd config version, since %s", terrstr());
} }
sz = taosArrayGetSize(getGlobalCfg(tsCfg)); sz = taosArrayGetSize(getGlobalCfg(tsCfg));
for (int i = 0; i < sz; ++i) { for (int i = 0; i < sz; ++i) {
SConfigItem *item = taosArrayGet(getGlobalCfg(tsCfg), i); SConfigItem *item = taosArrayGet(getGlobalCfg(tsCfg), i);
if ((code = mndSetCreateConfigCommitLogs(pTrans, item)) != 0) { SConfigObj *obj = mndInitConfigObj(item);
if ((code = mndSetCreateConfigCommitLogs(pTrans, obj)) != 0) {
mError("failed to init mnd config:%s, since %s", item->name, terrstr()); mError("failed to init mnd config:%s, since %s", item->name, terrstr());
} }
taosMemoryFree(obj);
} }
if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER; if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
@ -268,31 +270,32 @@ int32_t mndInitReadCfg(SMnode *pMnode) {
mError("failed to init read cfg in create trans, since %s", terrstr()); mError("failed to init read cfg in create trans, since %s", terrstr());
goto _OVER; goto _OVER;
} }
SConfigItem *item = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion"); SConfigObj *obj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion");
if (item == NULL) { if (obj == NULL) {
mInfo("failed to acquire mnd config version, since %s", terrstr()); mInfo("failed to acquire mnd config version, since %s", terrstr());
goto _OVER; goto _OVER;
} else { } else {
tsmmConfigVersion = item->i32; tsmmConfigVersion = obj->i32;
sdbRelease(pMnode->pSdb, item); sdbRelease(pMnode->pSdb, obj);
} }
sz = taosArrayGetSize(getGlobalCfg(tsCfg));
for (int i = 0; i < sz; ++i) { for (int i = 0; i < sz; ++i) {
SConfigItem *item = taosArrayGet(getGlobalCfg(tsCfg), i); SConfigItem *item = taosArrayGet(getGlobalCfg(tsCfg), i);
SConfigItem *newItem = sdbAcquire(pMnode->pSdb, SDB_CFG, item->name); SConfigObj *newObj = sdbAcquire(pMnode->pSdb, SDB_CFG, item->name);
if (newItem == NULL) { if (newObj == NULL) {
mInfo("failed to acquire mnd config:%s, since %s", item->name, terrstr()); mInfo("failed to acquire mnd config:%s, since %s", item->name, terrstr());
continue; continue;
} }
cfgUpdateItem(item, newItem); cfgUpdateItem(item, newObj);
sdbRelease(pMnode->pSdb, newItem); sdbRelease(pMnode->pSdb, newObj);
} }
_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigItem *item) { int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigObj *item) {
int32_t code = 0; int32_t code = 0;
SSdbRaw *pCommitRaw = mnCfgActionEncode(item); SSdbRaw *pCommitRaw = mnCfgActionEncode(item);
if (pCommitRaw == NULL) { if (pCommitRaw == NULL) {
@ -303,3 +306,44 @@ int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigItem *item) {
if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code); if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t cfgUpdateItem(SConfigItem *pItem, SConfigObj *obj) {
int32_t code = TSDB_CODE_SUCCESS;
if (pItem == NULL || obj == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
switch (pItem->dtype) {
case CFG_DTYPE_BOOL: {
pItem->bval = obj->bval;
break;
}
case CFG_DTYPE_INT32: {
pItem->i32 = obj->i32;
break;
}
case CFG_DTYPE_INT64: {
pItem->i64 = obj->i64;
break;
}
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: {
pItem->fval = obj->fval;
break;
}
case CFG_DTYPE_DIR:
case CFG_DTYPE_TIMEZONE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_NONE:
case CFG_DTYPE_STRING: {
pItem->str = obj->str;
break;
}
default:
code = TSDB_CODE_INVALID_CFG;
break;
}
TAOS_RETURN(code);
}

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndDef.h"
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndDef.h"
#include "taoserror.h" #include "taoserror.h"
static void *freeStreamTasks(SArray *pTaskLevel); static void *freeStreamTasks(SArray *pTaskLevel);
@ -72,7 +72,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz)); TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz));
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGetP(pArray, j); SStreamTask *pTask = taosArrayGetP(pArray, j);
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
pTask->ver = SSTREAM_TASK_VER; pTask->ver = SSTREAM_TASK_VER;
} }
TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask)); TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask));
@ -273,8 +273,8 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
static void *topicNameDup(void *p) { return taosStrdup((char *)p); } static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
char *topic, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer) { SMqConsumerObj **ppConsumer) {
int32_t code = 0; int32_t code = 0;
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) { if (pConsumer == NULL) {
@ -294,30 +294,30 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
pConsumer->createTime = taosGetTimestampMs(); pConsumer->createTime = taosGetTimestampMs();
pConsumer->updateType = updateType; pConsumer->updateType = updateType;
if (updateType == CONSUMER_ADD_REB){ if (updateType == CONSUMER_ADD_REB) {
pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *)); pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
if(pConsumer->rebNewTopics == NULL){ if (pConsumer->rebNewTopics == NULL) {
code = terrno; code = terrno;
goto END; goto END;
} }
char* topicTmp = taosStrdup(topic); char *topicTmp = taosStrdup(topic);
if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) { if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
code = terrno; code = terrno;
goto END; goto END;
} }
}else if (updateType == CONSUMER_REMOVE_REB) { } else if (updateType == CONSUMER_REMOVE_REB) {
pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
if(pConsumer->rebRemovedTopics == NULL){ if (pConsumer->rebRemovedTopics == NULL) {
code = terrno; code = terrno;
goto END; goto END;
} }
char* topicTmp = taosStrdup(topic); char *topicTmp = taosStrdup(topic);
if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) { if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
code = terrno; code = terrno;
goto END; goto END;
} }
}else if (updateType == CONSUMER_INSERT_SUB){ } else if (updateType == CONSUMER_INSERT_SUB) {
tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId)); tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
pConsumer->withTbName = subscribe->withTbName; pConsumer->withTbName = subscribe->withTbName;
pConsumer->autoCommit = subscribe->autoCommit; pConsumer->autoCommit = subscribe->autoCommit;
@ -329,13 +329,13 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN); tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
if (pConsumer->rebNewTopics == NULL){ if (pConsumer->rebNewTopics == NULL) {
code = terrno; code = terrno;
goto END; goto END;
} }
pConsumer->assignedTopics = subscribe->topicNames; pConsumer->assignedTopics = subscribe->topicNames;
subscribe->topicNames = NULL; subscribe->topicNames = NULL;
}else if (updateType == CONSUMER_UPDATE_SUB){ } else if (updateType == CONSUMER_UPDATE_SUB) {
pConsumer->assignedTopics = subscribe->topicNames; pConsumer->assignedTopics = subscribe->topicNames;
subscribe->topicNames = NULL; subscribe->topicNames = NULL;
} }
@ -504,12 +504,12 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
} }
if (sver > 2){ if (sver > 2) {
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs); buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs); buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
buf = taosDecodeStringTo(buf, pConsumer->user); buf = taosDecodeStringTo(buf, pConsumer->user);
buf = taosDecodeStringTo(buf, pConsumer->fqdn); buf = taosDecodeStringTo(buf, pConsumer->fqdn);
} else{ } else {
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
} }
@ -517,7 +517,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
return (void *)buf; return (void *)buf;
} }
int32_t tEncodeOffRows(void **buf, SArray *offsetRows){ int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
int32_t tlen = 0; int32_t tlen = 0;
int32_t szVgs = taosArrayGetSize(offsetRows); int32_t szVgs = taosArrayGetSize(offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs); tlen += taosEncodeFixedI32(buf, szVgs);
@ -545,11 +545,10 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows); return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
} }
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver){ void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
int32_t szVgs = 0; int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs); buf = taosDecodeFixedI32(buf, &szVgs);
if (szVgs > 0) { if (szVgs > 0) {
@ -568,7 +567,7 @@ void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver){
} else { } else {
// do nothing // do nothing
} }
if(sver > 2){ if (sver > 2) {
buf = taosDecodeFixedI64(buf, &offRows->ever); buf = taosDecodeFixedI64(buf, &offRows->ever);
} }
} }
@ -598,7 +597,7 @@ int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
MND_TMQ_NULL_CHECK(pSubObj->consumerHash); MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES); pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs); MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
if (ppSub){ if (ppSub) {
*ppSub = pSubObj; *ppSub = pSubObj;
} }
return code; return code;
@ -730,6 +729,40 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
return (void *)buf; return (void *)buf;
} }
SConfigObj *mndInitConfigObj(SConfigItem *pItem) {
SConfigObj *pObj = taosMemoryCalloc(1, sizeof(SConfigObj));
if (pObj == NULL) {
return NULL;
}
strncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
pObj->dtype = pItem->dtype;
switch (pItem->dtype) {
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
pObj->bval = pItem->bval;
break;
case CFG_DTYPE_INT32:
pObj->i32 = pItem->i32;
break;
case CFG_DTYPE_INT64:
pObj->i64 = pItem->i64;
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
pObj->fval = pItem->fval;
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
pObj->str = pItem->str;
break;
}
return pObj;
}
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { // SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); // SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
// if (pEntryNew == NULL) return NULL; // if (pEntryNew == NULL) return NULL;

View File

@ -612,6 +612,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-anode", mndInitAnode, mndCleanupAnode)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-anode", mndInitAnode, mndCleanupAnode));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-arbgroup", mndInitArbGroup, mndCleanupArbGroup)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-arbgroup", mndInitArbGroup, mndCleanupArbGroup));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-config", mndInitConfig, NULL));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant));
@ -638,7 +639,6 @@ static int32_t mndInitSteps(SMnode *pMnode) {
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem));
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-config", mndInitConfig, NULL));
return 0; return 0;
} }

View File

@ -410,7 +410,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
code = sdbWriteWithoutFree(pSdb, pRaw); code = sdbWriteWithoutFree(pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("failed to read sdb file:%s since %s", file, terrstr()); mError("failed to exec sdbWrite while read sdb file:%s since %s", file, terrstr());
goto _OVER; goto _OVER;
} }
} }

View File

@ -261,6 +261,9 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) { int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) {
if (pRaw->type == SDB_CFG) {
mTrace("sdb write cfg");
}
SHashObj *hash = sdbGetHash(pSdb, pRaw->type); SHashObj *hash = sdbGetHash(pSdb, pRaw->type);
if (hash == NULL) return terrno; if (hash == NULL) return terrno;

View File

@ -404,46 +404,6 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy
TAOS_RETURN(code); TAOS_RETURN(code);
} }
int32_t cfgUpdateItem(SConfigItem *pItem, SConfigItem *newItem) {
int32_t code = TSDB_CODE_SUCCESS;
if (pItem == NULL || newItem == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
switch (pItem->dtype) {
case CFG_DTYPE_BOOL: {
pItem->bval = newItem->bval;
break;
}
case CFG_DTYPE_INT32: {
pItem->i32 = newItem->i32;
break;
}
case CFG_DTYPE_INT64: {
pItem->i64 = newItem->i64;
break;
}
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: {
pItem->fval = newItem->fval;
break;
}
case CFG_DTYPE_DIR:
case CFG_DTYPE_TIMEZONE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_NONE:
case CFG_DTYPE_STRING: {
pItem->str = newItem->str;
break;
}
default:
code = TSDB_CODE_INVALID_CFG;
break;
}
TAOS_RETURN(code);
}
SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName) { SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName) {
if (pCfg == NULL) return NULL; if (pCfg == NULL) return NULL;