Merge branch '3.0' into feature/tq
This commit is contained in:
commit
dff0640967
|
@ -512,6 +512,7 @@ typedef struct {
|
||||||
int32_t maxRows;
|
int32_t maxRows;
|
||||||
int32_t commitTime;
|
int32_t commitTime;
|
||||||
int32_t fsyncPeriod;
|
int32_t fsyncPeriod;
|
||||||
|
int32_t ttl;
|
||||||
int8_t walLevel;
|
int8_t walLevel;
|
||||||
int8_t precision; // time resolution
|
int8_t precision; // time resolution
|
||||||
int8_t compression;
|
int8_t compression;
|
||||||
|
@ -521,6 +522,7 @@ typedef struct {
|
||||||
int8_t cacheLastRow;
|
int8_t cacheLastRow;
|
||||||
int8_t ignoreExist;
|
int8_t ignoreExist;
|
||||||
int8_t streamMode;
|
int8_t streamMode;
|
||||||
|
int8_t singleSTable;
|
||||||
int32_t numOfRetensions;
|
int32_t numOfRetensions;
|
||||||
SArray* pRetensions; // SRetention
|
SArray* pRetensions; // SRetention
|
||||||
} SCreateDbReq;
|
} SCreateDbReq;
|
||||||
|
@ -586,6 +588,41 @@ int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
|
||||||
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
|
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
|
||||||
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
|
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
} SDbCfgReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
|
||||||
|
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t numOfVgroups;
|
||||||
|
int32_t cacheBlockSize;
|
||||||
|
int32_t totalBlocks;
|
||||||
|
int32_t daysPerFile;
|
||||||
|
int32_t daysToKeep0;
|
||||||
|
int32_t daysToKeep1;
|
||||||
|
int32_t daysToKeep2;
|
||||||
|
int32_t minRows;
|
||||||
|
int32_t maxRows;
|
||||||
|
int32_t commitTime;
|
||||||
|
int32_t fsyncPeriod;
|
||||||
|
int32_t ttl;
|
||||||
|
int8_t walLevel;
|
||||||
|
int8_t precision;
|
||||||
|
int8_t compression;
|
||||||
|
int8_t replications;
|
||||||
|
int8_t quorum;
|
||||||
|
int8_t update;
|
||||||
|
int8_t cacheLastRow;
|
||||||
|
int8_t streamMode;
|
||||||
|
int8_t singleSTable;
|
||||||
|
} SDbCfgRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
|
||||||
|
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp);
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t rowNum;
|
int32_t rowNum;
|
||||||
} SQnodeListReq;
|
} SQnodeListReq;
|
||||||
|
|
|
@ -155,6 +155,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
|
|
|
@ -77,6 +77,8 @@ typedef struct SDbVgVersion {
|
||||||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||||||
} SDbVgVersion;
|
} SDbVgVersion;
|
||||||
|
|
||||||
|
typedef SDbCfgRsp SDbCfgInfo;
|
||||||
|
|
||||||
int32_t catalogInit(SCatalogCfg *cfg);
|
int32_t catalogInit(SCatalogCfg *cfg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -217,6 +219,8 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable
|
||||||
|
|
||||||
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
|
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
|
||||||
|
|
||||||
|
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy catalog and relase all resources
|
* Destroy catalog and relase all resources
|
||||||
|
|
|
@ -1515,6 +1515,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
||||||
if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
|
||||||
|
@ -1524,6 +1525,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
||||||
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->singleSTable) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
||||||
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||||
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
|
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
|
||||||
|
@ -1556,6 +1558,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
|
||||||
if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
|
||||||
|
@ -1565,6 +1568,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
|
||||||
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->singleSTable) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
|
||||||
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
|
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
|
||||||
if (pReq->pRetensions == NULL) {
|
if (pReq->pRetensions == NULL) {
|
||||||
|
@ -1942,6 +1946,94 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
|
||||||
taosArrayDestroy(pRsp->pArray);
|
taosArrayDestroy(pRsp->pArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->cacheBlockSize) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->totalBlocks) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->daysToKeep0) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->daysToKeep1) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->daysToKeep2) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->minRows) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->maxRows) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->commitTime) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->fsyncPeriod) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRsp->walLevel) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRsp->quorum) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRsp->update) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRsp->streamMode) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->cacheBlockSize) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->totalBlocks) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->daysToKeep0) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->daysToKeep1) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->daysToKeep2) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->minRows) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->maxRows) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->commitTime) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->fsyncPeriod) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pRsp->walLevel) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pRsp->quorum) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pRsp->update) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pRsp->streamMode) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
|
@ -146,6 +146,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -260,6 +260,7 @@ typedef struct {
|
||||||
int32_t maxRows;
|
int32_t maxRows;
|
||||||
int32_t commitTime;
|
int32_t commitTime;
|
||||||
int32_t fsyncPeriod;
|
int32_t fsyncPeriod;
|
||||||
|
int32_t ttl;
|
||||||
int8_t walLevel;
|
int8_t walLevel;
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
int8_t compression;
|
int8_t compression;
|
||||||
|
@ -268,6 +269,7 @@ typedef struct {
|
||||||
int8_t update;
|
int8_t update;
|
||||||
int8_t cacheLastRow;
|
int8_t cacheLastRow;
|
||||||
int8_t streamMode;
|
int8_t streamMode;
|
||||||
|
int8_t singleSTable;
|
||||||
int32_t numOfRetensions;
|
int32_t numOfRetensions;
|
||||||
SArray* pRetensions;
|
SArray* pRetensions;
|
||||||
} SDbCfg;
|
} SDbCfg;
|
||||||
|
|
|
@ -40,6 +40,7 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
|
||||||
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
||||||
|
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
|
||||||
|
|
||||||
int32_t mndInitDb(SMnode *pMnode) {
|
int32_t mndInitDb(SMnode *pMnode) {
|
||||||
SSdbTable table = {.sdbType = SDB_DB,
|
SSdbTable table = {.sdbType = SDB_DB,
|
||||||
|
@ -56,6 +57,7 @@ int32_t mndInitDb(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta);
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta);
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
|
||||||
|
@ -268,6 +270,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
||||||
if (pCfg->minRows > pCfg->maxRows) return -1;
|
if (pCfg->minRows > pCfg->maxRows) return -1;
|
||||||
if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) return -1;
|
if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) return -1;
|
||||||
if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) return -1;
|
if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) return -1;
|
||||||
|
if (pCfg->ttl < TSDB_MIN_DB_TTL_OPTION && pCfg->ttl != TSDB_DEFAULT_DB_TTL_OPTION) return -1;
|
||||||
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) return -1;
|
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) return -1;
|
||||||
if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1;
|
if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1;
|
||||||
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
|
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
|
||||||
|
@ -278,6 +281,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
||||||
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1;
|
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1;
|
||||||
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
|
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
|
||||||
if (pCfg->streamMode < TSDB_MIN_DB_STREAM_MODE || pCfg->streamMode > TSDB_MAX_DB_STREAM_MODE) return -1;
|
if (pCfg->streamMode < TSDB_MIN_DB_STREAM_MODE || pCfg->streamMode > TSDB_MAX_DB_STREAM_MODE) return -1;
|
||||||
|
if (pCfg->singleSTable < TSDB_MIN_DB_SINGLE_STABLE_OPTION || pCfg->streamMode > TSDB_MAX_DB_SINGLE_STABLE_OPTION) return -1;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,6 +297,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK;
|
if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK;
|
||||||
if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME;
|
if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME;
|
||||||
if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
||||||
|
if (pCfg->ttl < 0) pCfg->ttl = TSDB_DEFAULT_DB_TTL_OPTION;
|
||||||
if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL;
|
if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL;
|
||||||
if (pCfg->precision < 0) pCfg->precision = TSDB_DEFAULT_PRECISION;
|
if (pCfg->precision < 0) pCfg->precision = TSDB_DEFAULT_PRECISION;
|
||||||
if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL;
|
if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL;
|
||||||
|
@ -301,6 +306,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
if (pCfg->update < 0) pCfg->update = TSDB_DEFAULT_DB_UPDATE_OPTION;
|
if (pCfg->update < 0) pCfg->update = TSDB_DEFAULT_DB_UPDATE_OPTION;
|
||||||
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
|
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
|
||||||
if (pCfg->streamMode < 0) pCfg->streamMode = TSDB_DEFAULT_DB_STREAM_MODE;
|
if (pCfg->streamMode < 0) pCfg->streamMode = TSDB_DEFAULT_DB_STREAM_MODE;
|
||||||
|
if (pCfg->singleSTable < 0) pCfg->singleSTable = TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION;
|
||||||
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
|
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -437,6 +443,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
|
||||||
.maxRows = pCreate->maxRows,
|
.maxRows = pCreate->maxRows,
|
||||||
.commitTime = pCreate->commitTime,
|
.commitTime = pCreate->commitTime,
|
||||||
.fsyncPeriod = pCreate->fsyncPeriod,
|
.fsyncPeriod = pCreate->fsyncPeriod,
|
||||||
|
.ttl = pCreate->ttl,
|
||||||
.walLevel = pCreate->walLevel,
|
.walLevel = pCreate->walLevel,
|
||||||
.precision = pCreate->precision,
|
.precision = pCreate->precision,
|
||||||
.compression = pCreate->compression,
|
.compression = pCreate->compression,
|
||||||
|
@ -445,6 +452,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
|
||||||
.update = pCreate->update,
|
.update = pCreate->update,
|
||||||
.cacheLastRow = pCreate->cacheLastRow,
|
.cacheLastRow = pCreate->cacheLastRow,
|
||||||
.streamMode = pCreate->streamMode,
|
.streamMode = pCreate->streamMode,
|
||||||
|
.singleSTable = pCreate->singleSTable,
|
||||||
};
|
};
|
||||||
|
|
||||||
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
||||||
|
@ -730,6 +738,71 @@ ALTER_DB_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
|
||||||
|
SMnode *pMnode = pReq->pNode;
|
||||||
|
int32_t code = -1;
|
||||||
|
SDbObj *pDb = NULL;
|
||||||
|
SDbCfgReq cfgReq = {0};
|
||||||
|
SDbCfgRsp cfgRsp = {0};
|
||||||
|
|
||||||
|
if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto GET_DB_CFG_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDb = mndAcquireDb(pMnode, cfgReq.db);
|
||||||
|
if (pDb == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||||
|
goto GET_DB_CFG_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
|
||||||
|
cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize;
|
||||||
|
cfgRsp.totalBlocks = pDb->cfg.totalBlocks;
|
||||||
|
cfgRsp.daysPerFile = pDb->cfg.daysPerFile;
|
||||||
|
cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0;
|
||||||
|
cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1;
|
||||||
|
cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2;
|
||||||
|
cfgRsp.minRows = pDb->cfg.minRows;
|
||||||
|
cfgRsp.maxRows = pDb->cfg.maxRows;
|
||||||
|
cfgRsp.commitTime = pDb->cfg.commitTime;
|
||||||
|
cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod;
|
||||||
|
cfgRsp.ttl = pDb->cfg.ttl;
|
||||||
|
cfgRsp.walLevel = pDb->cfg.walLevel;
|
||||||
|
cfgRsp.precision = pDb->cfg.precision;
|
||||||
|
cfgRsp.compression = pDb->cfg.compression;
|
||||||
|
cfgRsp.replications = pDb->cfg.replications;
|
||||||
|
cfgRsp.quorum = pDb->cfg.quorum;
|
||||||
|
cfgRsp.update = pDb->cfg.update;
|
||||||
|
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||||
|
cfgRsp.streamMode = pDb->cfg.streamMode;
|
||||||
|
cfgRsp.singleSTable = pDb->cfg.singleSTable;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
|
||||||
|
void *pRsp = rpcMallocCont(contLen);
|
||||||
|
if (pRsp == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = -1;
|
||||||
|
goto GET_DB_CFG_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp);
|
||||||
|
|
||||||
|
pReq->pRsp = pRsp;
|
||||||
|
pReq->rspLen = contLen;
|
||||||
|
|
||||||
|
GET_DB_CFG_OVER:
|
||||||
|
|
||||||
|
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
|
mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
|
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
|
@ -1509,6 +1582,23 @@ static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_
|
||||||
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int32_t *)pWrite = pDb->cfg.ttl;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int8_t *)pWrite = pDb->cfg.singleSTable;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int8_t *)pWrite = pDb->cfg.streamMode;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
char *status = "ready";
|
||||||
|
STR_WITH_SIZE_TO_VARSTR(pWrite, status, strlen(status));
|
||||||
|
cols++;
|
||||||
|
|
||||||
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
// *(int8_t *)pWrite = pDb->cfg.update;
|
// *(int8_t *)pWrite = pDb->cfg.update;
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,11 @@ static const SInfosTableSchema userDBSchema[] = {
|
||||||
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
{.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
{.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
{.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
{.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
|
{.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
|
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
// {.name = "update", .bytes = 1, .type =
|
// {.name = "update", .bytes = 1, .type =
|
||||||
// TSDB_DATA_TYPE_TINYINT}, // disable update
|
// TSDB_DATA_TYPE_TINYINT}, // disable update
|
||||||
};
|
};
|
||||||
|
|
|
@ -569,6 +569,44 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *dbFName, SDbCfgInfo *out) {
|
||||||
|
char *msg = NULL;
|
||||||
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
|
||||||
|
|
||||||
|
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)]((void *)dbFName, &msg, 0, &msgLen);
|
||||||
|
if (code) {
|
||||||
|
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TDMT_MND_GET_DB_CFG,
|
||||||
|
.pCont = msg,
|
||||||
|
.contLen = msgLen,
|
||||||
|
};
|
||||||
|
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
|
||||||
|
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||||
|
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||||
|
ctgError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rpcRsp.code), dbFName);
|
||||||
|
CTG_ERR_RET(rpcRsp.code);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)](out, rpcRsp.pCont, rpcRsp.contLen);
|
||||||
|
if (code) {
|
||||||
|
ctgError("Process get db cfg rsp failed, code:%x, db:%s", code, dbFName);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctgDebug("Got db cfg from mnode, dbFName:%s", dbFName);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
|
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
|
||||||
if (NULL == pCtg->dbCache) {
|
if (NULL == pCtg->dbCache) {
|
||||||
*exist = 0;
|
*exist = 0;
|
||||||
|
@ -2137,7 +2175,6 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||||
if (gCtgMgmt.pCluster) {
|
if (gCtgMgmt.pCluster) {
|
||||||
qError("catalog already initialized");
|
qError("catalog already initialized");
|
||||||
|
@ -2717,6 +2754,15 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num)
|
||||||
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
|
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
|
||||||
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == dbFName || NULL == pDbCfg) {
|
||||||
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
|
||||||
|
}
|
||||||
|
|
||||||
void catalogDestroy(void) {
|
void catalogDestroy(void) {
|
||||||
qInfo("start to destroy catalog");
|
qInfo("start to destroy catalog");
|
||||||
|
|
|
@ -618,7 +618,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSortNode->pSortKeys->length);
|
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSortNode->pSortKeys->length);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->totalRowSize);
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
|
|
@ -209,6 +209,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
assert(pDispatcher->queryEnd);
|
assert(pDispatcher->queryEnd);
|
||||||
pOutput->useconds = pDispatcher->useconds;
|
pOutput->useconds = pDispatcher->useconds;
|
||||||
pOutput->precision = pDispatcher->pSchema->precision;
|
pOutput->precision = pDispatcher->pSchema->precision;
|
||||||
|
pOutput->bufStatus = DS_BUF_EMPTY;
|
||||||
|
pOutput->queryEnd = pDispatcher->queryEnd;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
||||||
|
|
|
@ -7289,8 +7289,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
|
||||||
SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode->pExpr;
|
|
||||||
SBlockOrderInfo bi = {0};
|
SBlockOrderInfo bi = {0};
|
||||||
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
|
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
|
||||||
|
|
|
@ -255,6 +255,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
||||||
pOperator->getNextFn = doTableScan;
|
pOperator->getNextFn = doTableScan;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
static int32_t cost = 0;
|
||||||
|
pOperator->cost.openCost = ++cost;
|
||||||
|
pOperator->cost.totalCost = ++cost;
|
||||||
|
pOperator->resultInfo.totalRows = ++cost;
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
|
|
||||||
char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "&", "|", ">", ">=", "<", "<=", "=", "<>",
|
char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "-", "&", "|", ">", ">=", "<", "<=", "=", "<>",
|
||||||
"IN", "NOT IN", "LIKE", "NOT LIKE", "MATCH", "NMATCH", "IS NULL", "IS NOT NULL",
|
"IN", "NOT IN", "LIKE", "NOT LIKE", "MATCH", "NMATCH", "IS NULL", "IS NOT NULL",
|
||||||
"IS TRUE", "IS FALSE", "IS UNKNOWN", "IS NOT TRUE", "IS NOT FALSE", "IS NOT UNKNOWN"};
|
"IS TRUE", "IS FALSE", "IS UNKNOWN", "IS NOT TRUE", "IS NOT FALSE", "IS NOT UNKNOWN"};
|
||||||
char *gLogicConditionStr[] = {"AND", "OR", "NOT"};
|
char *gLogicConditionStr[] = {"AND", "OR", "NOT"};
|
||||||
|
|
|
@ -104,6 +104,8 @@ static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) {
|
||||||
}
|
}
|
||||||
pOp->opType = OP_TYPE_IS_TRUE;
|
pOp->opType = OP_TYPE_IS_TRUE;
|
||||||
pOp->pLeft = pSrc;
|
pOp->pLeft = pSrc;
|
||||||
|
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||||
|
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||||
*pIsTrue = (SNode*)pOp;
|
*pIsTrue = (SNode*)pOp;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -198,6 +200,8 @@ static int32_t calcConstQuery(SNode* pStmt) {
|
||||||
switch (nodeType(pStmt)) {
|
switch (nodeType(pStmt)) {
|
||||||
case QUERY_NODE_SELECT_STMT:
|
case QUERY_NODE_SELECT_STMT:
|
||||||
return calcConstSelect((SSelectStmt*)pStmt);
|
return calcConstSelect((SSelectStmt*)pStmt);
|
||||||
|
case QUERY_NODE_EXPLAIN_STMT:
|
||||||
|
return calcConstQuery(((SExplainStmt*)pStmt)->pQuery);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1008,6 +1008,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
|
||||||
pReq->cacheLastRow = GET_OPTION_VAL(pStmt->pOptions->pCachelast, TSDB_DEFAULT_CACHE_LAST_ROW);
|
pReq->cacheLastRow = GET_OPTION_VAL(pStmt->pOptions->pCachelast, TSDB_DEFAULT_CACHE_LAST_ROW);
|
||||||
pReq->ignoreExist = pStmt->ignoreExists;
|
pReq->ignoreExist = pStmt->ignoreExists;
|
||||||
pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION);
|
pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION);
|
||||||
|
pReq->ttl = GET_OPTION_VAL(pStmt->pOptions->pTtl, TSDB_DEFAULT_DB_TTL_OPTION);
|
||||||
|
pReq->singleSTable = GET_OPTION_VAL(pStmt->pOptions->pSingleStable, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION);
|
||||||
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
|
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -121,6 +121,23 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||||
|
if (NULL == msg || NULL == msgLen) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDbCfgReq dbCfgReq = {0};
|
||||||
|
strcpy(dbCfgReq.db, input);
|
||||||
|
|
||||||
|
int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
|
||||||
|
void *pBuf = rpcMallocCont(bufLen);
|
||||||
|
tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq);
|
||||||
|
|
||||||
|
*msg = pBuf;
|
||||||
|
*msgLen = bufLen;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
SUseDbOutput *pOut = output;
|
SUseDbOutput *pOut = output;
|
||||||
|
@ -309,17 +326,36 @@ PROCESS_QLIST_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
|
SDbCfgRsp out = {0};
|
||||||
|
|
||||||
|
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) {
|
||||||
|
qError("tDeserializeSDbCfgRsp failed, msgSize:%d", msgSize);
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(output, &out, sizeof(out));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void initQueryModuleMsgHandle() {
|
void initQueryModuleMsgHandle() {
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
||||||
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
|
||||||
|
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
||||||
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -977,10 +977,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
|
//QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
|
||||||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
//QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||||
|
|
||||||
queryRsped = true;
|
//queryRsped = true;
|
||||||
|
|
||||||
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
|
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
|
||||||
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
|
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
|
||||||
|
@ -994,10 +994,10 @@ _return:
|
||||||
input.code = code;
|
input.code = code;
|
||||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||||
|
|
||||||
if (!queryRsped) {
|
//if (!queryRsped) {
|
||||||
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
|
// qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
|
||||||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||||
}
|
//}
|
||||||
|
|
||||||
QW_RET(TSDB_CODE_SUCCESS);
|
QW_RET(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -687,11 +687,15 @@ int32_t filterGetRangeRes(void* h, SFilterRange *ra) {
|
||||||
SFilterRangeNode* r = ctx->rs;
|
SFilterRangeNode* r = ctx->rs;
|
||||||
|
|
||||||
while (r) {
|
while (r) {
|
||||||
|
if (num) {
|
||||||
|
ra->e = r->ra.e;
|
||||||
|
ra->eflag = r->ra.eflag;
|
||||||
|
} else {
|
||||||
FILTER_COPY_RA(ra, &r->ra);
|
FILTER_COPY_RA(ra, &r->ra);
|
||||||
|
}
|
||||||
|
|
||||||
++num;
|
++num;
|
||||||
r = r->next;
|
r = r->next;
|
||||||
++ra;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (num == 0) {
|
if (num == 0) {
|
||||||
|
@ -3314,8 +3318,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
|
int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool *isStrict) {
|
||||||
SFilterInfo *info = NULL;
|
|
||||||
SFilterRange ra = {0};
|
SFilterRange ra = {0};
|
||||||
SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
|
SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
|
||||||
SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
|
SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
|
||||||
|
@ -3369,13 +3372,14 @@ int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
|
||||||
*win = TSWINDOW_INITIALIZER;
|
*win = TSWINDOW_INITIALIZER;
|
||||||
} else {
|
} else {
|
||||||
filterGetRangeNum(prev, &num);
|
filterGetRangeNum(prev, &num);
|
||||||
if (num > 1) {
|
|
||||||
qError("only one time range accepted, num:%d", num);
|
|
||||||
FLT_ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION);
|
|
||||||
}
|
|
||||||
|
|
||||||
FLT_CHK_JMP(num < 1);
|
FLT_CHK_JMP(num < 1);
|
||||||
|
|
||||||
|
if (num > 1) {
|
||||||
|
*isStrict = false;
|
||||||
|
qDebug("more than one time range, num:%d", num);
|
||||||
|
}
|
||||||
|
|
||||||
SFilterRange tra;
|
SFilterRange tra;
|
||||||
filterGetRangeRes(prev, &tra);
|
filterGetRangeRes(prev, &tra);
|
||||||
win->skey = tra.s;
|
win->skey = tra.s;
|
||||||
|
@ -3401,6 +3405,30 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
|
||||||
|
SFilterInfo *info = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
*isStrict = true;
|
||||||
|
|
||||||
|
FLT_ERR_RET(filterInitFromNode(pNode, &info, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP));
|
||||||
|
|
||||||
|
if (info->scalarMode) {
|
||||||
|
*win = TSWINDOW_INITIALIZER;
|
||||||
|
*isStrict = false;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
FLT_ERR_JRET(filterGetTimeRangeImpl(info, win, isStrict));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
filterFreeInfo(info);
|
||||||
|
|
||||||
|
FLT_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar) {
|
int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar) {
|
||||||
if (FILTER_EMPTY_RES(info) || FILTER_ALL_RES(info)) {
|
if (FILTER_EMPTY_RES(info) || FILTER_ALL_RES(info)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -241,6 +241,7 @@ TEST(timerangeTest, greater) {
|
||||||
bool isStrict = false;
|
bool isStrict = false;
|
||||||
int32_t code = filterGetTimeRange(opNode1, &win, &isStrict);
|
int32_t code = filterGetTimeRange(opNode1, &win, &isStrict);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_EQ(isStrict, true);
|
||||||
ASSERT_EQ(win.skey, tsmall);
|
ASSERT_EQ(win.skey, tsmall);
|
||||||
ASSERT_EQ(win.ekey, INT64_MAX);
|
ASSERT_EQ(win.ekey, INT64_MAX);
|
||||||
//filterFreeInfo(filter);
|
//filterFreeInfo(filter);
|
||||||
|
@ -270,6 +271,7 @@ TEST(timerangeTest, greater_and_lower) {
|
||||||
STimeWindow win = {0};
|
STimeWindow win = {0};
|
||||||
bool isStrict = false;
|
bool isStrict = false;
|
||||||
int32_t code = filterGetTimeRange(logicNode, &win, &isStrict);
|
int32_t code = filterGetTimeRange(logicNode, &win, &isStrict);
|
||||||
|
ASSERT_EQ(isStrict, true);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(win.skey, tsmall);
|
ASSERT_EQ(win.skey, tsmall);
|
||||||
ASSERT_EQ(win.ekey, tbig);
|
ASSERT_EQ(win.ekey, tbig);
|
||||||
|
@ -277,6 +279,56 @@ TEST(timerangeTest, greater_and_lower) {
|
||||||
nodesDestroyNode(logicNode);
|
nodesDestroyNode(logicNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(timerangeTest, greater_and_lower_not_strict) {
|
||||||
|
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL;
|
||||||
|
bool eRes[5] = {false, false, true, true, true};
|
||||||
|
SScalarParam res = {0};
|
||||||
|
int64_t tsmall1 = 222, tbig1 = 333;
|
||||||
|
int64_t tsmall2 = 444, tbig2 = 555;
|
||||||
|
SNode *list[2] = {0};
|
||||||
|
|
||||||
|
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
|
||||||
|
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall1);
|
||||||
|
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||||
|
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
|
||||||
|
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig1);
|
||||||
|
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||||
|
list[0] = opNode1;
|
||||||
|
list[1] = opNode2;
|
||||||
|
|
||||||
|
flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_AND, list, 2);
|
||||||
|
|
||||||
|
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
|
||||||
|
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall2);
|
||||||
|
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||||
|
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
|
||||||
|
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig2);
|
||||||
|
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||||
|
list[0] = opNode1;
|
||||||
|
list[1] = opNode2;
|
||||||
|
|
||||||
|
flttMakeLogicNode(&logicNode2, LOGIC_COND_TYPE_AND, list, 2);
|
||||||
|
|
||||||
|
list[0] = logicNode1;
|
||||||
|
list[1] = logicNode2;
|
||||||
|
flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_OR, list, 2);
|
||||||
|
|
||||||
|
//SFilterInfo *filter = NULL;
|
||||||
|
//int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
|
||||||
|
//ASSERT_EQ(code, 0);
|
||||||
|
STimeWindow win = {0};
|
||||||
|
bool isStrict = false;
|
||||||
|
int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict);
|
||||||
|
ASSERT_EQ(isStrict, false);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_EQ(win.skey, tsmall1);
|
||||||
|
ASSERT_EQ(win.ekey, tbig2);
|
||||||
|
//filterFreeInfo(filter);
|
||||||
|
nodesDestroyNode(logicNode1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
TEST(columnTest, smallint_column_greater_double_value) {
|
TEST(columnTest, smallint_column_greater_double_value) {
|
||||||
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
|
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
|
||||||
int16_t leftv[5]= {1, 2, 3, 4, 5};
|
int16_t leftv[5]= {1, 2, 3, 4, 5};
|
||||||
|
|
|
@ -1235,6 +1235,34 @@ _return:
|
||||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
|
||||||
|
int32_t s = taosHashGetSize(pTaskList);
|
||||||
|
if (s <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
|
||||||
|
if (NULL == task || NULL == (*task)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pTask = *task;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
|
||||||
|
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
|
||||||
|
nodeInfo->handle = handle;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
|
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||||
|
@ -1247,22 +1275,25 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
|
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s = taosHashGetSize(pJob->execTasks);
|
schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
|
||||||
if (s <= 0) {
|
if (NULL == pTask) {
|
||||||
SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
|
if (TDMT_VND_EXPLAIN_RSP == msgType) {
|
||||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
|
||||||
}
|
} else {
|
||||||
|
|
||||||
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
|
|
||||||
if (NULL == task || NULL == (*task)) {
|
|
||||||
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
|
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
|
||||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pTask) {
|
||||||
|
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
|
||||||
|
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
pTask = *task;
|
|
||||||
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
|
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
|
||||||
|
|
||||||
SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
|
SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
|
||||||
|
schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
|
||||||
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
|
@ -722,10 +722,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
|
|
||||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||||
if (!transQueuePush(&conn->cliMsgs, pMsg)) {
|
transQueuePush(&conn->cliMsgs, pMsg);
|
||||||
return;
|
// tTrace("%s cli conn %p queue msg size %d", ((STrans*)pThrd->pTransInst)->label, conn, 2);
|
||||||
}
|
// return;
|
||||||
transDestroyBuffer(&conn->readBuf);
|
//}
|
||||||
|
// transDestroyBuffer(&conn->readBuf);
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
} else {
|
} else {
|
||||||
conn = cliCreateConn(pThrd);
|
conn = cliCreateConn(pThrd);
|
||||||
|
|
|
@ -286,25 +286,25 @@ endi
|
||||||
#sql_error select c1, ltrim(t1), c2, ltrim(t2) from ctb3
|
#sql_error select c1, ltrim(t1), c2, ltrim(t2) from ctb3
|
||||||
|
|
||||||
|
|
||||||
#print ====> rtrim
|
print ====> rtrim
|
||||||
#sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ntb3
|
sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ntb3
|
||||||
#print ====> select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3
|
print ====> select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3
|
||||||
#sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3
|
sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3
|
||||||
#print ====> rows: $rows
|
print ====> rows: $rows
|
||||||
#print ====> [ $data00 ] [ $data01 ] [ $data02 ] [ $data03 ] [ $data04 ] [ $data05 ] [ $data06 ]
|
print ====> [ $data00 ] [ $data01 ] [ $data02 ] [ $data03 ] [ $data04 ] [ $data05 ] [ $data06 ]
|
||||||
#print ====> $data10 $data11 $data12 $data13 $data14 $data15
|
print ====> $data10 $data11 $data12 $data13 $data14 $data15
|
||||||
#if $rows != 1 then
|
if $rows != 1 then
|
||||||
# return -1
|
return -1
|
||||||
#endi
|
endi
|
||||||
#if $data01 != @ abcd 1234@ then
|
if $data01 != @ abcd 1234@ then
|
||||||
# return -1
|
return -1
|
||||||
#endi
|
endi
|
||||||
#if $data03 != @ abcd 1234@ then
|
if $data03 != @ abcd 1234@ then
|
||||||
# return -1
|
return -1
|
||||||
#endi
|
endi
|
||||||
#if $data04 != @ abcdEFGH =-*&%@ then
|
if $data04 != @ abcdEFGH =-*&%@ then
|
||||||
# return -1
|
return -1
|
||||||
#endi
|
endi
|
||||||
|
|
||||||
#sql_error select c1, rtrim(t1), c2, rtrim(t2) from ctb3
|
#sql_error select c1, rtrim(t1), c2, rtrim(t2) from ctb3
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue