feat:write meta from tmq to taosd
This commit is contained in:
parent
054b16c2db
commit
f2f45a1ec7
|
@ -169,6 +169,9 @@ typedef enum _mgmt_table {
|
||||||
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
|
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
|
||||||
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
|
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
|
||||||
|
|
||||||
|
#define TD_REQ_FROM_APP 0
|
||||||
|
#define TD_REQ_FROM_TAOX 1
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
char* dbFName;
|
char* dbFName;
|
||||||
|
@ -434,11 +437,16 @@ STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
|
int8_t source; // 1-taosX or 0-taosClient
|
||||||
|
int8_t reserved[6];
|
||||||
|
tb_uid_t suid;
|
||||||
int64_t delay1;
|
int64_t delay1;
|
||||||
int64_t delay2;
|
int64_t delay2;
|
||||||
int64_t watermark1;
|
int64_t watermark1;
|
||||||
int64_t watermark2;
|
int64_t watermark2;
|
||||||
int32_t ttl;
|
int32_t ttl;
|
||||||
|
int32_t colVer;
|
||||||
|
int32_t tagVer;
|
||||||
int32_t numOfColumns;
|
int32_t numOfColumns;
|
||||||
int32_t numOfTags;
|
int32_t numOfTags;
|
||||||
int32_t numOfFuncs;
|
int32_t numOfFuncs;
|
||||||
|
@ -446,16 +454,11 @@ typedef struct {
|
||||||
int32_t ast1Len;
|
int32_t ast1Len;
|
||||||
int32_t ast2Len;
|
int32_t ast2Len;
|
||||||
SArray* pColumns; // array of SField
|
SArray* pColumns; // array of SField
|
||||||
int32_t cVersion;
|
|
||||||
SArray* pTags; // array of SField
|
SArray* pTags; // array of SField
|
||||||
int32_t tVersion;
|
|
||||||
SArray* pFuncs;
|
SArray* pFuncs;
|
||||||
char* pComment;
|
char* pComment;
|
||||||
char* pAst1;
|
char* pAst1;
|
||||||
char* pAst2;
|
char* pAst2;
|
||||||
tb_uid_t suid;
|
|
||||||
int8_t source; // 1-taosX or 0-taosClient
|
|
||||||
int8_t reserved[8];
|
|
||||||
} SMCreateStbReq;
|
} SMCreateStbReq;
|
||||||
|
|
||||||
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
||||||
|
@ -465,9 +468,9 @@ void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igNotExists;
|
int8_t igNotExists;
|
||||||
tb_uid_t suid;
|
|
||||||
int8_t source; // 1-taosX or 0-taosClient
|
int8_t source; // 1-taosX or 0-taosClient
|
||||||
int8_t reserved[8];
|
int8_t reserved[6];
|
||||||
|
tb_uid_t suid;
|
||||||
} SMDropStbReq;
|
} SMDropStbReq;
|
||||||
|
|
||||||
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
|
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
|
||||||
|
|
|
@ -246,6 +246,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
|
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
|
||||||
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF)
|
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF)
|
||||||
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0)
|
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0)
|
||||||
|
#define TSDB_CODE_MND_INVALID_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x03B1)
|
||||||
|
|
||||||
// mnode-infoSchema
|
// mnode-infoSchema
|
||||||
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x03BA)
|
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x03BA)
|
||||||
|
|
|
@ -2303,8 +2303,8 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
|
||||||
strcpy(field.name, pSchema->name);
|
strcpy(field.name, pSchema->name);
|
||||||
taosArrayPush(pReq.pTags, &field);
|
taosArrayPush(pReq.pTags, &field);
|
||||||
}
|
}
|
||||||
pReq.cVersion = req.schemaRow.version;
|
pReq.colVer = req.schemaRow.version;
|
||||||
pReq.tVersion = req.schemaTag.version;
|
pReq.tagVer = req.schemaTag.version;
|
||||||
pReq.numOfColumns = req.schemaRow.nCols;
|
pReq.numOfColumns = req.schemaRow.nCols;
|
||||||
pReq.numOfTags = req.schemaTag.nCols;
|
pReq.numOfTags = req.schemaTag.nCols;
|
||||||
pReq.commentLen = -1;
|
pReq.commentLen = -1;
|
||||||
|
|
|
@ -496,11 +496,18 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < sizeof(pReq->reserved) / sizeof(int8_t); ++i) {
|
||||||
|
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->delay1) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->delay1) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->delay2) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->delay2) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->watermark1) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->watermark1) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->watermark2) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->watermark2) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->colVer) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->tagVer) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->numOfFuncs) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->numOfFuncs) < 0) return -1;
|
||||||
|
@ -516,8 +523,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
||||||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tEncodeI32(&encoder, pReq->cVersion) < 0) return -1;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
|
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
|
||||||
SField *pField = taosArrayGet(pReq->pTags, i);
|
SField *pField = taosArrayGet(pReq->pTags, i);
|
||||||
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
|
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
|
||||||
|
@ -526,8 +531,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
||||||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tEncodeI32(&encoder, pReq->tVersion) < 0) return -1;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
|
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
|
||||||
const char *pFunc = taosArrayGet(pReq->pFuncs, i);
|
const char *pFunc = taosArrayGet(pReq->pFuncs, i);
|
||||||
if (tEncodeCStr(&encoder, pFunc) < 0) return -1;
|
if (tEncodeCStr(&encoder, pFunc) < 0) return -1;
|
||||||
|
@ -542,11 +545,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
||||||
if (pReq->ast2Len > 0) {
|
if (pReq->ast2Len > 0) {
|
||||||
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
|
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
|
|
||||||
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
|
|
||||||
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -562,11 +560,18 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < sizeof(pReq->reserved) / sizeof(int8_t); ++i) {
|
||||||
|
if (tDecodeI8(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->delay1) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->delay1) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->delay2) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->delay2) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->watermark1) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->watermark1) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->watermark2) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->watermark2) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->colVer) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->tagVer) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->numOfFuncs) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->numOfFuncs) < 0) return -1;
|
||||||
|
@ -594,8 +599,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tDecodeI32(&decoder, &pReq->cVersion) < 0) return -1;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
|
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
|
||||||
SField field = {0};
|
SField field = {0};
|
||||||
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
|
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
|
||||||
|
@ -608,8 +611,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tDecodeI32(&decoder, &pReq->tVersion) < 0) return -1;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
|
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
|
||||||
char pFunc[TSDB_FUNC_NAME_LEN] = {0};
|
char pFunc[TSDB_FUNC_NAME_LEN] = {0};
|
||||||
if (tDecodeCStrTo(&decoder, pFunc) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pFunc) < 0) return -1;
|
||||||
|
@ -637,12 +638,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
|
|
||||||
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
|
|
||||||
if (tDecodeI8(&decoder, &pReq->reserved[i]) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -664,11 +659,11 @@ int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
|
|
||||||
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
|
||||||
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
|
for (int32_t i = 0; i < sizeof(pReq->reserved) / sizeof(int8_t); ++i) {
|
||||||
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -683,11 +678,11 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq)
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
|
|
||||||
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
|
||||||
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
|
for (int32_t i = 0; i < sizeof(pReq->reserved) / sizeof(int8_t); ++i) {
|
||||||
if (tDecodeI8(&decoder, &pReq->reserved[i]) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
|
|
@ -708,8 +708,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
pDst->updateTime = pDst->createdTime;
|
pDst->updateTime = pDst->createdTime;
|
||||||
pDst->uid = (pCreate->source == 1) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
pDst->uid = (pCreate->source == 1) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
pDst->dbUid = pDb->uid;
|
pDst->dbUid = pDb->uid;
|
||||||
pDst->tagVer = (pCreate->source == 1) ? pCreate->tVersion : 1;
|
pDst->tagVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->tagVer : 1;
|
||||||
pDst->colVer = (pCreate->source == 1) ? pCreate->cVersion : 1;
|
pDst->colVer = (pCreate->source != TD_REQ_FROM_APP) ? pCreate->colVer : 1;
|
||||||
pDst->smaVer = 1;
|
pDst->smaVer = 1;
|
||||||
pDst->nextColId = 1;
|
pDst->nextColId = 1;
|
||||||
pDst->maxdelay[0] = pCreate->delay1;
|
pDst->maxdelay[0] = pCreate->delay1;
|
||||||
|
@ -900,6 +900,25 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (createReq.tagVer > 0 || createReq.colVer > 0) {
|
||||||
|
int32_t tagDelta = pStb->tagVer - createReq.tagVer;
|
||||||
|
int32_t colDelta = pStb->colVer - createReq.colVer;
|
||||||
|
int32_t verDelta = tagDelta + verDelta;
|
||||||
|
mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, mnode tagVer:%d colVer:%d", createReq.name,
|
||||||
|
createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
|
||||||
|
if (tagDelta <= 0 && colDelta <= 0) {
|
||||||
|
mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
|
||||||
|
code = 0;
|
||||||
|
goto _OVER;
|
||||||
|
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
||||||
|
mInfo("stb:%s, schema version is only increased by 1 digit, do alter operation", createReq.name);
|
||||||
|
} else {
|
||||||
|
mInfo("stb:%s, schema version increase more than 1 digit, error is returned", createReq.name);
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_SCHEMA_VER;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
|
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
@ -1614,12 +1633,23 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((alterReq.tagVer > 0 && alterReq.colVer > 0) &&
|
if (alterReq.tagVer > 0 || alterReq.colVer > 0) {
|
||||||
(alterReq.tagVer <= pStb->tagVer || alterReq.colVer <= pStb->colVer)) {
|
int32_t tagDelta = pStb->tagVer - alterReq.tagVer;
|
||||||
mDebug("stb:%s, already exist, tagVer:%d colVer:%d smaller than in mnode, tagVer:%d colVer:%d, alter success",
|
int32_t colDelta = pStb->colVer - alterReq.colVer;
|
||||||
alterReq.name, alterReq.tagVer, alterReq.colVer, pStb->tagVer, pStb->colVer);
|
int32_t verDelta = tagDelta + verDelta;
|
||||||
|
mInfo("stb:%s, already exist while alter, input tagVer:%d colVer:%d, mnode tagVer:%d colVer:%d", alterReq.name,
|
||||||
|
alterReq.tagVer, alterReq.colVer, pStb->tagVer, pStb->colVer);
|
||||||
|
if (tagDelta <= 0 && colDelta <= 0) {
|
||||||
|
mInfo("stb:%s, schema version is not incremented and nothing needs to be done", alterReq.name);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
||||||
|
mInfo("stb:%s, schema version is only increased by 1 digit, do alter operation", alterReq.name);
|
||||||
|
} else {
|
||||||
|
mInfo("stb:%s, schema version increase more than 1 digit, error is returned", alterReq.name);
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_SCHEMA_VER;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||||
|
@ -1752,7 +1782,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dropReq.source == 1 && pStb->uid != dropReq.suid){
|
if (dropReq.source != TD_REQ_FROM_APP && pStb->uid != dropReq.suid) {
|
||||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
|
@ -250,6 +250,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SCHEMA_VER, "Invalid schema version while alter stb")
|
||||||
|
|
||||||
// mnode-infoSchema
|
// mnode-infoSchema
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name")
|
||||||
|
|
Loading…
Reference in New Issue