Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
d31b4ce130
|
@ -326,11 +326,11 @@ typedef struct {
|
||||||
int8_t alterType;
|
int8_t alterType;
|
||||||
int32_t numOfFields;
|
int32_t numOfFields;
|
||||||
SArray* pFields;
|
SArray* pFields;
|
||||||
} SMAltertbReq;
|
} SMAlterStbReq;
|
||||||
|
|
||||||
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
|
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
|
||||||
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
|
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
|
||||||
void tFreeSMAltertbReq(SMAltertbReq* pReq);
|
void tFreeSMAltertbReq(SMAlterStbReq* pReq);
|
||||||
|
|
||||||
typedef struct SEpSet {
|
typedef struct SEpSet {
|
||||||
int8_t inUse;
|
int8_t inUse;
|
||||||
|
|
|
@ -327,29 +327,32 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601)
|
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601)
|
||||||
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602)
|
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602)
|
||||||
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603)
|
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603)
|
||||||
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604)
|
#define TSDB_CODE_TDB_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0604)
|
||||||
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605)
|
#define TSDB_CODE_TDB_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0605)
|
||||||
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606)
|
#define TSDB_CODE_TDB_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0606)
|
||||||
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607)
|
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0607)
|
||||||
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608)
|
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0608)
|
||||||
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609)
|
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0609)
|
||||||
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A)
|
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x060A)
|
||||||
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B)
|
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x060B)
|
||||||
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C)
|
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x060C)
|
||||||
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D)
|
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060D)
|
||||||
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E)
|
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060E)
|
||||||
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F)
|
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060F)
|
||||||
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610)
|
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x0600)
|
||||||
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611)
|
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x0601)
|
||||||
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612)
|
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x0602)
|
||||||
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613)
|
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0613)
|
||||||
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614)
|
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0614)
|
||||||
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
|
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0615)
|
||||||
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
|
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0616)
|
||||||
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617)
|
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0617)
|
||||||
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
|
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0618)
|
||||||
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619)
|
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
|
||||||
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620)
|
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
|
||||||
|
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
|
||||||
|
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
|
||||||
|
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x062D)
|
||||||
|
|
||||||
// query
|
// query
|
||||||
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
|
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
|
||||||
|
|
|
@ -594,7 +594,7 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) {
|
int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
@ -615,7 +615,7 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) {
|
int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) {
|
||||||
SCoder decoder = {0};
|
SCoder decoder = {0};
|
||||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
@ -645,7 +645,7 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tFreeSMAltertbReq(SMAltertbReq *pReq) {
|
void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
|
||||||
taosArrayDestroy(pReq->pFields);
|
taosArrayDestroy(pReq->pFields);
|
||||||
pReq->pFields = NULL;
|
pReq->pFields = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,7 +115,10 @@ typedef enum {
|
||||||
TRN_TYPE_STB_SCOPE_END,
|
TRN_TYPE_STB_SCOPE_END,
|
||||||
} ETrnType;
|
} ETrnType;
|
||||||
|
|
||||||
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
|
typedef enum {
|
||||||
|
TRN_POLICY_ROLLBACK = 0,
|
||||||
|
TRN_POLICY_RETRY = 1,
|
||||||
|
} ETrnPolicy;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
DND_REASON_ONLINE = 0,
|
DND_REASON_ONLINE = 0,
|
||||||
|
@ -131,6 +134,15 @@ typedef enum {
|
||||||
DND_REASON_OTHERS
|
DND_REASON_OTHERS
|
||||||
} EDndReason;
|
} EDndReason;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
CONSUMER_UPDATE__TOUCH = 1,
|
||||||
|
CONSUMER_UPDATE__ADD,
|
||||||
|
CONSUMER_UPDATE__REMOVE,
|
||||||
|
CONSUMER_UPDATE__LOST,
|
||||||
|
CONSUMER_UPDATE__RECOVER,
|
||||||
|
CONSUMER_UPDATE__MODIFY,
|
||||||
|
} ECsmUpdateType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t id;
|
int32_t id;
|
||||||
ETrnStage stage;
|
ETrnStage stage;
|
||||||
|
@ -386,7 +398,6 @@ typedef struct {
|
||||||
int32_t codeSize;
|
int32_t codeSize;
|
||||||
char* pComment;
|
char* pComment;
|
||||||
char* pCode;
|
char* pCode;
|
||||||
char pData[];
|
|
||||||
} SFuncObj;
|
} SFuncObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -425,18 +436,8 @@ typedef struct {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
} SMqOffsetObj;
|
} SMqOffsetObj;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) {
|
int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
|
||||||
int32_t tlen = 0;
|
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
||||||
tlen += taosEncodeString(buf, pOffset->key);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) {
|
|
||||||
buf = taosDecodeStringTo(buf, pOffset->key);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
@ -459,15 +460,6 @@ typedef struct {
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
} SMqTopicObj;
|
} SMqTopicObj;
|
||||||
|
|
||||||
enum {
|
|
||||||
CONSUMER_UPDATE__TOUCH = 1,
|
|
||||||
CONSUMER_UPDATE__ADD,
|
|
||||||
CONSUMER_UPDATE__REMOVE,
|
|
||||||
CONSUMER_UPDATE__LOST,
|
|
||||||
CONSUMER_UPDATE__RECOVER,
|
|
||||||
CONSUMER_UPDATE__MODIFY,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
|
@ -475,10 +467,8 @@ typedef struct {
|
||||||
int8_t updateType; // used only for update
|
int8_t updateType; // used only for update
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
// hbStatus is not applicable to serialization
|
int32_t hbStatus; // hbStatus is not applicable to serialization
|
||||||
int32_t hbStatus;
|
SRWLatch lock; // lock is used for topics update
|
||||||
// lock is used for topics update
|
|
||||||
SRWLatch lock;
|
|
||||||
SArray* currentTopics; // SArray<char*>
|
SArray* currentTopics; // SArray<char*>
|
||||||
SArray* rebNewTopics; // SArray<char*>
|
SArray* rebNewTopics; // SArray<char*>
|
||||||
SArray* rebRemovedTopics; // SArray<char*>
|
SArray* rebRemovedTopics; // SArray<char*>
|
||||||
|
@ -492,7 +482,6 @@ typedef struct {
|
||||||
int64_t upTime;
|
int64_t upTime;
|
||||||
int64_t subscribeTime;
|
int64_t subscribeTime;
|
||||||
int64_t rebalanceTime;
|
int64_t rebalanceTime;
|
||||||
|
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||||
|
@ -593,7 +582,6 @@ typedef struct {
|
||||||
int32_t vgNum;
|
int32_t vgNum;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
// int32_t sqlLen;
|
|
||||||
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
|
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
|
||||||
int32_t fixedSinkVgId; // 0 for shuffle
|
int32_t fixedSinkVgId; // 0 for shuffle
|
||||||
int64_t smaId; // 0 for unused
|
int64_t smaId; // 0 for unused
|
||||||
|
|
|
@ -29,7 +29,8 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
|
||||||
|
|
||||||
// for trans test
|
// for trans test
|
||||||
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
|
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
|
||||||
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen);
|
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
|
||||||
|
int32_t *pRspLen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,8 @@
|
||||||
#include "mndCluster.h"
|
#include "mndCluster.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
|
|
||||||
#define TSDB_CLUSTER_VER_NUMBE 1
|
#define CLUSTER_VER_NUMBE 1
|
||||||
#define TSDB_CLUSTER_RESERVE_SIZE 64
|
#define CLUSTER_RESERVE_SIZE 64
|
||||||
|
|
||||||
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
|
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
|
||||||
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
|
||||||
|
@ -30,14 +30,16 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
|
||||||
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitCluster(SMnode *pMnode) {
|
int32_t mndInitCluster(SMnode *pMnode) {
|
||||||
SSdbTable table = {.sdbType = SDB_CLUSTER,
|
SSdbTable table = {
|
||||||
|
.sdbType = SDB_CLUSTER,
|
||||||
.keyType = SDB_KEY_INT64,
|
.keyType = SDB_KEY_INT64,
|
||||||
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
|
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
|
||||||
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
|
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
|
||||||
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
|
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
|
||||||
.insertFp = (SdbInsertFp)mndClusterActionInsert,
|
.insertFp = (SdbInsertFp)mndClusterActionInsert,
|
||||||
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
|
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
|
.deleteFp = (SdbDeleteFp)mndClusterActionDelete,
|
||||||
|
};
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
|
||||||
|
@ -79,19 +81,19 @@ int64_t mndGetClusterId(SMnode *pMnode) {
|
||||||
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
|
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj) + TSDB_CLUSTER_RESERVE_SIZE);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE);
|
||||||
if (pRaw == NULL) goto CLUSTER_ENCODE_OVER;
|
if (pRaw == NULL) goto _OVER;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_INT64(pRaw, dataPos, pCluster->id, CLUSTER_ENCODE_OVER)
|
SDB_SET_INT64(pRaw, dataPos, pCluster->id, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, CLUSTER_ENCODE_OVER)
|
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, CLUSTER_ENCODE_OVER)
|
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_ENCODE_OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_ENCODE_OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
CLUSTER_ENCODE_OVER:
|
_OVER:
|
||||||
if (terrno != 0) {
|
if (terrno != 0) {
|
||||||
mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr());
|
mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr());
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
|
@ -106,29 +108,29 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CLUSTER_DECODE_OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
|
||||||
|
|
||||||
if (sver != TSDB_CLUSTER_VER_NUMBE) {
|
if (sver != CLUSTER_VER_NUMBE) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto CLUSTER_DECODE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
|
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
|
||||||
if (pRow == NULL) goto CLUSTER_DECODE_OVER;
|
if (pRow == NULL) goto _OVER;
|
||||||
|
|
||||||
SClusterObj *pCluster = sdbGetRowObj(pRow);
|
SClusterObj *pCluster = sdbGetRowObj(pRow);
|
||||||
if (pCluster == NULL) goto CLUSTER_DECODE_OVER;
|
if (pCluster == NULL) goto _OVER;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, CLUSTER_DECODE_OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, CLUSTER_DECODE_OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, CLUSTER_DECODE_OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_DECODE_OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_DECODE_OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
CLUSTER_DECODE_OVER:
|
_OVER:
|
||||||
if (terrno != 0) {
|
if (terrno != 0) {
|
||||||
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
|
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
|
||||||
taosMemoryFreeClear(pRow);
|
taosMemoryFreeClear(pRow);
|
||||||
|
@ -161,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
||||||
|
|
||||||
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
|
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
strcpy(clusterObj.name, "tdengine2.0");
|
strcpy(clusterObj.name, "tdengine3.0");
|
||||||
mError("failed to get name from system, set to default val %s", clusterObj.name);
|
mError("failed to get name from system, set to default val %s", clusterObj.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,12 +13,14 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndDef.h"
|
#include "mndDef.h"
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
|
|
||||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
|
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
|
||||||
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
|
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -515,3 +517,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeString(buf, pOffset->key);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
|
||||||
|
buf = taosDecodeStringTo(buf, pOffset->key);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
||||||
|
return buf;
|
||||||
|
}
|
|
@ -28,8 +28,8 @@
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define TSDB_STB_VER_NUMBER 1
|
#define STB_VER_NUMBER 1
|
||||||
#define TSDB_STB_RESERVE_SIZE 64
|
#define STB_RESERVE_SIZE 64
|
||||||
|
|
||||||
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
|
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
|
||||||
|
@ -46,13 +46,15 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
||||||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitStb(SMnode *pMnode) {
|
int32_t mndInitStb(SMnode *pMnode) {
|
||||||
SSdbTable table = {.sdbType = SDB_STB,
|
SSdbTable table = {
|
||||||
|
.sdbType = SDB_STB,
|
||||||
.keyType = SDB_KEY_BINARY,
|
.keyType = SDB_KEY_BINARY,
|
||||||
.encodeFp = (SdbEncodeFp)mndStbActionEncode,
|
.encodeFp = (SdbEncodeFp)mndStbActionEncode,
|
||||||
.decodeFp = (SdbDecodeFp)mndStbActionDecode,
|
.decodeFp = (SdbDecodeFp)mndStbActionDecode,
|
||||||
.insertFp = (SdbInsertFp)mndStbActionInsert,
|
.insertFp = (SdbInsertFp)mndStbActionInsert,
|
||||||
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
|
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
|
.deleteFp = (SdbDeleteFp)mndStbActionDelete,
|
||||||
|
};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
|
||||||
|
@ -74,8 +76,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + +pStb->commentLen +
|
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + +pStb->commentLen +
|
||||||
pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
|
pStb->ast1Len + pStb->ast2Len + STB_RESERVE_SIZE;
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto _OVER;
|
if (pRaw == NULL) goto _OVER;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
|
@ -99,6 +101,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
|
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
|
||||||
SSchema *pSchema = &pStb->pColumns[i];
|
SSchema *pSchema = &pStb->pColumns[i];
|
||||||
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
|
||||||
|
SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
|
||||||
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
|
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||||
|
@ -107,6 +110,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
|
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
|
||||||
SSchema *pSchema = &pStb->pTags[i];
|
SSchema *pSchema = &pStb->pTags[i];
|
||||||
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
|
||||||
|
SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
|
||||||
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
|
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||||
|
@ -121,7 +125,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
if (pStb->ast2Len > 0) {
|
if (pStb->ast2Len > 0) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||||
}
|
}
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
@ -143,7 +147,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
|
||||||
|
|
||||||
if (sver != TSDB_STB_VER_NUMBER) {
|
if (sver != STB_VER_NUMBER) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
@ -183,6 +187,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
|
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
|
||||||
SSchema *pSchema = &pStb->pColumns[i];
|
SSchema *pSchema = &pStb->pColumns[i];
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
|
||||||
|
SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
|
||||||
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||||
|
@ -191,6 +196,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
|
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
|
||||||
SSchema *pSchema = &pStb->pTags[i];
|
SSchema *pSchema = &pStb->pTags[i];
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
|
||||||
|
SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
|
||||||
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||||
|
@ -211,7 +217,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
if (pStb->pAst2 == NULL) goto _OVER;
|
if (pStb->pAst2 == NULL) goto _OVER;
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||||
}
|
}
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
|
@ -488,7 +494,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
|
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
|
||||||
SField *pField1 = taosArrayGet(pCreate->pColumns, i);
|
SField *pField1 = taosArrayGet(pCreate->pColumns, i);
|
||||||
if (pField->type < 0) {
|
if (pField1->type < 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -574,6 +580,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
action.contLen = contLen;
|
action.contLen = contLen;
|
||||||
action.msgType = TDMT_VND_CREATE_STB;
|
action.msgType = TDMT_VND_CREATE_STB;
|
||||||
|
action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
@ -613,6 +620,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
action.contLen = contLen;
|
action.contLen = contLen;
|
||||||
action.msgType = TDMT_VND_DROP_STB;
|
action.msgType = TDMT_VND_DROP_STB;
|
||||||
|
action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
|
||||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
@ -733,6 +741,7 @@ _OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
mndTransSetDbInfo(pTrans, pDb);
|
mndTransSetDbInfo(pTrans, pDb);
|
||||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||||
|
@ -792,7 +801,10 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfStbs = -1;
|
int32_t numOfStbs = -1;
|
||||||
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs);
|
if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
|
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
|
||||||
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -819,7 +831,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) {
|
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
|
||||||
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
|
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1170,7 +1182,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||||
SStbObj stbObj = {0};
|
SStbObj stbObj = {0};
|
||||||
taosRLockLatch(&pOld->lock);
|
taosRLockLatch(&pOld->lock);
|
||||||
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
||||||
|
@ -1239,7 +1251,7 @@ static int32_t mndProcessMAlterStbReq(SNodeMsg *pReq) {
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SStbObj *pStb = NULL;
|
SStbObj *pStb = NULL;
|
||||||
SUserObj *pUser = NULL;
|
SUserObj *pUser = NULL;
|
||||||
SMAltertbReq alterReq = {0};
|
SMAlterStbReq alterReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
|
if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
|
|
@ -345,7 +345,10 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfStbs = -1;
|
int32_t numOfStbs = -1;
|
||||||
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs);
|
if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
|
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
|
||||||
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -137,7 +137,7 @@ void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
|
void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
|
||||||
SMAltertbReq req = {0};
|
SMAlterStbReq req = {0};
|
||||||
strcpy(req.name, stbname);
|
strcpy(req.name, stbname);
|
||||||
req.numOfFields = 1;
|
req.numOfFields = 1;
|
||||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||||
|
@ -158,7 +158,7 @@ void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagnam
|
||||||
}
|
}
|
||||||
|
|
||||||
void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
|
void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
|
||||||
SMAltertbReq req = {0};
|
SMAlterStbReq req = {0};
|
||||||
strcpy(req.name, stbname);
|
strcpy(req.name, stbname);
|
||||||
req.numOfFields = 1;
|
req.numOfFields = 1;
|
||||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||||
|
@ -180,7 +180,7 @@ void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagna
|
||||||
|
|
||||||
void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname,
|
void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname,
|
||||||
int32_t* pContLen) {
|
int32_t* pContLen) {
|
||||||
SMAltertbReq req = {0};
|
SMAlterStbReq req = {0};
|
||||||
strcpy(req.name, stbname);
|
strcpy(req.name, stbname);
|
||||||
req.numOfFields = 2;
|
req.numOfFields = 2;
|
||||||
req.pFields = taosArrayInit(2, sizeof(SField));
|
req.pFields = taosArrayInit(2, sizeof(SField));
|
||||||
|
@ -208,7 +208,7 @@ void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char*
|
||||||
|
|
||||||
void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes,
|
void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes,
|
||||||
int32_t* pContLen) {
|
int32_t* pContLen) {
|
||||||
SMAltertbReq req = {0};
|
SMAlterStbReq req = {0};
|
||||||
strcpy(req.name, stbname);
|
strcpy(req.name, stbname);
|
||||||
req.numOfFields = 1;
|
req.numOfFields = 1;
|
||||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||||
|
@ -229,7 +229,7 @@ void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char
|
||||||
}
|
}
|
||||||
|
|
||||||
void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen) {
|
void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen) {
|
||||||
SMAltertbReq req = {0};
|
SMAlterStbReq req = {0};
|
||||||
strcpy(req.name, stbname);
|
strcpy(req.name, stbname);
|
||||||
req.numOfFields = 1;
|
req.numOfFields = 1;
|
||||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||||
|
@ -250,7 +250,7 @@ void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* col
|
||||||
}
|
}
|
||||||
|
|
||||||
void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen) {
|
void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen) {
|
||||||
SMAltertbReq req = {0};
|
SMAlterStbReq req = {0};
|
||||||
strcpy(req.name, stbname);
|
strcpy(req.name, stbname);
|
||||||
req.numOfFields = 1;
|
req.numOfFields = 1;
|
||||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||||
|
@ -272,7 +272,7 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co
|
||||||
|
|
||||||
void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
|
void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
|
||||||
int32_t* pContLen) {
|
int32_t* pContLen) {
|
||||||
SMAltertbReq req = {0};
|
SMAlterStbReq req = {0};
|
||||||
strcpy(req.name, stbname);
|
strcpy(req.name, stbname);
|
||||||
req.numOfFields = 1;
|
req.numOfFields = 1;
|
||||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||||
|
|
|
@ -325,10 +325,15 @@ typedef struct SExchangeInfo {
|
||||||
SLoadRemoteDataInfo loadInfo;
|
SLoadRemoteDataInfo loadInfo;
|
||||||
} SExchangeInfo;
|
} SExchangeInfo;
|
||||||
|
|
||||||
|
#define COL_MATCH_FROM_COL_ID 0x1
|
||||||
|
#define COL_MATCH_FROM_SLOT_ID 0x2
|
||||||
|
|
||||||
typedef struct SColMatchInfo {
|
typedef struct SColMatchInfo {
|
||||||
|
int32_t srcSlotId; // source slot id
|
||||||
int32_t colId;
|
int32_t colId;
|
||||||
int32_t targetSlotId;
|
int32_t targetSlotId;
|
||||||
bool output;
|
bool output;
|
||||||
|
int32_t matchType; // determinate the source according to col id or slot id
|
||||||
} SColMatchInfo;
|
} SColMatchInfo;
|
||||||
|
|
||||||
typedef struct SScanInfo {
|
typedef struct SScanInfo {
|
||||||
|
|
|
@ -4699,10 +4699,9 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
|
||||||
uint64_t queryId, uint64_t taskId);
|
uint64_t queryId, uint64_t taskId);
|
||||||
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);
|
||||||
|
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
static SArray* createIndexMap(SNodeList* pNodeList);
|
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||||
static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode);
|
static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode);
|
||||||
|
@ -4734,9 +4733,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pColList =
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
SQueryTableDataCond cond = {0};
|
||||||
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||||
|
@ -4761,10 +4761,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo,
|
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo,
|
||||||
pScanPhyNode->node.pConditions);
|
pScanPhyNode->node.pConditions);
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
|
@ -4773,18 +4774,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||||
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
|
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
||||||
|
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* colList =
|
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
|
||||||
extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
|
||||||
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
||||||
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
||||||
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||||
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode;
|
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
|
||||||
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
|
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||||
|
@ -4796,8 +4801,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* colList =
|
SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
|
||||||
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
|
SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -4869,15 +4873,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
||||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
|
||||||
|
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* pColList =
|
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||||
extractColMatchInfo(pSortPhyNode->pTargets, pSortPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
|
||||||
|
|
||||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||||
|
@ -5059,25 +5064,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* createIndexMap(SNodeList* pNodeList) {
|
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type) {
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(int32_t));
|
|
||||||
if (pList == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return pList;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
STargetNode* pTarget = (STargetNode*)nodesListGetNode(pNodeList, i);
|
|
||||||
|
|
||||||
SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr;
|
|
||||||
taosArrayPush(pList, &pColNode->slotId);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pList;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols) {
|
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
||||||
if (pList == NULL) {
|
if (pList == NULL) {
|
||||||
|
@ -5092,6 +5079,8 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
SColMatchInfo c = {0};
|
SColMatchInfo c = {0};
|
||||||
c.output = true;
|
c.output = true;
|
||||||
c.colId = pColNode->colId;
|
c.colId = pColNode->colId;
|
||||||
|
c.srcSlotId = pColNode->slotId;
|
||||||
|
c.matchType = type;
|
||||||
c.targetSlotId = pNode->slotId;
|
c.targetSlotId = pNode->slotId;
|
||||||
taosArrayPush(pList, &c);
|
taosArrayPush(pList, &c);
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,6 +129,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
||||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
memcpy(pkey->pData, val, varDataTLen(val));
|
memcpy(pkey->pData, val, varDataTLen(val));
|
||||||
|
ASSERT(varDataTLen(val) <= pkey->bytes);
|
||||||
} else {
|
} else {
|
||||||
memcpy(pkey->pData, val, pkey->bytes);
|
memcpy(pkey->pData, val, pkey->bytes);
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,15 +89,11 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
||||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
|
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
for(int32_t j = 0; j < p->info.numOfCols; ++j) {
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, j);
|
|
||||||
if (pSrc->info.colId == pmInfo->colId) {
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
||||||
colDataAssign(pDst, pSrc, p->info.rows);
|
colDataAssign(pDst, pSrc, p->info.rows);
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pDataBlock->info.rows = p->info.rows;
|
pDataBlock->info.rows = p->info.rows;
|
||||||
|
|
|
@ -2596,7 +2596,7 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS
|
||||||
pStmt->ignoreNotExists);
|
pStmt->ignoreNotExists);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterReq) {
|
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterReq) {
|
||||||
pAlterReq->pFields = taosArrayInit(2, sizeof(TAOS_FIELD));
|
pAlterReq->pFields = taosArrayInit(2, sizeof(TAOS_FIELD));
|
||||||
if (NULL == pAlterReq->pFields) {
|
if (NULL == pAlterReq->pFields) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -2632,7 +2632,7 @@ static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAltertbReq* pAlterRe
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateAlterTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
|
static int32_t translateAlterTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
|
||||||
SMAltertbReq alterReq = {0};
|
SMAlterStbReq alterReq = {0};
|
||||||
SName tableName;
|
SName tableName;
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), alterReq.name);
|
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), alterReq.name);
|
||||||
alterReq.alterType = pStmt->alterType;
|
alterReq.alterType = pStmt->alterType;
|
||||||
|
|
|
@ -326,6 +326,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, "Invalid table type")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, "Invalid table type")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION, "Invalid table schema version")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION, "Invalid table schema version")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, "Table already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, "Table already exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_NOT_EXIST, "Table not exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_ALREADY_EXIST, "Stable already exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_NOT_EXIST, "Stable not exists")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, "Invalid configuration")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, "Invalid configuration")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INIT_FAILED, "Tsdb init failed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INIT_FAILED, "Tsdb init failed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_DISKSPACE, "No diskspace for tsdb")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_DISKSPACE, "No diskspace for tsdb")
|
||||||
|
|
|
@ -1,5 +0,0 @@
|
||||||
run general/stable/disk.sim
|
|
||||||
run general/stable/dnode3.sim
|
|
||||||
run general/stable/metrics.sim
|
|
||||||
run general/stable/values.sim
|
|
||||||
run general/stable/vnode3.sim
|
|
Loading…
Reference in New Issue