commit
ae01673dbc
|
@ -184,6 +184,13 @@ typedef struct SField {
|
|||
int32_t bytes;
|
||||
} SField;
|
||||
|
||||
typedef struct SRetention {
|
||||
int32_t freq;
|
||||
int32_t keep;
|
||||
int8_t freqUnit;
|
||||
int8_t keepUnit;
|
||||
} SRetention;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
||||
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
|
||||
|
@ -272,11 +279,14 @@ typedef struct {
|
|||
float xFilesFactor;
|
||||
int32_t aggregationMethod;
|
||||
int32_t delay;
|
||||
int32_t ttl;
|
||||
int32_t numOfColumns;
|
||||
int32_t numOfTags;
|
||||
int32_t numOfSmas;
|
||||
int32_t commentLen;
|
||||
SArray* pColumns;
|
||||
SArray* pTags;
|
||||
SArray* pColumns; // array of SField
|
||||
SArray* pTags; // array of SField
|
||||
SArray* pSmas; // array of SField
|
||||
char* comment;
|
||||
} SMCreateStbReq;
|
||||
|
||||
|
@ -504,10 +514,13 @@ typedef struct {
|
|||
int8_t cacheLastRow;
|
||||
int8_t ignoreExist;
|
||||
int8_t streamMode;
|
||||
int32_t numOfRetensions;
|
||||
SArray* pRetensions; // SRetention
|
||||
} SCreateDbReq;
|
||||
|
||||
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
|
||||
int32_t tDeserializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
|
||||
void tFreeSCreateDbReq(SCreateDbReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
|
@ -752,11 +765,13 @@ typedef struct {
|
|||
int8_t selfIndex;
|
||||
int8_t streamMode;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
|
||||
int32_t numOfRetensions;
|
||||
SArray* pRetensions; // SRetention
|
||||
} SCreateVnodeReq, SAlterVnodeReq;
|
||||
|
||||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
|
|
|
@ -513,8 +513,10 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
|||
if (tEncodeFloat(&encoder, pReq->xFilesFactor) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->aggregationMethod) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->delay) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfSmas) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
|
||||
|
@ -531,7 +533,16 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
|||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->numOfSmas; ++i) {
|
||||
SField *pField = taosArrayGet(pReq->pSmas, i);
|
||||
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
||||
}
|
||||
|
||||
if (pReq->commentLen > 0) {
|
||||
if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -549,13 +560,16 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
|||
if (tDecodeFloat(&decoder, &pReq->xFilesFactor) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->aggregationMethod) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->delay) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfSmas) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
|
||||
|
||||
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
|
||||
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
|
||||
if (pReq->pColumns == NULL || pReq->pTags == NULL) {
|
||||
pReq->pSmas = taosArrayInit(pReq->numOfSmas, sizeof(SField));
|
||||
if (pReq->pColumns == NULL || pReq->pTags == NULL || pReq->pSmas == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -582,13 +596,23 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
|||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pReq->numOfSmas; ++i) {
|
||||
SField field = {0};
|
||||
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
|
||||
if (taosArrayPush(pReq->pSmas, &field) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pReq->commentLen > 0) {
|
||||
pReq->comment = malloc(pReq->commentLen);
|
||||
if (pReq->comment == NULL) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
|
@ -598,8 +622,11 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
|||
void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
|
||||
taosArrayDestroy(pReq->pColumns);
|
||||
taosArrayDestroy(pReq->pTags);
|
||||
taosArrayDestroy(pReq->pSmas);
|
||||
tfree(pReq->comment);
|
||||
pReq->pColumns = NULL;
|
||||
pReq->pTags = NULL;
|
||||
pReq->pSmas = NULL;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
|
||||
|
@ -1517,6 +1544,14 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
|||
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
|
||||
if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -1550,12 +1585,36 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
|
|||
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
|
||||
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
|
||||
if (pReq->pRetensions == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||
SRetention rentension = {0};
|
||||
if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1;
|
||||
if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSCreateDbReq(SCreateDbReq *pReq) {
|
||||
taosArrayDestroy(pReq->pRetensions);
|
||||
pReq->pRetensions = NULL;
|
||||
}
|
||||
|
||||
int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
@ -2433,6 +2492,14 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
|||
SReplica *pReplica = &pReq->replicas[i];
|
||||
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
|
||||
if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -2477,11 +2544,35 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
|||
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
|
||||
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
|
||||
if (pReq->pRetensions == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||
SRetention rentension = {0};
|
||||
if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1;
|
||||
if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
|
||||
taosArrayDestroy(pReq->pRetensions);
|
||||
pReq->pRetensions = NULL;
|
||||
}
|
||||
|
||||
int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
|
|
@ -267,6 +267,8 @@ typedef struct {
|
|||
int8_t update;
|
||||
int8_t cacheLastRow;
|
||||
int8_t streamMode;
|
||||
int32_t numOfRetensions;
|
||||
SArray* pRetensions;
|
||||
} SDbCfg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -344,11 +346,14 @@ typedef struct {
|
|||
float xFilesFactor;
|
||||
int32_t aggregationMethod;
|
||||
int32_t delay;
|
||||
int32_t ttl;
|
||||
int32_t numOfColumns;
|
||||
int32_t numOfTags;
|
||||
int32_t numOfSmas;
|
||||
int32_t commentLen;
|
||||
SSchema* pColumns;
|
||||
SSchema* pTags;
|
||||
SSchema* pSmas;
|
||||
char* comment;
|
||||
SRWLatch lock;
|
||||
} SStbObj;
|
||||
|
|
|
@ -100,6 +100,15 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
|||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.quorum, DB_ENCODE_OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.update, DB_ENCODE_OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, DB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, DB_ENCODE_OVER)
|
||||
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
|
||||
SRetention *pRetension = taosArrayGet(pDb->cfg.pRetensions, i);
|
||||
SDB_SET_INT32(pRaw, dataPos, pRetension->freq, DB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pRetension->keep, DB_ENCODE_OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pRetension->freqUnit, DB_ENCODE_OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, DB_ENCODE_OVER)
|
||||
}
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_ENCODE_OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, DB_ENCODE_OVER)
|
||||
|
||||
|
@ -161,6 +170,22 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.quorum, DB_DECODE_OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.update, DB_DECODE_OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLastRow, DB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.numOfRetensions, DB_DECODE_OVER)
|
||||
if (pDb->cfg.numOfRetensions > 0) {
|
||||
pDb->cfg.pRetensions = taosArrayInit(pDb->cfg.numOfRetensions, sizeof(SRetention));
|
||||
if (pDb->cfg.pRetensions == NULL) goto DB_DECODE_OVER;
|
||||
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
|
||||
SRetention retension = {0};
|
||||
SDB_GET_INT32(pRaw, dataPos, &retension.freq, DB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &retension.keep, DB_DECODE_OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &retension.freqUnit, DB_DECODE_OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &retension.keepUnit, DB_DECODE_OVER)
|
||||
if (taosArrayPush(pDb->cfg.pRetensions, &retension) == NULL) {
|
||||
goto DB_DECODE_OVER;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_DECODE_OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
@ -183,6 +208,7 @@ static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
|
|||
|
||||
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
|
||||
mTrace("db:%s, perform delete action, row:%p", pDb->name, pDb);
|
||||
taosArrayDestroy(pDb->cfg.pRetensions);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -417,6 +443,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
|
|||
.streamMode = pCreate->streamMode,
|
||||
};
|
||||
|
||||
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
||||
dbObj.cfg.pRetensions = pCreate->pRetensions;
|
||||
pCreate = NULL;
|
||||
|
||||
mndSetDefaultDbCfg(&dbObj.cfg);
|
||||
|
||||
if (mndCheckDbName(dbObj.name, pUser) != 0) {
|
||||
|
@ -505,6 +535,7 @@ CREATE_DB_OVER:
|
|||
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
tFreeSCreateDbReq(&createReq);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -72,7 +72,8 @@ void mndCleanupStb(SMnode *pMnode) {}
|
|||
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE;
|
||||
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags + pStb->numOfSmas) * sizeof(SSchema) +
|
||||
TSDB_STB_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto STB_ENCODE_OVER;
|
||||
|
||||
|
@ -88,8 +89,10 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->aggregationMethod, STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->delay, STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->ttl, STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->numOfSmas, STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, STB_ENCODE_OVER)
|
||||
|
||||
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
|
||||
|
@ -108,7 +111,17 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
|
||||
}
|
||||
|
||||
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, STB_ENCODE_OVER)
|
||||
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
|
||||
SSchema *pSchema = &pStb->pSmas[i];
|
||||
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
|
||||
}
|
||||
|
||||
if (pStb->commentLen > 0) {
|
||||
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, STB_ENCODE_OVER)
|
||||
}
|
||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER)
|
||||
|
||||
|
@ -156,13 +169,16 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
pStb->xFilesFactor = xFilesFactor / 10000.0f;
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->aggregationMethod, STB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->delay, STB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, STB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfSmas, STB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, STB_DECODE_OVER)
|
||||
|
||||
pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema));
|
||||
pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema));
|
||||
if (pStb->pColumns == NULL || pStb->pTags == NULL) {
|
||||
pStb->pSmas = calloc(pStb->numOfSmas, sizeof(SSchema));
|
||||
if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pSmas == NULL) {
|
||||
goto STB_DECODE_OVER;
|
||||
}
|
||||
|
||||
|
@ -182,6 +198,14 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
|
||||
SSchema *pSchema = &pStb->pSmas[i];
|
||||
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
|
||||
}
|
||||
|
||||
if (pStb->commentLen > 0) {
|
||||
pStb->comment = calloc(pStb->commentLen, 1);
|
||||
if (pStb->comment == NULL) goto STB_DECODE_OVER;
|
||||
|
@ -508,6 +532,16 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
|
||||
for (int32_t col = 0; col < pStb->numOfColumns; col++) {
|
||||
SSchema *pSchema = &pStb->pColumns[col];
|
||||
if (strcasecmp(pStb->pColumns[col].name, colName) == 0) {
|
||||
return pSchema;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||
SStbObj stbObj = {0};
|
||||
memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -518,19 +552,24 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
|
|||
stbObj.dbUid = pDb->uid;
|
||||
stbObj.version = 1;
|
||||
stbObj.nextColId = 1;
|
||||
stbObj.ttl = pCreate->ttl;
|
||||
stbObj.numOfColumns = pCreate->numOfColumns;
|
||||
stbObj.numOfTags = pCreate->numOfTags;
|
||||
stbObj.numOfSmas = pCreate->numOfSmas;
|
||||
stbObj.commentLen = pCreate->commentLen;
|
||||
stbObj.comment = calloc(stbObj.commentLen, 1);
|
||||
if (stbObj.comment == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
if (stbObj.commentLen > 0) {
|
||||
stbObj.comment = calloc(stbObj.commentLen, 1);
|
||||
if (stbObj.comment == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen);
|
||||
}
|
||||
memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen);
|
||||
|
||||
stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema));
|
||||
stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema));
|
||||
if (stbObj.pColumns == NULL || stbObj.pTags == NULL) {
|
||||
stbObj.pSmas = malloc(stbObj.numOfSmas * sizeof(SSchema));
|
||||
if (stbObj.pColumns == NULL || stbObj.pTags == NULL || stbObj.pSmas == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -555,6 +594,18 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
|
|||
stbObj.nextColId++;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stbObj.numOfSmas; ++i) {
|
||||
SField *pField = taosArrayGet(pCreate->pSmas, i);
|
||||
SSchema *pSchema = &stbObj.pSmas[i];
|
||||
SSchema *pColSchema = mndFindStbColumns(&stbObj, pField->name);
|
||||
if (pColSchema == NULL) {
|
||||
mError("stb:%s, sma:%s not found in columns", stbObj.name, pSchema->name);
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
memcpy(pSchema, pColSchema, sizeof(SSchema));
|
||||
}
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_STB_OVER;
|
||||
|
@ -1539,19 +1590,11 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
|
|||
if (pDb == NULL) return 0;
|
||||
}
|
||||
|
||||
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
|
||||
strcat(prefix, TS_PATH_DELIMITER);
|
||||
int32_t prefixLen = (int32_t)strlen(prefix);
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
if (pDb != NULL && pStb->dbUid != pDb->uid) {
|
||||
if (strncmp(pStb->db, pDb->name, prefixLen) == 0) {
|
||||
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pStb);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -218,6 +218,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
|||
createReq.hashBegin = pVgroup->hashBegin;
|
||||
createReq.hashEnd = pVgroup->hashEnd;
|
||||
createReq.hashMethod = pDb->hashMethod;
|
||||
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
|
||||
createReq.pRetensions = pDb->cfg.pRetensions;
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
SReplica *pReplica = &createReq.replicas[v];
|
||||
|
|
|
@ -26,9 +26,12 @@ class MndTestSma : public ::testing::Test {
|
|||
void* BuildDropDbReq(const char* dbname, int32_t* pContLen);
|
||||
void* BuildCreateStbReq(const char* stbname, int32_t* pContLen);
|
||||
void* BuildDropStbReq(const char* stbname, int32_t* pContLen);
|
||||
void* BuildCreateSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr,
|
||||
const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen);
|
||||
void* BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen);
|
||||
void* BuildCreateBSmaStbReq(const char* stbname, int32_t* pContLen);
|
||||
void* BuildCreateTSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr,
|
||||
const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen);
|
||||
void* BuildDropTSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen);
|
||||
|
||||
void PushField(SArray* pArray, int32_t bytes, int8_t type, const char* name);
|
||||
};
|
||||
|
||||
Testbase MndTestSma::test;
|
||||
|
@ -76,6 +79,14 @@ void* MndTestSma::BuildDropDbReq(const char* dbname, int32_t* pContLen) {
|
|||
return pReq;
|
||||
}
|
||||
|
||||
void MndTestSma::PushField(SArray* pArray, int32_t bytes, int8_t type, const char* name) {
|
||||
SField field = {0};
|
||||
field.bytes = bytes;
|
||||
field.type = type;
|
||||
strcpy(field.name, name);
|
||||
taosArrayPush(pArray, &field);
|
||||
}
|
||||
|
||||
void* MndTestSma::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
|
||||
SMCreateStbReq createReq = {0};
|
||||
createReq.numOfColumns = 3;
|
||||
|
@ -85,37 +96,35 @@ void* MndTestSma::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
|
|||
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
||||
strcpy(createReq.name, stbname);
|
||||
|
||||
{
|
||||
SField field = {0};
|
||||
field.bytes = 8;
|
||||
field.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(field.name, "ts");
|
||||
taosArrayPush(createReq.pColumns, &field);
|
||||
}
|
||||
PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_TIMESTAMP, "ts");
|
||||
PushField(createReq.pColumns, 2, TSDB_DATA_TYPE_TINYINT, "col1");
|
||||
PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_BIGINT, "col2");
|
||||
PushField(createReq.pTags, 2, TSDB_DATA_TYPE_TINYINT, "tag1");
|
||||
|
||||
{
|
||||
SField field = {0};
|
||||
field.bytes = 2;
|
||||
field.type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(field.name, "col1");
|
||||
taosArrayPush(createReq.pColumns, &field);
|
||||
}
|
||||
int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
|
||||
void* pHead = rpcMallocCont(tlen);
|
||||
tSerializeSMCreateStbReq(pHead, tlen, &createReq);
|
||||
tFreeSMCreateStbReq(&createReq);
|
||||
*pContLen = tlen;
|
||||
return pHead;
|
||||
}
|
||||
|
||||
{
|
||||
SField field = {0};
|
||||
field.bytes = 8;
|
||||
field.type = TSDB_DATA_TYPE_BIGINT;
|
||||
strcpy(field.name, "col2");
|
||||
taosArrayPush(createReq.pColumns, &field);
|
||||
}
|
||||
void* MndTestSma::BuildCreateBSmaStbReq(const char* stbname, int32_t* pContLen) {
|
||||
SMCreateStbReq createReq = {0};
|
||||
createReq.numOfColumns = 3;
|
||||
createReq.numOfTags = 1;
|
||||
createReq.numOfSmas = 1;
|
||||
createReq.igExists = 0;
|
||||
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
|
||||
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
||||
createReq.pSmas = taosArrayInit(createReq.numOfSmas, sizeof(SField));
|
||||
strcpy(createReq.name, stbname);
|
||||
|
||||
{
|
||||
SField field = {0};
|
||||
field.bytes = 2;
|
||||
field.type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(field.name, "tag1");
|
||||
taosArrayPush(createReq.pTags, &field);
|
||||
}
|
||||
PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_TIMESTAMP, "ts");
|
||||
PushField(createReq.pColumns, 2, TSDB_DATA_TYPE_TINYINT, "col1");
|
||||
PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_BIGINT, "col2");
|
||||
PushField(createReq.pTags, 2, TSDB_DATA_TYPE_TINYINT, "tag1");
|
||||
PushField(createReq.pSmas, 2, TSDB_DATA_TYPE_TINYINT, "col1");
|
||||
|
||||
int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
|
||||
void* pHead = rpcMallocCont(tlen);
|
||||
|
@ -137,8 +146,8 @@ void* MndTestSma::BuildDropStbReq(const char* stbname, int32_t* pContLen) {
|
|||
return pReq;
|
||||
}
|
||||
|
||||
void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr,
|
||||
const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen) {
|
||||
void* MndTestSma::BuildCreateTSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr,
|
||||
const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen) {
|
||||
SMCreateSmaReq createReq = {0};
|
||||
strcpy(createReq.name, smaname);
|
||||
strcpy(createReq.stb, stbname);
|
||||
|
@ -166,7 +175,7 @@ void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, in
|
|||
return pHead;
|
||||
}
|
||||
|
||||
void* MndTestSma::BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen) {
|
||||
void* MndTestSma::BuildDropTSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen) {
|
||||
SMDropSmaReq dropsmaReq = {0};
|
||||
dropsmaReq.igNotExists = igNotExists;
|
||||
strcpy(dropsmaReq.name, smaname);
|
||||
|
@ -180,6 +189,7 @@ void* MndTestSma::BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32
|
|||
}
|
||||
|
||||
TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||
#if 0
|
||||
const char* dbname = "1.d1";
|
||||
const char* stbname = "1.d1.stb";
|
||||
const char* smaname = "1.d1.sma";
|
||||
|
@ -201,9 +211,9 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
}
|
||||
#if 0
|
||||
|
||||
{
|
||||
pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen);
|
||||
pReq = BuildCreateTSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen);
|
||||
pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname);
|
||||
|
@ -226,7 +236,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
}
|
||||
|
||||
{
|
||||
pReq = BuildDropSmaReq(smaname, 0, &contLen);
|
||||
pReq = BuildDropTSmaReq(smaname, 0, &contLen);
|
||||
pRsp = test.SendReq(TDMT_MND_DROP_SMA, pReq, contLen);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname);
|
||||
|
@ -235,3 +245,50 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
TEST_F(MndTestSma, 02_Create_Show_Meta_Drop_Restart_BSma) {
|
||||
const char* dbname = "1.d1";
|
||||
const char* stbname = "1.d1.bsmastb";
|
||||
int32_t contLen = 0;
|
||||
void* pReq;
|
||||
SRpcMsg* pRsp;
|
||||
|
||||
{
|
||||
pReq = BuildCreateDbReq(dbname, &contLen);
|
||||
pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
pReq = BuildCreateBSmaStbReq(stbname, &contLen);
|
||||
pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
// CheckBinary("bsmastb", TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
|
||||
test.Restart();
|
||||
|
||||
{
|
||||
pReq = BuildCreateBSmaStbReq(stbname, &contLen);
|
||||
pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_STB_ALREADY_EXIST);
|
||||
}
|
||||
|
||||
{
|
||||
pReq = BuildDropStbReq(stbname, &contLen);
|
||||
pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
pReq = BuildDropStbReq(stbname, &contLen);
|
||||
pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_STB_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ sql insert into tb1 values (now, 1);
|
|||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
print $rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue