Merge branch '3.0' into feature/trans-fix
This commit is contained in:
commit
471a76198f
|
@ -512,6 +512,7 @@ typedef struct {
|
|||
int32_t maxRows;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
int32_t ttl;
|
||||
int8_t walLevel;
|
||||
int8_t precision; // time resolution
|
||||
int8_t compression;
|
||||
|
@ -521,6 +522,7 @@ typedef struct {
|
|||
int8_t cacheLastRow;
|
||||
int8_t ignoreExist;
|
||||
int8_t streamMode;
|
||||
int8_t singleSTable;
|
||||
int32_t numOfRetensions;
|
||||
SArray* pRetensions; // SRetention
|
||||
} SCreateDbReq;
|
||||
|
@ -586,6 +588,41 @@ int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
|
|||
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
|
||||
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
} SDbCfgReq;
|
||||
|
||||
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
|
||||
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfVgroups;
|
||||
int32_t cacheBlockSize;
|
||||
int32_t totalBlocks;
|
||||
int32_t daysPerFile;
|
||||
int32_t daysToKeep0;
|
||||
int32_t daysToKeep1;
|
||||
int32_t daysToKeep2;
|
||||
int32_t minRows;
|
||||
int32_t maxRows;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
int32_t ttl;
|
||||
int8_t walLevel;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t replications;
|
||||
int8_t quorum;
|
||||
int8_t update;
|
||||
int8_t cacheLastRow;
|
||||
int8_t streamMode;
|
||||
int8_t singleSTable;
|
||||
} SDbCfgRsp;
|
||||
|
||||
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
|
||||
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp);
|
||||
|
||||
|
||||
typedef struct {
|
||||
int32_t rowNum;
|
||||
} SQnodeListReq;
|
||||
|
|
|
@ -155,6 +155,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
|
||||
|
||||
// Requests handled by VNODE
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||
|
|
|
@ -77,6 +77,8 @@ typedef struct SDbVgVersion {
|
|||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||||
} SDbVgVersion;
|
||||
|
||||
typedef SDbCfgRsp SDbCfgInfo;
|
||||
|
||||
int32_t catalogInit(SCatalogCfg *cfg);
|
||||
|
||||
/**
|
||||
|
@ -217,6 +219,8 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable
|
|||
|
||||
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
|
||||
|
||||
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
|
||||
|
||||
|
||||
/**
|
||||
* Destroy catalog and relase all resources
|
||||
|
|
|
@ -1515,6 +1515,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
|||
if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
|
||||
|
@ -1524,6 +1525,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
|||
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->singleSTable) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
|
||||
|
@ -1556,6 +1558,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
|
|||
if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
|
||||
|
@ -1565,6 +1568,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
|
|||
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->singleSTable) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
|
||||
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
|
||||
if (pReq->pRetensions == NULL) {
|
||||
|
@ -1942,6 +1946,94 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
|
|||
taosArrayDestroy(pRsp->pArray);
|
||||
}
|
||||
|
||||
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->cacheBlockSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->totalBlocks) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->daysToKeep0) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->daysToKeep1) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->daysToKeep2) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->minRows) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->maxRows) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->commitTime) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->fsyncPeriod) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->walLevel) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->quorum) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->update) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->streamMode) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->cacheBlockSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->totalBlocks) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->daysToKeep0) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->daysToKeep1) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->daysToKeep2) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->minRows) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->maxRows) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->commitTime) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->fsyncPeriod) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->walLevel) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->quorum) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->update) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->streamMode) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
|
|
@ -146,6 +146,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
|||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
|
|
|
@ -260,6 +260,7 @@ typedef struct {
|
|||
int32_t maxRows;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
int32_t ttl;
|
||||
int8_t walLevel;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
|
@ -268,6 +269,7 @@ typedef struct {
|
|||
int8_t update;
|
||||
int8_t cacheLastRow;
|
||||
int8_t streamMode;
|
||||
int8_t singleSTable;
|
||||
int32_t numOfRetensions;
|
||||
SArray* pRetensions;
|
||||
} SDbCfg;
|
||||
|
|
|
@ -40,6 +40,7 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
|
|||
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
|
||||
|
||||
int32_t mndInitDb(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_DB,
|
||||
|
@ -56,6 +57,7 @@ int32_t mndInitDb(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
|
||||
|
||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
|
||||
|
@ -268,6 +270,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
|||
if (pCfg->minRows > pCfg->maxRows) return -1;
|
||||
if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) return -1;
|
||||
if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) return -1;
|
||||
if (pCfg->ttl < TSDB_MIN_DB_TTL_OPTION && pCfg->ttl != TSDB_DEFAULT_DB_TTL_OPTION) return -1;
|
||||
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) return -1;
|
||||
if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1;
|
||||
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
|
||||
|
@ -278,6 +281,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
|||
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1;
|
||||
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
|
||||
if (pCfg->streamMode < TSDB_MIN_DB_STREAM_MODE || pCfg->streamMode > TSDB_MAX_DB_STREAM_MODE) return -1;
|
||||
if (pCfg->singleSTable < TSDB_MIN_DB_SINGLE_STABLE_OPTION || pCfg->streamMode > TSDB_MAX_DB_SINGLE_STABLE_OPTION) return -1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -293,6 +297,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK;
|
||||
if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME;
|
||||
if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
||||
if (pCfg->ttl < 0) pCfg->ttl = TSDB_DEFAULT_DB_TTL_OPTION;
|
||||
if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL;
|
||||
if (pCfg->precision < 0) pCfg->precision = TSDB_DEFAULT_PRECISION;
|
||||
if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL;
|
||||
|
@ -301,6 +306,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->update < 0) pCfg->update = TSDB_DEFAULT_DB_UPDATE_OPTION;
|
||||
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
|
||||
if (pCfg->streamMode < 0) pCfg->streamMode = TSDB_DEFAULT_DB_STREAM_MODE;
|
||||
if (pCfg->singleSTable < 0) pCfg->singleSTable = TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION;
|
||||
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
|
||||
}
|
||||
|
||||
|
@ -437,6 +443,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
|
|||
.maxRows = pCreate->maxRows,
|
||||
.commitTime = pCreate->commitTime,
|
||||
.fsyncPeriod = pCreate->fsyncPeriod,
|
||||
.ttl = pCreate->ttl,
|
||||
.walLevel = pCreate->walLevel,
|
||||
.precision = pCreate->precision,
|
||||
.compression = pCreate->compression,
|
||||
|
@ -445,6 +452,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
|
|||
.update = pCreate->update,
|
||||
.cacheLastRow = pCreate->cacheLastRow,
|
||||
.streamMode = pCreate->streamMode,
|
||||
.singleSTable = pCreate->singleSTable,
|
||||
};
|
||||
|
||||
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
||||
|
@ -730,6 +738,71 @@ ALTER_DB_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pNode;
|
||||
int32_t code = -1;
|
||||
SDbObj *pDb = NULL;
|
||||
SDbCfgReq cfgReq = {0};
|
||||
SDbCfgRsp cfgRsp = {0};
|
||||
|
||||
if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto GET_DB_CFG_OVER;
|
||||
}
|
||||
|
||||
pDb = mndAcquireDb(pMnode, cfgReq.db);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
goto GET_DB_CFG_OVER;
|
||||
}
|
||||
|
||||
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
|
||||
cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize;
|
||||
cfgRsp.totalBlocks = pDb->cfg.totalBlocks;
|
||||
cfgRsp.daysPerFile = pDb->cfg.daysPerFile;
|
||||
cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0;
|
||||
cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1;
|
||||
cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2;
|
||||
cfgRsp.minRows = pDb->cfg.minRows;
|
||||
cfgRsp.maxRows = pDb->cfg.maxRows;
|
||||
cfgRsp.commitTime = pDb->cfg.commitTime;
|
||||
cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod;
|
||||
cfgRsp.ttl = pDb->cfg.ttl;
|
||||
cfgRsp.walLevel = pDb->cfg.walLevel;
|
||||
cfgRsp.precision = pDb->cfg.precision;
|
||||
cfgRsp.compression = pDb->cfg.compression;
|
||||
cfgRsp.replications = pDb->cfg.replications;
|
||||
cfgRsp.quorum = pDb->cfg.quorum;
|
||||
cfgRsp.update = pDb->cfg.update;
|
||||
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||
cfgRsp.streamMode = pDb->cfg.streamMode;
|
||||
cfgRsp.singleSTable = pDb->cfg.singleSTable;
|
||||
|
||||
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
|
||||
void *pRsp = rpcMallocCont(contLen);
|
||||
if (pRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = -1;
|
||||
goto GET_DB_CFG_OVER;
|
||||
}
|
||||
|
||||
tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp);
|
||||
|
||||
pReq->pRsp = pRsp;
|
||||
pReq->rspLen = contLen;
|
||||
|
||||
GET_DB_CFG_OVER:
|
||||
|
||||
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr());
|
||||
}
|
||||
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
|
@ -1509,6 +1582,23 @@ static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_
|
|||
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int32_t *)pWrite = pDb->cfg.ttl;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int8_t *)pWrite = pDb->cfg.singleSTable;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int8_t *)pWrite = pDb->cfg.streamMode;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
char *status = "ready";
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, status, strlen(status));
|
||||
cols++;
|
||||
|
||||
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
// *(int8_t *)pWrite = pDb->cfg.update;
|
||||
}
|
||||
|
|
|
@ -63,7 +63,11 @@ static const SInfosTableSchema userDBSchema[] = {
|
|||
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
// {.name = "update", .bytes = 1, .type =
|
||||
// TSDB_DATA_TYPE_TINYINT}, // disable update
|
||||
};
|
||||
|
|
|
@ -569,6 +569,44 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *dbFName, SDbCfgInfo *out) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)]((void *)dbFName, &msg, 0, &msgLen);
|
||||
if (code) {
|
||||
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_GET_DB_CFG,
|
||||
.pCont = msg,
|
||||
.contLen = msgLen,
|
||||
};
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
|
||||
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||
ctgError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rpcRsp.code), dbFName);
|
||||
CTG_ERR_RET(rpcRsp.code);
|
||||
}
|
||||
|
||||
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)](out, rpcRsp.pCont, rpcRsp.contLen);
|
||||
if (code) {
|
||||
ctgError("Process get db cfg rsp failed, code:%x, db:%s", code, dbFName);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
ctgDebug("Got db cfg from mnode, dbFName:%s", dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
|
||||
if (NULL == pCtg->dbCache) {
|
||||
*exist = 0;
|
||||
|
@ -2137,7 +2175,6 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||
if (gCtgMgmt.pCluster) {
|
||||
qError("catalog already initialized");
|
||||
|
@ -2717,6 +2754,15 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num)
|
|||
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
|
||||
}
|
||||
|
||||
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == dbFName || NULL == pDbCfg) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
|
||||
}
|
||||
|
||||
void catalogDestroy(void) {
|
||||
qInfo("start to destroy catalog");
|
||||
|
|
|
@ -618,7 +618,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSortNode->pSortKeys->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
|
|
@ -209,6 +209,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|||
assert(pDispatcher->queryEnd);
|
||||
pOutput->useconds = pDispatcher->useconds;
|
||||
pOutput->precision = pDispatcher->pSchema->precision;
|
||||
pOutput->bufStatus = DS_BUF_EMPTY;
|
||||
pOutput->queryEnd = pDispatcher->queryEnd;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
||||
|
|
|
@ -7289,8 +7289,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
||||
SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode->pExpr;
|
||||
SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
|
||||
SBlockOrderInfo bi = {0};
|
||||
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
|
||||
|
|
|
@ -255,6 +255,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
|||
pOperator->getNextFn = doTableScan;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
static int32_t cost = 0;
|
||||
pOperator->cost.openCost = ++cost;
|
||||
pOperator->cost.totalCost = ++cost;
|
||||
pOperator->resultInfo.totalRows = ++cost;
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#include "taoserror.h"
|
||||
#include "thash.h"
|
||||
|
||||
char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "&", "|", ">", ">=", "<", "<=", "=", "<>",
|
||||
char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "-", "&", "|", ">", ">=", "<", "<=", "=", "<>",
|
||||
"IN", "NOT IN", "LIKE", "NOT LIKE", "MATCH", "NMATCH", "IS NULL", "IS NOT NULL",
|
||||
"IS TRUE", "IS FALSE", "IS UNKNOWN", "IS NOT TRUE", "IS NOT FALSE", "IS NOT UNKNOWN"};
|
||||
char *gLogicConditionStr[] = {"AND", "OR", "NOT"};
|
||||
|
|
|
@ -104,6 +104,8 @@ static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) {
|
|||
}
|
||||
pOp->opType = OP_TYPE_IS_TRUE;
|
||||
pOp->pLeft = pSrc;
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||
*pIsTrue = (SNode*)pOp;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -198,6 +200,8 @@ static int32_t calcConstQuery(SNode* pStmt) {
|
|||
switch (nodeType(pStmt)) {
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
return calcConstSelect((SSelectStmt*)pStmt);
|
||||
case QUERY_NODE_EXPLAIN_STMT:
|
||||
return calcConstQuery(((SExplainStmt*)pStmt)->pQuery);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1008,6 +1008,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
|
|||
pReq->cacheLastRow = GET_OPTION_VAL(pStmt->pOptions->pCachelast, TSDB_DEFAULT_CACHE_LAST_ROW);
|
||||
pReq->ignoreExist = pStmt->ignoreExists;
|
||||
pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION);
|
||||
pReq->ttl = GET_OPTION_VAL(pStmt->pOptions->pTtl, TSDB_DEFAULT_DB_TTL_OPTION);
|
||||
pReq->singleSTable = GET_OPTION_VAL(pStmt->pOptions->pSingleStable, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION);
|
||||
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
|
||||
}
|
||||
|
||||
|
|
|
@ -121,6 +121,23 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||
if (NULL == msg || NULL == msgLen) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SDbCfgReq dbCfgReq = {0};
|
||||
strcpy(dbCfgReq.db, input);
|
||||
|
||||
int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq);
|
||||
|
||||
*msg = pBuf;
|
||||
*msgLen = bufLen;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||
SUseDbOutput *pOut = output;
|
||||
|
@ -309,17 +326,36 @@ PROCESS_QLIST_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
||||
SDbCfgRsp out = {0};
|
||||
|
||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) {
|
||||
qError("tDeserializeSDbCfgRsp failed, msgSize:%d", msgSize);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
memcpy(output, &out, sizeof(out));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void initQueryModuleMsgHandle() {
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
|
||||
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
|
@ -977,10 +977,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
|
|||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
|
||||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
//QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
|
||||
//QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
|
||||
queryRsped = true;
|
||||
//queryRsped = true;
|
||||
|
||||
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
|
||||
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
|
||||
|
@ -994,10 +994,10 @@ _return:
|
|||
input.code = code;
|
||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||
|
||||
if (!queryRsped) {
|
||||
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
|
||||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
}
|
||||
//if (!queryRsped) {
|
||||
// qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
|
||||
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
//}
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
|
|
@ -687,11 +687,15 @@ int32_t filterGetRangeRes(void* h, SFilterRange *ra) {
|
|||
SFilterRangeNode* r = ctx->rs;
|
||||
|
||||
while (r) {
|
||||
if (num) {
|
||||
ra->e = r->ra.e;
|
||||
ra->eflag = r->ra.eflag;
|
||||
} else {
|
||||
FILTER_COPY_RA(ra, &r->ra);
|
||||
}
|
||||
|
||||
++num;
|
||||
r = r->next;
|
||||
++ra;
|
||||
}
|
||||
|
||||
if (num == 0) {
|
||||
|
@ -3314,8 +3318,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
|
|||
|
||||
|
||||
|
||||
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
|
||||
SFilterInfo *info = NULL;
|
||||
int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool *isStrict) {
|
||||
SFilterRange ra = {0};
|
||||
SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
|
||||
SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
|
||||
|
@ -3369,13 +3372,14 @@ int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
|
|||
*win = TSWINDOW_INITIALIZER;
|
||||
} else {
|
||||
filterGetRangeNum(prev, &num);
|
||||
if (num > 1) {
|
||||
qError("only one time range accepted, num:%d", num);
|
||||
FLT_ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION);
|
||||
}
|
||||
|
||||
FLT_CHK_JMP(num < 1);
|
||||
|
||||
if (num > 1) {
|
||||
*isStrict = false;
|
||||
qDebug("more than one time range, num:%d", num);
|
||||
}
|
||||
|
||||
SFilterRange tra;
|
||||
filterGetRangeRes(prev, &tra);
|
||||
win->skey = tra.s;
|
||||
|
@ -3401,6 +3405,30 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
|
||||
SFilterInfo *info = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
*isStrict = true;
|
||||
|
||||
FLT_ERR_RET(filterInitFromNode(pNode, &info, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP));
|
||||
|
||||
if (info->scalarMode) {
|
||||
*win = TSWINDOW_INITIALIZER;
|
||||
*isStrict = false;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
FLT_ERR_JRET(filterGetTimeRangeImpl(info, win, isStrict));
|
||||
|
||||
_return:
|
||||
|
||||
filterFreeInfo(info);
|
||||
|
||||
FLT_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar) {
|
||||
if (FILTER_EMPTY_RES(info) || FILTER_ALL_RES(info)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -241,6 +241,7 @@ TEST(timerangeTest, greater) {
|
|||
bool isStrict = false;
|
||||
int32_t code = filterGetTimeRange(opNode1, &win, &isStrict);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(isStrict, true);
|
||||
ASSERT_EQ(win.skey, tsmall);
|
||||
ASSERT_EQ(win.ekey, INT64_MAX);
|
||||
//filterFreeInfo(filter);
|
||||
|
@ -270,6 +271,7 @@ TEST(timerangeTest, greater_and_lower) {
|
|||
STimeWindow win = {0};
|
||||
bool isStrict = false;
|
||||
int32_t code = filterGetTimeRange(logicNode, &win, &isStrict);
|
||||
ASSERT_EQ(isStrict, true);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(win.skey, tsmall);
|
||||
ASSERT_EQ(win.ekey, tbig);
|
||||
|
@ -277,6 +279,56 @@ TEST(timerangeTest, greater_and_lower) {
|
|||
nodesDestroyNode(logicNode);
|
||||
}
|
||||
|
||||
TEST(timerangeTest, greater_and_lower_not_strict) {
|
||||
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL;
|
||||
bool eRes[5] = {false, false, true, true, true};
|
||||
SScalarParam res = {0};
|
||||
int64_t tsmall1 = 222, tbig1 = 333;
|
||||
int64_t tsmall2 = 444, tbig2 = 555;
|
||||
SNode *list[2] = {0};
|
||||
|
||||
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
|
||||
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall1);
|
||||
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
|
||||
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig1);
|
||||
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||
list[0] = opNode1;
|
||||
list[1] = opNode2;
|
||||
|
||||
flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_AND, list, 2);
|
||||
|
||||
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
|
||||
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall2);
|
||||
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
|
||||
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig2);
|
||||
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||
list[0] = opNode1;
|
||||
list[1] = opNode2;
|
||||
|
||||
flttMakeLogicNode(&logicNode2, LOGIC_COND_TYPE_AND, list, 2);
|
||||
|
||||
list[0] = logicNode1;
|
||||
list[1] = logicNode2;
|
||||
flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_OR, list, 2);
|
||||
|
||||
//SFilterInfo *filter = NULL;
|
||||
//int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
|
||||
//ASSERT_EQ(code, 0);
|
||||
STimeWindow win = {0};
|
||||
bool isStrict = false;
|
||||
int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict);
|
||||
ASSERT_EQ(isStrict, false);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(win.skey, tsmall1);
|
||||
ASSERT_EQ(win.ekey, tbig2);
|
||||
//filterFreeInfo(filter);
|
||||
nodesDestroyNode(logicNode1);
|
||||
}
|
||||
|
||||
|
||||
|
||||
TEST(columnTest, smallint_column_greater_double_value) {
|
||||
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
|
||||
int16_t leftv[5]= {1, 2, 3, 4, 5};
|
||||
|
|
|
@ -1235,6 +1235,34 @@ _return:
|
|||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||
}
|
||||
|
||||
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
|
||||
int32_t s = taosHashGetSize(pTaskList);
|
||||
if (s <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
|
||||
if (NULL == task || NULL == (*task)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*pTask = *task;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
|
||||
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
|
||||
nodeInfo->handle = handle;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
|
@ -1247,22 +1275,25 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
|
||||
}
|
||||
|
||||
int32_t s = taosHashGetSize(pJob->execTasks);
|
||||
if (s <= 0) {
|
||||
SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
|
||||
if (NULL == task || NULL == (*task)) {
|
||||
schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
|
||||
if (NULL == pTask) {
|
||||
if (TDMT_VND_EXPLAIN_RSP == msgType) {
|
||||
schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
|
||||
} else {
|
||||
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pTask) {
|
||||
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
pTask = *task;
|
||||
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
|
||||
|
||||
SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
|
||||
schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
|
||||
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
||||
|
||||
_return:
|
||||
|
|
|
@ -205,8 +205,12 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
|||
transRefCliHandle(conn); \
|
||||
} \
|
||||
} while (0)
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) \
|
||||
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_RELEASE_BY_SERVER(conn) \
|
||||
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
|
||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
||||
|
@ -716,10 +720,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||
|
||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||
if (!transQueuePush(&conn->cliMsgs, pMsg)) {
|
||||
return;
|
||||
}
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
// tTrace("%s cli conn %p queue msg size %d", ((STrans*)pThrd->pTransInst)->label, conn, 2);
|
||||
// return;
|
||||
//}
|
||||
// transDestroyBuffer(&conn->readBuf);
|
||||
cliSend(conn);
|
||||
} else {
|
||||
conn = cliCreateConn(pThrd);
|
||||
|
|
|
@ -286,25 +286,25 @@ endi
|
|||
#sql_error select c1, ltrim(t1), c2, ltrim(t2) from ctb3
|
||||
|
||||
|
||||
#print ====> rtrim
|
||||
#sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ntb3
|
||||
#print ====> select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3
|
||||
#sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3
|
||||
#print ====> rows: $rows
|
||||
#print ====> [ $data00 ] [ $data01 ] [ $data02 ] [ $data03 ] [ $data04 ] [ $data05 ] [ $data06 ]
|
||||
#print ====> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
#if $rows != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data01 != @ abcd 1234@ then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data03 != @ abcd 1234@ then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data04 != @ abcdEFGH =-*&%@ then
|
||||
# return -1
|
||||
#endi
|
||||
print ====> rtrim
|
||||
sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ntb3
|
||||
print ====> select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3
|
||||
sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3
|
||||
print ====> rows: $rows
|
||||
print ====> [ $data00 ] [ $data01 ] [ $data02 ] [ $data03 ] [ $data04 ] [ $data05 ] [ $data06 ]
|
||||
print ====> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @ abcd 1234@ then
|
||||
return -1
|
||||
endi
|
||||
if $data03 != @ abcd 1234@ then
|
||||
return -1
|
||||
endi
|
||||
if $data04 != @ abcdEFGH =-*&%@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#sql_error select c1, rtrim(t1), c2, rtrim(t2) from ctb3
|
||||
|
||||
|
|
Loading…
Reference in New Issue