Merge pull request #11615 from taosdata/feature/vnode_refact1
refact vnode
This commit is contained in:
commit
0b71a2866e
|
@ -253,22 +253,16 @@ typedef struct {
|
|||
SSubmitRspBlock failedBlocks[];
|
||||
} SSubmitRsp;
|
||||
|
||||
#define SCHEMA_SMA_ON 0x1
|
||||
#define SCHEMA_IDX_ON 0x2
|
||||
typedef struct SSchema {
|
||||
int8_t type;
|
||||
int8_t index; // default is 0, not index created
|
||||
int8_t flags;
|
||||
col_id_t colId;
|
||||
int32_t bytes;
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
} SSchema;
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
int8_t sma; // ETsdbBSmaType and default is TSDB_BSMA_TYPE_I
|
||||
col_id_t colId;
|
||||
int32_t bytes;
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
} SSchemaEx;
|
||||
|
||||
#define SSCHMEA_TYPE(s) ((s)->type)
|
||||
#define SSCHMEA_SMA(s) ((s)->sma)
|
||||
#define SSCHMEA_COLID(s) ((s)->colId)
|
||||
|
@ -1454,7 +1448,7 @@ typedef struct SVCreateTbReq {
|
|||
tb_uid_t suid;
|
||||
col_id_t nCols;
|
||||
col_id_t nBSmaCols;
|
||||
SSchemaEx* pSchema;
|
||||
SSchema* pSchema;
|
||||
col_id_t nTagCols;
|
||||
SSchema* pTagSchema;
|
||||
SRSmaParam* pRSmaParam;
|
||||
|
@ -1466,7 +1460,7 @@ typedef struct SVCreateTbReq {
|
|||
struct {
|
||||
col_id_t nCols;
|
||||
col_id_t nBSmaCols;
|
||||
SSchemaEx* pSchema;
|
||||
SSchema* pSchema;
|
||||
SRSmaParam* pRSmaParam;
|
||||
} ntbCfg;
|
||||
};
|
||||
|
@ -2031,16 +2025,13 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq)
|
|||
|
||||
typedef struct {
|
||||
uint32_t nCols;
|
||||
union {
|
||||
SSchema* pSchema;
|
||||
SSchemaEx* pSchemaEx;
|
||||
};
|
||||
} SSchemaWrapper;
|
||||
|
||||
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->index);
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->flags);
|
||||
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
|
||||
tlen += taosEncodeFixedI16(buf, pSchema->colId);
|
||||
tlen += taosEncodeString(buf, pSchema->name);
|
||||
|
@ -2049,7 +2040,7 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema
|
|||
|
||||
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
|
||||
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
||||
buf = taosDecodeFixedI8(buf, &pSchema->index);
|
||||
buf = taosDecodeFixedI8(buf, &pSchema->flags);
|
||||
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
||||
buf = taosDecodeFixedI16(buf, &pSchema->colId);
|
||||
buf = taosDecodeStringTo(buf, pSchema->name);
|
||||
|
@ -2058,7 +2049,7 @@ static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
|
|||
|
||||
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
|
||||
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pSchema->index) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1;
|
||||
if (tEncodeI16(pEncoder, pSchema->colId) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
|
||||
|
@ -2067,7 +2058,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
|
|||
|
||||
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
|
||||
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pSchema->index) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1;
|
||||
if (tDecodeI16(pDecoder, &pSchema->colId) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
|
||||
|
|
|
@ -411,7 +411,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
|
||||
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
|
||||
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].sma);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags);
|
||||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId);
|
||||
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes);
|
||||
tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name);
|
||||
|
@ -419,7 +419,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
|
||||
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
|
||||
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].index);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].flags);
|
||||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
|
||||
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
|
||||
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
|
||||
|
@ -443,7 +443,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
|
||||
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
|
||||
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].sma);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].flags);
|
||||
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pSchema[i].colId);
|
||||
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
|
||||
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
|
||||
|
@ -478,10 +478,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
|||
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
|
||||
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
|
||||
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
|
||||
pReq->stbCfg.pSchema = (SSchemaEx *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchemaEx));
|
||||
pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
|
||||
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
|
||||
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
|
||||
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].sma));
|
||||
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].flags));
|
||||
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId));
|
||||
buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes));
|
||||
buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name);
|
||||
|
@ -490,7 +490,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
|||
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
|
||||
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
|
||||
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
|
||||
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].index));
|
||||
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].flags));
|
||||
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
|
||||
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
|
||||
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
|
||||
|
@ -520,10 +520,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
|||
case TD_NORMAL_TABLE:
|
||||
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.nCols);
|
||||
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
|
||||
pReq->ntbCfg.pSchema = (SSchemaEx *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchemaEx));
|
||||
pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema));
|
||||
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
|
||||
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].sma);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].flags);
|
||||
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.pSchema[i].colId);
|
||||
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
|
||||
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
aux_source_directory(. DND_VNODE_TEST_SRC)
|
||||
add_executable(dvnodeTest ${DND_VNODE_TEST_SRC})
|
||||
target_link_libraries(
|
||||
dvnodeTest
|
||||
PUBLIC sut
|
||||
)
|
||||
# aux_source_directory(. DND_VNODE_TEST_SRC)
|
||||
# add_executable(dvnodeTest ${DND_VNODE_TEST_SRC})
|
||||
# target_link_libraries(
|
||||
# dvnodeTest
|
||||
# PUBLIC sut
|
||||
# )
|
||||
|
||||
add_test(
|
||||
NAME dvnodeTest
|
||||
COMMAND dvnodeTest
|
||||
)
|
||||
# add_test(
|
||||
# NAME dvnodeTest
|
||||
# COMMAND dvnodeTest
|
||||
# )
|
||||
|
|
|
@ -18,8 +18,8 @@
|
|||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndInfoSchema.h"
|
||||
#include "mndPerfSchema.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndPerfSchema.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
|
@ -40,7 +40,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp);
|
|||
static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp);
|
||||
static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp);
|
||||
static int32_t mndProcessTableMetaReq(SNodeMsg *pReq);
|
||||
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
|
||||
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitStb(SMnode *pMnode) {
|
||||
|
@ -333,9 +333,9 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
|
|||
}
|
||||
|
||||
static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSchema) {
|
||||
if (*(col_id_t *)colId < ((SSchemaEx *)pSchema)->colId) {
|
||||
if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) {
|
||||
return -1;
|
||||
} else if (*(col_id_t *)colId > ((SSchemaEx *)pSchema)->colId) {
|
||||
} else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
|
@ -360,49 +360,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
req.stbCfg.nTagCols = pStb->numOfTags;
|
||||
req.stbCfg.pTagSchema = pStb->pTags;
|
||||
req.stbCfg.nBSmaCols = pStb->numOfSmas;
|
||||
req.stbCfg.pSchema = (SSchemaEx *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaEx));
|
||||
req.stbCfg.pSchema = (SSchema *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
|
||||
if (req.stbCfg.pSchema == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int bSmaStat = 0; // no column has bsma
|
||||
if (pStb->numOfSmas == pStb->numOfColumns) { // assume pColumns > 0
|
||||
bSmaStat = 1; // all columns have bsma
|
||||
} else if (pStb->numOfSmas != 0) {
|
||||
bSmaStat = 2; // partial columns have bsma
|
||||
TASSERT(pStb->pSmas != NULL); // TODO: remove the assert
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < req.stbCfg.nCols; ++i) {
|
||||
SSchemaEx *pSchemaEx = req.stbCfg.pSchema + i;
|
||||
SSchema *pSchema = pStb->pColumns + i;
|
||||
pSchemaEx->type = pSchema->type;
|
||||
pSchemaEx->sma = (bSmaStat == 1) ? TSDB_BSMA_TYPE_LATEST : TSDB_BSMA_TYPE_NONE;
|
||||
pSchemaEx->colId = pSchema->colId;
|
||||
pSchemaEx->bytes = pSchema->bytes;
|
||||
memcpy(pSchemaEx->name, pSchema->name, TSDB_COL_NAME_LEN);
|
||||
}
|
||||
|
||||
if (bSmaStat == 2) {
|
||||
if (pStb->pSmas == NULL) {
|
||||
mError("stb:%s, sma options is empty", pStb->name);
|
||||
taosMemoryFreeClear(req.stbCfg.pSchema);
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
|
||||
SSchema *pSmaSchema = pStb->pSmas + i;
|
||||
SSchemaEx *pColSchema = taosbsearch(&pSmaSchema->colId, req.stbCfg.pSchema, req.stbCfg.nCols, sizeof(SSchemaEx),
|
||||
schemaExColIdCompare, TD_EQ);
|
||||
if (pColSchema == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
taosMemoryFreeClear(req.stbCfg.pSchema);
|
||||
mError("stb:%s, sma col:%s not found in columns", pStb->name, pSmaSchema->name);
|
||||
return NULL;
|
||||
}
|
||||
pColSchema->sma = TSDB_BSMA_TYPE_LATEST;
|
||||
}
|
||||
memcpy(req.stbCfg.pSchema, pStb->pColumns, sizeof(SSchema) * pStb->numOfColumns);
|
||||
for (int i = 0; i < pStb->numOfColumns; i++) {
|
||||
req.stbCfg.pSchema[i].flags = SCHEMA_SMA_ON;
|
||||
}
|
||||
|
||||
SRSmaParam *pRSmaParam = NULL;
|
||||
|
@ -1644,14 +1610,14 @@ static void mndExtractTableName(char *tableId, char *name) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
|
||||
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->pNode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SStbObj *pStb = NULL;
|
||||
int32_t cols = 0;
|
||||
|
||||
SDbObj* pDb = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
if (strlen(pShow->db) > 0) {
|
||||
pDb = mndAcquireDb(pMnode, pShow->db);
|
||||
if (pDb == NULL) return terrno;
|
||||
|
@ -1673,16 +1639,16 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo
|
|||
mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]);
|
||||
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
||||
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char*) stbName, false);
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
|
||||
|
||||
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB);
|
||||
tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&name, varDataVal(db));
|
||||
varDataSetLen(db, strlen(varDataVal(db)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char*) db, false);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)db, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
|
||||
|
@ -1696,7 +1662,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
|
||||
|
||||
char* p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures
|
||||
char *p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures
|
||||
if (p != NULL) {
|
||||
if (pStb->commentLen != 0) {
|
||||
STR_TO_VARSTR(p, pStb->comment);
|
||||
|
|
|
@ -299,10 +299,10 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
|||
|
||||
if (pTbCfg->type == META_SUPER_TABLE) {
|
||||
schemaWrapper.nCols = pTbCfg->stbCfg.nCols;
|
||||
schemaWrapper.pSchemaEx = pTbCfg->stbCfg.pSchema;
|
||||
schemaWrapper.pSchema = pTbCfg->stbCfg.pSchema;
|
||||
} else {
|
||||
schemaWrapper.nCols = pTbCfg->ntbCfg.nCols;
|
||||
schemaWrapper.pSchemaEx = pTbCfg->ntbCfg.pSchema;
|
||||
schemaWrapper.pSchema = pTbCfg->ntbCfg.pSchema;
|
||||
}
|
||||
pVal = pBuf = buf;
|
||||
metaEncodeSchemaEx(&pBuf, &schemaWrapper);
|
||||
|
@ -464,7 +464,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
|||
tb_uid_t quid;
|
||||
SSchemaWrapper *pSW;
|
||||
STSchemaBuilder sb;
|
||||
SSchemaEx *pSchema;
|
||||
SSchema *pSchema;
|
||||
STSchema *pTSchema;
|
||||
STbCfg *pTbCfg;
|
||||
|
||||
|
@ -482,8 +482,8 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
|||
|
||||
tdInitTSchemaBuilder(&sb, 0);
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
pSchema = pSW->pSchemaEx + i;
|
||||
tdAddColToSchema(&sb, pSchema->type, pSchema->sma, pSchema->colId, pSchema->bytes);
|
||||
pSchema = pSW->pSchema + i;
|
||||
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
|
||||
}
|
||||
pTSchema = tdGetSchemaFromBuilder(&sb);
|
||||
tdDestroyTSchemaBuilder(&sb);
|
||||
|
@ -939,7 +939,7 @@ static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
|
|||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
pSchema = pSW->pSchema + i;
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->index);
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->flags);
|
||||
tlen += taosEncodeFixedI16(buf, pSchema->colId);
|
||||
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
|
||||
tlen += taosEncodeString(buf, pSchema->name);
|
||||
|
@ -967,13 +967,13 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
|
|||
|
||||
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
|
||||
int tlen = 0;
|
||||
SSchemaEx *pSchema;
|
||||
SSchema *pSchema;
|
||||
|
||||
tlen += taosEncodeFixedU32(buf, pSW->nCols);
|
||||
for (int i = 0; i < pSW->nCols; ++i) {
|
||||
pSchema = pSW->pSchemaEx + i;
|
||||
pSchema = pSW->pSchema + i;
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->sma);
|
||||
tlen += taosEncodeFixedI8(buf, pSchema->flags);
|
||||
tlen += taosEncodeFixedI16(buf, pSchema->colId);
|
||||
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
|
||||
tlen += taosEncodeString(buf, pSchema->name);
|
||||
|
@ -985,11 +985,11 @@ static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
|
|||
static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) {
|
||||
buf = taosDecodeFixedU32(buf, &pSW->nCols);
|
||||
if (isGetEx) {
|
||||
pSW->pSchemaEx = (SSchemaEx *)taosMemoryMalloc(sizeof(SSchemaEx) * pSW->nCols);
|
||||
pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
SSchemaEx *pSchema = pSW->pSchemaEx + i;
|
||||
SSchema *pSchema = pSW->pSchema + i;
|
||||
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
||||
buf = taosDecodeFixedI8(buf, &pSchema->sma);
|
||||
buf = taosDecodeFixedI8(buf, &pSchema->flags);
|
||||
buf = taosDecodeFixedI16(buf, &pSchema->colId);
|
||||
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
||||
buf = taosDecodeStringTo(buf, pSchema->name);
|
||||
|
|
|
@ -498,7 +498,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
|||
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type) ) {
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type)) {
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
||||
} else {
|
||||
|
@ -838,7 +838,8 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
|
|||
return NULL;
|
||||
}
|
||||
pFunc->pParameterList = nodesMakeList();
|
||||
if (NULL == pFunc->pParameterList || TSDB_CODE_SUCCESS != nodesListStrictAppend(pFunc->pParameterList, nodesCloneNode(pExpr))) {
|
||||
if (NULL == pFunc->pParameterList ||
|
||||
TSDB_CODE_SUCCESS != nodesListStrictAppend(pFunc->pParameterList, nodesCloneNode(pExpr))) {
|
||||
nodesDestroyNode(pFunc);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -889,11 +890,13 @@ static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, SN
|
|||
}
|
||||
|
||||
static bool isStar(SNode* pNode) {
|
||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) &&
|
||||
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
}
|
||||
|
||||
static bool isTableStar(SNode* pNode) {
|
||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) &&
|
||||
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
}
|
||||
|
||||
static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrcParas, SNodeList** pOutput) {
|
||||
|
@ -1128,9 +1131,7 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
return translateExprList(pCxt, pSelect->pGroupByList);
|
||||
}
|
||||
|
||||
static bool isValTimeUnit(char unit) {
|
||||
return ('n' == unit || 'y' == unit);
|
||||
}
|
||||
static bool isValTimeUnit(char unit) { return ('n' == unit || 'y' == unit); }
|
||||
|
||||
static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit) {
|
||||
int64_t days = convertTimeFromPrecisionToUnit(val, fromPrecision, 'd');
|
||||
|
@ -1174,7 +1175,8 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
|
|||
}
|
||||
bool fixed = !isValTimeUnit(pOffset->unit) && !valInter;
|
||||
if ((fixed && pOffset->datum.i >= pInter->datum.i) ||
|
||||
(!fixed && getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit) >= getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) {
|
||||
(!fixed && getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit) >=
|
||||
getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG);
|
||||
}
|
||||
}
|
||||
|
@ -1367,7 +1369,8 @@ static int32_t checkRangeOption(STranslateContext* pCxt, const char* pName, SVal
|
|||
if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
|
||||
return pCxt->errCode;
|
||||
}
|
||||
if (pVal->isDuration && (TIME_UNIT_MINUTE != pVal->unit && TIME_UNIT_HOUR != pVal->unit && TIME_UNIT_DAY != pVal->unit)) {
|
||||
if (pVal->isDuration &&
|
||||
(TIME_UNIT_MINUTE != pVal->unit && TIME_UNIT_HOUR != pVal->unit && TIME_UNIT_DAY != pVal->unit)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_OPTION_UNIT, pName, pVal->unit);
|
||||
}
|
||||
int64_t val = getBigintFromValueNode(pVal);
|
||||
|
@ -1462,9 +1465,12 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) {
|
|||
SValueNode* pKeep0 = (SValueNode*)nodesListGetNode(pKeep, 0);
|
||||
SValueNode* pKeep1 = (SValueNode*)nodesListGetNode(pKeep, 1);
|
||||
SValueNode* pKeep2 = (SValueNode*)nodesListGetNode(pKeep, 2);
|
||||
if ((pKeep0->isDuration && (TIME_UNIT_MINUTE != pKeep0->unit && TIME_UNIT_HOUR != pKeep0->unit && TIME_UNIT_DAY != pKeep0->unit)) ||
|
||||
(pKeep1->isDuration && (TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) ||
|
||||
(pKeep2->isDuration && (TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) {
|
||||
if ((pKeep0->isDuration &&
|
||||
(TIME_UNIT_MINUTE != pKeep0->unit && TIME_UNIT_HOUR != pKeep0->unit && TIME_UNIT_DAY != pKeep0->unit)) ||
|
||||
(pKeep1->isDuration &&
|
||||
(TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) ||
|
||||
(pKeep2->isDuration &&
|
||||
(TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit, pKeep2->unit);
|
||||
}
|
||||
|
||||
|
@ -1521,7 +1527,8 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, SDatabaseOptions* p
|
|||
code = checkRangeOption(pCxt, "compression", pOptions->pCompressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkRangeOption(pCxt, "daysPerFile", pOptions->pDaysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
|
||||
code =
|
||||
checkRangeOption(pCxt, "daysPerFile", pOptions->pDaysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkRangeOption(pCxt, "fsyncPeriod", pOptions->pFsyncPeriod, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
|
||||
|
@ -1574,7 +1581,7 @@ static int32_t checkCreateDatabase(STranslateContext* pCxt, SCreateDatabaseStmt*
|
|||
return checkDatabaseOptions(pCxt, pStmt->pOptions);
|
||||
}
|
||||
|
||||
typedef int32_t (*FSerializeFunc)(void *pBuf, int32_t bufLen, void *pReq);
|
||||
typedef int32_t (*FSerializeFunc)(void* pBuf, int32_t bufLen, void* pReq);
|
||||
|
||||
static int32_t buildCmdMsg(STranslateContext* pCxt, int16_t msgType, FSerializeFunc func, void* pReq) {
|
||||
pCxt->pCmdMsg = taosMemoryMalloc(sizeof(SCmdMsgInfo));
|
||||
|
@ -2179,7 +2186,8 @@ static int16_t getCreateComponentNodeMsgType(ENodeType type) {
|
|||
|
||||
static int32_t translateCreateComponentNode(STranslateContext* pCxt, SCreateComponentNodeStmt* pStmt) {
|
||||
SMCreateQnodeReq createReq = {.dnodeId = pStmt->dnodeId};
|
||||
return buildCmdMsg(pCxt, getCreateComponentNodeMsgType(nodeType(pStmt)), (FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &createReq);
|
||||
return buildCmdMsg(pCxt, getCreateComponentNodeMsgType(nodeType(pStmt)),
|
||||
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &createReq);
|
||||
}
|
||||
|
||||
static int16_t getDropComponentNodeMsgType(ENodeType type) {
|
||||
|
@ -2200,7 +2208,8 @@ static int16_t getDropComponentNodeMsgType(ENodeType type) {
|
|||
|
||||
static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponentNodeStmt* pStmt) {
|
||||
SDDropQnodeReq dropReq = {.dnodeId = pStmt->dnodeId};
|
||||
return buildCmdMsg(pCxt, getDropComponentNodeMsgType(nodeType(pStmt)), (FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
|
||||
return buildCmdMsg(pCxt, getDropComponentNodeMsgType(nodeType(pStmt)),
|
||||
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
|
||||
}
|
||||
|
||||
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
||||
|
@ -2304,11 +2313,13 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
|
|||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pWatermark) {
|
||||
code = (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark)) ? pCxt->errCode : TSDB_CODE_SUCCESS;
|
||||
code = (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark)) ? pCxt->errCode
|
||||
: TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
createReq.triggerType = pStmt->pOptions->triggerType;
|
||||
createReq.watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
|
||||
createReq.watermark =
|
||||
(NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -2460,9 +2471,7 @@ static int32_t extractSelectResultSchema(const SSelectStmt* pSelect, int32_t* nu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int8_t extractResultTsPrecision(const SSelectStmt* pSelect) {
|
||||
return pSelect->precision;
|
||||
}
|
||||
static int8_t extractResultTsPrecision(const SSelectStmt* pSelect) { return pSelect->precision; }
|
||||
|
||||
static int32_t extractExplainResultSchema(int32_t* numOfCols, SSchema** pSchema) {
|
||||
*numOfCols = 1;
|
||||
|
@ -2725,11 +2734,15 @@ typedef struct SVgroupTablesBatch {
|
|||
char dbName[TSDB_DB_NAME_LEN];
|
||||
} SVgroupTablesBatch;
|
||||
|
||||
static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchemaEx* pSchema) {
|
||||
static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
|
||||
int8_t flags = 0;
|
||||
if (pCol->sma) {
|
||||
flags |= SCHEMA_SMA_ON;
|
||||
}
|
||||
pSchema->colId = colId;
|
||||
pSchema->type = pCol->dataType.type;
|
||||
pSchema->bytes = calcTypeBytes(pCol->dataType);
|
||||
pSchema->sma = pCol->sma ? TSDB_BSMA_TYPE_LATEST : TSDB_BSMA_TYPE_NONE;
|
||||
pSchema->flags = flags;
|
||||
strcpy(pSchema->name, pCol->colName);
|
||||
}
|
||||
|
||||
|
@ -2774,7 +2787,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
|||
req.dbFName = strdup(dbFName);
|
||||
req.name = strdup(pStmt->tableName);
|
||||
req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols);
|
||||
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchemaEx));
|
||||
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchema));
|
||||
if (NULL == req.name || NULL == req.ntbCfg.pSchema) {
|
||||
destroyCreateTbReq(&req);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -3179,7 +3192,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pQuery->precision = extractResultTsPrecision((SSelectStmt*) pQuery->pRoot);
|
||||
pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot);
|
||||
}
|
||||
|
||||
if (NULL != pCxt->pDbs) {
|
||||
|
|
Loading…
Reference in New Issue