feat: support database auto compact parameters

This commit is contained in:
Hongze Cheng 2024-11-14 17:01:15 +08:00
parent 1f51f480af
commit be743733a5
12 changed files with 791 additions and 391 deletions

View File

@ -677,7 +677,7 @@ typedef struct {
int32_t tsSlowLogThreshold;
int32_t tsSlowLogMaxLen;
int32_t tsSlowLogScope;
int32_t tsSlowLogThresholdTest; //Obsolete
int32_t tsSlowLogThresholdTest; // Obsolete
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN];
} SMonitorParas;
@ -986,7 +986,6 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
int32_t tEncodeSEpSet(SEncoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
@ -1343,6 +1342,11 @@ typedef struct {
int8_t withArbitrator;
int8_t encryptAlgorithm;
char dnodeListStr[TSDB_DNODE_LIST_LEN];
// 1. add auto-compact parameters
int32_t compactInterval;
int32_t compactStartTime;
int32_t compactEndTime;
int32_t compactTimeOffset;
} SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
@ -1374,6 +1378,11 @@ typedef struct {
int32_t sqlLen;
char* sql;
int8_t withArbitrator;
// 1. add auto-compact parameters
int32_t compactInterval;
int32_t compactStartTime;
int32_t compactEndTime;
int32_t compactTimeOffset;
} SAlterDbReq;
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
@ -4147,11 +4156,11 @@ typedef struct {
SArray* blockTbName;
SArray* blockSchema;
union{
struct{
union {
struct {
int64_t sleepTime;
};
struct{
struct {
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
@ -4160,7 +4169,7 @@ typedef struct {
} SMqDataRsp;
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pObj);
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);

View File

@ -109,6 +109,14 @@ typedef struct SDatabaseOptions {
SValueNode* s3KeepLocalStr;
int8_t s3Compact;
int8_t withArbitrator;
// for auto-compact
SValueNode* pCompactIntervalNode;
int32_t compactInterval;
SNodeList* pCompactTimeRangeList;
int32_t compactStartTime;
int32_t compactEndTime;
SValueNode* pCompactTimeOffsetNode;
int32_t compactTimeOffset;
} SDatabaseOptions;
typedef struct SCreateDatabaseStmt {

View File

@ -190,45 +190,22 @@ typedef enum EOperatorType {
} EOperatorType;
static const EOperatorType OPERATOR_ARRAY[] = {
OP_TYPE_ADD,
OP_TYPE_SUB,
OP_TYPE_MULTI,
OP_TYPE_DIV,
OP_TYPE_REM,
OP_TYPE_ADD, OP_TYPE_SUB, OP_TYPE_MULTI, OP_TYPE_DIV, OP_TYPE_REM,
OP_TYPE_MINUS,
OP_TYPE_BIT_AND,
OP_TYPE_BIT_OR,
OP_TYPE_BIT_AND, OP_TYPE_BIT_OR,
OP_TYPE_GREATER_THAN,
OP_TYPE_GREATER_EQUAL,
OP_TYPE_LOWER_THAN,
OP_TYPE_LOWER_EQUAL,
OP_TYPE_EQUAL,
OP_TYPE_NOT_EQUAL,
OP_TYPE_IN,
OP_TYPE_NOT_IN,
OP_TYPE_LIKE,
OP_TYPE_NOT_LIKE,
OP_TYPE_MATCH,
OP_TYPE_NMATCH,
OP_TYPE_GREATER_THAN, OP_TYPE_GREATER_EQUAL, OP_TYPE_LOWER_THAN, OP_TYPE_LOWER_EQUAL, OP_TYPE_EQUAL,
OP_TYPE_NOT_EQUAL, OP_TYPE_IN, OP_TYPE_NOT_IN, OP_TYPE_LIKE, OP_TYPE_NOT_LIKE, OP_TYPE_MATCH, OP_TYPE_NMATCH,
OP_TYPE_IS_NULL,
OP_TYPE_IS_NOT_NULL,
OP_TYPE_IS_TRUE,
OP_TYPE_IS_FALSE,
OP_TYPE_IS_UNKNOWN,
OP_TYPE_IS_NOT_TRUE,
OP_TYPE_IS_NOT_FALSE,
OP_TYPE_IS_NOT_UNKNOWN,
//OP_TYPE_COMPARE_MAX_VALUE,
OP_TYPE_IS_NULL, OP_TYPE_IS_NOT_NULL, OP_TYPE_IS_TRUE, OP_TYPE_IS_FALSE, OP_TYPE_IS_UNKNOWN, OP_TYPE_IS_NOT_TRUE,
OP_TYPE_IS_NOT_FALSE, OP_TYPE_IS_NOT_UNKNOWN,
// OP_TYPE_COMPARE_MAX_VALUE,
OP_TYPE_JSON_GET_VALUE,
OP_TYPE_JSON_CONTAINS,
OP_TYPE_JSON_GET_VALUE, OP_TYPE_JSON_CONTAINS,
OP_TYPE_ASSIGN
};
OP_TYPE_ASSIGN};
#define OP_TYPE_CALC_MAX OP_TYPE_BIT_OR
@ -528,6 +505,11 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_TABLE_TTL 0
#define TSDB_DEFAULT_TABLE_TTL 0
#define TSDB_DEFAULT_COMPACT_INTERVAL 0
#define TSDB_DEFAULT_COMPACT_START_TIME 0
#define TSDB_DEFAULT_COMPACT_END_TIME 0
#define TSDB_DEFAULT_COMPACT_TIME_OFFSET 0
#define TSDB_MIN_EXPLAIN_RATIO 0
#define TSDB_MAX_EXPLAIN_RATIO 1
#define TSDB_DEFAULT_EXPLAIN_RATIO 0.001

View File

@ -76,7 +76,7 @@ static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas *p
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogScope));
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen));
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold));
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest)); //Obsolete
TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest)); // Obsolete
TAOS_CHECK_RETURN(tEncodeCStr(encoder, pMonitorParas->tsSlowLogExceptDb));
return 0;
}
@ -87,7 +87,7 @@ static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas *pMoni
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope));
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen));
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold));
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest)); //Obsolete
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest)); // Obsolete
TAOS_CHECK_RETURN(tDecodeCStrTo(decoder, pMonitorParas->tsSlowLogExceptDb));
return 0;
}
@ -3937,6 +3937,12 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->s3Compact));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->dnodeListStr));
// auto-compact parameters
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactInterval));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactStartTime));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactEndTime));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactTimeOffset));
tEndEncode(&encoder);
_exit:
@ -4028,6 +4034,18 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->dnodeListStr));
}
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pReq->compactInterval));
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pReq->compactStartTime));
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pReq->compactEndTime));
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pReq->compactTimeOffset));
} else {
pReq->compactInterval = 0;
pReq->compactStartTime = 0;
pReq->compactEndTime = 0;
pReq->compactTimeOffset = 0;
}
tEndDecode(&decoder);
_exit:
@ -4078,6 +4096,11 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
ENCODESQL();
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->withArbitrator));
// auto compact config
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactInterval));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactStartTime));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactEndTime));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactTimeOffset));
tEndEncode(&encoder);
_exit:
@ -4145,6 +4168,19 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->withArbitrator));
}
// auto compact config
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pReq->compactInterval));
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pReq->compactStartTime));
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pReq->compactEndTime));
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pReq->compactTimeOffset));
} else {
pReq->compactInterval = 0;
pReq->compactStartTime = 0;
pReq->compactEndTime = 0;
pReq->compactTimeOffset = 0;
}
tEndDecode(&decoder);
_exit:

View File

@ -106,8 +106,8 @@ typedef enum {
TRN_CONFLICT_GLOBAL = 1,
TRN_CONFLICT_DB = 2,
TRN_CONFLICT_DB_INSIDE = 3,
// TRN_CONFLICT_TOPIC = 4,
// TRN_CONFLICT_TOPIC_INSIDE = 5,
// TRN_CONFLICT_TOPIC = 4,
// TRN_CONFLICT_TOPIC_INSIDE = 5,
TRN_CONFLICT_ARBGROUP = 6,
TRN_CONFLICT_TSMA = 7,
} ETrnConflct;
@ -423,6 +423,10 @@ typedef struct {
int8_t s3Compact;
int8_t withArbitrator;
int8_t encryptAlgorithm;
int32_t compactInterval;
int32_t compactStartTime;
int32_t compactEndTime;
int32_t compactTimeOffset;
} SDbCfg;
typedef struct {
@ -649,8 +653,8 @@ typedef struct {
int32_t maxPollIntervalMs;
} SMqConsumerObj;
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
char *topic, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer);
int32_t tNewSMqConsumerObj(int64_t consumerId, char* cgroup, int8_t updateType, char* topic, SCMSubscribeReq* subscribe,
SMqConsumerObj** ppConsumer);
void tClearSMqConsumerObj(SMqConsumerObj* pConsumer);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
@ -693,8 +697,8 @@ typedef struct {
char* qmsg; // SubPlanToString
} SMqSubscribeObj;
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub);
int32_t tCloneSubscribeObj(const SMqSubscribeObj* pSub, SMqSubscribeObj **ppSub);
int32_t tNewSubscribeObj(const char* key, SMqSubscribeObj** ppSub);
int32_t tCloneSubscribeObj(const SMqSubscribeObj* pSub, SMqSubscribeObj** ppSub);
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mndDb.h"
#include "audit.h"
#include "command.h"
#include "mndArbGroup.h"
#include "mndCluster.h"
#include "mndDnode.h"
@ -34,7 +35,6 @@
#include "systable.h"
#include "thttp.h"
#include "tjson.h"
#include "command.h"
#define DB_VER_NUMBER 1
#define DB_RESERVE_SIZE 27
@ -795,6 +795,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.tsdbPageSize = pCreate->tsdbPageSize,
.withArbitrator = pCreate->withArbitrator,
.encryptAlgorithm = pCreate->encryptAlgorithm,
.compactInterval = pCreate->compactInterval,
.compactStartTime = pCreate->compactStartTime,
.compactEndTime = pCreate->compactEndTime,
.compactTimeOffset = pCreate->compactTimeOffset,
};
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
@ -1138,6 +1142,32 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
code = 0;
}
if (pAlter->compactInterval != TSDB_DEFAULT_COMPACT_INTERVAL && pAlter->compactInterval != pDb->cfg.compactInterval) {
pDb->cfg.compactInterval = pAlter->compactInterval;
pDb->vgVersion++;
code = 0;
}
if (pAlter->compactStartTime != TSDB_DEFAULT_COMPACT_START_TIME &&
pAlter->compactStartTime != pDb->cfg.compactStartTime) {
pDb->cfg.compactStartTime = pAlter->compactStartTime;
pDb->vgVersion++;
code = 0;
}
if (pAlter->compactEndTime != TSDB_DEFAULT_COMPACT_END_TIME && pAlter->compactEndTime != pDb->cfg.compactEndTime) {
pDb->cfg.compactEndTime = pAlter->compactEndTime;
pDb->vgVersion++;
code = 0;
}
if (pAlter->compactTimeOffset != TSDB_DEFAULT_COMPACT_TIME_OFFSET &&
pAlter->compactTimeOffset != pDb->cfg.compactTimeOffset) {
pDb->cfg.compactTimeOffset = pAlter->compactTimeOffset;
pDb->vgVersion++;
code = 0;
}
TAOS_RETURN(code);
}
@ -2386,7 +2416,8 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)strictVstr, false), &lino, _OVER);
char durationVstr[128] = {0};
int32_t len = formatDurationOrKeep(&durationVstr[VARSTR_HEADER_SIZE], sizeof(durationVstr) - VARSTR_HEADER_SIZE, pDb->cfg.daysPerFile);
int32_t len = formatDurationOrKeep(&durationVstr[VARSTR_HEADER_SIZE], sizeof(durationVstr) - VARSTR_HEADER_SIZE,
pDb->cfg.daysPerFile);
varDataSetLen(durationVstr, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);

View File

@ -1333,7 +1333,8 @@ static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddObject(pJson, jkInterpFuncLogicPlanTimeSeries, nodeToJson, pNode->pTimeSeries);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption);
code =
tjsonAddObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption);
}
return code;
@ -1371,7 +1372,8 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkInterpFuncLogicPlanTimeSeries, &pNode->pTimeSeries);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
code =
tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
}
return code;
@ -3350,7 +3352,8 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption);
code =
tjsonAddObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption);
}
return code;
@ -3391,7 +3394,8 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
code =
tjsonToObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
}
return code;
@ -5335,7 +5339,6 @@ static int32_t jsonToWindowOffsetNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkDatabaseOptionsBuffer = "Buffer";
static const char* jkDatabaseOptionsCacheModel = "CacheModel";
static const char* jkDatabaseOptionsCompressionLevel = "CompressionLevel";
@ -5359,6 +5362,9 @@ static const char* jkDatabaseOptionsS3ChunkSize = "S3ChunkSize";
static const char* jkDatabaseOptionsS3KeepLocalNode = "S3KeepLocalNode";
static const char* jkDatabaseOptionsS3KeepLocal = "S3KeepLocal";
static const char* jkDatabaseOptionsS3Compact = "S3Compact";
static const char* jkDatabaseOptionsCompactIntervalNode = "compactIntervalNode";
static const char* jkDatabaseOptionsCompactTimeRange = "compactTimeRange";
static const char* jkDatabaseOptionsCompactTimeOffsetNode = "compactTimeOffsetNode";
static int32_t databaseOptionsToJson(const void* pObj, SJson* pJson) {
const SDatabaseOptions* pNode = (const SDatabaseOptions*)pObj;
@ -5430,6 +5436,15 @@ static int32_t databaseOptionsToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsS3Compact, pNode->s3Compact);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDatabaseOptionsCompactIntervalNode, nodeToJson, pNode->pCompactIntervalNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkDatabaseOptionsCompactTimeRange, pNode->pCompactTimeRangeList);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDatabaseOptionsCompactTimeOffsetNode, nodeToJson, pNode->pCompactTimeOffsetNode);
}
return code;
}
@ -5504,6 +5519,15 @@ static int32_t jsonToDatabaseOptions(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsS3Compact, &pNode->s3Compact);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDatabaseOptionsCompactIntervalNode, (SNode**)&pNode->pCompactIntervalNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkDatabaseOptionsCompactTimeRange, &pNode->pCompactTimeRangeList);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDatabaseOptionsCompactTimeOffsetNode, (SNode**)&pNode->pCompactTimeOffsetNode);
}
return code;
}

View File

@ -14,6 +14,7 @@
*/
#include "cmdnodes.h"
#include "functionMgt.h"
#include "nodesUtil.h"
#include "plannodes.h"
#include "querynodes.h"
@ -22,7 +23,6 @@
#include "tdatablock.h"
#include "thash.h"
#include "tref.h"
#include "functionMgt.h"
typedef struct SNodeMemChunk {
int32_t availableSize;
@ -59,12 +59,10 @@ char* getFullJoinTypeString(EJoinType type, EJoinSubType stype) {
{"INNER", "INNER", "INNER", "INNER", "INNER", "INNER ANY", "INNER", "INNER"},
{"LEFT", "LEFT", "LEFT OUTER", "LEFT SEMI", "LEFT ANTI", "LEFT ANY", "LEFT ASOF", "LEFT WINDOW"},
{"RIGHT", "RIGHT", "RIGHT OUTER", "RIGHT SEMI", "RIGHT ANTI", "RIGHT ANY", "RIGHT ASOF", "RIGHT WINDOW"},
{"FULL", "FULL", "FULL OUTER", "FULL", "FULL", "FULL ANY", "FULL", "FULL"}
};
{"FULL", "FULL", "FULL OUTER", "FULL", "FULL", "FULL ANY", "FULL", "FULL"}};
return joinFullType[type][stype];
}
int32_t mergeJoinConds(SNode** ppDst, SNode** ppSrc) {
if (NULL == *ppSrc) {
return TSDB_CODE_SUCCESS;
@ -74,14 +72,16 @@ int32_t mergeJoinConds(SNode** ppDst, SNode** ppSrc) {
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc) && ((SLogicConditionNode*)(*ppSrc))->condType == LOGIC_COND_TYPE_AND) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc) &&
((SLogicConditionNode*)(*ppSrc))->condType == LOGIC_COND_TYPE_AND) {
TSWAP(*ppDst, *ppSrc);
}
int32_t code = 0;
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppDst)) {
SLogicConditionNode* pDst = (SLogicConditionNode*)*ppDst;
if (pDst->condType == LOGIC_COND_TYPE_AND) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc) && ((SLogicConditionNode*)(*ppSrc))->condType == LOGIC_COND_TYPE_AND) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc) &&
((SLogicConditionNode*)(*ppSrc))->condType == LOGIC_COND_TYPE_AND) {
code = nodesListStrictAppendList(pDst->pParameterList, ((SLogicConditionNode*)(*ppSrc))->pParameterList);
((SLogicConditionNode*)(*ppSrc))->pParameterList = NULL;
} else {
@ -115,7 +115,6 @@ int32_t mergeJoinConds(SNode** ppDst, SNode** ppSrc) {
return code;
}
static int32_t callocNodeChunk(SNodeAllocator* pAllocator, SNodeMemChunk** pOutChunk) {
SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize);
if (NULL == pNewChunk) {
@ -155,7 +154,8 @@ static int32_t nodesCallocImpl(int32_t size, void** pOut) {
void* p = g_pNodeAllocator->pCurrChunk->pBuf + g_pNodeAllocator->pCurrChunk->usedSize;
g_pNodeAllocator->pCurrChunk->usedSize += size;
*pOut = p;
return TSDB_CODE_SUCCESS;;
return TSDB_CODE_SUCCESS;
;
}
static int32_t nodesCalloc(int32_t num, int32_t size, void** pOut) {
@ -237,7 +237,8 @@ void nodesDestroyAllocatorSet() {
refId = pAllocator->self;
int32_t code = taosRemoveRef(g_allocatorReqRefPool, refId);
if (TSDB_CODE_SUCCESS != code) {
nodesError("failed to remove ref at: %s:%d, rsetId:%d, refId:%"PRId64, __func__, __LINE__, g_allocatorReqRefPool, refId);
nodesError("failed to remove ref at: %s:%d, rsetId:%d, refId:%" PRId64, __func__, __LINE__,
g_allocatorReqRefPool, refId);
}
pAllocator = taosIterateRef(g_allocatorReqRefPool, refId);
}
@ -333,7 +334,8 @@ void nodesDestroyAllocator(int64_t allocatorId) {
int32_t code = taosRemoveRef(g_allocatorReqRefPool, allocatorId);
if (TSDB_CODE_SUCCESS != code) {
nodesError("failed to remove ref at: %s:%d, rsetId:%d, refId:%"PRId64, __func__, __LINE__, g_allocatorReqRefPool, allocatorId);
nodesError("failed to remove ref at: %s:%d, rsetId:%d, refId:%" PRId64, __func__, __LINE__, g_allocatorReqRefPool,
allocatorId);
}
}
@ -352,200 +354,293 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
int32_t code = 0;
switch (type) {
case QUERY_NODE_COLUMN:
code = makeNode(type, sizeof(SColumnNode), &pNode); break;
code = makeNode(type, sizeof(SColumnNode), &pNode);
break;
case QUERY_NODE_VALUE:
code = makeNode(type, sizeof(SValueNode), &pNode); break;
code = makeNode(type, sizeof(SValueNode), &pNode);
break;
case QUERY_NODE_OPERATOR:
code = makeNode(type, sizeof(SOperatorNode), &pNode); break;
code = makeNode(type, sizeof(SOperatorNode), &pNode);
break;
case QUERY_NODE_LOGIC_CONDITION:
code = makeNode(type, sizeof(SLogicConditionNode), &pNode); break;
code = makeNode(type, sizeof(SLogicConditionNode), &pNode);
break;
case QUERY_NODE_FUNCTION:
code = makeNode(type, sizeof(SFunctionNode), &pNode); break;
code = makeNode(type, sizeof(SFunctionNode), &pNode);
break;
case QUERY_NODE_REAL_TABLE:
code = makeNode(type, sizeof(SRealTableNode), &pNode); break;
code = makeNode(type, sizeof(SRealTableNode), &pNode);
break;
case QUERY_NODE_TEMP_TABLE:
code = makeNode(type, sizeof(STempTableNode), &pNode); break;
code = makeNode(type, sizeof(STempTableNode), &pNode);
break;
case QUERY_NODE_JOIN_TABLE:
code = makeNode(type, sizeof(SJoinTableNode), &pNode); break;
code = makeNode(type, sizeof(SJoinTableNode), &pNode);
break;
case QUERY_NODE_GROUPING_SET:
code = makeNode(type, sizeof(SGroupingSetNode), &pNode); break;
code = makeNode(type, sizeof(SGroupingSetNode), &pNode);
break;
case QUERY_NODE_ORDER_BY_EXPR:
code = makeNode(type, sizeof(SOrderByExprNode), &pNode); break;
code = makeNode(type, sizeof(SOrderByExprNode), &pNode);
break;
case QUERY_NODE_LIMIT:
code = makeNode(type, sizeof(SLimitNode), &pNode); break;
code = makeNode(type, sizeof(SLimitNode), &pNode);
break;
case QUERY_NODE_STATE_WINDOW:
code = makeNode(type, sizeof(SStateWindowNode), &pNode); break;
code = makeNode(type, sizeof(SStateWindowNode), &pNode);
break;
case QUERY_NODE_SESSION_WINDOW:
code = makeNode(type, sizeof(SSessionWindowNode), &pNode); break;
code = makeNode(type, sizeof(SSessionWindowNode), &pNode);
break;
case QUERY_NODE_INTERVAL_WINDOW:
code = makeNode(type, sizeof(SIntervalWindowNode), &pNode); break;
code = makeNode(type, sizeof(SIntervalWindowNode), &pNode);
break;
case QUERY_NODE_NODE_LIST:
code = makeNode(type, sizeof(SNodeListNode), &pNode); break;
code = makeNode(type, sizeof(SNodeListNode), &pNode);
break;
case QUERY_NODE_FILL:
code = makeNode(type, sizeof(SFillNode), &pNode); break;
code = makeNode(type, sizeof(SFillNode), &pNode);
break;
case QUERY_NODE_RAW_EXPR:
code = makeNode(type, sizeof(SRawExprNode), &pNode); break;
code = makeNode(type, sizeof(SRawExprNode), &pNode);
break;
case QUERY_NODE_TARGET:
code = makeNode(type, sizeof(STargetNode), &pNode); break;
code = makeNode(type, sizeof(STargetNode), &pNode);
break;
case QUERY_NODE_DATABLOCK_DESC:
code = makeNode(type, sizeof(SDataBlockDescNode), &pNode); break;
code = makeNode(type, sizeof(SDataBlockDescNode), &pNode);
break;
case QUERY_NODE_SLOT_DESC:
code = makeNode(type, sizeof(SSlotDescNode), &pNode); break;
code = makeNode(type, sizeof(SSlotDescNode), &pNode);
break;
case QUERY_NODE_COLUMN_DEF:
code = makeNode(type, sizeof(SColumnDefNode), &pNode); break;
code = makeNode(type, sizeof(SColumnDefNode), &pNode);
break;
case QUERY_NODE_DOWNSTREAM_SOURCE:
code = makeNode(type, sizeof(SDownstreamSourceNode), &pNode); break;
code = makeNode(type, sizeof(SDownstreamSourceNode), &pNode);
break;
case QUERY_NODE_DATABASE_OPTIONS:
code = makeNode(type, sizeof(SDatabaseOptions), &pNode); break;
code = makeNode(type, sizeof(SDatabaseOptions), &pNode);
break;
case QUERY_NODE_TABLE_OPTIONS:
code = makeNode(type, sizeof(STableOptions), &pNode); break;
code = makeNode(type, sizeof(STableOptions), &pNode);
break;
case QUERY_NODE_COLUMN_OPTIONS:
code = makeNode(type, sizeof(SColumnOptions), &pNode); break;
code = makeNode(type, sizeof(SColumnOptions), &pNode);
break;
case QUERY_NODE_INDEX_OPTIONS:
code = makeNode(type, sizeof(SIndexOptions), &pNode); break;
code = makeNode(type, sizeof(SIndexOptions), &pNode);
break;
case QUERY_NODE_EXPLAIN_OPTIONS:
code = makeNode(type, sizeof(SExplainOptions), &pNode); break;
code = makeNode(type, sizeof(SExplainOptions), &pNode);
break;
case QUERY_NODE_STREAM_OPTIONS:
code = makeNode(type, sizeof(SStreamOptions), &pNode); break;
code = makeNode(type, sizeof(SStreamOptions), &pNode);
break;
case QUERY_NODE_LEFT_VALUE:
code = makeNode(type, sizeof(SLeftValueNode), &pNode); break;
code = makeNode(type, sizeof(SLeftValueNode), &pNode);
break;
case QUERY_NODE_COLUMN_REF:
code = makeNode(type, sizeof(SColumnRefNode), &pNode); break;
code = makeNode(type, sizeof(SColumnRefNode), &pNode);
break;
case QUERY_NODE_WHEN_THEN:
code = makeNode(type, sizeof(SWhenThenNode), &pNode); break;
code = makeNode(type, sizeof(SWhenThenNode), &pNode);
break;
case QUERY_NODE_CASE_WHEN:
code = makeNode(type, sizeof(SCaseWhenNode), &pNode); break;
code = makeNode(type, sizeof(SCaseWhenNode), &pNode);
break;
case QUERY_NODE_EVENT_WINDOW:
code = makeNode(type, sizeof(SEventWindowNode), &pNode); break;
code = makeNode(type, sizeof(SEventWindowNode), &pNode);
break;
case QUERY_NODE_COUNT_WINDOW:
code = makeNode(type, sizeof(SCountWindowNode), &pNode); break;
code = makeNode(type, sizeof(SCountWindowNode), &pNode);
break;
case QUERY_NODE_ANOMALY_WINDOW:
code = makeNode(type, sizeof(SAnomalyWindowNode), &pNode); break;
code = makeNode(type, sizeof(SAnomalyWindowNode), &pNode);
break;
case QUERY_NODE_HINT:
code = makeNode(type, sizeof(SHintNode), &pNode); break;
code = makeNode(type, sizeof(SHintNode), &pNode);
break;
case QUERY_NODE_VIEW:
code = makeNode(type, sizeof(SViewNode), &pNode); break;
code = makeNode(type, sizeof(SViewNode), &pNode);
break;
case QUERY_NODE_WINDOW_OFFSET:
code = makeNode(type, sizeof(SWindowOffsetNode), &pNode); break;
code = makeNode(type, sizeof(SWindowOffsetNode), &pNode);
break;
case QUERY_NODE_SET_OPERATOR:
code = makeNode(type, sizeof(SSetOperator), &pNode); break;
code = makeNode(type, sizeof(SSetOperator), &pNode);
break;
case QUERY_NODE_SELECT_STMT:
code = makeNode(type, sizeof(SSelectStmt), &pNode); break;
code = makeNode(type, sizeof(SSelectStmt), &pNode);
break;
case QUERY_NODE_VNODE_MODIFY_STMT:
code = makeNode(type, sizeof(SVnodeModifyOpStmt), &pNode); break;
code = makeNode(type, sizeof(SVnodeModifyOpStmt), &pNode);
break;
case QUERY_NODE_CREATE_DATABASE_STMT:
code = makeNode(type, sizeof(SCreateDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateDatabaseStmt), &pNode);
break;
case QUERY_NODE_DROP_DATABASE_STMT:
code = makeNode(type, sizeof(SDropDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(SDropDatabaseStmt), &pNode);
break;
case QUERY_NODE_ALTER_DATABASE_STMT:
code = makeNode(type, sizeof(SAlterDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(SAlterDatabaseStmt), &pNode);
break;
case QUERY_NODE_FLUSH_DATABASE_STMT:
code = makeNode(type, sizeof(SFlushDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(SFlushDatabaseStmt), &pNode);
break;
case QUERY_NODE_TRIM_DATABASE_STMT:
code = makeNode(type, sizeof(STrimDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(STrimDatabaseStmt), &pNode);
break;
case QUERY_NODE_S3MIGRATE_DATABASE_STMT:
code = makeNode(type, sizeof(SS3MigrateDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(SS3MigrateDatabaseStmt), &pNode);
break;
case QUERY_NODE_CREATE_TABLE_STMT:
code = makeNode(type, sizeof(SCreateTableStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateTableStmt), &pNode);
break;
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
code = makeNode(type, sizeof(SCreateSubTableClause), &pNode); break;
code = makeNode(type, sizeof(SCreateSubTableClause), &pNode);
break;
case QUERY_NODE_CREATE_SUBTABLE_FROM_FILE_CLAUSE:
code = makeNode(type, sizeof(SCreateSubTableFromFileClause), &pNode); break;
code = makeNode(type, sizeof(SCreateSubTableFromFileClause), &pNode);
break;
case QUERY_NODE_CREATE_MULTI_TABLES_STMT:
code = makeNode(type, sizeof(SCreateMultiTablesStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateMultiTablesStmt), &pNode);
break;
case QUERY_NODE_DROP_TABLE_CLAUSE:
code = makeNode(type, sizeof(SDropTableClause), &pNode); break;
code = makeNode(type, sizeof(SDropTableClause), &pNode);
break;
case QUERY_NODE_DROP_TABLE_STMT:
code = makeNode(type, sizeof(SDropTableStmt), &pNode); break;
code = makeNode(type, sizeof(SDropTableStmt), &pNode);
break;
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
code = makeNode(type, sizeof(SDropSuperTableStmt), &pNode); break;
code = makeNode(type, sizeof(SDropSuperTableStmt), &pNode);
break;
case QUERY_NODE_ALTER_TABLE_STMT:
case QUERY_NODE_ALTER_SUPER_TABLE_STMT:
code = makeNode(type, sizeof(SAlterTableStmt), &pNode); break;
code = makeNode(type, sizeof(SAlterTableStmt), &pNode);
break;
case QUERY_NODE_CREATE_USER_STMT:
code = makeNode(type, sizeof(SCreateUserStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateUserStmt), &pNode);
break;
case QUERY_NODE_ALTER_USER_STMT:
code = makeNode(type, sizeof(SAlterUserStmt), &pNode); break;
code = makeNode(type, sizeof(SAlterUserStmt), &pNode);
break;
case QUERY_NODE_DROP_USER_STMT:
code = makeNode(type, sizeof(SDropUserStmt), &pNode); break;
code = makeNode(type, sizeof(SDropUserStmt), &pNode);
break;
case QUERY_NODE_USE_DATABASE_STMT:
code = makeNode(type, sizeof(SUseDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(SUseDatabaseStmt), &pNode);
break;
case QUERY_NODE_CREATE_DNODE_STMT:
code = makeNode(type, sizeof(SCreateDnodeStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateDnodeStmt), &pNode);
break;
case QUERY_NODE_DROP_DNODE_STMT:
code = makeNode(type, sizeof(SDropDnodeStmt), &pNode); break;
code = makeNode(type, sizeof(SDropDnodeStmt), &pNode);
break;
case QUERY_NODE_ALTER_DNODE_STMT:
code = makeNode(type, sizeof(SAlterDnodeStmt), &pNode); break;
code = makeNode(type, sizeof(SAlterDnodeStmt), &pNode);
break;
case QUERY_NODE_CREATE_ANODE_STMT:
code = makeNode(type, sizeof(SCreateAnodeStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateAnodeStmt), &pNode);
break;
case QUERY_NODE_DROP_ANODE_STMT:
code = makeNode(type, sizeof(SDropAnodeStmt), &pNode); break;
code = makeNode(type, sizeof(SDropAnodeStmt), &pNode);
break;
case QUERY_NODE_UPDATE_ANODE_STMT:
code = makeNode(type, sizeof(SUpdateAnodeStmt), &pNode); break;
code = makeNode(type, sizeof(SUpdateAnodeStmt), &pNode);
break;
case QUERY_NODE_CREATE_INDEX_STMT:
code = makeNode(type, sizeof(SCreateIndexStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateIndexStmt), &pNode);
break;
case QUERY_NODE_DROP_INDEX_STMT:
code = makeNode(type, sizeof(SDropIndexStmt), &pNode); break;
code = makeNode(type, sizeof(SDropIndexStmt), &pNode);
break;
case QUERY_NODE_CREATE_QNODE_STMT:
case QUERY_NODE_CREATE_BNODE_STMT:
case QUERY_NODE_CREATE_SNODE_STMT:
case QUERY_NODE_CREATE_MNODE_STMT:
code = makeNode(type, sizeof(SCreateComponentNodeStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateComponentNodeStmt), &pNode);
break;
case QUERY_NODE_DROP_QNODE_STMT:
case QUERY_NODE_DROP_BNODE_STMT:
case QUERY_NODE_DROP_SNODE_STMT:
case QUERY_NODE_DROP_MNODE_STMT:
code = makeNode(type, sizeof(SDropComponentNodeStmt), &pNode); break;
code = makeNode(type, sizeof(SDropComponentNodeStmt), &pNode);
break;
case QUERY_NODE_CREATE_TOPIC_STMT:
code = makeNode(type, sizeof(SCreateTopicStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateTopicStmt), &pNode);
break;
case QUERY_NODE_DROP_TOPIC_STMT:
code = makeNode(type, sizeof(SDropTopicStmt), &pNode); break;
code = makeNode(type, sizeof(SDropTopicStmt), &pNode);
break;
case QUERY_NODE_DROP_CGROUP_STMT:
code = makeNode(type, sizeof(SDropCGroupStmt), &pNode); break;
code = makeNode(type, sizeof(SDropCGroupStmt), &pNode);
break;
case QUERY_NODE_ALTER_LOCAL_STMT:
code = makeNode(type, sizeof(SAlterLocalStmt), &pNode); break;
code = makeNode(type, sizeof(SAlterLocalStmt), &pNode);
break;
case QUERY_NODE_EXPLAIN_STMT:
code = makeNode(type, sizeof(SExplainStmt), &pNode); break;
code = makeNode(type, sizeof(SExplainStmt), &pNode);
break;
case QUERY_NODE_DESCRIBE_STMT:
code = makeNode(type, sizeof(SDescribeStmt), &pNode); break;
code = makeNode(type, sizeof(SDescribeStmt), &pNode);
break;
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
code = makeNode(type, sizeof(SNode), &pNode); break;
code = makeNode(type, sizeof(SNode), &pNode);
break;
case QUERY_NODE_COMPACT_DATABASE_STMT:
code = makeNode(type, sizeof(SCompactDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(SCompactDatabaseStmt), &pNode);
break;
case QUERY_NODE_COMPACT_VGROUPS_STMT:
code = makeNode(type, sizeof(SCompactVgroupsStmt), &pNode); break;
code = makeNode(type, sizeof(SCompactVgroupsStmt), &pNode);
break;
case QUERY_NODE_CREATE_FUNCTION_STMT:
code = makeNode(type, sizeof(SCreateFunctionStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateFunctionStmt), &pNode);
break;
case QUERY_NODE_DROP_FUNCTION_STMT:
code = makeNode(type, sizeof(SDropFunctionStmt), &pNode); break;
code = makeNode(type, sizeof(SDropFunctionStmt), &pNode);
break;
case QUERY_NODE_CREATE_STREAM_STMT:
code = makeNode(type, sizeof(SCreateStreamStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateStreamStmt), &pNode);
break;
case QUERY_NODE_DROP_STREAM_STMT:
code = makeNode(type, sizeof(SDropStreamStmt), &pNode); break;
code = makeNode(type, sizeof(SDropStreamStmt), &pNode);
break;
case QUERY_NODE_PAUSE_STREAM_STMT:
code = makeNode(type, sizeof(SPauseStreamStmt), &pNode); break;
code = makeNode(type, sizeof(SPauseStreamStmt), &pNode);
break;
case QUERY_NODE_RESUME_STREAM_STMT:
code = makeNode(type, sizeof(SResumeStreamStmt), &pNode); break;
code = makeNode(type, sizeof(SResumeStreamStmt), &pNode);
break;
case QUERY_NODE_BALANCE_VGROUP_STMT:
code = makeNode(type, sizeof(SBalanceVgroupStmt), &pNode); break;
code = makeNode(type, sizeof(SBalanceVgroupStmt), &pNode);
break;
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
code = makeNode(type, sizeof(SBalanceVgroupLeaderStmt), &pNode); break;
code = makeNode(type, sizeof(SBalanceVgroupLeaderStmt), &pNode);
break;
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
code = makeNode(type, sizeof(SBalanceVgroupLeaderStmt), &pNode); break;
code = makeNode(type, sizeof(SBalanceVgroupLeaderStmt), &pNode);
break;
case QUERY_NODE_MERGE_VGROUP_STMT:
code = makeNode(type, sizeof(SMergeVgroupStmt), &pNode); break;
code = makeNode(type, sizeof(SMergeVgroupStmt), &pNode);
break;
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
code = makeNode(type, sizeof(SRedistributeVgroupStmt), &pNode); break;
code = makeNode(type, sizeof(SRedistributeVgroupStmt), &pNode);
break;
case QUERY_NODE_SPLIT_VGROUP_STMT:
code = makeNode(type, sizeof(SSplitVgroupStmt), &pNode); break;
code = makeNode(type, sizeof(SSplitVgroupStmt), &pNode);
break;
case QUERY_NODE_SYNCDB_STMT:
break;
case QUERY_NODE_GRANT_STMT:
code = makeNode(type, sizeof(SGrantStmt), &pNode); break;
code = makeNode(type, sizeof(SGrantStmt), &pNode);
break;
case QUERY_NODE_REVOKE_STMT:
code = makeNode(type, sizeof(SRevokeStmt), &pNode); break;
code = makeNode(type, sizeof(SRevokeStmt), &pNode);
break;
case QUERY_NODE_ALTER_CLUSTER_STMT:
code = makeNode(type, sizeof(SAlterClusterStmt), &pNode); break;
code = makeNode(type, sizeof(SAlterClusterStmt), &pNode);
break;
case QUERY_NODE_SHOW_DNODES_STMT:
case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
@ -585,191 +680,280 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT:
case QUERY_NODE_SHOW_ENCRYPTIONS_STMT:
case QUERY_NODE_SHOW_TSMAS_STMT:
code = makeNode(type, sizeof(SShowStmt), &pNode); break;
code = makeNode(type, sizeof(SShowStmt), &pNode);
break;
case QUERY_NODE_SHOW_TABLE_TAGS_STMT:
code = makeNode(type, sizeof(SShowTableTagsStmt), &pNode); break;
code = makeNode(type, sizeof(SShowTableTagsStmt), &pNode);
break;
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
code = makeNode(type, sizeof(SShowDnodeVariablesStmt), &pNode); break;
code = makeNode(type, sizeof(SShowDnodeVariablesStmt), &pNode);
break;
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
code = makeNode(type, sizeof(SShowCreateDatabaseStmt), &pNode); break;
code = makeNode(type, sizeof(SShowCreateDatabaseStmt), &pNode);
break;
case QUERY_NODE_SHOW_DB_ALIVE_STMT:
case QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT:
code = makeNode(type, sizeof(SShowAliveStmt), &pNode); break;
code = makeNode(type, sizeof(SShowAliveStmt), &pNode);
break;
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
code = makeNode(type, sizeof(SShowCreateTableStmt), &pNode); break;
code = makeNode(type, sizeof(SShowCreateTableStmt), &pNode);
break;
case QUERY_NODE_SHOW_CREATE_VIEW_STMT:
code = makeNode(type, sizeof(SShowCreateViewStmt), &pNode); break;
code = makeNode(type, sizeof(SShowCreateViewStmt), &pNode);
break;
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
code = makeNode(type, sizeof(SShowTableDistributedStmt), &pNode); break;
code = makeNode(type, sizeof(SShowTableDistributedStmt), &pNode);
break;
case QUERY_NODE_SHOW_COMPACTS_STMT:
code = makeNode(type, sizeof(SShowCompactsStmt), &pNode); break;
code = makeNode(type, sizeof(SShowCompactsStmt), &pNode);
break;
case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT:
code = makeNode(type, sizeof(SShowCompactDetailsStmt), &pNode); break;
code = makeNode(type, sizeof(SShowCompactDetailsStmt), &pNode);
break;
case QUERY_NODE_KILL_QUERY_STMT:
code = makeNode(type, sizeof(SKillQueryStmt), &pNode); break;
code = makeNode(type, sizeof(SKillQueryStmt), &pNode);
break;
case QUERY_NODE_KILL_TRANSACTION_STMT:
case QUERY_NODE_KILL_CONNECTION_STMT:
case QUERY_NODE_KILL_COMPACT_STMT:
code = makeNode(type, sizeof(SKillStmt), &pNode); break;
code = makeNode(type, sizeof(SKillStmt), &pNode);
break;
case QUERY_NODE_DELETE_STMT:
code = makeNode(type, sizeof(SDeleteStmt), &pNode); break;
code = makeNode(type, sizeof(SDeleteStmt), &pNode);
break;
case QUERY_NODE_INSERT_STMT:
code = makeNode(type, sizeof(SInsertStmt), &pNode); break;
code = makeNode(type, sizeof(SInsertStmt), &pNode);
break;
case QUERY_NODE_QUERY:
code = makeNode(type, sizeof(SQuery), &pNode); break;
code = makeNode(type, sizeof(SQuery), &pNode);
break;
case QUERY_NODE_RESTORE_DNODE_STMT:
case QUERY_NODE_RESTORE_QNODE_STMT:
case QUERY_NODE_RESTORE_MNODE_STMT:
case QUERY_NODE_RESTORE_VNODE_STMT:
code = makeNode(type, sizeof(SRestoreComponentNodeStmt), &pNode); break;
code = makeNode(type, sizeof(SRestoreComponentNodeStmt), &pNode);
break;
case QUERY_NODE_CREATE_VIEW_STMT:
code = makeNode(type, sizeof(SCreateViewStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateViewStmt), &pNode);
break;
case QUERY_NODE_DROP_VIEW_STMT:
code = makeNode(type, sizeof(SDropViewStmt), &pNode); break;
code = makeNode(type, sizeof(SDropViewStmt), &pNode);
break;
case QUERY_NODE_CREATE_TSMA_STMT:
code = makeNode(type, sizeof(SCreateTSMAStmt), &pNode); break;
code = makeNode(type, sizeof(SCreateTSMAStmt), &pNode);
break;
case QUERY_NODE_DROP_TSMA_STMT:
code = makeNode(type, sizeof(SDropTSMAStmt), &pNode); break;
code = makeNode(type, sizeof(SDropTSMAStmt), &pNode);
break;
case QUERY_NODE_TSMA_OPTIONS:
code = makeNode(type, sizeof(STSMAOptions), &pNode); break;
code = makeNode(type, sizeof(STSMAOptions), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_SCAN:
code = makeNode(type, sizeof(SScanLogicNode), &pNode); break;
code = makeNode(type, sizeof(SScanLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
code = makeNode(type, sizeof(SJoinLogicNode), &pNode); break;
code = makeNode(type, sizeof(SJoinLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_AGG:
code = makeNode(type, sizeof(SAggLogicNode), &pNode); break;
code = makeNode(type, sizeof(SAggLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_PROJECT:
code = makeNode(type, sizeof(SProjectLogicNode), &pNode); break;
code = makeNode(type, sizeof(SProjectLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY:
code = makeNode(type, sizeof(SVnodeModifyLogicNode), &pNode); break;
code = makeNode(type, sizeof(SVnodeModifyLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
code = makeNode(type, sizeof(SExchangeLogicNode), &pNode); break;
code = makeNode(type, sizeof(SExchangeLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_MERGE:
code = makeNode(type, sizeof(SMergeLogicNode), &pNode); break;
code = makeNode(type, sizeof(SMergeLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
code = makeNode(type, sizeof(SWindowLogicNode), &pNode); break;
code = makeNode(type, sizeof(SWindowLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_FILL:
code = makeNode(type, sizeof(SFillLogicNode), &pNode); break;
code = makeNode(type, sizeof(SFillLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_SORT:
code = makeNode(type, sizeof(SSortLogicNode), &pNode); break;
code = makeNode(type, sizeof(SSortLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = makeNode(type, sizeof(SPartitionLogicNode), &pNode); break;
code = makeNode(type, sizeof(SPartitionLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
code = makeNode(type, sizeof(SIndefRowsFuncLogicNode), &pNode); break;
code = makeNode(type, sizeof(SIndefRowsFuncLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
code = makeNode(type, sizeof(SInterpFuncLogicNode), &pNode); break;
code = makeNode(type, sizeof(SInterpFuncLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
code = makeNode(type, sizeof(SForecastFuncLogicNode), &pNode); break;
code = makeNode(type, sizeof(SForecastFuncLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
code = makeNode(type, sizeof(SGroupCacheLogicNode), &pNode); break;
code = makeNode(type, sizeof(SGroupCacheLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
code = makeNode(type, sizeof(SDynQueryCtrlLogicNode), &pNode); break;
code = makeNode(type, sizeof(SDynQueryCtrlLogicNode), &pNode);
break;
case QUERY_NODE_LOGIC_SUBPLAN:
code = makeNode(type, sizeof(SLogicSubplan), &pNode); break;
code = makeNode(type, sizeof(SLogicSubplan), &pNode);
break;
case QUERY_NODE_LOGIC_PLAN:
code = makeNode(type, sizeof(SQueryLogicPlan), &pNode); break;
code = makeNode(type, sizeof(SQueryLogicPlan), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
code = makeNode(type, sizeof(STagScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(STagScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
code = makeNode(type, sizeof(STableScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(STableScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
code = makeNode(type, sizeof(STableSeqScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(STableSeqScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
code = makeNode(type, sizeof(STableMergeScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(STableMergeScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
code = makeNode(type, sizeof(SStreamScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
code = makeNode(type, sizeof(SSystemTableScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SSystemTableScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
code = makeNode(type, sizeof(SBlockDistScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SBlockDistScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
code = makeNode(type, sizeof(SLastRowScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SLastRowScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
code = makeNode(type, sizeof(STableCountScanPhysiNode), &pNode); break;
code = makeNode(type, sizeof(STableCountScanPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
code = makeNode(type, sizeof(SProjectPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SProjectPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
code = makeNode(type, sizeof(SSortMergeJoinPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SSortMergeJoinPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN:
code = makeNode(type, sizeof(SHashJoinPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SHashJoinPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
code = makeNode(type, sizeof(SAggPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SAggPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
code = makeNode(type, sizeof(SExchangePhysiNode), &pNode); break;
code = makeNode(type, sizeof(SExchangePhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
code = makeNode(type, sizeof(SMergePhysiNode), &pNode); break;
code = makeNode(type, sizeof(SMergePhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_SORT:
code = makeNode(type, sizeof(SSortPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SSortPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT:
code = makeNode(type, sizeof(SGroupSortPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SGroupSortPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
code = makeNode(type, sizeof(SIntervalPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SIntervalPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
code = makeNode(type, sizeof(SMergeIntervalPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SMergeIntervalPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
code = makeNode(type, sizeof(SMergeAlignedIntervalPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SMergeAlignedIntervalPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
code = makeNode(type, sizeof(SStreamIntervalPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamIntervalPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
code = makeNode(type, sizeof(SStreamFinalIntervalPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamFinalIntervalPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
code = makeNode(type, sizeof(SStreamSemiIntervalPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamSemiIntervalPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
code = makeNode(type, sizeof(SStreamMidIntervalPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamMidIntervalPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
code = makeNode(type, sizeof(SFillPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SFillPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
code = makeNode(type, sizeof(SSessionWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SSessionWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
code = makeNode(type, sizeof(SStreamSessionWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamSessionWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
code = makeNode(type, sizeof(SStreamSemiSessionWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamSemiSessionWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
code = makeNode(type, sizeof(SStreamFinalSessionWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamFinalSessionWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
code = makeNode(type, sizeof(SStateWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStateWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
code = makeNode(type, sizeof(SStreamStateWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamStateWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
code = makeNode(type, sizeof(SEventWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SEventWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
code = makeNode(type, sizeof(SStreamEventWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamEventWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT:
code = makeNode(type, sizeof(SCountWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SCountWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
code = makeNode(type, sizeof(SAnomalyWindowPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SAnomalyWindowPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
code = makeNode(type, sizeof(SStreamCountWinodwPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamCountWinodwPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = makeNode(type, sizeof(SPartitionPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SPartitionPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
code = makeNode(type, sizeof(SStreamPartitionPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamPartitionPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
code = makeNode(type, sizeof(SIndefRowsFuncPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SIndefRowsFuncPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
code = makeNode(type, sizeof(SInterpFuncLogicNode), &pNode); break;
code = makeNode(type, sizeof(SInterpFuncLogicNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
code = makeNode(type, sizeof(SForecastFuncLogicNode), &pNode); break;
code = makeNode(type, sizeof(SForecastFuncLogicNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = makeNode(type, sizeof(SDataDispatcherNode), &pNode); break;
code = makeNode(type, sizeof(SDataDispatcherNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
code = makeNode(type, sizeof(SDataInserterNode), &pNode); break;
code = makeNode(type, sizeof(SDataInserterNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
code = makeNode(type, sizeof(SQueryInserterNode), &pNode); break;
code = makeNode(type, sizeof(SQueryInserterNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
code = makeNode(type, sizeof(SDataDeleterNode), &pNode); break;
code = makeNode(type, sizeof(SDataDeleterNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:
code = makeNode(type, sizeof(SGroupCachePhysiNode), &pNode); break;
code = makeNode(type, sizeof(SGroupCachePhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL:
code = makeNode(type, sizeof(SDynQueryCtrlPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SDynQueryCtrlPhysiNode), &pNode);
break;
case QUERY_NODE_PHYSICAL_SUBPLAN:
code = makeNode(type, sizeof(SSubplan), &pNode); break;
code = makeNode(type, sizeof(SSubplan), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN:
code = makeNode(type, sizeof(SQueryPlan), &pNode); break;
code = makeNode(type, sizeof(SQueryPlan), &pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
code = makeNode(type, sizeof(SStreamInterpFuncPhysiNode), &pNode); break;
code = makeNode(type, sizeof(SStreamInterpFuncPhysiNode), &pNode);
break;
default:
break;
}
@ -972,6 +1156,9 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode((SNode*)pOptions->s3KeepLocalStr);
nodesDestroyList(pOptions->pKeep);
nodesDestroyList(pOptions->pRetentions);
nodesDestroyNode((SNode*)pOptions->pCompactIntervalNode);
nodesDestroyList(pOptions->pCompactTimeRangeList);
nodesDestroyNode((SNode*)pOptions->pCompactTimeOffsetNode);
break;
}
case QUERY_NODE_TABLE_OPTIONS: {
@ -1901,7 +2088,6 @@ int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc) {
return code;
}
int32_t nodesListMakeStrictAppendList(SNodeList** pTarget, SNodeList* pSrc) {
if (NULL == *pTarget) {
int32_t code = nodesMakeList(pTarget);
@ -2360,7 +2546,8 @@ static EDealRes doCollect(SCollectColumnsCxt* pCxt, SColumnNode* pCol, SNode* pN
static bool isCollectType(ECollectColType collectType, EColumnType colType) {
return COLLECT_COL_TYPE_ALL == collectType
? true
: (COLLECT_COL_TYPE_TAG == collectType ? COLUMN_TYPE_TAG == colType : (COLUMN_TYPE_TAG != colType && COLUMN_TYPE_TBNAME != colType));
: (COLLECT_COL_TYPE_TAG == collectType ? COLUMN_TYPE_TAG == colType
: (COLUMN_TYPE_TAG != colType && COLUMN_TYPE_TBNAME != colType));
}
static EDealRes collectColumns(SNode* pNode, void* pContext) {
@ -2380,7 +2567,9 @@ static EDealRes collectColumnsExt(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode;
if (isCollectType(pCxt->collectType, pCol->colType) && 0 != strcmp(pCol->colName, "*") &&
(NULL == pCxt->pMultiTableAlias || NULL != (pCxt->pTableAlias = tSimpleHashGet(pCxt->pMultiTableAlias, pCol->tableAlias, strlen(pCol->tableAlias))))) {
(NULL == pCxt->pMultiTableAlias ||
NULL != (pCxt->pTableAlias =
tSimpleHashGet(pCxt->pMultiTableAlias, pCol->tableAlias, strlen(pCol->tableAlias))))) {
return doCollect(pCxt, pCol, pNode);
}
}
@ -2392,7 +2581,7 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
if (NULL == pSelect || NULL == pCols) {
return TSDB_CODE_FAILED;
}
SNodeList * pList = NULL;
SNodeList* pList = NULL;
if (!*pCols) {
int32_t code = nodesMakeList(&pList);
if (TSDB_CODE_SUCCESS != code) {
@ -2424,13 +2613,13 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
return TSDB_CODE_SUCCESS;
}
int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, SSHashObj* pMultiTableAlias, ECollectColType type,
SNodeList** pCols) {
int32_t nodesCollectColumnsExt(SSelectStmt* pSelect, ESqlClause clause, SSHashObj* pMultiTableAlias,
ECollectColType type, SNodeList** pCols) {
if (NULL == pSelect || NULL == pCols) {
return TSDB_CODE_FAILED;
}
SNodeList * pList = NULL;
SNodeList* pList = NULL;
if (!*pCols) {
int32_t code = nodesMakeList(&pList);
if (TSDB_CODE_SUCCESS != code) {
@ -2468,7 +2657,7 @@ int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, EColle
if (NULL == pCols) {
return TSDB_CODE_FAILED;
}
SNodeList * pList = NULL;
SNodeList* pList = NULL;
if (!*pCols) {
int32_t code = nodesMakeList(&pList);
if (TSDB_CODE_SUCCESS != code) {
@ -2553,21 +2742,21 @@ static int32_t funcNodeEqual(const void* pLeft, const void* pRight, size_t len)
return nodesEqualNode(*(const SNode**)pLeft, *(const SNode**)pRight) ? 0 : 1;
}
int32_t nodesCollectSelectFuncs(SSelectStmt* pSelect, ESqlClause clause, char* tableAlias, FFuncClassifier classifier, SNodeList* pFuncs) {
int32_t nodesCollectSelectFuncs(SSelectStmt* pSelect, ESqlClause clause, char* tableAlias, FFuncClassifier classifier,
SNodeList* pFuncs) {
if (NULL == pSelect || NULL == pFuncs) {
return TSDB_CODE_FAILED;
}
SCollectFuncsCxt cxt = {.errCode = TSDB_CODE_SUCCESS,
.classifier = classifier,
.tableAlias = tableAlias,
.pFuncs = pFuncs};
SCollectFuncsCxt cxt = {
.errCode = TSDB_CODE_SUCCESS, .classifier = classifier, .tableAlias = tableAlias, .pFuncs = pFuncs};
nodesWalkSelectStmt(pSelect, clause, collectFuncs, &cxt);
return cxt.errCode;
}
int32_t nodesCollectFuncs(SSelectStmt* pSelect, ESqlClause clause, char* tableAlias, FFuncClassifier classifier, SNodeList** pFuncs) {
int32_t nodesCollectFuncs(SSelectStmt* pSelect, ESqlClause clause, char* tableAlias, FFuncClassifier classifier,
SNodeList** pFuncs) {
if (NULL == pSelect || NULL == pFuncs) {
return TSDB_CODE_FAILED;
}

View File

@ -70,6 +70,9 @@ typedef enum EDatabaseOptionType {
DB_OPTION_KEEP_TIME_OFFSET,
DB_OPTION_ENCRYPT_ALGORITHM,
DB_OPTION_DNODES,
DB_OPTION_COMPACT_INTERVAL,
DB_OPTION_COMPACT_TIME_RANGE,
DB_OPTION_COMPACT_TIME_OFFSET,
} EDatabaseOptionType;
typedef enum ETableOptionType {

View File

@ -288,6 +288,9 @@ db_options(A) ::= db_options(B) S3_COMPACT NK_INTEGER(C).
db_options(A) ::= db_options(B) KEEP_TIME_OFFSET NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_KEEP_TIME_OFFSET, &C); }
db_options(A) ::= db_options(B) ENCRYPT_ALGORITHM NK_STRING(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_ENCRYPT_ALGORITHM, &C); }
db_options(A) ::= db_options(B) DNODES NK_STRING(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DNODES, &C); }
db_options(A) ::= db_options(B) COMPACT_INTERVAL NK_VARIABLE(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_COMPACT_INTERVAL, &C); }
db_options(A) ::= db_options(B) COMPACT_TIME_RANGE variable_list(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_COMPACT_TIME_RANGE, &C); }
db_options(A) ::= db_options(B) COMPACT_TIME_OFFSET NK_VARIABLE(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_COMPACT_TIME_OFFSET, &C); }
alter_db_options(A) ::= alter_db_option(B). { A = createAlterDatabaseOptions(pCxt); A = setAlterDatabaseOption(pCxt, A, &B); }
alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setAlterDatabaseOption(pCxt, B, &C); }
@ -323,6 +326,9 @@ alter_db_option(A) ::= S3_KEEPLOCAL NK_VARIABLE(B).
alter_db_option(A) ::= S3_COMPACT NK_INTEGER(B). { A.type = DB_OPTION_S3_COMPACT, A.val = B; }
alter_db_option(A) ::= KEEP_TIME_OFFSET NK_INTEGER(B). { A.type = DB_OPTION_KEEP_TIME_OFFSET; A.val = B; }
alter_db_option(A) ::= ENCRYPT_ALGORITHM NK_STRING(B). { A.type = DB_OPTION_ENCRYPT_ALGORITHM; A.val = B; }
alter_db_option(A) ::= COMPACT_INTERVAL NK_VARIABLE(B). { A.type = DB_OPTION_COMPACT_INTERVAL; A.val = B; }
alter_db_option(A) ::= COMPACT_TIME_RANGE variable_list(B). { A.type = DB_OPTION_COMPACT_TIME_RANGE; A.pList = B; }
alter_db_option(A) ::= COMPACT_TIME_OFFSET NK_VARIABLE(B). { A.type = DB_OPTION_COMPACT_TIME_OFFSET; A.val = B; }
%type integer_list { SNodeList* }
%destructor integer_list { nodesDestroyList($$); }

View File

@ -1800,6 +1800,10 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->withArbitrator = TSDB_DEFAULT_DB_WITH_ARBITRATOR;
pOptions->encryptAlgorithm = TSDB_DEFAULT_ENCRYPT_ALGO;
pOptions->dnodeListStr[0] = 0;
pOptions->compactInterval = TSDB_DEFAULT_COMPACT_INTERVAL;
pOptions->compactStartTime = TSDB_DEFAULT_COMPACT_START_TIME;
pOptions->compactEndTime = TSDB_DEFAULT_COMPACT_END_TIME;
pOptions->compactTimeOffset = TSDB_DEFAULT_COMPACT_TIME_OFFSET;
return (SNode*)pOptions;
_err:
return NULL;
@ -1844,6 +1848,10 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->withArbitrator = -1;
pOptions->encryptAlgorithm = -1;
pOptions->dnodeListStr[0] = 0;
pOptions->compactInterval = TSDB_DEFAULT_COMPACT_INTERVAL;
pOptions->compactStartTime = TSDB_DEFAULT_COMPACT_START_TIME;
pOptions->compactEndTime = TSDB_DEFAULT_COMPACT_END_TIME;
pOptions->compactTimeOffset = TSDB_DEFAULT_COMPACT_TIME_OFFSET;
return (SNode*)pOptions;
_err:
return NULL;
@ -1991,6 +1999,17 @@ static SNode* setDatabaseOptionImpl(SAstCreateContext* pCxt, SNode* pOptions, ED
} else {
COPY_STRING_FORM_STR_TOKEN(pDbOptions->dnodeListStr, (SToken*)pVal);
}
break;
case DB_OPTION_COMPACT_INTERVAL:
pDbOptions->pCompactIntervalNode = (SValueNode*)createDurationValueNode(pCxt, (SToken*)pVal);
break;
case DB_OPTION_COMPACT_TIME_RANGE:
pDbOptions->pCompactTimeRangeList = pVal;
break;
case DB_OPTION_COMPACT_TIME_OFFSET:
pDbOptions->pCompactTimeOffsetNode = (SValueNode*)createDurationValueNode(pCxt, (SToken*)pVal);
;
break;
default:
break;
}

View File

@ -1853,7 +1853,7 @@ static bool clauseSupportAlias(ESqlClause clause) {
return SQL_CLAUSE_GROUP_BY == clause || SQL_CLAUSE_PARTITION_BY == clause || SQL_CLAUSE_ORDER_BY == clause;
}
static EDealRes translateColumnInGroupByClause(STranslateContext* pCxt, SColumnNode** pCol, bool *translateAsAlias) {
static EDealRes translateColumnInGroupByClause(STranslateContext* pCxt, SColumnNode** pCol, bool* translateAsAlias) {
*translateAsAlias = false;
// count(*)/first(*)/last(*) and so on
if (0 == strcmp((*pCol)->colName, "*")) {
@ -1876,9 +1876,8 @@ static EDealRes translateColumnInGroupByClause(STranslateContext* pCxt, SColumnN
} else {
bool found = false;
res = translateColumnWithoutPrefix(pCxt, pCol);
if (!(*pCol)->node.asParam &&
res != DEAL_RES_CONTINUE &&
res != DEAL_RES_END && pCxt->errCode != TSDB_CODE_PAR_AMBIGUOUS_COLUMN) {
if (!(*pCol)->node.asParam && res != DEAL_RES_CONTINUE && res != DEAL_RES_END &&
pCxt->errCode != TSDB_CODE_PAR_AMBIGUOUS_COLUMN) {
res = translateColumnUseAlias(pCxt, pCol, &found);
*translateAsAlias = true;
}
@ -3321,9 +3320,11 @@ static int32_t selectCommonType(SDataType* commonType, const SDataType* newType)
return TSDB_CODE_SUCCESS;
}
if ((resultType == TSDB_DATA_TYPE_VARCHAR) && (IS_MATHABLE_TYPE(commonType->type) || IS_MATHABLE_TYPE(newType->type))) {
if ((resultType == TSDB_DATA_TYPE_VARCHAR) &&
(IS_MATHABLE_TYPE(commonType->type) || IS_MATHABLE_TYPE(newType->type))) {
commonType->bytes = TMAX(TMAX(commonType->bytes, newType->bytes), QUERY_NUMBER_MAX_DISPLAY_LEN);
} else if ((resultType == TSDB_DATA_TYPE_NCHAR) && (IS_MATHABLE_TYPE(commonType->type) || IS_MATHABLE_TYPE(newType->type))) {
} else if ((resultType == TSDB_DATA_TYPE_NCHAR) &&
(IS_MATHABLE_TYPE(commonType->type) || IS_MATHABLE_TYPE(newType->type))) {
commonType->bytes = TMAX(TMAX(commonType->bytes, newType->bytes), QUERY_NUMBER_MAX_DISPLAY_LEN * TSDB_NCHAR_SIZE);
} else {
commonType->bytes = TMAX(TMAX(commonType->bytes, newType->bytes), TYPE_BYTES[resultType]);
@ -5480,7 +5481,7 @@ static EDealRes translateGroupPartitionByImpl(SNode** pNode, void* pContext) {
int32_t code = TSDB_CODE_SUCCESS;
STranslateContext* pTransCxt = pCxt->pTranslateCxt;
if (QUERY_NODE_VALUE == nodeType(*pNode)) {
SValueNode* pVal = (SValueNode*) *pNode;
SValueNode* pVal = (SValueNode*)*pNode;
if (DEAL_RES_ERROR == translateValue(pTransCxt, pVal)) {
return DEAL_RES_CONTINUE;
}
@ -5528,8 +5529,7 @@ static int32_t translateGroupByList(STranslateContext* pCxt, SSelectStmt* pSelec
if (NULL == pSelect->pGroupByList) {
return TSDB_CODE_SUCCESS;
}
SReplaceGroupByAliasCxt cxt = {
.pTranslateCxt = pCxt, .pProjectionList = pSelect->pProjectionList};
SReplaceGroupByAliasCxt cxt = {.pTranslateCxt = pCxt, .pProjectionList = pSelect->pProjectionList};
nodesRewriteExprsPostOrder(pSelect->pGroupByList, translateGroupPartitionByImpl, &cxt);
return pCxt->errCode;
@ -5540,8 +5540,7 @@ static int32_t translatePartitionByList(STranslateContext* pCxt, SSelectStmt* pS
return TSDB_CODE_SUCCESS;
}
SReplaceGroupByAliasCxt cxt = {
.pTranslateCxt = pCxt, .pProjectionList = pSelect->pProjectionList};
SReplaceGroupByAliasCxt cxt = {.pTranslateCxt = pCxt, .pProjectionList = pSelect->pProjectionList};
nodesRewriteExprsPostOrder(pSelect->pPartitionByList, translateGroupPartitionByImpl, &cxt);
return pCxt->errCode;
@ -7559,6 +7558,12 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq->encryptAlgorithm = pStmt->pOptions->encryptAlgorithm;
tstrncpy(pReq->dnodeListStr, pStmt->pOptions->dnodeListStr, TSDB_DNODE_LIST_LEN);
// auto-compact options
pReq->compactInterval = pStmt->pOptions->compactInterval;
pReq->compactStartTime = pStmt->pOptions->compactStartTime;
pReq->compactEndTime = pStmt->pOptions->compactEndTime;
pReq->compactTimeOffset = pStmt->pOptions->compactTimeOffset;
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
}
@ -7922,6 +7927,79 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbCompactIntervalOption(STranslateContext* pCxt, SDatabaseOptions* pOptions) {
if (NULL == pOptions->pCompactIntervalNode) return TSDB_CODE_SUCCESS;
if (DEAL_RES_ERROR == translateValue(pCxt, pOptions->pCompactIntervalNode)) {
return pCxt->errCode;
}
if (TIME_UNIT_MINUTE != pOptions->pCompactIntervalNode->unit &&
TIME_UNIT_HOUR != pOptions->pCompactIntervalNode->unit) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option compact_interval unit: %c, only %c, %c allowed",
pOptions->pCompactIntervalNode->unit, TIME_UNIT_MINUTE, TIME_UNIT_HOUR);
}
pOptions->compactInterval = getBigintFromValueNode(pOptions->pCompactIntervalNode);
// TODO: check the semantic of compact_interval
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbCompactTimeRangeOption(STranslateContext* pCxt, SDatabaseOptions* pOptions) {
if (NULL == pOptions->pCompactTimeRangeList) {
return TSDB_CODE_SUCCESS;
}
if (LIST_LENGTH(pOptions->pCompactTimeRangeList) != 2) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option compact_time_range, should have 2 values");
}
SValueNode* pStart = (SValueNode*)nodesListGetNode(pOptions->pCompactTimeRangeList, 0);
SValueNode* pEnd = (SValueNode*)nodesListGetNode(pOptions->pCompactTimeRangeList, 1);
if (DEAL_RES_ERROR == translateValue(pCxt, pStart)) {
return pCxt->errCode;
}
if (DEAL_RES_ERROR == translateValue(pCxt, pEnd)) {
return pCxt->errCode;
}
if (TIME_UNIT_MINUTE != pStart->unit && TIME_UNIT_HOUR != pStart->unit && TIME_UNIT_DAY != pStart->unit) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option compact_time_range start unit: %c, only %c, %c, %c allowed",
pStart->unit, TIME_UNIT_MINUTE, TIME_UNIT_HOUR, TIME_UNIT_DAY);
}
if (TIME_UNIT_MINUTE != pEnd->unit && TIME_UNIT_HOUR != pEnd->unit && TIME_UNIT_DAY != pEnd->unit) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option compact_time_range end unit: %c, only %c, %c, %c allowed",
pEnd->unit, TIME_UNIT_MINUTE, TIME_UNIT_HOUR, TIME_UNIT_DAY);
}
pOptions->compactStartTime = getBigintFromValueNode(pStart);
pOptions->compactEndTime = getBigintFromValueNode(pEnd);
// TODO: check the semantic of compact_time_range
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbCompactTimeOffsetOption(STranslateContext* pCxt, SDatabaseOptions* pOptions) {
if (NULL == pOptions->pCompactTimeOffsetNode) {
return TSDB_CODE_SUCCESS;
}
if (DEAL_RES_ERROR == translateValue(pCxt, pOptions->pCompactTimeOffsetNode)) {
return pCxt->errCode;
}
if (TIME_UNIT_MINUTE != pOptions->pCompactTimeOffsetNode->unit &&
TIME_UNIT_HOUR != pOptions->pCompactTimeOffsetNode->unit) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option compact_time_offset unit: %c, only %c, %c allowed",
pOptions->pCompactTimeOffsetNode->unit, TIME_UNIT_MINUTE, TIME_UNIT_HOUR);
}
pOptions->compactTimeOffset = getBigintFromValueNode(pOptions->pCompactTimeOffsetNode);
// TODO: check the semantic of compact_time_offset
return TSDB_CODE_SUCCESS;
}
static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions) {
int32_t code =
checkDbRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE);
@ -8038,6 +8116,15 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "s3_compact", pOptions->s3Compact, TSDB_MIN_S3_COMPACT, TSDB_MAX_S3_COMPACT);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbCompactIntervalOption(pCxt, pOptions);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbCompactTimeRangeOption(pCxt, pOptions);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbCompactTimeOffsetOption(pCxt, pOptions);
}
return code;
}
@ -8274,6 +8361,10 @@ static int32_t buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStm
pReq->s3KeepLocal = pStmt->pOptions->s3KeepLocal;
pReq->s3Compact = pStmt->pOptions->s3Compact;
pReq->withArbitrator = pStmt->pOptions->withArbitrator;
pReq->compactInterval = pStmt->pOptions->compactInterval;
pReq->compactStartTime = pStmt->pOptions->compactStartTime;
pReq->compactEndTime = pStmt->pOptions->compactEndTime;
pReq->compactTimeOffset = pStmt->pOptions->compactTimeOffset;
return code;
}
@ -10601,7 +10692,8 @@ static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
(void)tNameGetFullDbName(&name, pDbFName);
}
static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* aliasName, int32_t len, char* defaultName[]) {
static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* aliasName, int32_t len,
char* defaultName[]) {
for (int32_t i = 0; defaultName[i] != NULL; i++) {
if (NULL == taosHashGet(pUserAliasSet, defaultName[i], strlen(defaultName[i]))) {
snprintf(aliasName, len, "%s", defaultName[i]);
@ -10627,8 +10719,8 @@ static int32_t setColumnDefNodePrimaryKey(SColumnDefNode* pNode, bool isPk) {
return code;
}
static int32_t addIrowTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect,
SHashObj* pUserAliasSet, SNodeList* pCols, SCMCreateStreamReq* pReq) {
static int32_t addIrowTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect, SHashObj* pUserAliasSet,
SNodeList* pCols, SCMCreateStreamReq* pReq) {
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
if (!pSelect->hasInterpFunc ||
(QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_irowts", ((SFunctionNode*)pProj)->functionName))) {
@ -11070,20 +11162,17 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (pStmt->pOptions->fillHistory) {
return generateSyntaxErrMsgExt(
&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream unsupported Fill history");
}
if (pStmt->pOptions->ignoreExpired != 1) {
return generateSyntaxErrMsgExt(
&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream must not set ignore expired 0");
}
if (pStmt->pOptions->ignoreUpdate != 1) {
return generateSyntaxErrMsgExt(
&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream must not set ignore update 0");
}