feat:write meta from tmq to taosd
This commit is contained in:
parent
c6591c3cd3
commit
998de54728
|
@ -479,8 +479,6 @@ int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
|
|||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t alterType;
|
||||
int32_t tagVer;
|
||||
int32_t colVer;
|
||||
int32_t numOfFields;
|
||||
SArray* pFields;
|
||||
int32_t ttl;
|
||||
|
|
|
@ -248,6 +248,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF)
|
||||
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0)
|
||||
#define TSDB_CODE_MND_INVALID_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x03B1)
|
||||
#define TSDB_CODE_MND_STABLE_UID_NOT_MATCH TAOS_DEF_ERROR_CODE(0, 0x03B2)
|
||||
|
||||
// mnode-infoSchema
|
||||
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x03BA)
|
||||
|
|
|
@ -697,8 +697,6 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
|
|||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->alterType) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->tagVer) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->colVer) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfFields) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
|
||||
SField *pField = taosArrayGet(pReq->pFields, i);
|
||||
|
@ -725,8 +723,6 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
|
|||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->alterType) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->tagVer) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->colVer) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfFields) < 0) return -1;
|
||||
pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField));
|
||||
if (pReq->pFields == NULL) {
|
||||
|
|
|
@ -869,9 +869,38 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
pStb = mndAcquireStb(pMnode, createReq.name);
|
||||
if (pStb != NULL) {
|
||||
if (createReq.igExists) {
|
||||
if (createReq.source == TD_REQ_FROM_APP) {
|
||||
mDebug("stb:%s, already exist, ignore exist is set", createReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
} else if (pStb->uid != createReq.suid) {
|
||||
mError("stb:%s, already exist while create, input suid:%" PRId64 " not match with exist suid:%" PRId64,
|
||||
createReq.name, createReq.suid, pStb->uid);
|
||||
terrno = TSDB_CODE_MND_STABLE_UID_NOT_MATCH;
|
||||
goto _OVER;
|
||||
} else if (createReq.tagVer > 0 || createReq.colVer > 0) {
|
||||
int32_t tagDelta = pStb->tagVer - createReq.tagVer;
|
||||
int32_t colDelta = pStb->colVer - createReq.colVer;
|
||||
int32_t verDelta = tagDelta + verDelta;
|
||||
mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
|
||||
createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
|
||||
if (tagDelta <= 0 && colDelta <= 0) {
|
||||
mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
||||
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
|
||||
} else {
|
||||
mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
|
||||
terrno = TSDB_CODE_MND_INVALID_SCHEMA_VER;
|
||||
goto _OVER;
|
||||
}
|
||||
} else {
|
||||
mError("stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid", createReq.name,
|
||||
createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
|
||||
terrno = TSDB_CODE_MND_INVALID_SCHEMA_VER;
|
||||
goto _OVER;
|
||||
}
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST;
|
||||
goto _OVER;
|
||||
|
@ -900,25 +929,6 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (createReq.tagVer > 0 || createReq.colVer > 0) {
|
||||
int32_t tagDelta = pStb->tagVer - createReq.tagVer;
|
||||
int32_t colDelta = pStb->colVer - createReq.colVer;
|
||||
int32_t verDelta = tagDelta + verDelta;
|
||||
mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, mnode tagVer:%d colVer:%d", createReq.name,
|
||||
createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
|
||||
if (tagDelta <= 0 && colDelta <= 0) {
|
||||
mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
||||
mInfo("stb:%s, schema version is only increased by 1 digit, do alter operation", createReq.name);
|
||||
} else {
|
||||
mInfo("stb:%s, schema version increase more than 1 digit, error is returned", createReq.name);
|
||||
terrno = TSDB_CODE_MND_INVALID_SCHEMA_VER;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
|
@ -1633,25 +1643,6 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (alterReq.tagVer > 0 || alterReq.colVer > 0) {
|
||||
int32_t tagDelta = pStb->tagVer - alterReq.tagVer;
|
||||
int32_t colDelta = pStb->colVer - alterReq.colVer;
|
||||
int32_t verDelta = tagDelta + verDelta;
|
||||
mInfo("stb:%s, already exist while alter, input tagVer:%d colVer:%d, mnode tagVer:%d colVer:%d", alterReq.name,
|
||||
alterReq.tagVer, alterReq.colVer, pStb->tagVer, pStb->colVer);
|
||||
if (tagDelta <= 0 && colDelta <= 0) {
|
||||
mInfo("stb:%s, schema version is not incremented and nothing needs to be done", alterReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
||||
mInfo("stb:%s, schema version is only increased by 1 digit, do alter operation", alterReq.name);
|
||||
} else {
|
||||
mInfo("stb:%s, schema version increase more than 1 digit, error is returned", alterReq.name);
|
||||
terrno = TSDB_CODE_MND_INVALID_SCHEMA_VER;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
|
|
@ -252,6 +252,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SCHEMA_VER, "Invalid schema version while alter stb")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STABLE_UID_NOT_MATCH, "Invalid stable uid while alter stb")
|
||||
|
||||
// mnode-infoSchema
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name")
|
||||
|
|
Loading…
Reference in New Issue