Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
2ece388a02
|
@ -177,6 +177,7 @@ typedef struct SField {
|
||||||
char name[TSDB_COL_NAME_LEN];
|
char name[TSDB_COL_NAME_LEN];
|
||||||
uint8_t type;
|
uint8_t type;
|
||||||
int32_t bytes;
|
int32_t bytes;
|
||||||
|
int8_t flags;
|
||||||
} SField;
|
} SField;
|
||||||
|
|
||||||
typedef struct SRetention {
|
typedef struct SRetention {
|
||||||
|
@ -302,13 +303,11 @@ typedef struct {
|
||||||
int32_t ttl;
|
int32_t ttl;
|
||||||
int32_t numOfColumns;
|
int32_t numOfColumns;
|
||||||
int32_t numOfTags;
|
int32_t numOfTags;
|
||||||
int32_t numOfSmas;
|
|
||||||
int32_t commentLen;
|
int32_t commentLen;
|
||||||
int32_t ast1Len;
|
int32_t ast1Len;
|
||||||
int32_t ast2Len;
|
int32_t ast2Len;
|
||||||
SArray* pColumns; // array of SField
|
SArray* pColumns; // array of SField
|
||||||
SArray* pTags; // array of SField
|
SArray* pTags; // array of SField
|
||||||
SArray* pSmas; // array of SField
|
|
||||||
char* comment;
|
char* comment;
|
||||||
char* pAst1;
|
char* pAst1;
|
||||||
char* pAst2;
|
char* pAst2;
|
||||||
|
|
|
@ -20,8 +20,8 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "querynodes.h"
|
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
|
||||||
typedef enum EFunctionType {
|
typedef enum EFunctionType {
|
||||||
// aggregate function
|
// aggregate function
|
||||||
|
@ -123,7 +123,7 @@ struct SCatalog;
|
||||||
|
|
||||||
typedef struct SFmGetFuncInfoParam {
|
typedef struct SFmGetFuncInfoParam {
|
||||||
struct SCatalog* pCtg;
|
struct SCatalog* pCtg;
|
||||||
void *pRpc;
|
void* pRpc;
|
||||||
const SEpSet* pMgmtEps;
|
const SEpSet* pMgmtEps;
|
||||||
char* pErrBuf;
|
char* pErrBuf;
|
||||||
int32_t errBufLen;
|
int32_t errBufLen;
|
||||||
|
@ -143,6 +143,7 @@ bool fmIsDatetimeFunc(int32_t funcId);
|
||||||
bool fmIsTimelineFunc(int32_t funcId);
|
bool fmIsTimelineFunc(int32_t funcId);
|
||||||
bool fmIsTimeorderFunc(int32_t funcId);
|
bool fmIsTimeorderFunc(int32_t funcId);
|
||||||
bool fmIsPseudoColumnFunc(int32_t funcId);
|
bool fmIsPseudoColumnFunc(int32_t funcId);
|
||||||
|
bool fmIsScanPseudoColumnFunc(int32_t funcId);
|
||||||
bool fmIsWindowPseudoColumnFunc(int32_t funcId);
|
bool fmIsWindowPseudoColumnFunc(int32_t funcId);
|
||||||
bool fmIsWindowClauseFunc(int32_t funcId);
|
bool fmIsWindowClauseFunc(int32_t funcId);
|
||||||
bool fmIsSpecialDataRequiredFunc(int32_t funcId);
|
bool fmIsSpecialDataRequiredFunc(int32_t funcId);
|
||||||
|
|
|
@ -20,8 +20,8 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "querynodes.h"
|
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "querynodes.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
typedef struct SLogicNode {
|
typedef struct SLogicNode {
|
||||||
|
@ -33,16 +33,12 @@ typedef struct SLogicNode {
|
||||||
int32_t optimizedFlag;
|
int32_t optimizedFlag;
|
||||||
} SLogicNode;
|
} SLogicNode;
|
||||||
|
|
||||||
typedef enum EScanType {
|
typedef enum EScanType { SCAN_TYPE_TAG = 1, SCAN_TYPE_TABLE, SCAN_TYPE_SYSTEM_TABLE, SCAN_TYPE_STREAM } EScanType;
|
||||||
SCAN_TYPE_TAG,
|
|
||||||
SCAN_TYPE_TABLE,
|
|
||||||
SCAN_TYPE_SYSTEM_TABLE,
|
|
||||||
SCAN_TYPE_STREAM
|
|
||||||
} EScanType;
|
|
||||||
|
|
||||||
typedef struct SScanLogicNode {
|
typedef struct SScanLogicNode {
|
||||||
SLogicNode node;
|
SLogicNode node;
|
||||||
SNodeList* pScanCols;
|
SNodeList* pScanCols;
|
||||||
|
SNodeList* pScanPseudoCols;
|
||||||
struct STableMeta* pMeta;
|
struct STableMeta* pMeta;
|
||||||
SVgroupsInfo* pVgroupList;
|
SVgroupsInfo* pVgroupList;
|
||||||
EScanType scanType;
|
EScanType scanType;
|
||||||
|
@ -95,11 +91,7 @@ typedef struct SExchangeLogicNode {
|
||||||
uint8_t precision;
|
uint8_t precision;
|
||||||
} SExchangeLogicNode;
|
} SExchangeLogicNode;
|
||||||
|
|
||||||
typedef enum EWindowType {
|
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
|
||||||
WINDOW_TYPE_INTERVAL = 1,
|
|
||||||
WINDOW_TYPE_SESSION,
|
|
||||||
WINDOW_TYPE_STATE
|
|
||||||
} EWindowType;
|
|
||||||
|
|
||||||
typedef struct SWindowLogicNode {
|
typedef struct SWindowLogicNode {
|
||||||
SLogicNode node;
|
SLogicNode node;
|
||||||
|
@ -187,6 +179,7 @@ typedef struct SPhysiNode {
|
||||||
typedef struct SScanPhysiNode {
|
typedef struct SScanPhysiNode {
|
||||||
SPhysiNode node;
|
SPhysiNode node;
|
||||||
SNodeList* pScanCols;
|
SNodeList* pScanCols;
|
||||||
|
SNodeList* pScanPseudoCols;
|
||||||
uint64_t uid; // unique id of the table
|
uint64_t uid; // unique id of the table
|
||||||
int8_t tableType;
|
int8_t tableType;
|
||||||
SName tableName;
|
SName tableName;
|
||||||
|
@ -315,7 +308,7 @@ typedef struct SDataInserterNode {
|
||||||
SDataSinkNode sink;
|
SDataSinkNode sink;
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
uint32_t size;
|
uint32_t size;
|
||||||
char *pData;
|
char* pData;
|
||||||
} SDataInserterNode;
|
} SDataInserterNode;
|
||||||
|
|
||||||
typedef struct SSubplan {
|
typedef struct SSubplan {
|
||||||
|
@ -333,11 +326,7 @@ typedef struct SSubplan {
|
||||||
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
|
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
|
||||||
} SSubplan;
|
} SSubplan;
|
||||||
|
|
||||||
typedef enum EExplainMode {
|
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;
|
||||||
EXPLAIN_MODE_DISABLE = 1,
|
|
||||||
EXPLAIN_MODE_STATIC,
|
|
||||||
EXPLAIN_MODE_ANALYZE
|
|
||||||
} EExplainMode;
|
|
||||||
|
|
||||||
typedef struct SExplainInfo {
|
typedef struct SExplainInfo {
|
||||||
EExplainMode mode;
|
EExplainMode mode;
|
||||||
|
|
|
@ -293,7 +293,10 @@ typedef struct SExplainStmt {
|
||||||
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
|
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
|
||||||
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext);
|
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext);
|
||||||
|
|
||||||
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, SNodeList** pCols);
|
typedef enum ECollectColType { COLLECT_COL_TYPE_COL = 1, COLLECT_COL_TYPE_TAG, COLLECT_COL_TYPE_ALL } ECollectColType;
|
||||||
|
|
||||||
|
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type,
|
||||||
|
SNodeList** pCols);
|
||||||
|
|
||||||
typedef bool (*FFuncClassifier)(int32_t funcId);
|
typedef bool (*FFuncClassifier)(int32_t funcId);
|
||||||
int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs);
|
int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs);
|
||||||
|
|
|
@ -311,6 +311,8 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
|
||||||
memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
|
memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pColumnInfoData->hasNull = pSource->hasNull;
|
||||||
|
pColumnInfoData->info = pSource->info;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -517,7 +517,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
||||||
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->numOfSmas) < 0) return -1;
|
|
||||||
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->ast1Len) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->ast1Len) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->ast2Len) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->ast2Len) < 0) return -1;
|
||||||
|
@ -527,6 +526,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
||||||
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
|
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
|
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
|
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
|
||||||
|
@ -534,13 +534,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
||||||
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
|
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
|
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
||||||
}
|
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfSmas; ++i) {
|
|
||||||
SField *pField = taosArrayGet(pReq->pSmas, i);
|
|
||||||
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
|
|
||||||
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
|
|
||||||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReq->commentLen > 0) {
|
if (pReq->commentLen > 0) {
|
||||||
|
@ -571,15 +565,13 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->numOfSmas) < 0) return -1;
|
|
||||||
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->ast1Len) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->ast1Len) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->ast2Len) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->ast2Len) < 0) return -1;
|
||||||
|
|
||||||
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
|
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
|
||||||
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
|
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
|
||||||
pReq->pSmas = taosArrayInit(pReq->numOfSmas, sizeof(SField));
|
if (pReq->pColumns == NULL || pReq->pTags == NULL) {
|
||||||
if (pReq->pColumns == NULL || pReq->pTags == NULL || pReq->pSmas == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -589,6 +581,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
|
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
|
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
|
||||||
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
|
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -600,23 +593,13 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
|
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
|
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
|
||||||
if (taosArrayPush(pReq->pTags, &field) == NULL) {
|
if (taosArrayPush(pReq->pTags, &field) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfSmas; ++i) {
|
|
||||||
SField field = {0};
|
|
||||||
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
|
|
||||||
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
|
|
||||||
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
|
|
||||||
if (taosArrayPush(pReq->pSmas, &field) == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pReq->commentLen > 0) {
|
if (pReq->commentLen > 0) {
|
||||||
pReq->comment = taosMemoryMalloc(pReq->commentLen);
|
pReq->comment = taosMemoryMalloc(pReq->commentLen);
|
||||||
if (pReq->comment == NULL) return -1;
|
if (pReq->comment == NULL) return -1;
|
||||||
|
@ -644,13 +627,11 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
|
void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
|
||||||
taosArrayDestroy(pReq->pColumns);
|
taosArrayDestroy(pReq->pColumns);
|
||||||
taosArrayDestroy(pReq->pTags);
|
taosArrayDestroy(pReq->pTags);
|
||||||
taosArrayDestroy(pReq->pSmas);
|
|
||||||
taosMemoryFreeClear(pReq->comment);
|
taosMemoryFreeClear(pReq->comment);
|
||||||
taosMemoryFreeClear(pReq->pAst1);
|
taosMemoryFreeClear(pReq->pAst1);
|
||||||
taosMemoryFreeClear(pReq->pAst2);
|
taosMemoryFreeClear(pReq->pAst2);
|
||||||
pReq->pColumns = NULL;
|
pReq->pColumns = NULL;
|
||||||
pReq->pTags = NULL;
|
pReq->pTags = NULL;
|
||||||
pReq->pSmas = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
|
int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
|
||||||
|
|
|
@ -358,13 +358,11 @@ typedef struct {
|
||||||
int32_t ttl;
|
int32_t ttl;
|
||||||
int32_t numOfColumns;
|
int32_t numOfColumns;
|
||||||
int32_t numOfTags;
|
int32_t numOfTags;
|
||||||
int32_t numOfSmas;
|
|
||||||
int32_t commentLen;
|
int32_t commentLen;
|
||||||
int32_t ast1Len;
|
int32_t ast1Len;
|
||||||
int32_t ast2Len;
|
int32_t ast2Len;
|
||||||
SSchema* pColumns;
|
SSchema* pColumns;
|
||||||
SSchema* pTags;
|
SSchema* pTags;
|
||||||
SSchema* pSmas;
|
|
||||||
char* comment;
|
char* comment;
|
||||||
char* pAst1;
|
char* pAst1;
|
||||||
char* pAst2;
|
char* pAst2;
|
||||||
|
|
|
@ -45,7 +45,7 @@ extern "C" {
|
||||||
typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg);
|
typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg);
|
||||||
typedef int32_t (*MndInitFp)(SMnode *pMnode);
|
typedef int32_t (*MndInitFp)(SMnode *pMnode);
|
||||||
typedef void (*MndCleanupFp)(SMnode *pMnode);
|
typedef void (*MndCleanupFp)(SMnode *pMnode);
|
||||||
typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
|
typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
|
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
|
||||||
typedef struct SQWorkerMgmt SQHandle;
|
typedef struct SQWorkerMgmt SQHandle;
|
||||||
|
|
||||||
|
@ -84,7 +84,7 @@ typedef struct {
|
||||||
int64_t timeseriesAllowed;
|
int64_t timeseriesAllowed;
|
||||||
} SGrantInfo;
|
} SGrantInfo;
|
||||||
|
|
||||||
struct SMnode {
|
typedef struct SMnode {
|
||||||
int32_t selfId;
|
int32_t selfId;
|
||||||
int64_t clusterId;
|
int64_t clusterId;
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
|
@ -109,7 +109,7 @@ struct SMnode {
|
||||||
SGrantInfo grant;
|
SGrantInfo grant;
|
||||||
MndMsgFp msgFp[TDMT_MAX];
|
MndMsgFp msgFp[TDMT_MAX];
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
};
|
} SMnode;
|
||||||
|
|
||||||
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
|
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
|
||||||
int64_t mndGenerateUid(char *name, int32_t len);
|
int64_t mndGenerateUid(char *name, int32_t len);
|
||||||
|
|
|
@ -72,8 +72,8 @@ void mndCleanupStb(SMnode *pMnode) {}
|
||||||
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags + pStb->numOfSmas) * sizeof(SSchema) +
|
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + +pStb->commentLen +
|
||||||
+pStb->commentLen + pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
|
pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto _OVER;
|
if (pRaw == NULL) goto _OVER;
|
||||||
|
|
||||||
|
@ -91,7 +91,6 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->numOfSmas, _OVER)
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
|
||||||
|
@ -112,14 +111,6 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
|
|
||||||
SSchema *pSchema = &pStb->pSmas[i];
|
|
||||||
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
|
|
||||||
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
|
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pStb->commentLen > 0) {
|
if (pStb->commentLen > 0) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, _OVER)
|
||||||
}
|
}
|
||||||
|
@ -178,15 +169,13 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfSmas, _OVER)
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
|
||||||
|
|
||||||
pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
|
pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
|
||||||
pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
|
pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
|
||||||
pStb->pSmas = taosMemoryCalloc(pStb->numOfSmas, sizeof(SSchema));
|
if (pStb->pColumns == NULL || pStb->pTags == NULL) {
|
||||||
if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pSmas == NULL) {
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,14 +195,6 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
|
|
||||||
SSchema *pSchema = &pStb->pSmas[i];
|
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
|
|
||||||
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
|
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pStb->commentLen > 0) {
|
if (pStb->commentLen > 0) {
|
||||||
pStb->comment = taosMemoryCalloc(pStb->commentLen, 1);
|
pStb->comment = taosMemoryCalloc(pStb->commentLen, 1);
|
||||||
if (pStb->comment == NULL) goto _OVER;
|
if (pStb->comment == NULL) goto _OVER;
|
||||||
|
@ -291,18 +272,6 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOld->numOfSmas < pNew->numOfSmas) {
|
|
||||||
void *pSmas = taosMemoryMalloc(pNew->numOfSmas * sizeof(SSchema));
|
|
||||||
if (pSmas != NULL) {
|
|
||||||
taosMemoryFree(pOld->pSmas);
|
|
||||||
pOld->pSmas = pSmas;
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
|
||||||
taosWUnLockLatch(&pOld->lock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pOld->commentLen < pNew->commentLen) {
|
if (pOld->commentLen < pNew->commentLen) {
|
||||||
void *comment = taosMemoryMalloc(pNew->commentLen);
|
void *comment = taosMemoryMalloc(pNew->commentLen);
|
||||||
if (comment != NULL) {
|
if (comment != NULL) {
|
||||||
|
@ -411,11 +380,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
req.schemaTag.nCols = pStb->numOfTags;
|
req.schemaTag.nCols = pStb->numOfTags;
|
||||||
req.schemaTag.pSchema = pStb->pTags;
|
req.schemaTag.pSchema = pStb->pTags;
|
||||||
|
|
||||||
// TODO: remove here
|
|
||||||
for (int iCol = 0; iCol < req.schema.nCols; iCol++) {
|
|
||||||
req.schema.pSchema[iCol].flags = SCHEMA_SMA_ON;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (req.rollup) {
|
if (req.rollup) {
|
||||||
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
|
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
|
||||||
req.pRSmaParam.delay = pStb->delay;
|
req.pRSmaParam.delay = pStb->delay;
|
||||||
|
@ -674,7 +638,6 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
|
||||||
stbObj.ttl = pCreate->ttl;
|
stbObj.ttl = pCreate->ttl;
|
||||||
stbObj.numOfColumns = pCreate->numOfColumns;
|
stbObj.numOfColumns = pCreate->numOfColumns;
|
||||||
stbObj.numOfTags = pCreate->numOfTags;
|
stbObj.numOfTags = pCreate->numOfTags;
|
||||||
stbObj.numOfSmas = pCreate->numOfSmas;
|
|
||||||
stbObj.commentLen = pCreate->commentLen;
|
stbObj.commentLen = pCreate->commentLen;
|
||||||
if (stbObj.commentLen > 0) {
|
if (stbObj.commentLen > 0) {
|
||||||
stbObj.comment = taosMemoryCalloc(stbObj.commentLen, 1);
|
stbObj.comment = taosMemoryCalloc(stbObj.commentLen, 1);
|
||||||
|
@ -707,8 +670,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
|
||||||
|
|
||||||
stbObj.pColumns = taosMemoryMalloc(stbObj.numOfColumns * sizeof(SSchema));
|
stbObj.pColumns = taosMemoryMalloc(stbObj.numOfColumns * sizeof(SSchema));
|
||||||
stbObj.pTags = taosMemoryMalloc(stbObj.numOfTags * sizeof(SSchema));
|
stbObj.pTags = taosMemoryMalloc(stbObj.numOfTags * sizeof(SSchema));
|
||||||
stbObj.pSmas = taosMemoryMalloc(stbObj.numOfSmas * sizeof(SSchema));
|
if (stbObj.pColumns == NULL || stbObj.pTags == NULL) {
|
||||||
if (stbObj.pColumns == NULL || stbObj.pTags == NULL || stbObj.pSmas == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -718,6 +680,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
|
||||||
SSchema *pSchema = &stbObj.pColumns[i];
|
SSchema *pSchema = &stbObj.pColumns[i];
|
||||||
pSchema->type = pField->type;
|
pSchema->type = pField->type;
|
||||||
pSchema->bytes = pField->bytes;
|
pSchema->bytes = pField->bytes;
|
||||||
|
pSchema->flags = pField->flags;
|
||||||
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||||
pSchema->colId = stbObj.nextColId;
|
pSchema->colId = stbObj.nextColId;
|
||||||
stbObj.nextColId++;
|
stbObj.nextColId++;
|
||||||
|
@ -733,18 +696,6 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
|
||||||
stbObj.nextColId++;
|
stbObj.nextColId++;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < stbObj.numOfSmas; ++i) {
|
|
||||||
SField *pField = taosArrayGet(pCreate->pSmas, i);
|
|
||||||
SSchema *pSchema = &stbObj.pSmas[i];
|
|
||||||
SSchema *pColSchema = mndFindStbColumns(&stbObj, pField->name);
|
|
||||||
if (pColSchema == NULL) {
|
|
||||||
mError("stb:%s, sma:%s not found in columns", stbObj.name, pField->name);
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
memcpy(pSchema, pColSchema, sizeof(SSchema));
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
|
|
@ -22,13 +22,15 @@ static int32_t mndInitWal(SMnode *pMnode) {
|
||||||
|
|
||||||
char path[PATH_MAX] = {0};
|
char path[PATH_MAX] = {0};
|
||||||
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
||||||
SWalCfg cfg = {.vgId = 1,
|
SWalCfg cfg = {
|
||||||
|
.vgId = 1,
|
||||||
.fsyncPeriod = 0,
|
.fsyncPeriod = 0,
|
||||||
.rollPeriod = -1,
|
.rollPeriod = -1,
|
||||||
.segSize = -1,
|
.segSize = -1,
|
||||||
.retentionPeriod = -1,
|
.retentionPeriod = -1,
|
||||||
.retentionSize = -1,
|
.retentionSize = -1,
|
||||||
.level = TAOS_WAL_FSYNC};
|
.level = TAOS_WAL_FSYNC,
|
||||||
|
};
|
||||||
pMgmt->pWal = walOpen(path, &cfg);
|
pMgmt->pWal = walOpen(path, &cfg);
|
||||||
if (pMgmt->pWal == NULL) return -1;
|
if (pMgmt->pWal == NULL) return -1;
|
||||||
|
|
||||||
|
@ -54,62 +56,62 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
|
||||||
|
|
||||||
int64_t first = walGetFirstVer(pWal);
|
int64_t first = walGetFirstVer(pWal);
|
||||||
int64_t last = walGetLastVer(pWal);
|
int64_t last = walGetLastVer(pWal);
|
||||||
mDebug("start to restore sdb wal, sdb ver:%" PRId64 ", wal first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
|
mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
|
||||||
|
|
||||||
first = TMAX(lastSdbVer + 1, first);
|
first = TMAX(lastSdbVer + 1, first);
|
||||||
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
|
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
|
||||||
if (walReadWithHandle(pHandle, ver) < 0) {
|
if (walReadWithHandle(pHandle, ver) < 0) {
|
||||||
mError("failed to read by wal handle since %s, ver:%" PRId64, terrstr(), ver);
|
mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr());
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalHead *pHead = pHandle->pHead;
|
SWalHead *pHead = pHandle->pHead;
|
||||||
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
if (sdbVer + 1 != ver) {
|
if (sdbVer + 1 != ver) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
|
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
|
||||||
mError("failed to read wal from sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver);
|
mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer);
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("wal:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
|
mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
|
||||||
if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
|
if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
|
||||||
mError("failed to read wal from sdb since %s, ver:%" PRId64, terrstr(), ver);
|
mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr());
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbUpdateVer(pSdb, 1);
|
sdbUpdateVer(pSdb, 1);
|
||||||
mDebug("wal:%" PRId64 ", is restored", ver);
|
mDebug("ver:%" PRId64 ", is restored", ver);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
|
mDebug("restore wal finished, sdbver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
mndTransPullup(pMnode);
|
mndTransPullup(pMnode);
|
||||||
sdbVer = sdbUpdateVer(pSdb, 0);
|
sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
mDebug("pullup trans finished, sdb ver:%" PRId64, sdbVer);
|
mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
if (sdbVer != lastSdbVer) {
|
if (sdbVer != lastSdbVer) {
|
||||||
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
||||||
if (sdbWriteFile(pSdb) != 0) {
|
if (sdbWriteFile(pSdb) != 0) {
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walCommit(pWal, sdbVer) != 0) {
|
if (walCommit(pWal, sdbVer) != 0) {
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walBeginSnapshot(pWal, sdbVer) < 0) {
|
if (walBeginSnapshot(pWal, sdbVer) < 0) {
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walEndSnapshot(pWal) < 0) {
|
if (walEndSnapshot(pWal) < 0) {
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
WAL_RESTORE_OVER:
|
_OVER:
|
||||||
walCloseReadHandle(pHandle);
|
walCloseReadHandle(pHandle);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -158,11 +160,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||||
int64_t ver = sdbUpdateVer(pSdb, 1);
|
int64_t ver = sdbUpdateVer(pSdb, 1);
|
||||||
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
|
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
|
||||||
sdbUpdateVer(pSdb, -1);
|
sdbUpdateVer(pSdb, -1);
|
||||||
mError("failed to write raw:%p since %s, ver:%" PRId64, pRaw, terrstr(), ver);
|
mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("raw:%p, write to wal, ver:%" PRId64, pRaw, ver);
|
mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw);
|
||||||
walCommit(pWal, ver);
|
walCommit(pWal, ver);
|
||||||
walFsync(pWal, true);
|
walFsync(pWal, true);
|
||||||
|
|
||||||
|
|
|
@ -146,7 +146,6 @@ int32_t mndInitTelem(SMnode* pMnode) {
|
||||||
taosGetEmail(pMgmt->email, sizeof(pMgmt->email));
|
taosGetEmail(pMgmt->email, sizeof(pMgmt->email));
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
|
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
|
||||||
|
|
||||||
mDebug("mnode telemetry is initialized");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,8 +21,8 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "tbase64.h"
|
#include "tbase64.h"
|
||||||
|
|
||||||
#define TSDB_USER_VER_NUMBER 1
|
#define USER_VER_NUMBER 1
|
||||||
#define TSDB_USER_RESERVE_SIZE 64
|
#define USER_RESERVE_SIZE 64
|
||||||
|
|
||||||
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
|
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
|
||||||
static SSdbRaw *mndUserActionEncode(SUserObj *pUser);
|
static SSdbRaw *mndUserActionEncode(SUserObj *pUser);
|
||||||
|
@ -35,7 +35,7 @@ static int32_t mndProcessCreateUserReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessAlterUserReq(SNodeMsg *pReq);
|
static int32_t mndProcessAlterUserReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessDropUserReq(SNodeMsg *pReq);
|
static int32_t mndProcessDropUserReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq);
|
static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq);
|
||||||
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
|
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitUser(SMnode *pMnode) {
|
int32_t mndInitUser(SMnode *pMnode) {
|
||||||
|
@ -93,9 +93,9 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
|
||||||
|
|
||||||
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
|
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
|
||||||
int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs);
|
int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs);
|
||||||
int32_t size = sizeof(SUserObj) + TSDB_USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN;
|
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN;
|
||||||
|
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, TSDB_USER_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto USER_ENCODE_OVER;
|
if (pRaw == NULL) goto USER_ENCODE_OVER;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
|
@ -120,7 +120,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
|
||||||
db = taosHashIterate(pUser->writeDbs, db);
|
db = taosHashIterate(pUser->writeDbs, db);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE, USER_ENCODE_OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, USER_ENCODE_OVER)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, USER_ENCODE_OVER)
|
SDB_SET_DATALEN(pRaw, dataPos, USER_ENCODE_OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
@ -142,7 +142,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto USER_DECODE_OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto USER_DECODE_OVER;
|
||||||
|
|
||||||
if (sver != TSDB_USER_VER_NUMBER) {
|
if (sver != USER_VER_NUMBER) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto USER_DECODE_OVER;
|
goto USER_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
|
||||||
taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN);
|
taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE, USER_DECODE_OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, USER_DECODE_OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
|
@ -639,7 +639,7 @@ GET_AUTH_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
|
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
@ -652,29 +652,29 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pB
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
cols = 0;
|
cols = 0;
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
|
|
||||||
char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]);
|
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]);
|
||||||
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) name, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)name, false);
|
||||||
|
|
||||||
cols++;
|
cols++;
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
|
|
||||||
const char* src = pUser->superUser? "super":"normal";
|
const char *src = pUser->superUser ? "super" : "normal";
|
||||||
char b[10+VARSTR_HEADER_SIZE] = {0};
|
char b[10 + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src));
|
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src));
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) b, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)b, false);
|
||||||
|
|
||||||
cols++;
|
cols++;
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) &pUser->createdTime, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pUser->createdTime, false);
|
||||||
|
|
||||||
cols++;
|
cols++;
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]);
|
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) name, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)name, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pUser);
|
sdbRelease(pSdb, pUser);
|
||||||
|
|
|
@ -114,18 +114,15 @@ void* MndTestSma::BuildCreateBSmaStbReq(const char* stbname, int32_t* pContLen)
|
||||||
SMCreateStbReq createReq = {0};
|
SMCreateStbReq createReq = {0};
|
||||||
createReq.numOfColumns = 3;
|
createReq.numOfColumns = 3;
|
||||||
createReq.numOfTags = 1;
|
createReq.numOfTags = 1;
|
||||||
createReq.numOfSmas = 1;
|
|
||||||
createReq.igExists = 0;
|
createReq.igExists = 0;
|
||||||
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
|
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
|
||||||
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
||||||
createReq.pSmas = taosArrayInit(createReq.numOfSmas, sizeof(SField));
|
|
||||||
strcpy(createReq.name, stbname);
|
strcpy(createReq.name, stbname);
|
||||||
|
|
||||||
PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_TIMESTAMP, "ts");
|
PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_TIMESTAMP, "ts");
|
||||||
PushField(createReq.pColumns, 2, TSDB_DATA_TYPE_TINYINT, "col1");
|
PushField(createReq.pColumns, 2, TSDB_DATA_TYPE_TINYINT, "col1");
|
||||||
PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_BIGINT, "col2");
|
PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_BIGINT, "col2");
|
||||||
PushField(createReq.pTags, 2, TSDB_DATA_TYPE_TINYINT, "tag1");
|
PushField(createReq.pTags, 2, TSDB_DATA_TYPE_TINYINT, "tag1");
|
||||||
PushField(createReq.pSmas, 2, TSDB_DATA_TYPE_TINYINT, "col1");
|
|
||||||
|
|
||||||
int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
|
int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
|
||||||
void* pHead = rpcMallocCont(tlen);
|
void* pHead = rpcMallocCont(tlen);
|
||||||
|
@ -190,7 +187,7 @@ void* MndTestSma::BuildDropTSmaReq(const char* smaname, int8_t igNotExists, int3
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
#if 0
|
#if 0
|
||||||
const char* dbname = "1.d1";
|
const char* dbname = "1.d1";
|
||||||
const char* stbname = "1.d1.stb";
|
const char* stbname = "1.d1.stb";
|
||||||
const char* smaname = "1.d1.sma";
|
const char* smaname = "1.d1.sma";
|
||||||
|
@ -264,7 +261,7 @@ TEST_F(MndTestSma, 02_Create_Show_Meta_Drop_Restart_BSma) {
|
||||||
pReq = BuildCreateBSmaStbReq(stbname, &contLen);
|
pReq = BuildCreateBSmaStbReq(stbname, &contLen);
|
||||||
pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
test.SendShowReq(TSDB_MGMT_TABLE_STB, "user_stables",dbname);
|
test.SendShowReq(TSDB_MGMT_TABLE_STB, "user_stables", dbname);
|
||||||
EXPECT_EQ(test.GetShowRows(), 1);
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +277,7 @@ TEST_F(MndTestSma, 02_Create_Show_Meta_Drop_Restart_BSma) {
|
||||||
pReq = BuildDropStbReq(stbname, &contLen);
|
pReq = BuildDropStbReq(stbname, &contLen);
|
||||||
pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
|
pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
test.SendShowReq(TSDB_MGMT_TABLE_STB, "user_stables",dbname);
|
test.SendShowReq(TSDB_MGMT_TABLE_STB, "user_stables", dbname);
|
||||||
EXPECT_EQ(test.GetShowRows(), 0);
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -162,7 +162,4 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) {
|
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { return atomic_add_fetch_64(&pSdb->curVer, val); }
|
||||||
pSdb->curVer += val;
|
|
||||||
return pSdb->curVer;
|
|
||||||
}
|
|
|
@ -28,7 +28,7 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
|
||||||
if (fp == NULL) continue;
|
if (fp == NULL) continue;
|
||||||
|
|
||||||
if ((*fp)(pSdb->pMnode) != 0) {
|
if ((*fp)(pSdb->pMnode) != 0) {
|
||||||
mError("failed to deploy sdb:%d since %s", i, terrstr());
|
mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,7 +220,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// The first row of a new block does not belongs to the previous existed group
|
// The first row of a new block does not belongs to the previous existed group
|
||||||
if (!equal && j == 0) {
|
if (j == 0) {
|
||||||
num++;
|
num++;
|
||||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -38,6 +38,7 @@ extern "C" {
|
||||||
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
|
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
|
||||||
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
|
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
|
||||||
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
|
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
|
||||||
|
#define FUNC_MGT_SCAN_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(12)
|
||||||
|
|
||||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||||
|
|
||||||
|
|
|
@ -807,7 +807,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.finalizeFunc = NULL},
|
.finalizeFunc = NULL},
|
||||||
{.name = "tbname",
|
{.name = "tbname",
|
||||||
.type = FUNCTION_TYPE_TBNAME,
|
.type = FUNCTION_TYPE_TBNAME,
|
||||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
|
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
|
||||||
.translateFunc = translateTbnameColumn,
|
.translateFunc = translateTbnameColumn,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
|
|
|
@ -15,12 +15,12 @@
|
||||||
|
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
|
||||||
|
#include "builtins.h"
|
||||||
|
#include "catalog.h"
|
||||||
#include "functionMgtInt.h"
|
#include "functionMgtInt.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "builtins.h"
|
|
||||||
#include "catalog.h"
|
|
||||||
|
|
||||||
typedef struct SFuncMgtService {
|
typedef struct SFuncMgtService {
|
||||||
SHashObj* pFuncNameHashTable;
|
SHashObj* pFuncNameHashTable;
|
||||||
|
@ -36,14 +36,16 @@ static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
|
||||||
static int32_t initFunctionCode = 0;
|
static int32_t initFunctionCode = 0;
|
||||||
|
|
||||||
static void doInitFunctionTable() {
|
static void doInitFunctionTable() {
|
||||||
gFunMgtService.pFuncNameHashTable = taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
gFunMgtService.pFuncNameHashTable =
|
||||||
|
taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
if (NULL == gFunMgtService.pFuncNameHashTable) {
|
if (NULL == gFunMgtService.pFuncNameHashTable) {
|
||||||
initFunctionCode = TSDB_CODE_FAILED;
|
initFunctionCode = TSDB_CODE_FAILED;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
|
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
|
||||||
if (TSDB_CODE_SUCCESS != taosHashPut(gFunMgtService.pFuncNameHashTable, funcMgtBuiltins[i].name, strlen(funcMgtBuiltins[i].name), &i, sizeof(int32_t))) {
|
if (TSDB_CODE_SUCCESS != taosHashPut(gFunMgtService.pFuncNameHashTable, funcMgtBuiltins[i].name,
|
||||||
|
strlen(funcMgtBuiltins[i].name), &i, sizeof(int32_t))) {
|
||||||
initFunctionCode = TSDB_CODE_FAILED;
|
initFunctionCode = TSDB_CODE_FAILED;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -52,8 +54,9 @@ static void doInitFunctionTable() {
|
||||||
|
|
||||||
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
||||||
if (fmIsUserDefinedFunc(funcId)) {
|
if (fmIsUserDefinedFunc(funcId)) {
|
||||||
return FUNC_MGT_AGG_FUNC == classification ? FUNC_AGGREGATE_UDF_ID == funcId :
|
return FUNC_MGT_AGG_FUNC == classification
|
||||||
(FUNC_MGT_SCALAR_FUNC == classification ? FUNC_SCALAR_UDF_ID == funcId : false);
|
? FUNC_AGGREGATE_UDF_ID == funcId
|
||||||
|
: (FUNC_MGT_SCALAR_FUNC == classification ? FUNC_SCALAR_UDF_ID == funcId : false);
|
||||||
}
|
}
|
||||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -126,29 +129,19 @@ int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fmIsAggFunc(int32_t funcId) {
|
bool fmIsAggFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_AGG_FUNC); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_AGG_FUNC);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool fmIsScalarFunc(int32_t funcId) {
|
bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool fmIsPseudoColumnFunc(int32_t funcId) {
|
bool fmIsPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PSEUDO_COLUMN_FUNC); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_PSEUDO_COLUMN_FUNC);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool fmIsWindowPseudoColumnFunc(int32_t funcId) {
|
bool fmIsScanPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCAN_PC_FUNC); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_WINDOW_PC_FUNC);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool fmIsWindowClauseFunc(int32_t funcId) {
|
bool fmIsWindowPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_WINDOW_PC_FUNC); }
|
||||||
return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool fmIsNonstandardSQLFunc(int32_t funcId) {
|
bool fmIsWindowClauseFunc(int32_t funcId) { return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_NONSTANDARD_SQL_FUNC);
|
|
||||||
}
|
bool fmIsNonstandardSQLFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_NONSTANDARD_SQL_FUNC); }
|
||||||
|
|
||||||
bool fmIsSpecialDataRequiredFunc(int32_t funcId) {
|
bool fmIsSpecialDataRequiredFunc(int32_t funcId) {
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED);
|
return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED);
|
||||||
|
@ -158,13 +151,9 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId) {
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED);
|
return isSpecificClassifyFunc(funcId, FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fmIsMultiResFunc(int32_t funcId) {
|
bool fmIsMultiResFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool fmIsUserDefinedFunc(int32_t funcId) {
|
bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; }
|
||||||
return funcId > FUNC_UDF_ID_START;
|
|
||||||
}
|
|
||||||
|
|
||||||
void fmFuncMgtDestroy() {
|
void fmFuncMgtDestroy() {
|
||||||
void* m = gFunMgtService.pFuncNameHashTable;
|
void* m = gFunMgtService.pFuncNameHashTable;
|
||||||
|
|
|
@ -224,6 +224,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
||||||
COPY_ALL_SCALAR_FIELDS;
|
COPY_ALL_SCALAR_FIELDS;
|
||||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||||
CLONE_NODE_LIST_FIELD(pScanCols);
|
CLONE_NODE_LIST_FIELD(pScanCols);
|
||||||
|
CLONE_NODE_LIST_FIELD(pScanPseudoCols);
|
||||||
CLONE_OBJECT_FIELD(pMeta, tableMetaClone);
|
CLONE_OBJECT_FIELD(pMeta, tableMetaClone);
|
||||||
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
|
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
|
||||||
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
|
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
|
||||||
|
|
|
@ -467,6 +467,7 @@ static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkScanLogicPlanScanCols = "ScanCols";
|
static const char* jkScanLogicPlanScanCols = "ScanCols";
|
||||||
|
static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
|
||||||
static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize";
|
static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize";
|
||||||
static const char* jkScanLogicPlanTableMeta = "TableMeta";
|
static const char* jkScanLogicPlanTableMeta = "TableMeta";
|
||||||
|
|
||||||
|
@ -477,6 +478,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodeListToJson(pJson, jkScanLogicPlanScanCols, pNode->pScanCols);
|
code = nodeListToJson(pJson, jkScanLogicPlanScanCols, pNode->pScanCols);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkScanLogicPlanScanPseudoCols, pNode->pScanPseudoCols);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanTableMetaSize, TABLE_META_SIZE(pNode->pMeta));
|
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanTableMetaSize, TABLE_META_SIZE(pNode->pMeta));
|
||||||
}
|
}
|
||||||
|
@ -495,6 +499,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeList(pJson, jkScanLogicPlanScanCols, &pNode->pScanCols);
|
code = jsonToNodeList(pJson, jkScanLogicPlanScanCols, &pNode->pScanCols);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkScanLogicPlanScanPseudoCols, &pNode->pScanPseudoCols);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetIntValue(pJson, jkScanLogicPlanTableMetaSize, &objSize);
|
code = tjsonGetIntValue(pJson, jkScanLogicPlanTableMetaSize, &objSize);
|
||||||
}
|
}
|
||||||
|
@ -670,6 +677,7 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkScanPhysiPlanScanCols = "ScanCols";
|
static const char* jkScanPhysiPlanScanCols = "ScanCols";
|
||||||
|
static const char* jkScanPhysiPlanScanPseudoCols = "ScanPseudoCols";
|
||||||
static const char* jkScanPhysiPlanTableId = "TableId";
|
static const char* jkScanPhysiPlanTableId = "TableId";
|
||||||
static const char* jkScanPhysiPlanTableType = "TableType";
|
static const char* jkScanPhysiPlanTableType = "TableType";
|
||||||
static const char* jkScanPhysiPlanTableName = "TableName";
|
static const char* jkScanPhysiPlanTableName = "TableName";
|
||||||
|
@ -681,6 +689,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodeListToJson(pJson, jkScanPhysiPlanScanCols, pNode->pScanCols);
|
code = nodeListToJson(pJson, jkScanPhysiPlanScanCols, pNode->pScanCols);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkScanPhysiPlanScanPseudoCols, pNode->pScanPseudoCols);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid);
|
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid);
|
||||||
}
|
}
|
||||||
|
@ -701,6 +712,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeList(pJson, jkScanPhysiPlanScanCols, &pNode->pScanCols);
|
code = jsonToNodeList(pJson, jkScanPhysiPlanScanCols, &pNode->pScanCols);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkScanPhysiPlanScanPseudoCols, &pNode->pScanPseudoCols);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid);
|
code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1043,6 +1043,7 @@ bool nodesIsTimelineQuery(const SNode* pQuery) { return false; }
|
||||||
typedef struct SCollectColumnsCxt {
|
typedef struct SCollectColumnsCxt {
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
const char* pTableAlias;
|
const char* pTableAlias;
|
||||||
|
ECollectColType collectType;
|
||||||
SNodeList* pCols;
|
SNodeList* pCols;
|
||||||
SHashObj* pColHash;
|
SHashObj* pColHash;
|
||||||
} SCollectColumnsCxt;
|
} SCollectColumnsCxt;
|
||||||
|
@ -1057,25 +1058,33 @@ static EDealRes doCollect(SCollectColumnsCxt* pCxt, SColumnNode* pCol, SNode* pN
|
||||||
if (NULL == taosHashGet(pCxt->pColHash, name, len)) {
|
if (NULL == taosHashGet(pCxt->pColHash, name, len)) {
|
||||||
pCxt->errCode = taosHashPut(pCxt->pColHash, name, len, NULL, 0);
|
pCxt->errCode = taosHashPut(pCxt->pColHash, name, len, NULL, 0);
|
||||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||||
pCxt->errCode = nodesListAppend(pCxt->pCols, pNode);
|
pCxt->errCode = nodesListStrictAppend(pCxt->pCols, nodesCloneNode(pNode));
|
||||||
}
|
}
|
||||||
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isCollectType(ECollectColType collectType, EColumnType colType) {
|
||||||
|
return COLLECT_COL_TYPE_ALL == collectType
|
||||||
|
? true
|
||||||
|
: (COLLECT_COL_TYPE_TAG == collectType ? COLUMN_TYPE_TAG == colType : COLUMN_TYPE_TAG != colType);
|
||||||
|
}
|
||||||
|
|
||||||
static EDealRes collectColumns(SNode* pNode, void* pContext) {
|
static EDealRes collectColumns(SNode* pNode, void* pContext) {
|
||||||
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
|
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
if (NULL == pCxt->pTableAlias || 0 == strcmp(pCxt->pTableAlias, pCol->tableAlias)) {
|
if (isCollectType(pCxt->collectType, pCol->colType) &&
|
||||||
|
(NULL == pCxt->pTableAlias || 0 == strcmp(pCxt->pTableAlias, pCol->tableAlias))) {
|
||||||
return doCollect(pCxt, pCol, pNode);
|
return doCollect(pCxt, pCol, pNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, SNodeList** pCols) {
|
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type,
|
||||||
|
SNodeList** pCols) {
|
||||||
if (NULL == pSelect || NULL == pCols) {
|
if (NULL == pSelect || NULL == pCols) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1083,6 +1092,7 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
|
||||||
SCollectColumnsCxt cxt = {
|
SCollectColumnsCxt cxt = {
|
||||||
.errCode = TSDB_CODE_SUCCESS,
|
.errCode = TSDB_CODE_SUCCESS,
|
||||||
.pTableAlias = pTableAlias,
|
.pTableAlias = pTableAlias,
|
||||||
|
.collectType = type,
|
||||||
.pCols = nodesMakeList(),
|
.pCols = nodesMakeList(),
|
||||||
.pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)};
|
.pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)};
|
||||||
if (NULL == cxt.pCols || NULL == cxt.pColHash) {
|
if (NULL == cxt.pCols || NULL == cxt.pColHash) {
|
||||||
|
@ -1092,11 +1102,11 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
|
||||||
nodesWalkSelectStmt(pSelect, clause, collectColumns, &cxt);
|
nodesWalkSelectStmt(pSelect, clause, collectColumns, &cxt);
|
||||||
taosHashCleanup(cxt.pColHash);
|
taosHashCleanup(cxt.pColHash);
|
||||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||||
nodesClearList(cxt.pCols);
|
nodesDestroyList(cxt.pCols);
|
||||||
return cxt.errCode;
|
return cxt.errCode;
|
||||||
}
|
}
|
||||||
if (0 == LIST_LENGTH(cxt.pCols)) {
|
if (0 == LIST_LENGTH(cxt.pCols)) {
|
||||||
nodesClearList(cxt.pCols);
|
nodesDestroyList(cxt.pCols);
|
||||||
cxt.pCols = NULL;
|
cxt.pCols = NULL;
|
||||||
}
|
}
|
||||||
*pCols = cxt.pCols;
|
*pCols = cxt.pCols;
|
||||||
|
@ -1123,10 +1133,12 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNod
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCollectFuncsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .classifier = classifier, .pFuncs = nodesMakeList()};
|
SCollectFuncsCxt cxt = {
|
||||||
|
.errCode = TSDB_CODE_SUCCESS, .classifier = classifier, .pFuncs = (NULL == *pFuncs ? nodesMakeList() : *pFuncs)};
|
||||||
if (NULL == cxt.pFuncs) {
|
if (NULL == cxt.pFuncs) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
*pFuncs = NULL;
|
||||||
nodesWalkSelectStmt(pSelect, SQL_CLAUSE_GROUP_BY, collectFuncs, &cxt);
|
nodesWalkSelectStmt(pSelect, SQL_CLAUSE_GROUP_BY, collectFuncs, &cxt);
|
||||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||||
nodesDestroyList(cxt.pFuncs);
|
nodesDestroyList(cxt.pFuncs);
|
||||||
|
@ -1136,7 +1148,6 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNod
|
||||||
*pFuncs = cxt.pFuncs;
|
*pFuncs = cxt.pFuncs;
|
||||||
} else {
|
} else {
|
||||||
nodesDestroyList(cxt.pFuncs);
|
nodesDestroyList(cxt.pFuncs);
|
||||||
*pFuncs = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -1905,18 +1905,9 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) {
|
||||||
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
|
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
|
||||||
SField field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
|
SField field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
|
||||||
strcpy(field.name, pCol->colName);
|
strcpy(field.name, pCol->colName);
|
||||||
taosArrayPush(*pArray, &field);
|
if (pCol->sma) {
|
||||||
|
field.flags |= SCHEMA_SMA_ON;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t columnNodeToField(SNodeList* pList, SArray** pArray) {
|
|
||||||
*pArray = taosArrayInit(LIST_LENGTH(pList), sizeof(SField));
|
|
||||||
SNode* pNode;
|
|
||||||
FOREACH(pNode, pList) {
|
|
||||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
|
||||||
SField field = {.type = pCol->node.resType.type, .bytes = calcTypeBytes(pCol->node.resType)};
|
|
||||||
strcpy(field.name, pCol->colName);
|
|
||||||
taosArrayPush(*pArray, &field);
|
taosArrayPush(*pArray, &field);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2249,13 +2240,6 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
|
||||||
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
|
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
|
||||||
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
|
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
|
||||||
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||||
if (NULL == pStmt->pOptions->pSma) {
|
|
||||||
columnDefNodeToField(pStmt->pCols, &pReq->pSmas);
|
|
||||||
pReq->numOfSmas = pReq->numOfColumns;
|
|
||||||
} else {
|
|
||||||
columnNodeToField(pStmt->pOptions->pSma, &pReq->pSmas);
|
|
||||||
pReq->numOfSmas = LIST_LENGTH(pStmt->pOptions->pSma);
|
|
||||||
}
|
|
||||||
|
|
||||||
SName tableName;
|
SName tableName;
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pReq->name);
|
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pReq->name);
|
||||||
|
|
|
@ -129,14 +129,66 @@ static int32_t createChildLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelec
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanCols, STableMeta* pMeta) {
|
typedef struct SCreateColumnCxt {
|
||||||
|
int32_t errCode;
|
||||||
|
SNodeList* pList;
|
||||||
|
} SCreateColumnCxt;
|
||||||
|
|
||||||
|
static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
|
||||||
|
SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
|
||||||
|
switch (nodeType(pNode)) {
|
||||||
|
case QUERY_NODE_COLUMN: {
|
||||||
|
SNode* pCol = nodesCloneNode(pNode);
|
||||||
|
if (NULL == pCol) {
|
||||||
|
return DEAL_RES_ERROR;
|
||||||
|
}
|
||||||
|
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
||||||
|
}
|
||||||
|
case QUERY_NODE_OPERATOR:
|
||||||
|
case QUERY_NODE_LOGIC_CONDITION:
|
||||||
|
case QUERY_NODE_FUNCTION: {
|
||||||
|
SExprNode* pExpr = (SExprNode*)pNode;
|
||||||
|
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||||
|
if (NULL == pCol) {
|
||||||
|
return DEAL_RES_ERROR;
|
||||||
|
}
|
||||||
|
pCol->node.resType = pExpr->resType;
|
||||||
|
strcpy(pCol->colName, pExpr->aliasName);
|
||||||
|
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pList) {
|
||||||
|
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
|
||||||
|
if (NULL == cxt.pList) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesWalkExprs(pExprs, doCreateColumn, &cxt);
|
||||||
|
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||||
|
nodesDestroyList(cxt.pList);
|
||||||
|
return cxt.errCode;
|
||||||
|
}
|
||||||
|
if (NULL == *pList) {
|
||||||
|
*pList = cxt.pList;
|
||||||
|
}
|
||||||
|
return cxt.errCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols,
|
||||||
|
STableMeta* pMeta) {
|
||||||
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
|
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
|
||||||
return SCAN_TYPE_STREAM;
|
return SCAN_TYPE_STREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pScanCols) {
|
if (NULL == pScanCols) {
|
||||||
// select count(*) from t
|
// select count(*) from t
|
||||||
return SCAN_TYPE_TABLE;
|
return NULL == pScanPseudoCols ? SCAN_TYPE_TABLE : SCAN_TYPE_TAG;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_SYSTEM_TABLE == pMeta->tableType) {
|
if (TSDB_SYSTEM_TABLE == pMeta->tableType) {
|
||||||
|
@ -186,7 +238,6 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, SNodeList** pCols) {
|
||||||
|
|
||||||
if (!found) {
|
if (!found) {
|
||||||
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(*pCols, createPrimaryKeyCol(tableId))) {
|
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(*pCols, createPrimaryKeyCol(tableId))) {
|
||||||
nodesDestroyList(*pCols);
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -214,31 +265,32 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
|
pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
|
||||||
// set columns to scan
|
// set columns to scan
|
||||||
SNodeList* pCols = NULL;
|
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, COLLECT_COL_TYPE_COL,
|
||||||
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, &pCols);
|
&pScan->pScanCols);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = addPrimaryKeyCol(pScan->pMeta->uid, &pCols);
|
code = nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, COLLECT_COL_TYPE_TAG,
|
||||||
|
&pScan->pScanPseudoCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pScan->pScanCols = nodesCloneList(pCols);
|
code = nodesCollectFuncs(pSelect, fmIsScanPseudoColumnFunc, &pScan->pScanPseudoCols);
|
||||||
if (NULL == pScan) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pScan->scanType = getScanType(pCxt, pCols, pScan->pMeta);
|
pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->pMeta);
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addPrimaryKeyCol(pScan->pMeta->uid, &pScan->pScanCols);
|
||||||
|
}
|
||||||
|
|
||||||
// set output
|
// set output
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pScan->node.pTargets = nodesCloneList(pCols);
|
code = createColumnByRewriteExps(pCxt, pScan->pScanCols, &pScan->node.pTargets);
|
||||||
if (NULL == pScan) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = createColumnByRewriteExps(pCxt, pScan->pScanPseudoCols, &pScan->node.pTargets);
|
||||||
}
|
}
|
||||||
|
|
||||||
nodesClearList(pCols);
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
*pLogicNode = (SLogicNode*)pScan;
|
*pLogicNode = (SLogicNode*)pScan;
|
||||||
} else {
|
} else {
|
||||||
|
@ -362,57 +414,6 @@ static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr)
|
||||||
return pCol;
|
return pCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SCreateColumnCxt {
|
|
||||||
int32_t errCode;
|
|
||||||
SNodeList* pList;
|
|
||||||
} SCreateColumnCxt;
|
|
||||||
|
|
||||||
static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
|
|
||||||
SCreateColumnCxt* pCxt = (SCreateColumnCxt*)pContext;
|
|
||||||
switch (nodeType(pNode)) {
|
|
||||||
case QUERY_NODE_COLUMN: {
|
|
||||||
SNode* pCol = nodesCloneNode(pNode);
|
|
||||||
if (NULL == pCol) {
|
|
||||||
return DEAL_RES_ERROR;
|
|
||||||
}
|
|
||||||
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
|
||||||
}
|
|
||||||
case QUERY_NODE_OPERATOR:
|
|
||||||
case QUERY_NODE_LOGIC_CONDITION:
|
|
||||||
case QUERY_NODE_FUNCTION: {
|
|
||||||
SExprNode* pExpr = (SExprNode*)pNode;
|
|
||||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
|
||||||
if (NULL == pCol) {
|
|
||||||
return DEAL_RES_ERROR;
|
|
||||||
}
|
|
||||||
pCol->node.resType = pExpr->resType;
|
|
||||||
strcpy(pCol->colName, pExpr->aliasName);
|
|
||||||
return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return DEAL_RES_CONTINUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pList) {
|
|
||||||
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
|
|
||||||
if (NULL == cxt.pList) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
nodesWalkExprs(pExprs, doCreateColumn, &cxt);
|
|
||||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
|
||||||
nodesDestroyList(cxt.pList);
|
|
||||||
return cxt.errCode;
|
|
||||||
}
|
|
||||||
if (NULL == *pList) {
|
|
||||||
*pList = cxt.pList;
|
|
||||||
}
|
|
||||||
return cxt.errCode;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||||
if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) {
|
if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -598,14 +599,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeList* pCols = NULL;
|
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets);
|
||||||
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, &pCols);
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pCols) {
|
|
||||||
pSort->node.pTargets = nodesCloneList(pCols);
|
|
||||||
if (NULL == pSort->node.pTargets) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pSort->pSortKeys = nodesCloneList(pSelect->pOrderByList);
|
pSort->pSortKeys = nodesCloneList(pSelect->pOrderByList);
|
||||||
|
@ -695,14 +689,8 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeList* pCols = NULL;
|
int32_t code =
|
||||||
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, &pCols);
|
nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, COLLECT_COL_TYPE_ALL, &pPartition->node.pTargets);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pCols) {
|
|
||||||
pPartition->node.pTargets = nodesCloneList(pCols);
|
|
||||||
if (NULL == pPartition->node.pTargets) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pPartition->pPartitionKeys = nodesCloneList(pSelect->pPartitionByList);
|
pPartition->pPartitionKeys = nodesCloneList(pSelect->pPartitionByList);
|
||||||
|
|
|
@ -380,6 +380,10 @@ static int32_t sortScanCols(SNodeList* pScanCols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
|
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
|
||||||
|
if (NULL == pScanCols) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
|
pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
|
||||||
if (NULL == pScanPhysiNode->pScanCols) {
|
if (NULL == pScanPhysiNode->pScanCols) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -394,9 +398,22 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo
|
||||||
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
|
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
|
||||||
code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
|
code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
|
||||||
|
pScanPhysiNode->pScanPseudoCols = nodesCloneList(pScanLogicNode->pScanPseudoCols);
|
||||||
|
if (NULL == pScanPhysiNode->pScanPseudoCols) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
|
code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
||||||
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
||||||
|
@ -1190,6 +1207,11 @@ static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubp
|
||||||
++(pQueryPlan->numOfSubplans);
|
++(pQueryPlan->numOfSubplans);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
nodesDestroyNode(pSubplan);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
|
||||||
code = nodesListMakeAppend(&pParent->pChildren, pSubplan);
|
code = nodesListMakeAppend(&pParent->pChildren, pSubplan);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -1207,10 +1229,6 @@ static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
nodesDestroyNode(pSubplan);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@ using namespace std;
|
||||||
|
|
||||||
class PlanSuperTableTest : public PlannerTestBase {};
|
class PlanSuperTableTest : public PlannerTestBase {};
|
||||||
|
|
||||||
TEST_F(PlanSuperTableTest, unionAll) {
|
TEST_F(PlanSuperTableTest, tbname) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("select tbname from st1");
|
run("select tbname from st1");
|
||||||
|
|
|
@ -85,7 +85,6 @@ typedef struct {
|
||||||
TAOS* conn;
|
TAOS* conn;
|
||||||
TdThread pid;
|
TdThread pid;
|
||||||
tsem_t cancelSem;
|
tsem_t cancelSem;
|
||||||
int64_t result;
|
|
||||||
} SShellObj;
|
} SShellObj;
|
||||||
|
|
||||||
// shellArguments.c
|
// shellArguments.c
|
||||||
|
|
|
@ -213,13 +213,10 @@ void shellRunSingleCommandImp(char *command) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t oresult = atomic_load_64(&shell.result);
|
|
||||||
|
|
||||||
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
|
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
|
||||||
fprintf(stdout, "Database changed.\n\n");
|
fprintf(stdout, "Database changed.\n\n");
|
||||||
fflush(stdout);
|
fflush(stdout);
|
||||||
|
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -230,10 +227,7 @@ void shellRunSingleCommandImp(char *command) {
|
||||||
int32_t error_no = 0;
|
int32_t error_no = 0;
|
||||||
|
|
||||||
int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
|
int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
|
||||||
if (numOfRows < 0) {
|
if (numOfRows < 0) return;
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
et = taosGetTimestampUs();
|
et = taosGetTimestampUs();
|
||||||
if (error_no == 0) {
|
if (error_no == 0) {
|
||||||
|
@ -250,8 +244,6 @@ void shellRunSingleCommandImp(char *command) {
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("\n");
|
printf("\n");
|
||||||
|
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
||||||
|
@ -398,7 +390,6 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
|
||||||
row = taos_fetch_row(tres);
|
row = taos_fetch_row(tres);
|
||||||
} while (row != NULL);
|
} while (row != NULL);
|
||||||
|
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
|
@ -766,7 +757,6 @@ void shellWriteHistory() {
|
||||||
|
|
||||||
void shellPrintError(TAOS_RES *tres, int64_t st) {
|
void shellPrintError(TAOS_RES *tres, int64_t st) {
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
atomic_store_ptr(&shell.result, 0);
|
|
||||||
fprintf(stderr, "\nDB error: %s (%.6fs)\n", taos_errstr(tres), (et - st) / 1E6);
|
fprintf(stderr, "\nDB error: %s (%.6fs)\n", taos_errstr(tres), (et - st) / 1E6);
|
||||||
taos_free_result(tres);
|
taos_free_result(tres);
|
||||||
}
|
}
|
||||||
|
@ -872,7 +862,6 @@ void shellGetGrantInfo() {
|
||||||
fprintf(stdout, "Server is Enterprise %s Edition, %s and will expire at %s.\n", serverVersion, sinfo, expiretime);
|
fprintf(stdout, "Server is Enterprise %s Edition, %s and will expire at %s.\n", serverVersion, sinfo, expiretime);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
taos_free_result(tres);
|
taos_free_result(tres);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue