Merge pull request #25451 from taosdata/enh/refactorCompress
refactor compress
This commit is contained in:
commit
e47d17e826
|
@ -86,16 +86,16 @@ bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRES
|
||||||
bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]);
|
bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]);
|
||||||
bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTION_LEN]);
|
bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTION_LEN]);
|
||||||
|
|
||||||
void setColEncode(uint32_t* compress, uint8_t encode);
|
void setColEncode(uint32_t* compress, uint8_t encode);
|
||||||
void setColCompress(uint32_t* compress, uint16_t compressType);
|
void setColCompress(uint32_t* compress, uint16_t compressType);
|
||||||
void setColLevel(uint32_t* compress, uint8_t level);
|
void setColLevel(uint32_t* compress, uint8_t level);
|
||||||
int8_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check,
|
int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check,
|
||||||
uint32_t* compress);
|
uint32_t* compress);
|
||||||
|
|
||||||
int8_t validColCompressLevel(uint8_t type, uint8_t level);
|
int8_t validColCompressLevel(uint8_t type, uint8_t level);
|
||||||
int8_t validColCompress(uint8_t type, uint8_t l2);
|
int8_t validColCompress(uint8_t type, uint8_t l2);
|
||||||
int8_t validColEncode(uint8_t type, uint8_t l1);
|
int8_t validColEncode(uint8_t type, uint8_t l1);
|
||||||
|
|
||||||
uint32_t createDefaultColCmprByType(uint8_t type);
|
uint32_t createDefaultColCmprByType(uint8_t type);
|
||||||
bool validColCmprByType(uint8_t type, uint32_t cmpr);
|
int32_t validColCmprByType(uint8_t type, uint32_t cmpr);
|
||||||
#endif /*_TD_TCOL_H_*/
|
#endif /*_TD_TCOL_H_*/
|
||||||
|
|
|
@ -182,6 +182,8 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TSC_STMT_CACHE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0230)
|
#define TSDB_CODE_TSC_STMT_CACHE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0230)
|
||||||
#define TSDB_CODE_TSC_ENCODE_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0231)
|
#define TSDB_CODE_TSC_ENCODE_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0231)
|
||||||
#define TSDB_CODE_TSC_ENCODE_PARAM_NULL TAOS_DEF_ERROR_CODE(0, 0X0232)
|
#define TSDB_CODE_TSC_ENCODE_PARAM_NULL TAOS_DEF_ERROR_CODE(0, 0X0232)
|
||||||
|
#define TSDB_CODE_TSC_COMPRESS_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0233)
|
||||||
|
#define TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR TAOS_DEF_ERROR_CODE(0, 0X0234)
|
||||||
#define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0X02FF)
|
#define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0X02FF)
|
||||||
|
|
||||||
// mnode-common
|
// mnode-common
|
||||||
|
@ -283,7 +285,6 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
|
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
|
||||||
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
|
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
|
||||||
#define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370)
|
#define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370)
|
||||||
#define TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0371)
|
|
||||||
|
|
||||||
|
|
||||||
// mnode-func
|
// mnode-func
|
||||||
|
@ -406,6 +407,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_INVALID_TARGET_TABLE TAOS_DEF_ERROR_CODE(0, 0x03F7)
|
#define TSDB_CODE_MND_INVALID_TARGET_TABLE TAOS_DEF_ERROR_CODE(0, 0x03F7)
|
||||||
|
|
||||||
|
|
||||||
|
#define TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F8)
|
||||||
|
|
||||||
// dnode
|
// dnode
|
||||||
// #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) // 2.x
|
// #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) // 2.x
|
||||||
|
|
|
@ -306,22 +306,22 @@ void setColLevel(uint32_t* compress, uint8_t level) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check,
|
int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check,
|
||||||
uint32_t* compress) {
|
uint32_t* compress) {
|
||||||
if (check && !validColEncode(type, encode)) return 0;
|
if (check && !validColEncode(type, encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
||||||
setColEncode(compress, encode);
|
setColEncode(compress, encode);
|
||||||
|
|
||||||
if (compressType == TSDB_COLVAL_COMPRESS_DISABLED) {
|
if (compressType == TSDB_COLVAL_COMPRESS_DISABLED) {
|
||||||
setColCompress(compress, compressType);
|
setColCompress(compress, compressType);
|
||||||
setColLevel(compress, TSDB_COLVAL_LEVEL_DISABLED);
|
setColLevel(compress, TSDB_COLVAL_LEVEL_DISABLED);
|
||||||
} else {
|
} else {
|
||||||
if (check && !validColCompress(type, compressType)) return 0;
|
if (check && !validColCompress(type, compressType)) return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
|
||||||
setColCompress(compress, compressType);
|
setColCompress(compress, compressType);
|
||||||
|
|
||||||
if (check && !validColCompressLevel(type, level)) return 0;
|
if (check && !validColCompressLevel(type, level)) return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
|
||||||
setColLevel(compress, level);
|
setColLevel(compress, level);
|
||||||
}
|
}
|
||||||
return 1;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool useCompress(uint8_t tableType) { return TSDB_SUPER_TABLE == tableType || TSDB_NORMAL_TABLE == tableType; }
|
bool useCompress(uint8_t tableType) { return TSDB_SUPER_TABLE == tableType || TSDB_NORMAL_TABLE == tableType; }
|
||||||
|
@ -397,10 +397,17 @@ uint32_t createDefaultColCmprByType(uint8_t type) {
|
||||||
SET_COMPRESS(encode, compress, lvl, ret);
|
SET_COMPRESS(encode, compress, lvl, ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
bool validColCmprByType(uint8_t type, uint32_t cmpr) {
|
int32_t validColCmprByType(uint8_t type, uint32_t cmpr) {
|
||||||
DEFINE_VAR(cmpr);
|
DEFINE_VAR(cmpr);
|
||||||
if (validColEncode(type, l1) && validColCompress(type, l2) && validColCompressLevel(type, lvl)) {
|
if (!validColEncode(type, l1)) {
|
||||||
return true;
|
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
||||||
}
|
}
|
||||||
return false;
|
if (!validColCompress(type, l2)) {
|
||||||
|
return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!validColCompressLevel(type, lvl)) {
|
||||||
|
return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
|
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq);
|
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
|
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
|
||||||
|
|
||||||
int32_t mndInitStb(SMnode *pMnode) {
|
int32_t mndInitStb(SMnode *pMnode) {
|
||||||
|
@ -1006,7 +1006,8 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
pHead->vgId = htonl(pVgroup->vgId);
|
||||||
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq);
|
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
||||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -1752,9 +1753,10 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *
|
||||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (!validColCmprByType(pTarget->type, p->bytes)) {
|
code = validColCmprByType(pTarget->type, p->bytes);
|
||||||
terrno = TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return -1;
|
terrno = code;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t updated = 0;
|
int8_t updated = 0;
|
||||||
|
@ -3885,32 +3887,33 @@ typedef struct SMDropTbDbInfo {
|
||||||
} SMDropTbDbInfo;
|
} SMDropTbDbInfo;
|
||||||
|
|
||||||
typedef struct SMDropTbTsmaInfo {
|
typedef struct SMDropTbTsmaInfo {
|
||||||
char tsmaResTbDbFName[TSDB_DB_FNAME_LEN];
|
char tsmaResTbDbFName[TSDB_DB_FNAME_LEN];
|
||||||
char tsmaResTbNamePrefix[TSDB_TABLE_NAME_LEN];
|
char tsmaResTbNamePrefix[TSDB_TABLE_NAME_LEN];
|
||||||
int32_t suid;
|
int32_t suid;
|
||||||
SMDropTbDbInfo dbInfo; // reference to DbInfo in pDbMap
|
SMDropTbDbInfo dbInfo; // reference to DbInfo in pDbMap
|
||||||
} SMDropTbTsmaInfo;
|
} SMDropTbTsmaInfo;
|
||||||
|
|
||||||
typedef struct SMDropTbTsmaInfos {
|
typedef struct SMDropTbTsmaInfos {
|
||||||
SArray* pTsmaInfos; // SMDropTbTsmaInfo
|
SArray *pTsmaInfos; // SMDropTbTsmaInfo
|
||||||
} SMDropTbTsmaInfos;
|
} SMDropTbTsmaInfos;
|
||||||
|
|
||||||
typedef struct SMndDropTbsWithTsmaCtx {
|
typedef struct SMndDropTbsWithTsmaCtx {
|
||||||
SHashObj* pTsmaMap; // <suid, SMDropTbTsmaInfos>
|
SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos>
|
||||||
SHashObj* pDbMap; // <dbuid, SMDropTbDbInfo>
|
SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo>
|
||||||
SHashObj* pVgMap; // <vgId, SVDropTbVgReqs>
|
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
|
||||||
SArray* pResTbNames; // SArray<char*>
|
SArray *pResTbNames; // SArray<char*>
|
||||||
} SMndDropTbsWithTsmaCtx;
|
} SMndDropTbsWithTsmaCtx;
|
||||||
|
|
||||||
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWithTsmaCtx* pCtx, SArray* pTbs, int32_t vgId);
|
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||||
|
int32_t vgId);
|
||||||
|
|
||||||
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
|
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
|
||||||
if (!p) return;
|
if (!p) return;
|
||||||
|
|
||||||
if (p->pDbMap) {
|
if (p->pDbMap) {
|
||||||
void* pIter = taosHashIterate(p->pDbMap, NULL);
|
void *pIter = taosHashIterate(p->pDbMap, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SMDropTbDbInfo* pInfo = pIter;
|
SMDropTbDbInfo *pInfo = pIter;
|
||||||
taosArrayDestroy(pInfo->dbVgInfos);
|
taosArrayDestroy(pInfo->dbVgInfos);
|
||||||
pIter = taosHashIterate(p->pDbMap, pIter);
|
pIter = taosHashIterate(p->pDbMap, pIter);
|
||||||
}
|
}
|
||||||
|
@ -3920,9 +3923,9 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
|
||||||
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
|
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
|
||||||
}
|
}
|
||||||
if (p->pTsmaMap) {
|
if (p->pTsmaMap) {
|
||||||
void* pIter = taosHashIterate(p->pTsmaMap, NULL);
|
void *pIter = taosHashIterate(p->pTsmaMap, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SMDropTbTsmaInfos* pInfos = pIter;
|
SMDropTbTsmaInfos *pInfos = pIter;
|
||||||
taosArrayDestroy(pInfos->pTsmaInfos);
|
taosArrayDestroy(pInfos->pTsmaInfos);
|
||||||
pIter = taosHashIterate(p->pTsmaMap, pIter);
|
pIter = taosHashIterate(p->pTsmaMap, pIter);
|
||||||
}
|
}
|
||||||
|
@ -3930,7 +3933,7 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p->pVgMap) {
|
if (p->pVgMap) {
|
||||||
void* pIter = taosHashIterate(p->pVgMap, NULL);
|
void *pIter = taosHashIterate(p->pVgMap, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SVDropTbVgReqs *pReqs = pIter;
|
SVDropTbVgReqs *pReqs = pIter;
|
||||||
taosArrayDestroy(pReqs->req.pArray);
|
taosArrayDestroy(pReqs->req.pArray);
|
||||||
|
@ -3941,9 +3944,9 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx** ppCtx) {
|
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMndDropTbsWithTsmaCtx* pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
|
SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
|
||||||
if (!pCtx) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!pCtx) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
if (!pCtx->pTsmaMap) {
|
if (!pCtx->pTsmaMap) {
|
||||||
|
@ -3969,12 +3972,12 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, const SVDropTbBatchReq *pReq,
|
||||||
static void* mndBuildVDropTbsReq(SMnode* pMnode, const SVgroupInfo* pVgInfo, const SVDropTbBatchReq* pReq, int32_t *len) {
|
int32_t *len) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
SMsgHead *pHead = NULL;
|
SMsgHead *pHead = NULL;
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
|
|
||||||
tEncodeSize(tEncodeSVDropTbBatchReq, pReq, contLen, ret);
|
tEncodeSize(tEncodeSVDropTbBatchReq, pReq, contLen, ret);
|
||||||
if (ret < 0) return NULL;
|
if (ret < 0) return NULL;
|
||||||
|
@ -3999,7 +4002,8 @@ static void* mndBuildVDropTbsReq(SMnode* pMnode, const SVgroupInfo* pVgInfo, con
|
||||||
return pHead;
|
return pHead;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, STrans* pTrans, const SVDropTbVgReqs* pVgReqs, void* pCont, int32_t contLen) {
|
static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont,
|
||||||
|
int32_t contLen) {
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = pVgReqs->info.epSet;
|
action.epSet = pVgReqs->info.epSet;
|
||||||
action.pCont = pCont;
|
action.pCont = pCont;
|
||||||
|
@ -4009,7 +4013,7 @@ static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, STrans* pTrans, const SV
|
||||||
return mndTransAppendRedoAction(pTrans, &action);
|
return mndTransAppendRedoAction(pTrans, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp, SMndDropTbsWithTsmaCtx* pCtx) {
|
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) {
|
||||||
SMnode *pMnode = pRsp->info.node;
|
SMnode *pMnode = pRsp->info.node;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
|
||||||
mndTransSetChangeless(pTrans);
|
mndTransSetChangeless(pTrans);
|
||||||
|
@ -4017,11 +4021,11 @@ static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp, SMndDropTbsWithTsmaCtx*
|
||||||
|
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
void* pIter = taosHashIterate(pCtx->pVgMap, NULL);
|
void *pIter = taosHashIterate(pCtx->pVgMap, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
const SVDropTbVgReqs* pVgReqs = pIter;
|
const SVDropTbVgReqs *pVgReqs = pIter;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
void* p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
|
void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
|
||||||
if (!p || mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len) != 0) {
|
if (!p || mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len) != 0) {
|
||||||
taosHashCancelIterate(pCtx->pVgMap, pIter);
|
taosHashCancelIterate(pCtx->pVgMap, pIter);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -4035,28 +4039,27 @@ _OVER:
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
|
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SStbObj *pStb = NULL;
|
SStbObj *pStb = NULL;
|
||||||
SMDropTbsReq dropReq = {0};
|
SMDropTbsReq dropReq = {0};
|
||||||
bool locked = false;
|
bool locked = false;
|
||||||
if (tDeserializeSMDropTbsReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSMDropTbsReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMndDropTbsWithTsmaCtx* pCtx = NULL;
|
SMndDropTbsWithTsmaCtx *pCtx = NULL;
|
||||||
terrno = mndInitDropTbsWithTsmaCtx(&pCtx);
|
terrno = mndInitDropTbsWithTsmaCtx(&pCtx);
|
||||||
if (terrno) goto _OVER;
|
if (terrno) goto _OVER;
|
||||||
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
|
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
|
||||||
SMDropTbReqsOnSingleVg* pReq = taosArrayGet(dropReq.pVgReqs, i);
|
SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
|
||||||
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
||||||
if (terrno) goto _OVER;
|
if (terrno) goto _OVER;
|
||||||
}
|
}
|
||||||
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0)
|
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) code = 0;
|
||||||
code = 0;
|
|
||||||
_OVER:
|
_OVER:
|
||||||
tFreeSMDropTbsReq(&dropReq);
|
tFreeSMDropTbsReq(&dropReq);
|
||||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||||
|
@ -4067,8 +4070,8 @@ static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupI
|
||||||
bool ignoreNotExists) {
|
bool ignoreNotExists) {
|
||||||
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists};
|
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists};
|
||||||
|
|
||||||
SVDropTbVgReqs * pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
SVDropTbVgReqs *pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||||
SVDropTbVgReqs reqs = {0};
|
SVDropTbVgReqs reqs = {0};
|
||||||
if (pReqs == NULL) {
|
if (pReqs == NULL) {
|
||||||
reqs.info = *pVgInfo;
|
reqs.info = *pVgInfo;
|
||||||
reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
|
reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
|
||||||
|
@ -4080,16 +4083,16 @@ static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupI
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetDbVgInfoForTsma(SMnode* pMnode, const char* dbname, SMDropTbTsmaInfo* pInfo) {
|
static int32_t mndGetDbVgInfoForTsma(SMnode *pMnode, const char *dbname, SMDropTbTsmaInfo *pInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, dbname);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbname);
|
||||||
if (!pDb) {
|
if (!pDb) {
|
||||||
code = TSDB_CODE_MND_DB_NOT_EXIST;
|
code = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||||
if ( !pInfo->dbInfo.dbVgInfos) {
|
if (!pInfo->dbInfo.dbVgInfos) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -4108,9 +4111,9 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vgHashValCmp(const void* lp, const void* rp) {
|
int32_t vgHashValCmp(const void *lp, const void *rp) {
|
||||||
uint32_t* key = (uint32_t*)lp;
|
uint32_t *key = (uint32_t *)lp;
|
||||||
SVgroupInfo* pVg = (SVgroupInfo*)rp;
|
SVgroupInfo *pVg = (SVgroupInfo *)rp;
|
||||||
|
|
||||||
if (*key < pVg->hashBegin) {
|
if (*key < pVg->hashBegin) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -4121,23 +4124,26 @@ int32_t vgHashValCmp(const void* lp, const void* rp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWithTsmaCtx* pCtx, SArray* pTbs, int32_t vgId) {
|
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||||
|
int32_t vgId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SVgObj* pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
if (!pVgObj) {
|
if (!pVgObj) {
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin, .hashEnd = pVgObj->hashEnd, .numOfTable = pVgObj->numOfTables, .vgId = pVgObj->vgId};
|
SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin,
|
||||||
|
.hashEnd = pVgObj->hashEnd,
|
||||||
|
.numOfTable = pVgObj->numOfTables,
|
||||||
|
.vgId = pVgObj->vgId};
|
||||||
vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
|
|
||||||
// get all stb uids
|
// get all stb uids
|
||||||
for (int32_t i = 0; i < pTbs->size; ++i) {
|
for (int32_t i = 0; i < pTbs->size; ++i) {
|
||||||
const SVDropTbReq* pTb = taosArrayGet(pTbs, i);
|
const SVDropTbReq *pTb = taosArrayGet(pTbs, i);
|
||||||
if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
|
if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SMDropTbTsmaInfos infos = {0};
|
SMDropTbTsmaInfos infos = {0};
|
||||||
infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
|
infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
|
||||||
|
@ -4156,14 +4162,14 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWith
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
if (!pIter) break;
|
if (!pIter) break;
|
||||||
SMDropTbTsmaInfos* pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
|
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
|
||||||
if (pInfos) {
|
if (pInfos) {
|
||||||
SMDropTbTsmaInfo info = {0};
|
SMDropTbTsmaInfo info = {0};
|
||||||
int32_t len = sprintf(buf, "%s", pSma->name);
|
int32_t len = sprintf(buf, "%s", pSma->name);
|
||||||
len = taosCreateMD5Hash(buf, len);
|
len = taosCreateMD5Hash(buf, len);
|
||||||
sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
|
sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
|
||||||
snprintf(info.tsmaResTbNamePrefix, TSDB_TABLE_NAME_LEN, "%s", buf);
|
snprintf(info.tsmaResTbNamePrefix, TSDB_TABLE_NAME_LEN, "%s", buf);
|
||||||
SMDropTbDbInfo* pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
|
SMDropTbDbInfo *pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
|
||||||
info.suid = pSma->dstTbUid;
|
info.suid = pSma->dstTbUid;
|
||||||
if (!pDbInfo) {
|
if (!pDbInfo) {
|
||||||
code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
|
code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
|
||||||
|
@ -4183,7 +4189,7 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWith
|
||||||
|
|
||||||
// generate vg req map
|
// generate vg req map
|
||||||
for (int32_t i = 0; i < pTbs->size; ++i) {
|
for (int32_t i = 0; i < pTbs->size; ++i) {
|
||||||
SVDropTbReq* pTb = taosArrayGet(pTbs, i);
|
SVDropTbReq *pTb = taosArrayGet(pTbs, i);
|
||||||
mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists);
|
mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists);
|
||||||
|
|
||||||
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
|
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
|
||||||
|
@ -4195,7 +4201,7 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWith
|
||||||
uint32_t hashVal =
|
uint32_t hashVal =
|
||||||
taosGetTbHashVal(buf, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix, pInfo->dbInfo.hashSuffix);
|
taosGetTbHashVal(buf, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix, pInfo->dbInfo.hashSuffix);
|
||||||
const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
|
const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
|
||||||
void* p = taosStrdup(buf + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
|
void *p = taosStrdup(buf + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
|
||||||
taosArrayPush(pCtx->pResTbNames, &p);
|
taosArrayPush(pCtx->pResTbNames, &p);
|
||||||
mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true);
|
mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true);
|
||||||
}
|
}
|
||||||
|
@ -4225,8 +4231,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
|
||||||
|
|
||||||
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
||||||
if (terrno) goto _end;
|
if (terrno) goto _end;
|
||||||
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0)
|
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0;
|
||||||
code = 0;
|
|
||||||
_end:
|
_end:
|
||||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -18,10 +18,10 @@
|
||||||
#include "commandInt.h"
|
#include "commandInt.h"
|
||||||
#include "scheduler.h"
|
#include "scheduler.h"
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
|
#include "taosdef.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "tgrant.h"
|
#include "tgrant.h"
|
||||||
#include "taosdef.h"
|
|
||||||
|
|
||||||
extern SConfig* tsCfg;
|
extern SConfig* tsCfg;
|
||||||
|
|
||||||
|
@ -126,7 +126,8 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
|
||||||
pCol7 = taosArrayGet(pBlock->pDataBlock, 6);
|
pCol7 = taosArrayGet(pBlock->pDataBlock, 6);
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[DESCRIBE_RESULT_FIELD_LEN] = {0};
|
int32_t fillTagCol = 0;
|
||||||
|
char buf[DESCRIBE_RESULT_FIELD_LEN] = {0};
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) {
|
if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -140,6 +141,7 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
|
||||||
if (TSDB_VIEW_TABLE != pMeta->tableType) {
|
if (TSDB_VIEW_TABLE != pMeta->tableType) {
|
||||||
if (i >= pMeta->tableInfo.numOfColumns) {
|
if (i >= pMeta->tableInfo.numOfColumns) {
|
||||||
STR_TO_VARSTR(buf, "TAG");
|
STR_TO_VARSTR(buf, "TAG");
|
||||||
|
fillTagCol = 1;
|
||||||
} else if (i == 1 && pMeta->schema[i].flags & COL_IS_KEY) {
|
} else if (i == 1 && pMeta->schema[i].flags & COL_IS_KEY) {
|
||||||
STR_TO_VARSTR(buf, "PRIMARY KEY")
|
STR_TO_VARSTR(buf, "PRIMARY KEY")
|
||||||
} else {
|
} else {
|
||||||
|
@ -158,15 +160,17 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
|
||||||
STR_TO_VARSTR(buf, columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pMeta->schemaExt[i].compress)));
|
STR_TO_VARSTR(buf, columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pMeta->schemaExt[i].compress)));
|
||||||
colDataSetVal(pCol7, pBlock->info.rows, buf, false);
|
colDataSetVal(pCol7, pBlock->info.rows, buf, false);
|
||||||
} else {
|
} else {
|
||||||
STR_TO_VARSTR(buf, "");
|
STR_TO_VARSTR(buf, fillTagCol == 0 ? "" : "disabled");
|
||||||
colDataSetVal(pCol5, pBlock->info.rows, buf, false);
|
colDataSetVal(pCol5, pBlock->info.rows, buf, false);
|
||||||
STR_TO_VARSTR(buf, "");
|
STR_TO_VARSTR(buf, fillTagCol == 0 ? "" : "disabled");
|
||||||
colDataSetVal(pCol6, pBlock->info.rows, buf, false);
|
colDataSetVal(pCol6, pBlock->info.rows, buf, false);
|
||||||
STR_TO_VARSTR(buf, "");
|
STR_TO_VARSTR(buf, fillTagCol == 0 ? "" : "disabled");
|
||||||
colDataSetVal(pCol7, pBlock->info.rows, buf, false);
|
colDataSetVal(pCol7, pBlock->info.rows, buf, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fillTagCol = 0;
|
||||||
|
|
||||||
++(pBlock->info.rows);
|
++(pBlock->info.rows);
|
||||||
}
|
}
|
||||||
if (pMeta->tableType == TSDB_SUPER_TABLE && biMode != 0) {
|
if (pMeta->tableType == TSDB_SUPER_TABLE && biMode != 0) {
|
||||||
|
@ -355,7 +359,7 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* retentions = buildRetension(pCfg->pRetensions);
|
char* retentions = buildRetension(pCfg->pRetensions);
|
||||||
int32_t dbFNameLen = strlen(dbFName);
|
int32_t dbFNameLen = strlen(dbFName);
|
||||||
int32_t hashPrefix = 0;
|
int32_t hashPrefix = 0;
|
||||||
if (pCfg->hashPrefix > 0) {
|
if (pCfg->hashPrefix > 0) {
|
||||||
|
@ -367,17 +371,20 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
|
||||||
if (IS_SYS_DBNAME(dbName)) {
|
if (IS_SYS_DBNAME(dbName)) {
|
||||||
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s`", dbName);
|
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s`", dbName);
|
||||||
} else {
|
} else {
|
||||||
len += sprintf(
|
len += sprintf(buf2 + VARSTR_HEADER_SIZE,
|
||||||
buf2 + VARSTR_HEADER_SIZE,
|
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
|
||||||
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
|
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d "
|
||||||
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
|
"PRECISION '%s' REPLICA %d "
|
||||||
"WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d TABLE_PREFIX %d TABLE_SUFFIX %d TSDB_PAGESIZE %d "
|
"WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d TABLE_PREFIX %d TABLE_SUFFIX %d TSDB_PAGESIZE %d "
|
||||||
"WAL_RETENTION_PERIOD %d WAL_RETENTION_SIZE %" PRId64 " KEEP_TIME_OFFSET %d ENCRYPT_ALGORITHM '%s' S3_CHUNKSIZE %d S3_KEEPLOCAL %dm S3_COMPACT %d",
|
"WAL_RETENTION_PERIOD %d WAL_RETENTION_SIZE %" PRId64
|
||||||
dbName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile,
|
" KEEP_TIME_OFFSET %d ENCRYPT_ALGORITHM '%s' S3_CHUNKSIZE %d S3_KEEPLOCAL %dm S3_COMPACT %d",
|
||||||
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
|
dbName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression,
|
||||||
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups,
|
pCfg->daysPerFile, pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger,
|
||||||
1 == pCfg->numOfStables, hashPrefix, pCfg->hashSuffix, pCfg->tsdbPageSize, pCfg->walRetentionPeriod, pCfg->walRetentionSize,
|
pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages, pCfg->pageSize, prec,
|
||||||
pCfg->keepTimeOffset, encryptAlgorithmStr(pCfg->encryptAlgorithm), pCfg->s3ChunkSize, pCfg->s3KeepLocal, pCfg->s3Compact);
|
pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups, 1 == pCfg->numOfStables, hashPrefix,
|
||||||
|
pCfg->hashSuffix, pCfg->tsdbPageSize, pCfg->walRetentionPeriod, pCfg->walRetentionSize,
|
||||||
|
pCfg->keepTimeOffset, encryptAlgorithmStr(pCfg->encryptAlgorithm), pCfg->s3ChunkSize,
|
||||||
|
pCfg->s3KeepLocal, pCfg->s3Compact);
|
||||||
|
|
||||||
if (retentions) {
|
if (retentions) {
|
||||||
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
|
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
|
||||||
|
@ -391,7 +398,9 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
|
||||||
colDataSetVal(pCol2, 0, buf2, false);
|
colDataSetVal(pCol2, 0, buf2, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define CHECK_LEADER(n) (row[n] && (fields[n].type == TSDB_DATA_TYPE_VARCHAR && strncasecmp(row[n], "leader", varDataLen((char *)row[n] - VARSTR_HEADER_SIZE)) == 0))
|
#define CHECK_LEADER(n) \
|
||||||
|
(row[n] && (fields[n].type == TSDB_DATA_TYPE_VARCHAR && \
|
||||||
|
strncasecmp(row[n], "leader", varDataLen((char*)row[n] - VARSTR_HEADER_SIZE)) == 0))
|
||||||
// on this row, if have leader return true else return false
|
// on this row, if have leader return true else return false
|
||||||
bool existLeaderRole(TAOS_ROW row, TAOS_FIELD* fields, int nFields) {
|
bool existLeaderRole(TAOS_ROW row, TAOS_FIELD* fields, int nFields) {
|
||||||
// vgroup_id | db_name | tables | v1_dnode | v1_status | v2_dnode | v2_status | v3_dnode | v3_status | v4_dnode |
|
// vgroup_id | db_name | tables | v1_dnode | v1_status | v2_dnode | v2_status | v3_dnode | v3_status | v4_dnode |
|
||||||
|
@ -548,23 +557,25 @@ static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||||
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
|
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
|
||||||
SSchema* pSchema = pCfg->pSchemas + i;
|
SSchema* pSchema = pCfg->pSchemas + i;
|
||||||
char type[32 + 60]; // 60 byte for compress info
|
char type[32 + 60]; // 60 byte for compress info
|
||||||
sprintf(type, "%s", tDataTypes[pSchema->type].name);
|
sprintf(type, "%s", tDataTypes[pSchema->type].name);
|
||||||
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_VARBINARY == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
|
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_VARBINARY == pSchema->type ||
|
||||||
|
TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
|
||||||
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
|
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
|
||||||
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
|
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
|
||||||
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (useCompress(pCfg->tableType)) {
|
if (useCompress(pCfg->tableType)) {
|
||||||
sprintf(type + strlen(type), " ENCODE \'%s\'", columnEncodeStr(COMPRESS_L1_TYPE_U32(pCfg->pSchemaExt[i].compress)));
|
sprintf(type + strlen(type), " ENCODE \'%s\'",
|
||||||
sprintf(type + strlen(type), " COMPRESS \'%s\'", columnCompressStr(COMPRESS_L2_TYPE_U32(pCfg->pSchemaExt[i].compress)));
|
columnEncodeStr(COMPRESS_L1_TYPE_U32(pCfg->pSchemaExt[i].compress)));
|
||||||
sprintf(type + strlen(type), " LEVEL \'%s\'", columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pCfg->pSchemaExt[i].compress)));
|
sprintf(type + strlen(type), " COMPRESS \'%s\'",
|
||||||
|
columnCompressStr(COMPRESS_L2_TYPE_U32(pCfg->pSchemaExt[i].compress)));
|
||||||
|
sprintf(type + strlen(type), " LEVEL \'%s\'",
|
||||||
|
columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pCfg->pSchemaExt[i].compress)));
|
||||||
}
|
}
|
||||||
if (!(pSchema->flags & COL_IS_KEY)) {
|
if (!(pSchema->flags & COL_IS_KEY)) {
|
||||||
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
|
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
|
||||||
|
@ -580,7 +591,8 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||||
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
|
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
|
||||||
char type[32];
|
char type[32];
|
||||||
sprintf(type, "%s", tDataTypes[pSchema->type].name);
|
sprintf(type, "%s", tDataTypes[pSchema->type].name);
|
||||||
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_VARBINARY == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
|
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_VARBINARY == pSchema->type ||
|
||||||
|
TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
|
||||||
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
|
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
|
||||||
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
|
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
|
||||||
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||||
|
@ -823,7 +835,8 @@ static int32_t setCreateViewResultIntoDataBlock(SSDataBlock* pBlock, SShowCreate
|
||||||
|
|
||||||
SViewMeta* pMeta = pStmt->pViewMeta;
|
SViewMeta* pMeta = pStmt->pViewMeta;
|
||||||
ASSERT(pMeta);
|
ASSERT(pMeta);
|
||||||
snprintf(varDataVal(buf2), SHOW_CREATE_VIEW_RESULT_FIELD2_LEN - VARSTR_HEADER_SIZE, "CREATE VIEW `%s`.`%s` AS %s", pStmt->dbName, pStmt->viewName, pMeta->querySql);
|
snprintf(varDataVal(buf2), SHOW_CREATE_VIEW_RESULT_FIELD2_LEN - VARSTR_HEADER_SIZE, "CREATE VIEW `%s`.`%s` AS %s",
|
||||||
|
pStmt->dbName, pStmt->viewName, pMeta->querySql);
|
||||||
int32_t len = strlen(varDataVal(buf2));
|
int32_t len = strlen(varDataVal(buf2));
|
||||||
varDataLen(buf2) = (len > 65535) ? 65535 : len;
|
varDataLen(buf2) = (len > 65535) ? 65535 : len;
|
||||||
colDataSetVal(pCol2, 0, buf2, false);
|
colDataSetVal(pCol2, 0, buf2, false);
|
||||||
|
@ -833,7 +846,6 @@ static int32_t setCreateViewResultIntoDataBlock(SSDataBlock* pBlock, SShowCreate
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
|
static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
int32_t code = buildCreateTbResultDataBlock(&pBlock);
|
int32_t code = buildCreateTbResultDataBlock(&pBlock);
|
||||||
|
|
|
@ -745,7 +745,6 @@ SNode* createTimeOffsetValueNode(SAstCreateContext* pCxt, const SToken* pLiteral
|
||||||
return (SNode*)val;
|
return (SNode*)val;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt) {
|
SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
if (NULL == pCxt->pQueryCxt->db) {
|
if (NULL == pCxt->pQueryCxt->db) {
|
||||||
|
@ -965,7 +964,8 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
|
||||||
return (SNode*)tempTable;
|
return (SNode*)tempTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, EJoinSubType stype, SNode* pLeft, SNode* pRight, SNode* pJoinCond) {
|
SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, EJoinSubType stype, SNode* pLeft, SNode* pRight,
|
||||||
|
SNode* pJoinCond) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
SJoinTableNode* joinTable = (SJoinTableNode*)nodesMakeNode(QUERY_NODE_JOIN_TABLE);
|
SJoinTableNode* joinTable = (SJoinTableNode*)nodesMakeNode(QUERY_NODE_JOIN_TABLE);
|
||||||
CHECK_OUT_OF_MEM(joinTable);
|
CHECK_OUT_OF_MEM(joinTable);
|
||||||
|
@ -1264,7 +1264,6 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
|
||||||
return pStmt;
|
return pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SNode* addJLimitClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pJLimit) {
|
SNode* addJLimitClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pJLimit) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
if (NULL == pJLimit) {
|
if (NULL == pJLimit) {
|
||||||
|
@ -1272,11 +1271,10 @@ SNode* addJLimitClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pJLimit) {
|
||||||
}
|
}
|
||||||
SJoinTableNode* pJoinNode = (SJoinTableNode*)pJoin;
|
SJoinTableNode* pJoinNode = (SJoinTableNode*)pJoin;
|
||||||
pJoinNode->pJLimit = pJLimit;
|
pJoinNode->pJLimit = pJLimit;
|
||||||
|
|
||||||
return pJoin;
|
return pJoin;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SNode* addWindowOffsetClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pWinOffset) {
|
SNode* addWindowOffsetClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pWinOffset) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
if (NULL == pWinOffset) {
|
if (NULL == pWinOffset) {
|
||||||
|
@ -1284,11 +1282,10 @@ SNode* addWindowOffsetClause(SAstCreateContext* pCxt, SNode* pJoin, SNode* pWinO
|
||||||
}
|
}
|
||||||
SJoinTableNode* pJoinNode = (SJoinTableNode*)pJoin;
|
SJoinTableNode* pJoinNode = (SJoinTableNode*)pJoin;
|
||||||
pJoinNode->pWindowOffset = pWinOffset;
|
pJoinNode->pWindowOffset = pWinOffset;
|
||||||
|
|
||||||
return pJoin;
|
return pJoin;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable,
|
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable,
|
||||||
SNodeList* pHint) {
|
SNodeList* pHint) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
@ -1549,10 +1546,10 @@ static SNode* setDatabaseOptionImpl(SAstCreateContext* pCxt, SNode* pOptions, ED
|
||||||
case DB_OPTION_KEEP_TIME_OFFSET: {
|
case DB_OPTION_KEEP_TIME_OFFSET: {
|
||||||
pDbOptions->keepTimeOffset = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
|
pDbOptions->keepTimeOffset = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
|
||||||
break;
|
break;
|
||||||
case DB_OPTION_ENCRYPT_ALGORITHM:
|
case DB_OPTION_ENCRYPT_ALGORITHM:
|
||||||
COPY_STRING_FORM_STR_TOKEN(pDbOptions->encryptAlgorithmStr, (SToken*)pVal);
|
COPY_STRING_FORM_STR_TOKEN(pDbOptions->encryptAlgorithmStr, (SToken*)pVal);
|
||||||
pDbOptions->encryptAlgorithm = TSDB_DEFAULT_ENCRYPT_ALGO;
|
pDbOptions->encryptAlgorithm = TSDB_DEFAULT_ENCRYPT_ALGO;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
@ -1744,17 +1741,17 @@ SNode* setColumnOptions(SAstCreateContext* pCxt, SNode* pOptions, EColumnOptionT
|
||||||
memset(((SColumnOptions*)pOptions)->compress, 0, TSDB_CL_COMPRESS_OPTION_LEN);
|
memset(((SColumnOptions*)pOptions)->compress, 0, TSDB_CL_COMPRESS_OPTION_LEN);
|
||||||
COPY_STRING_FORM_STR_TOKEN(((SColumnOptions*)pOptions)->compress, (SToken*)pVal);
|
COPY_STRING_FORM_STR_TOKEN(((SColumnOptions*)pOptions)->compress, (SToken*)pVal);
|
||||||
if (0 == strlen(((SColumnOptions*)pOptions)->compress)) {
|
if (0 == strlen(((SColumnOptions*)pOptions)->compress)) {
|
||||||
pCxt->errCode = TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
pCxt->errCode = TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case COLUMN_OPTION_LEVEL:
|
case COLUMN_OPTION_LEVEL:
|
||||||
memset(((SColumnOptions*)pOptions)->compressLevel, 0, TSDB_CL_COMPRESS_OPTION_LEN);
|
memset(((SColumnOptions*)pOptions)->compressLevel, 0, TSDB_CL_COMPRESS_OPTION_LEN);
|
||||||
COPY_STRING_FORM_STR_TOKEN(((SColumnOptions*)pOptions)->compressLevel, (SToken*)pVal);
|
COPY_STRING_FORM_STR_TOKEN(((SColumnOptions*)pOptions)->compressLevel, (SToken*)pVal);
|
||||||
if (0 == strlen(((SColumnOptions*)pOptions)->compressLevel)) {
|
if (0 == strlen(((SColumnOptions*)pOptions)->compressLevel)) {
|
||||||
pCxt->errCode = TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
pCxt->errCode = TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case COLUMN_OPTION_PRIMARYKEY:
|
case COLUMN_OPTION_PRIMARYKEY:
|
||||||
((SColumnOptions*)pOptions)->bPrimaryKey = true;
|
((SColumnOptions*)pOptions)->bPrimaryKey = true;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -1789,7 +1786,7 @@ SDataType createDataType(uint8_t type) {
|
||||||
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
|
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
|
||||||
int32_t len = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
|
int32_t len = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
|
||||||
if (type == TSDB_DATA_TYPE_NCHAR) len /= TSDB_NCHAR_SIZE;
|
if (type == TSDB_DATA_TYPE_NCHAR) len /= TSDB_NCHAR_SIZE;
|
||||||
if(pLen) len = taosStr2Int32(pLen->z, NULL, 10);
|
if (pLen) len = taosStr2Int32(pLen->z, NULL, 10);
|
||||||
SDataType dt = {.type = type, .precision = 0, .scale = 0, .bytes = len};
|
SDataType dt = {.type = type, .precision = 0, .scale = 0, .bytes = len};
|
||||||
return dt;
|
return dt;
|
||||||
}
|
}
|
||||||
|
@ -1895,8 +1892,8 @@ SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable,
|
||||||
return createAlterTableStmtFinalize(pRealTable, pStmt);
|
return createAlterTableStmtFinalize(pRealTable, pStmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* createAlterTableAddModifyColOptions(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName,
|
SNode* createAlterTableAddModifyColOptions(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType,
|
||||||
SNode* pOptions) {
|
SToken* pColName, SNode* pOptions) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
if (!checkColumnName(pCxt, pColName)) {
|
if (!checkColumnName(pCxt, pColName)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2965,7 +2962,7 @@ SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
STSMAOptions* pOptions = (STSMAOptions*)nodesMakeNode(QUERY_NODE_TSMA_OPTIONS);
|
STSMAOptions* pOptions = (STSMAOptions*)nodesMakeNode(QUERY_NODE_TSMA_OPTIONS);
|
||||||
if (!pOptions) {
|
if (!pOptions) {
|
||||||
//nodesDestroyList(pTSMAFuncs);
|
// nodesDestroyList(pTSMAFuncs);
|
||||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory");
|
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory");
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -735,7 +735,7 @@ static int32_t getTableTsmas(STranslateContext* pCxt, const SName* pName, SArray
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTsma(STranslateContext* pCxt, const SName* pName, STableTSMAInfo** pTsma) {
|
static int32_t getTsma(STranslateContext* pCxt, const SName* pName, STableTSMAInfo** pTsma) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||||
if (pParCxt->async) {
|
if (pParCxt->async) {
|
||||||
code = getTsmaFromCache(pCxt->pMetaCache, pName, pTsma);
|
code = getTsmaFromCache(pCxt->pMetaCache, pName, pTsma);
|
||||||
|
@ -3676,7 +3676,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
|
||||||
char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
|
char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
|
||||||
for (int32_t i = 0; i < pRealTable->pTsmas->size; ++i) {
|
for (int32_t i = 0; i < pRealTable->pTsmas->size; ++i) {
|
||||||
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
|
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
|
||||||
SName tsmaTargetTbName = {0};
|
SName tsmaTargetTbName = {0};
|
||||||
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName);
|
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName);
|
||||||
int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN, "%s.%s_%s", pTsma->dbFName, pTsma->name,
|
int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN, "%s.%s_%s", pTsma->dbFName, pTsma->name,
|
||||||
pRealTable->table.tableName);
|
pRealTable->table.tableName);
|
||||||
|
@ -3684,7 +3684,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
|
||||||
strncpy(tsmaTargetTbName.tname, buf, MD5_OUTPUT_LEN);
|
strncpy(tsmaTargetTbName.tname, buf, MD5_OUTPUT_LEN);
|
||||||
collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables);
|
collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables);
|
||||||
SVgroupInfo vgInfo = {0};
|
SVgroupInfo vgInfo = {0};
|
||||||
bool exists = false;
|
bool exists = false;
|
||||||
code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tsmaTargetTbName, &vgInfo, &exists);
|
code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tsmaTargetTbName, &vgInfo, &exists);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
ASSERT(exists);
|
ASSERT(exists);
|
||||||
|
@ -5695,8 +5695,8 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
for (int i = 0; i < taosArrayGetSize(aTables); ++i) {
|
for (int i = 0; i < taosArrayGetSize(aTables); ++i) {
|
||||||
SEqCondTbNameTableInfo* pInfo = taosArrayGet(aTables, i);
|
SEqCondTbNameTableInfo* pInfo = taosArrayGet(aTables, i);
|
||||||
int32_t nTbls = taosArrayGetSize(pInfo->aTbnames);
|
int32_t nTbls = taosArrayGetSize(pInfo->aTbnames);
|
||||||
int32_t numOfVgs = pInfo->pRealTable->pVgroupList->numOfVgroups;
|
int32_t numOfVgs = pInfo->pRealTable->pVgroupList->numOfVgroups;
|
||||||
|
|
||||||
SVgroupsInfo* vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo));
|
SVgroupsInfo* vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo));
|
||||||
findVgroupsFromEqualTbname(pCxt, pInfo->aTbnames, pInfo->pRealTable->table.dbName, numOfVgs, vgsInfo);
|
findVgroupsFromEqualTbname(pCxt, pInfo->aTbnames, pInfo->pRealTable->table.dbName, numOfVgs, vgsInfo);
|
||||||
|
@ -5705,7 +5705,7 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
|
||||||
pInfo->pRealTable->pVgroupList = vgsInfo;
|
pInfo->pRealTable->pVgroupList = vgsInfo;
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(vgsInfo);
|
taosMemoryFree(vgsInfo);
|
||||||
}
|
}
|
||||||
vgsInfo = NULL;
|
vgsInfo = NULL;
|
||||||
|
|
||||||
if (pInfo->pRealTable->pTsmas) {
|
if (pInfo->pRealTable->pTsmas) {
|
||||||
|
@ -5714,12 +5714,12 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
|
||||||
|
|
||||||
for (int32_t i = 0; i < pInfo->pRealTable->pTsmas->size; ++i) {
|
for (int32_t i = 0; i < pInfo->pRealTable->pTsmas->size; ++i) {
|
||||||
STableTSMAInfo* pTsma = taosArrayGetP(pInfo->pRealTable->pTsmas, i);
|
STableTSMAInfo* pTsma = taosArrayGetP(pInfo->pRealTable->pTsmas, i);
|
||||||
SArray *pTbNames = taosArrayInit(pInfo->aTbnames->size, POINTER_BYTES);
|
SArray* pTbNames = taosArrayInit(pInfo->aTbnames->size, POINTER_BYTES);
|
||||||
if (!pTbNames) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!pTbNames) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) {
|
for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) {
|
||||||
const char* pTbName = taosArrayGetP(pInfo->aTbnames, k);
|
const char* pTbName = taosArrayGetP(pInfo->aTbnames, k);
|
||||||
char* pNewTbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1);
|
char* pNewTbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1);
|
||||||
if (!pNewTbName) {
|
if (!pNewTbName) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
break;
|
break;
|
||||||
|
@ -7225,9 +7225,9 @@ static int32_t checkColumnOptions(SNodeList* pList) {
|
||||||
if (!checkColumnEncodeOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->encode))
|
if (!checkColumnEncodeOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->encode))
|
||||||
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
||||||
if (!checkColumnCompressOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compress))
|
if (!checkColumnCompressOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compress))
|
||||||
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
|
||||||
if (!checkColumnLevelOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compressLevel))
|
if (!checkColumnLevelOrSetDefault(pCol->dataType.type, ((SColumnOptions*)pCol->pOptions)->compressLevel))
|
||||||
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -7245,11 +7245,11 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray, bool calB
|
||||||
FOREACH(pNode, pList) {
|
FOREACH(pNode, pList) {
|
||||||
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
|
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
|
||||||
SFieldWithOptions field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
|
SFieldWithOptions field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
|
||||||
if (calBytes) {
|
if (calBytes) {
|
||||||
field.bytes = calcTypeBytes(pCol->dataType);
|
field.bytes = calcTypeBytes(pCol->dataType);
|
||||||
} else {
|
} else {
|
||||||
field.bytes = pCol->dataType.bytes;
|
field.bytes = pCol->dataType.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
strcpy(field.name, pCol->colName);
|
strcpy(field.name, pCol->colName);
|
||||||
if (pCol->pOptions) {
|
if (pCol->pOptions) {
|
||||||
|
@ -7742,7 +7742,7 @@ static int32_t addWdurationToSampleProjects(SNodeList* pProjectionList) {
|
||||||
return nodesListAppend(pProjectionList, (SNode*)pFunc);
|
return nodesListAppend(pProjectionList, (SNode*)pFunc);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildProjectsForSampleAst(SSampleAstInfo* pInfo, SNodeList** pList, int32_t *pProjectionTotalLen) {
|
static int32_t buildProjectsForSampleAst(SSampleAstInfo* pInfo, SNodeList** pList, int32_t* pProjectionTotalLen) {
|
||||||
SNodeList* pProjectionList = pInfo->pFuncs;
|
SNodeList* pProjectionList = pInfo->pFuncs;
|
||||||
pInfo->pFuncs = NULL;
|
pInfo->pFuncs = NULL;
|
||||||
|
|
||||||
|
@ -8118,13 +8118,15 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
|
||||||
TAOS_FIELD field = {0};
|
TAOS_FIELD field = {0};
|
||||||
strcpy(field.name, pStmt->colName);
|
strcpy(field.name, pStmt->colName);
|
||||||
if (!checkColumnEncode(pStmt->pColOptions->encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (!checkColumnEncode(pStmt->pColOptions->encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
||||||
if (!checkColumnCompress(pStmt->pColOptions->compress)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (!checkColumnCompress(pStmt->pColOptions->compress)) return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
|
||||||
if (!checkColumnLevel(pStmt->pColOptions->compressLevel)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (!checkColumnLevel(pStmt->pColOptions->compressLevel)) return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
|
||||||
int8_t valid =
|
int32_t code =
|
||||||
setColCompressByOption(pStmt->dataType.type, columnEncodeVal(pStmt->pColOptions->encode),
|
setColCompressByOption(pStmt->dataType.type, columnEncodeVal(pStmt->pColOptions->encode),
|
||||||
columnCompressVal(pStmt->pColOptions->compress),
|
columnCompressVal(pStmt->pColOptions->compress),
|
||||||
columnLevelVal(pStmt->pColOptions->compressLevel), false, (uint32_t*)&field.bytes);
|
columnLevelVal(pStmt->pColOptions->compressLevel), false, (uint32_t*)&field.bytes);
|
||||||
if (!valid) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
taosArrayPush(pAlterReq->pFields, &field);
|
taosArrayPush(pAlterReq->pFields, &field);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -10708,8 +10710,8 @@ static int32_t compareTsmaFuncWithFuncAndColId(SNode* pNode1, SNode* pNode2) {
|
||||||
|
|
||||||
// pFuncs are already sorted by funcId and colId
|
// pFuncs are already sorted by funcId and colId
|
||||||
static int32_t deduplicateTsmaFuncs(SNodeList* pFuncs) {
|
static int32_t deduplicateTsmaFuncs(SNodeList* pFuncs) {
|
||||||
SNode* pLast = NULL;
|
SNode* pLast = NULL;
|
||||||
SNode* pFunc = NULL;
|
SNode* pFunc = NULL;
|
||||||
SNodeList* pRes = NULL;
|
SNodeList* pRes = NULL;
|
||||||
FOREACH(pFunc, pFuncs) {
|
FOREACH(pFunc, pFuncs) {
|
||||||
if (pLast) {
|
if (pLast) {
|
||||||
|
@ -10726,7 +10728,8 @@ static int32_t deduplicateTsmaFuncs(SNodeList* pFuncs) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildTSMAAstStreamSubTable(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbname, SNode** pSubTable) {
|
static int32_t buildTSMAAstStreamSubTable(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbname,
|
||||||
|
SNode** pSubTable) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SFunctionNode* pMd5Func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
SFunctionNode* pMd5Func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||||
SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||||
|
@ -10768,8 +10771,8 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
|
static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const char* tbName,
|
||||||
const char* tbName, int32_t numOfTags, const SSchema* pTags) {
|
int32_t numOfTags, const SSchema* pTags) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SSampleAstInfo info = {0};
|
SSampleAstInfo info = {0};
|
||||||
info.createSmaIndex = true;
|
info.createSmaIndex = true;
|
||||||
|
@ -10813,16 +10816,17 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
|
||||||
if (!pTagCol) code = TSDB_CODE_OUT_OF_MEMORY;
|
if (!pTagCol) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc, (SNode**)&pSubTable);
|
code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc,
|
||||||
|
(SNode**)&pSubTable);
|
||||||
info.pSubTable = (SNode*)pSubTable;
|
info.pSubTable = (SNode*)pSubTable;
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS)
|
if (code == TSDB_CODE_SUCCESS)
|
||||||
code = nodesListMakeStrictAppend(&info.pTags, pStmt->pOptions->recursiveTsma ? pTagCol : nodesCloneNode((SNode*)pTbnameFunc));
|
code = nodesListMakeStrictAppend(
|
||||||
|
&info.pTags, pStmt->pOptions->recursiveTsma ? pTagCol : nodesCloneNode((SNode*)pTbnameFunc));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS && !pStmt->pOptions->recursiveTsma)
|
if (code == TSDB_CODE_SUCCESS && !pStmt->pOptions->recursiveTsma) code = fmCreateStateFuncs(info.pFuncs);
|
||||||
code = fmCreateStateFuncs(info.pFuncs);
|
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
int32_t pProjectionTotalLen = 0;
|
int32_t pProjectionTotalLen = 0;
|
||||||
|
@ -10914,7 +10918,8 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, SName* useTbName) {
|
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
|
||||||
|
SName* useTbName) {
|
||||||
SName name;
|
SName name;
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name);
|
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name);
|
||||||
memset(&name, 0, sizeof(SName));
|
memset(&name, 0, sizeof(SName));
|
||||||
|
@ -10924,15 +10929,15 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
||||||
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
||||||
pReq->intervalUnit = TIME_UNIT_MILLISECOND;
|
pReq->intervalUnit = TIME_UNIT_MILLISECOND;
|
||||||
|
|
||||||
#define TSMA_MIN_INTERVAL_MS 1 // 1ms
|
#define TSMA_MIN_INTERVAL_MS 1 // 1ms
|
||||||
#define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h
|
#define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h
|
||||||
if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) {
|
if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) {
|
||||||
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
STableTSMAInfo* pRecursiveTsma = NULL;
|
STableTSMAInfo* pRecursiveTsma = NULL;
|
||||||
int32_t numOfCols = 0, numOfTags = 0;
|
int32_t numOfCols = 0, numOfTags = 0;
|
||||||
SSchema * pCols = NULL, *pTags = NULL;
|
SSchema * pCols = NULL, *pTags = NULL;
|
||||||
|
@ -11022,7 +11027,7 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = buildCreateTSMAReq(pCxt, pStmt, pStmt->pReq, &useTbName);
|
code = buildCreateTSMAReq(pCxt, pStmt, pStmt->pReq, &useTbName);
|
||||||
}
|
}
|
||||||
if ( TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = collectUseTable(&useTbName, pCxt->pTargetTables);
|
code = collectUseTable(&useTbName, pCxt->pTargetTables);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -11063,7 +11068,8 @@ int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSData
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (interval.interval > 0) {
|
if (interval.interval > 0) {
|
||||||
pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision);
|
pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit,
|
||||||
|
interval.precision);
|
||||||
} else {
|
} else {
|
||||||
pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window
|
pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window
|
||||||
}
|
}
|
||||||
|
@ -11074,7 +11080,7 @@ int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSData
|
||||||
code = setQuery(&cxt, pQuery);
|
code = setQuery(&cxt, pQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
toName(pParseCxt->acctId, pStmt->dbName, pStmt->originalTbName, &name);
|
toName(pParseCxt->acctId, pStmt->dbName, pStmt->originalTbName, &name);
|
||||||
code = collectUseTable(&name, cxt.pTargetTables);
|
code = collectUseTable(&name, cxt.pTargetTables);
|
||||||
|
@ -11090,7 +11096,7 @@ int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSData
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateDropTSMA(STranslateContext* pCxt, SDropTSMAStmt* pStmt) {
|
static int32_t translateDropTSMA(STranslateContext* pCxt, SDropTSMAStmt* pStmt) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SMDropSmaReq dropReq = {0};
|
SMDropSmaReq dropReq = {0};
|
||||||
SName name;
|
SName name;
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), dropReq.name);
|
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), dropReq.name);
|
||||||
|
@ -12033,13 +12039,13 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
||||||
toSchema(pColDef, index + 1, pScheam);
|
toSchema(pColDef, index + 1, pScheam);
|
||||||
if (pColDef->pOptions) {
|
if (pColDef->pOptions) {
|
||||||
req.colCmpr.pColCmpr[index].id = index + 1;
|
req.colCmpr.pColCmpr[index].id = index + 1;
|
||||||
int8_t valid = setColCompressByOption(
|
int32_t code = setColCompressByOption(
|
||||||
pScheam->type, columnEncodeVal(((SColumnOptions*)pColDef->pOptions)->encode),
|
pScheam->type, columnEncodeVal(((SColumnOptions*)pColDef->pOptions)->encode),
|
||||||
columnCompressVal(((SColumnOptions*)pColDef->pOptions)->compress),
|
columnCompressVal(((SColumnOptions*)pColDef->pOptions)->compress),
|
||||||
columnLevelVal(((SColumnOptions*)pColDef->pOptions)->compressLevel), true, &req.colCmpr.pColCmpr[index].alg);
|
columnLevelVal(((SColumnOptions*)pColDef->pOptions)->compressLevel), true, &req.colCmpr.pColCmpr[index].alg);
|
||||||
if (!valid) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tdDestroySVCreateTbReq(&req);
|
tdDestroySVCreateTbReq(&req);
|
||||||
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
++index;
|
++index;
|
||||||
|
@ -12499,7 +12505,6 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
|
||||||
goto over;
|
goto over;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SVgroupInfo info = {0};
|
SVgroupInfo info = {0};
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
|
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
|
||||||
|
@ -12586,7 +12591,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
|
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
|
||||||
FOREACH(pNode, pStmt->pTables) {
|
FOREACH(pNode, pStmt->pTables) {
|
||||||
SDropTableClause* pClause = (SDropTableClause*)pNode;
|
SDropTableClause* pClause = (SDropTableClause*)pNode;
|
||||||
SName name;
|
SName name;
|
||||||
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
|
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
|
||||||
int32_t code = buildDropTableVgroupHashmap(pCxt, pClause, &name, &tableType, pVgroupHashmap);
|
int32_t code = buildDropTableVgroupHashmap(pCxt, pClause, &name, &tableType, pVgroupHashmap);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
@ -12653,8 +12658,8 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
|
|
||||||
static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
||||||
SVAlterTbReq* pReq) {
|
SVAlterTbReq* pReq) {
|
||||||
SName tbName = {0};
|
SName tbName = {0};
|
||||||
SArray* pTsmas = NULL;
|
SArray* pTsmas = NULL;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pCxt->pMetaCache) {
|
if (pCxt->pMetaCache) {
|
||||||
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName);
|
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName);
|
||||||
|
@ -12679,9 +12684,9 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
|
||||||
pReq->colId = pSchema->colId;
|
pReq->colId = pSchema->colId;
|
||||||
pReq->tagType = pSchema->type;
|
pReq->tagType = pSchema->type;
|
||||||
|
|
||||||
STag* pTag = NULL;
|
STag* pTag = NULL;
|
||||||
SToken token;
|
SToken token;
|
||||||
char tokenBuf[TSDB_MAX_TAGS_LEN];
|
char tokenBuf[TSDB_MAX_TAGS_LEN];
|
||||||
const char* tagStr = pStmt->pVal->literal;
|
const char* tagStr = pStmt->pVal->literal;
|
||||||
NEXT_TOKEN_WITH_PREV(tagStr, token);
|
NEXT_TOKEN_WITH_PREV(tagStr, token);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -12879,14 +12884,12 @@ static int buildAlterTableColumnCompress(STranslateContext* pCxt, SAlterTableStm
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!checkColumnEncode(pStmt->pColOptions->encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (!checkColumnEncode(pStmt->pColOptions->encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
||||||
if (!checkColumnCompress(pStmt->pColOptions->compress)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (!checkColumnCompress(pStmt->pColOptions->compress)) return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
|
||||||
if (!checkColumnLevel(pStmt->pColOptions->compressLevel)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (!checkColumnLevel(pStmt->pColOptions->compressLevel)) return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
|
||||||
int8_t valid = setColCompressByOption(pSchema->type, columnEncodeVal(pStmt->pColOptions->encode),
|
int8_t code = setColCompressByOption(pSchema->type, columnEncodeVal(pStmt->pColOptions->encode),
|
||||||
columnCompressVal(pStmt->pColOptions->compress),
|
columnCompressVal(pStmt->pColOptions->compress),
|
||||||
columnLevelVal(pStmt->pColOptions->compressLevel), true, &pReq->compress);
|
columnLevelVal(pStmt->pColOptions->compressLevel), true, &pReq->compress);
|
||||||
if (!valid) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
return code;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
static int32_t buildAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
||||||
|
|
|
@ -151,8 +151,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node in current query policy configuration")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node in current query policy configuration")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NOT_STABLE_ERROR, "Table is not a super table")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NOT_STABLE_ERROR, "Table is not a super table")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CACHE_ERROR, "Stmt cache error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CACHE_ERROR, "Stmt cache error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_ERROR, "Invalid compress param")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_ERROR, "Invalid encode param")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_NULL, "Not found compress param")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_NULL, "Not found compress param")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_COMPRESS_PARAM_ERROR, "Invalid compress param")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR, "Invalid compress level param")
|
||||||
|
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INTERNAL_ERROR, "Internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INTERNAL_ERROR, "Internal error")
|
||||||
|
|
||||||
// mnode-common
|
// mnode-common
|
||||||
|
@ -221,7 +225,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST, "Column compress already exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST, "Same with old param")
|
||||||
|
|
||||||
|
|
||||||
// mnode-func
|
// mnode-func
|
||||||
|
@ -397,7 +401,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_IS_VOTER, "Vnode already is a vo
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_DIR_ALREADY_EXIST, "Vnode directory already exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_DIR_ALREADY_EXIST, "Vnode directory already exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE, "Single replica vnode data will lost permanently after this operation, if you make sure this, please use drop dnode <id> unsafe to execute")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE, "Single replica vnode data will lost permanently after this operation, if you make sure this, please use drop dnode <id> unsafe to execute")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ARB_NOT_SYNCED, "Vgroup peer is not synced")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ARB_NOT_SYNCED, "Vgroup peer is not synced")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COLUMN_COMPRESS_ALREADY_EXIST,"Column compress already exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COLUMN_COMPRESS_ALREADY_EXIST,"Same with old param")
|
||||||
|
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
|
|
|
@ -128,8 +128,8 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
|
||||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
||||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
|
||||||
|
|
|
@ -61,8 +61,8 @@ python3 ./test.py -f 7-tmq/subscribeStb3.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
|
python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
|
||||||
python3 ./test.py -f 7-tmq/ins_topics_test.py
|
python3 ./test.py -f 7-tmq/ins_topics_test.py
|
||||||
python3 ./test.py -f 7-tmq/tmqMaxTopic.py
|
python3 ./test.py -f 7-tmq/tmqMaxTopic.py
|
||||||
#python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
||||||
#python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
||||||
python3 ./test.py -f 7-tmq/tmqClientConsLog.py
|
python3 ./test.py -f 7-tmq/tmqClientConsLog.py
|
||||||
python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
|
python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
|
||||||
python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
|
python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
|
||||||
|
|
Loading…
Reference in New Issue