Merge pull request #12195 from taosdata/feature/dnode
refactor: alter stb
This commit is contained in:
commit
fe62139a7f
|
@ -326,11 +326,11 @@ typedef struct {
|
|||
int8_t alterType;
|
||||
int32_t numOfFields;
|
||||
SArray* pFields;
|
||||
} SMAltertbReq;
|
||||
} SMAlterStbReq;
|
||||
|
||||
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
|
||||
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
|
||||
void tFreeSMAltertbReq(SMAltertbReq* pReq);
|
||||
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
|
||||
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
|
||||
void tFreeSMAltertbReq(SMAlterStbReq* pReq);
|
||||
|
||||
typedef struct SEpSet {
|
||||
int8_t inUse;
|
||||
|
|
|
@ -327,29 +327,32 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601)
|
||||
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602)
|
||||
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603)
|
||||
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604)
|
||||
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605)
|
||||
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606)
|
||||
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607)
|
||||
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608)
|
||||
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609)
|
||||
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A)
|
||||
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B)
|
||||
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C)
|
||||
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D)
|
||||
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E)
|
||||
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F)
|
||||
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610)
|
||||
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611)
|
||||
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612)
|
||||
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613)
|
||||
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614)
|
||||
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
|
||||
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
|
||||
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617)
|
||||
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
|
||||
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619)
|
||||
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620)
|
||||
#define TSDB_CODE_TDB_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0604)
|
||||
#define TSDB_CODE_TDB_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0605)
|
||||
#define TSDB_CODE_TDB_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0606)
|
||||
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0607)
|
||||
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0608)
|
||||
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0609)
|
||||
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x060A)
|
||||
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x060B)
|
||||
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x060C)
|
||||
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060D)
|
||||
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060E)
|
||||
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060F)
|
||||
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x0600)
|
||||
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x0601)
|
||||
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x0602)
|
||||
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0613)
|
||||
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0614)
|
||||
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0615)
|
||||
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0616)
|
||||
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0617)
|
||||
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0618)
|
||||
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
|
||||
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
|
||||
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
|
||||
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
|
||||
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x062D)
|
||||
|
||||
// query
|
||||
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
|
||||
|
|
|
@ -594,7 +594,7 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) {
|
||||
int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
|
@ -615,7 +615,7 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) {
|
|||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) {
|
||||
int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
|
@ -645,7 +645,7 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSMAltertbReq(SMAltertbReq *pReq) {
|
||||
void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
|
||||
taosArrayDestroy(pReq->pFields);
|
||||
pReq->pFields = NULL;
|
||||
}
|
||||
|
|
|
@ -115,7 +115,10 @@ typedef enum {
|
|||
TRN_TYPE_STB_SCOPE_END,
|
||||
} ETrnType;
|
||||
|
||||
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
|
||||
typedef enum {
|
||||
TRN_POLICY_ROLLBACK = 0,
|
||||
TRN_POLICY_RETRY = 1,
|
||||
} ETrnPolicy;
|
||||
|
||||
typedef enum {
|
||||
DND_REASON_ONLINE = 0,
|
||||
|
@ -131,6 +134,15 @@ typedef enum {
|
|||
DND_REASON_OTHERS
|
||||
} EDndReason;
|
||||
|
||||
typedef enum {
|
||||
CONSUMER_UPDATE__TOUCH = 1,
|
||||
CONSUMER_UPDATE__ADD,
|
||||
CONSUMER_UPDATE__REMOVE,
|
||||
CONSUMER_UPDATE__LOST,
|
||||
CONSUMER_UPDATE__RECOVER,
|
||||
CONSUMER_UPDATE__MODIFY,
|
||||
} ECsmUpdateType;
|
||||
|
||||
typedef struct {
|
||||
int32_t id;
|
||||
ETrnStage stage;
|
||||
|
@ -386,7 +398,6 @@ typedef struct {
|
|||
int32_t codeSize;
|
||||
char* pComment;
|
||||
char* pCode;
|
||||
char pData[];
|
||||
} SFuncObj;
|
||||
|
||||
typedef struct {
|
||||
|
@ -425,18 +436,8 @@ typedef struct {
|
|||
int64_t offset;
|
||||
} SMqOffsetObj;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pOffset->key);
|
||||
tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) {
|
||||
buf = taosDecodeStringTo(buf, pOffset->key);
|
||||
buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
||||
return buf;
|
||||
}
|
||||
int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
|
||||
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
|
@ -459,15 +460,6 @@ typedef struct {
|
|||
SSchemaWrapper schema;
|
||||
} SMqTopicObj;
|
||||
|
||||
enum {
|
||||
CONSUMER_UPDATE__TOUCH = 1,
|
||||
CONSUMER_UPDATE__ADD,
|
||||
CONSUMER_UPDATE__REMOVE,
|
||||
CONSUMER_UPDATE__LOST,
|
||||
CONSUMER_UPDATE__RECOVER,
|
||||
CONSUMER_UPDATE__MODIFY,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
|
@ -475,10 +467,8 @@ typedef struct {
|
|||
int8_t updateType; // used only for update
|
||||
int32_t epoch;
|
||||
int32_t status;
|
||||
// hbStatus is not applicable to serialization
|
||||
int32_t hbStatus;
|
||||
// lock is used for topics update
|
||||
SRWLatch lock;
|
||||
int32_t hbStatus; // hbStatus is not applicable to serialization
|
||||
SRWLatch lock; // lock is used for topics update
|
||||
SArray* currentTopics; // SArray<char*>
|
||||
SArray* rebNewTopics; // SArray<char*>
|
||||
SArray* rebRemovedTopics; // SArray<char*>
|
||||
|
@ -492,7 +482,6 @@ typedef struct {
|
|||
int64_t upTime;
|
||||
int64_t subscribeTime;
|
||||
int64_t rebalanceTime;
|
||||
|
||||
} SMqConsumerObj;
|
||||
|
||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||
|
@ -593,7 +582,6 @@ typedef struct {
|
|||
int32_t vgNum;
|
||||
SRWLatch lock;
|
||||
int8_t status;
|
||||
// int32_t sqlLen;
|
||||
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
|
||||
int32_t fixedSinkVgId; // 0 for shuffle
|
||||
int64_t smaId; // 0 for unused
|
||||
|
|
|
@ -29,7 +29,8 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
|
|||
|
||||
// for trans test
|
||||
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
|
||||
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen);
|
||||
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
|
||||
int32_t *pRspLen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
#include "mndCluster.h"
|
||||
#include "mndShow.h"
|
||||
|
||||
#define TSDB_CLUSTER_VER_NUMBE 1
|
||||
#define TSDB_CLUSTER_RESERVE_SIZE 64
|
||||
#define CLUSTER_VER_NUMBE 1
|
||||
#define CLUSTER_RESERVE_SIZE 64
|
||||
|
||||
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
|
||||
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
|
||||
|
@ -30,14 +30,16 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
|
|||
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitCluster(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_CLUSTER,
|
||||
SSdbTable table = {
|
||||
.sdbType = SDB_CLUSTER,
|
||||
.keyType = SDB_KEY_INT64,
|
||||
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
|
||||
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndClusterActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
|
||||
.deleteFp = (SdbDeleteFp)mndClusterActionDelete,
|
||||
};
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
|
||||
|
@ -79,19 +81,19 @@ int64_t mndGetClusterId(SMnode *pMnode) {
|
|||
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj) + TSDB_CLUSTER_RESERVE_SIZE);
|
||||
if (pRaw == NULL) goto CLUSTER_ENCODE_OVER;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT64(pRaw, dataPos, pCluster->id, CLUSTER_ENCODE_OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, CLUSTER_ENCODE_OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, CLUSTER_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_ENCODE_OVER)
|
||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_ENCODE_OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pCluster->id, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
|
||||
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
||||
CLUSTER_ENCODE_OVER:
|
||||
_OVER:
|
||||
if (terrno != 0) {
|
||||
mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
|
@ -106,29 +108,29 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CLUSTER_DECODE_OVER;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
|
||||
|
||||
if (sver != TSDB_CLUSTER_VER_NUMBE) {
|
||||
if (sver != CLUSTER_VER_NUMBE) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||
goto CLUSTER_DECODE_OVER;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
|
||||
if (pRow == NULL) goto CLUSTER_DECODE_OVER;
|
||||
if (pRow == NULL) goto _OVER;
|
||||
|
||||
SClusterObj *pCluster = sdbGetRowObj(pRow);
|
||||
if (pCluster == NULL) goto CLUSTER_DECODE_OVER;
|
||||
if (pCluster == NULL) goto _OVER;
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, CLUSTER_DECODE_OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, CLUSTER_DECODE_OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, CLUSTER_DECODE_OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_DECODE_OVER)
|
||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_DECODE_OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
|
||||
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
||||
CLUSTER_DECODE_OVER:
|
||||
_OVER:
|
||||
if (terrno != 0) {
|
||||
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
|
||||
taosMemoryFreeClear(pRow);
|
||||
|
@ -161,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
|||
|
||||
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
|
||||
if (code != 0) {
|
||||
strcpy(clusterObj.name, "tdengine2.0");
|
||||
strcpy(clusterObj.name, "tdengine3.0");
|
||||
mError("failed to get name from system, set to default val %s", clusterObj.name);
|
||||
}
|
||||
|
||||
|
@ -190,8 +192,8 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
|
|||
if (pShow->pIter == NULL) break;
|
||||
|
||||
cols = 0;
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->id, false);
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->id, false);
|
||||
|
||||
char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes);
|
||||
|
@ -200,7 +202,7 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
|
|||
colDataAppend(pColInfo, numOfRows, buf, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->createdTime, false);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
|
||||
|
||||
sdbRelease(pSdb, pCluster);
|
||||
numOfRows++;
|
||||
|
|
|
@ -13,12 +13,14 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndDef.h"
|
||||
#include "mndConsumer.h"
|
||||
|
||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
|
||||
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
|
||||
if (pConsumer == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -515,3 +517,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
|||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pOffset->key);
|
||||
tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
|
||||
buf = taosDecodeStringTo(buf, pOffset->key);
|
||||
buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
||||
return buf;
|
||||
}
|
|
@ -28,8 +28,8 @@
|
|||
#include "mndVgroup.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define TSDB_STB_VER_NUMBER 1
|
||||
#define TSDB_STB_RESERVE_SIZE 64
|
||||
#define STB_VER_NUMBER 1
|
||||
#define STB_RESERVE_SIZE 64
|
||||
|
||||
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
|
||||
|
@ -46,13 +46,15 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitStb(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_STB,
|
||||
SSdbTable table = {
|
||||
.sdbType = SDB_STB,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.encodeFp = (SdbEncodeFp)mndStbActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndStbActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndStbActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
|
||||
.deleteFp = (SdbDeleteFp)mndStbActionDelete,
|
||||
};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
|
||||
|
@ -74,8 +76,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + +pStb->commentLen +
|
||||
pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
|
||||
pStb->ast1Len + pStb->ast2Len + STB_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
|
||||
int32_t dataPos = 0;
|
||||
|
@ -99,6 +101,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
|
||||
SSchema *pSchema = &pStb->pColumns[i];
|
||||
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||
|
@ -107,6 +110,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
|
||||
SSchema *pSchema = &pStb->pTags[i];
|
||||
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||
|
@ -121,7 +125,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
|||
if (pStb->ast2Len > 0) {
|
||||
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||
}
|
||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
@ -143,7 +147,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
|
||||
|
||||
if (sver != TSDB_STB_VER_NUMBER) {
|
||||
if (sver != STB_VER_NUMBER) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -183,6 +187,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
|
||||
SSchema *pSchema = &pStb->pColumns[i];
|
||||
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||
|
@ -191,6 +196,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
|
||||
SSchema *pSchema = &pStb->pTags[i];
|
||||
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||
|
@ -211,7 +217,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
if (pStb->pAst2 == NULL) goto _OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||
}
|
||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
|
||||
SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
||||
|
@ -488,7 +494,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
|||
|
||||
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
|
||||
SField *pField1 = taosArrayGet(pCreate->pColumns, i);
|
||||
if (pField->type < 0) {
|
||||
if (pField1->type < 0) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
|
@ -574,6 +580,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_CREATE_STB;
|
||||
action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
|
@ -613,6 +620,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_DROP_STB;
|
||||
action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
|
||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
|
@ -733,6 +741,7 @@ _OVER:
|
|||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
mndTransSetDbInfo(pTrans, pDb);
|
||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
|
@ -792,7 +801,10 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
|
|||
}
|
||||
|
||||
int32_t numOfStbs = -1;
|
||||
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs);
|
||||
if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
|
||||
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
||||
goto _OVER;
|
||||
|
@ -819,7 +831,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) {
|
||||
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
|
||||
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
|
@ -1170,7 +1182,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||
static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||
SStbObj stbObj = {0};
|
||||
taosRLockLatch(&pOld->lock);
|
||||
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
||||
|
@ -1239,7 +1251,7 @@ static int32_t mndProcessMAlterStbReq(SNodeMsg *pReq) {
|
|||
SDbObj *pDb = NULL;
|
||||
SStbObj *pStb = NULL;
|
||||
SUserObj *pUser = NULL;
|
||||
SMAltertbReq alterReq = {0};
|
||||
SMAlterStbReq alterReq = {0};
|
||||
|
||||
if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
|
|
|
@ -345,7 +345,10 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
}
|
||||
|
||||
int32_t numOfStbs = -1;
|
||||
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs);
|
||||
if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
|
||||
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
||||
goto _OVER;
|
||||
|
|
|
@ -137,7 +137,7 @@ void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
|
|||
}
|
||||
|
||||
void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
|
||||
SMAltertbReq req = {0};
|
||||
SMAlterStbReq req = {0};
|
||||
strcpy(req.name, stbname);
|
||||
req.numOfFields = 1;
|
||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||
|
@ -158,7 +158,7 @@ void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagnam
|
|||
}
|
||||
|
||||
void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
|
||||
SMAltertbReq req = {0};
|
||||
SMAlterStbReq req = {0};
|
||||
strcpy(req.name, stbname);
|
||||
req.numOfFields = 1;
|
||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||
|
@ -180,7 +180,7 @@ void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagna
|
|||
|
||||
void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname,
|
||||
int32_t* pContLen) {
|
||||
SMAltertbReq req = {0};
|
||||
SMAlterStbReq req = {0};
|
||||
strcpy(req.name, stbname);
|
||||
req.numOfFields = 2;
|
||||
req.pFields = taosArrayInit(2, sizeof(SField));
|
||||
|
@ -208,7 +208,7 @@ void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char*
|
|||
|
||||
void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes,
|
||||
int32_t* pContLen) {
|
||||
SMAltertbReq req = {0};
|
||||
SMAlterStbReq req = {0};
|
||||
strcpy(req.name, stbname);
|
||||
req.numOfFields = 1;
|
||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||
|
@ -229,7 +229,7 @@ void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char
|
|||
}
|
||||
|
||||
void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen) {
|
||||
SMAltertbReq req = {0};
|
||||
SMAlterStbReq req = {0};
|
||||
strcpy(req.name, stbname);
|
||||
req.numOfFields = 1;
|
||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||
|
@ -250,7 +250,7 @@ void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* col
|
|||
}
|
||||
|
||||
void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen) {
|
||||
SMAltertbReq req = {0};
|
||||
SMAlterStbReq req = {0};
|
||||
strcpy(req.name, stbname);
|
||||
req.numOfFields = 1;
|
||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||
|
@ -272,7 +272,7 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co
|
|||
|
||||
void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
|
||||
int32_t* pContLen) {
|
||||
SMAltertbReq req = {0};
|
||||
SMAlterStbReq req = {0};
|
||||
strcpy(req.name, stbname);
|
||||
req.numOfFields = 1;
|
||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||
|
|
|
@ -2596,7 +2596,7 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS
|
|||
pStmt->ignoreNotExists);
|
||||
}
|
||||
|
||||
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterReq) {
|
||||
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterReq) {
|
||||
pAlterReq->pFields = taosArrayInit(2, sizeof(TAOS_FIELD));
|
||||
if (NULL == pAlterReq->pFields) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -2632,7 +2632,7 @@ static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterRe
|
|||
}
|
||||
|
||||
static int32_t translateAlterTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
|
||||
SMAltertbReq alterReq = {0};
|
||||
SMAlterStbReq alterReq = {0};
|
||||
SName tableName;
|
||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), alterReq.name);
|
||||
alterReq.alterType = pStmt->alterType;
|
||||
|
|
|
@ -326,6 +326,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, "Invalid table type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION, "Invalid table schema version")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, "Table already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_NOT_EXIST, "Table not exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_ALREADY_EXIST, "Stable already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_NOT_EXIST, "Stable not exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, "Invalid configuration")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INIT_FAILED, "Tsdb init failed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_DISKSPACE, "No diskspace for tsdb")
|
||||
|
|
|
@ -1,5 +0,0 @@
|
|||
run general/stable/disk.sim
|
||||
run general/stable/dnode3.sim
|
||||
run general/stable/metrics.sim
|
||||
run general/stable/values.sim
|
||||
run general/stable/vnode3.sim
|
Loading…
Reference in New Issue