TD-10431 update sdb for stable
This commit is contained in:
parent
a27bf43fab
commit
b6ed305614
|
@ -159,7 +159,7 @@ typedef enum _mgmt_table {
|
||||||
TSDB_MGMT_TABLE_DNODE,
|
TSDB_MGMT_TABLE_DNODE,
|
||||||
TSDB_MGMT_TABLE_MNODE,
|
TSDB_MGMT_TABLE_MNODE,
|
||||||
TSDB_MGMT_TABLE_VGROUP,
|
TSDB_MGMT_TABLE_VGROUP,
|
||||||
TSDB_MGMT_TABLE_METRIC,
|
TSDB_MGMT_TABLE_STABLE,
|
||||||
TSDB_MGMT_TABLE_MODULE,
|
TSDB_MGMT_TABLE_MODULE,
|
||||||
TSDB_MGMT_TABLE_QUERIES,
|
TSDB_MGMT_TABLE_QUERIES,
|
||||||
TSDB_MGMT_TABLE_STREAMS,
|
TSDB_MGMT_TABLE_STREAMS,
|
||||||
|
|
|
@ -251,9 +251,9 @@ typedef struct SStableObj {
|
||||||
int32_t version;
|
int32_t version;
|
||||||
int16_t numOfFields;
|
int16_t numOfFields;
|
||||||
int16_t numOfTags;
|
int16_t numOfTags;
|
||||||
|
SRWLatch lock;
|
||||||
SSchema *fieldSchema;
|
SSchema *fieldSchema;
|
||||||
SSchema *tagSchema;
|
SSchema *tagSchema;
|
||||||
char pCont[];
|
|
||||||
} SStableObj;
|
} SStableObj;
|
||||||
|
|
||||||
typedef struct SFuncObj {
|
typedef struct SFuncObj {
|
||||||
|
|
|
@ -270,7 +270,7 @@ char *mndShowStr(int32_t showType) {
|
||||||
return "show mnodes";
|
return "show mnodes";
|
||||||
case TSDB_MGMT_TABLE_VGROUP:
|
case TSDB_MGMT_TABLE_VGROUP:
|
||||||
return "show vgroups";
|
return "show vgroups";
|
||||||
case TSDB_MGMT_TABLE_METRIC:
|
case TSDB_MGMT_TABLE_STABLE:
|
||||||
return "show stables";
|
return "show stables";
|
||||||
case TSDB_MGMT_TABLE_MODULE:
|
case TSDB_MGMT_TABLE_MODULE:
|
||||||
return "show modules";
|
return "show modules";
|
||||||
|
|
|
@ -33,6 +33,10 @@ static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStable, SStabl
|
||||||
static int32_t mndProcessCreateStableMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessCreateStableMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessAlterStableMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessAlterStableMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessDropStableMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessDropStableMsg(SMnodeMsg *pMsg);
|
||||||
|
static int32_t mndProcessCreateStableInRsp(SMnodeMsg *pMsg);
|
||||||
|
static int32_t mndProcessAlterStableInRsp(SMnodeMsg *pMsg);
|
||||||
|
static int32_t mndProcessDropStableInRsp(SMnodeMsg *pMsg);
|
||||||
|
static int32_t mndProcessStableMetaMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mndGetStableMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
|
static int32_t mndGetStableMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
|
||||||
static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextStable(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextStable(SMnode *pMnode, void *pIter);
|
||||||
|
@ -46,13 +50,17 @@ int32_t mndInitStable(SMnode *pMnode) {
|
||||||
.updateFp = (SdbUpdateFp)mndStableActionUpdate,
|
.updateFp = (SdbUpdateFp)mndStableActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndStableActionDelete};
|
.deleteFp = (SdbDeleteFp)mndStableActionDelete};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DB, mndProcessCreateStableMsg);
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_STABLE, mndProcessCreateStableMsg);
|
||||||
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_DB, mndProcessAlterStableMsg);
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_STABLE, mndProcessAlterStableMsg);
|
||||||
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DB, mndProcessDropStableMsg);
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_STABLE, mndProcessDropStableMsg);
|
||||||
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP, mndProcessCreateStableInRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP, mndProcessAlterStableInRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_STABLE_IN_RSP, mndProcessDropStableInRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_TABLE_META, mndProcessStableMetaMsg);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetStableMeta);
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STABLE, mndGetStableMeta);
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveStables);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STABLE, mndRetrieveStables);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DB, mndCancelGetNextStable);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STABLE, mndCancelGetNextStable);
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
@ -119,8 +127,8 @@ static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT16(pRaw, pRow, dataPos, &pStable->numOfFields)
|
SDB_GET_INT16(pRaw, pRow, dataPos, &pStable->numOfFields)
|
||||||
SDB_GET_INT16(pRaw, pRow, dataPos, &pStable->numOfTags)
|
SDB_GET_INT16(pRaw, pRow, dataPos, &pStable->numOfTags)
|
||||||
|
|
||||||
pStable->fieldSchema = (SSchema *)pStable->pCont;
|
pStable->fieldSchema = calloc(pStable->numOfFields, sizeof(SSchema));
|
||||||
pStable->tagSchema = (SSchema *)(pStable->pCont + pStable->numOfFields * sizeof(SSchema));
|
pStable->tagSchema = calloc(pStable->numOfTags, sizeof(SSchema));
|
||||||
|
|
||||||
for (int32_t i = 0; i < pStable->numOfFields; ++i) {
|
for (int32_t i = 0; i < pStable->numOfFields; ++i) {
|
||||||
SSchema *pSchema = &pStable->fieldSchema[i];
|
SSchema *pSchema = &pStable->fieldSchema[i];
|
||||||
|
@ -155,28 +163,42 @@ static int32_t mndStableActionDelete(SSdb *pSdb, SStableObj *pStable) {
|
||||||
|
|
||||||
static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStable, SStableObj *pNewStable) {
|
static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStable, SStableObj *pNewStable) {
|
||||||
mTrace("stable:%s, perform update action", pOldStable->name);
|
mTrace("stable:%s, perform update action", pOldStable->name);
|
||||||
memcpy(pOldStable->name, pNewStable->name, TSDB_TABLE_NAME_LEN);
|
atomic_exchange_32(&pOldStable->updateTime, pNewStable->updateTime);
|
||||||
pOldStable->createdTime = pNewStable->createdTime;
|
atomic_exchange_32(&pOldStable->version, pNewStable->version);
|
||||||
pOldStable->updateTime = pNewStable->updateTime;
|
|
||||||
pOldStable->uid = pNewStable->uid;
|
|
||||||
pOldStable->version = pNewStable->version;
|
|
||||||
pOldStable->numOfFields = pNewStable->numOfFields;
|
|
||||||
pOldStable->numOfTags = pNewStable->numOfTags;
|
|
||||||
pOldStable->createdTime = pNewStable->createdTime;
|
|
||||||
|
|
||||||
memcpy(pOldStable->pCont, pNewStable, sizeof(SDbObj));
|
taosWLockLatch(&pOldStable->lock);
|
||||||
// pStable->fieldSchema = pStable->pCont;
|
int16_t numOfTags = pNewStable->numOfTags;
|
||||||
// pStable->tagSchema = pStable->pCont + pStable->numOfFields * sizeof(SSchema);
|
int32_t tagSize = numOfTags * sizeof(SSchema);
|
||||||
|
int16_t numOfFields = pNewStable->numOfFields;
|
||||||
|
int32_t fieldSize = numOfFields * sizeof(SSchema);
|
||||||
|
|
||||||
|
if (pOldStable->numOfTags < numOfTags) {
|
||||||
|
pOldStable->tagSchema = malloc(tagSize);
|
||||||
|
}
|
||||||
|
if (pOldStable->numOfFields < numOfFields) {
|
||||||
|
pOldStable->fieldSchema = malloc(fieldSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pOldStable->tagSchema, pNewStable->tagSchema, tagSize);
|
||||||
|
memcpy(pOldStable->fieldSchema, pNewStable->fieldSchema, fieldSize);
|
||||||
|
taosWUnLockLatch(&pOldStable->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateStableMsg(SMnodeMsg *pMsg) { return 0; }
|
static int32_t mndProcessCreateStableMsg(SMnodeMsg *pMsg) { return 0; }
|
||||||
|
|
||||||
|
static int32_t mndProcessCreateStableInRsp(SMnodeMsg *pMsg) { return 0; }
|
||||||
|
|
||||||
static int32_t mndProcessAlterStableMsg(SMnodeMsg *pMsg) { return 0; }
|
static int32_t mndProcessAlterStableMsg(SMnodeMsg *pMsg) { return 0; }
|
||||||
|
|
||||||
|
static int32_t mndProcessAlterStableInRsp(SMnodeMsg *pMsg) { return 0; }
|
||||||
|
|
||||||
static int32_t mndProcessDropStableMsg(SMnodeMsg *pMsg) { return 0; }
|
static int32_t mndProcessDropStableMsg(SMnodeMsg *pMsg) { return 0; }
|
||||||
|
|
||||||
|
static int32_t mndProcessDropStableInRsp(SMnodeMsg *pMsg) { return 0; }
|
||||||
|
|
||||||
|
static int32_t mndProcessStableMetaMsg(SMnodeMsg *pMsg) { return 0; }
|
||||||
|
|
||||||
static int32_t mndGetNumOfStables(SMnode *pMnode, char *dbName, int32_t *pNumOfStables) {
|
static int32_t mndGetNumOfStables(SMnode *pMnode, char *dbName, int32_t *pNumOfStables) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
|
|
@ -112,13 +112,13 @@ cmd ::= SHOW dbPrefix(X) TABLES LIKE ids(Y). {
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd ::= SHOW dbPrefix(X) STABLES. {
|
cmd ::= SHOW dbPrefix(X) STABLES. {
|
||||||
setShowOptions(pInfo, TSDB_MGMT_TABLE_METRIC, &X, 0);
|
setShowOptions(pInfo, TSDB_MGMT_TABLE_STABLE, &X, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd ::= SHOW dbPrefix(X) STABLES LIKE ids(Y). {
|
cmd ::= SHOW dbPrefix(X) STABLES LIKE ids(Y). {
|
||||||
SToken token;
|
SToken token;
|
||||||
tSetDbName(&token, &X);
|
tSetDbName(&token, &X);
|
||||||
setShowOptions(pInfo, TSDB_MGMT_TABLE_METRIC, &token, &Y);
|
setShowOptions(pInfo, TSDB_MGMT_TABLE_STABLE, &token, &Y);
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd ::= SHOW dbPrefix(X) VGROUPS. {
|
cmd ::= SHOW dbPrefix(X) VGROUPS. {
|
||||||
|
|
Loading…
Reference in New Issue