feat: support writing streams to existing tables
This commit is contained in:
parent
30381ec446
commit
ca43c0ab55
|
@ -1755,6 +1755,12 @@ typedef struct {
|
||||||
#define STREAM_CREATE_STABLE_TRUE 1
|
#define STREAM_CREATE_STABLE_TRUE 1
|
||||||
#define STREAM_CREATE_STABLE_FALSE 0
|
#define STREAM_CREATE_STABLE_FALSE 0
|
||||||
|
|
||||||
|
typedef struct SColLocation {
|
||||||
|
int16_t slotId;
|
||||||
|
col_id_t colId;
|
||||||
|
int8_t type;
|
||||||
|
} SColLocation;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_STREAM_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
char sourceDB[TSDB_DB_FNAME_LEN];
|
char sourceDB[TSDB_DB_FNAME_LEN];
|
||||||
|
|
|
@ -207,12 +207,6 @@ typedef struct SQueryNodeStat {
|
||||||
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
|
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
|
||||||
} SQueryNodeStat;
|
} SQueryNodeStat;
|
||||||
|
|
||||||
typedef struct SColLocation {
|
|
||||||
int16_t slotId;
|
|
||||||
col_id_t colId;
|
|
||||||
int8_t type;
|
|
||||||
} SColLocation;
|
|
||||||
|
|
||||||
int32_t initTaskQueue();
|
int32_t initTaskQueue();
|
||||||
int32_t cleanupTaskQueue();
|
int32_t cleanupTaskQueue();
|
||||||
|
|
||||||
|
|
|
@ -547,7 +547,7 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
|
||||||
|
|
||||||
/************************************************ create/drop stream **************************************************/
|
/************************************************ create/drop stream **************************************************/
|
||||||
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
|
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
|
||||||
full_table_name(C) col_list_opt(H) tags_def_opt(F) subtable_opt(G)
|
full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G)
|
||||||
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
|
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
|
||||||
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
|
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
|
||||||
|
|
||||||
|
@ -556,6 +556,12 @@ cmd ::= DROP STREAM exists_opt(A) stream_name(B).
|
||||||
col_list_opt(A) ::= . { A = NULL; }
|
col_list_opt(A) ::= . { A = NULL; }
|
||||||
col_list_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
|
col_list_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
|
||||||
|
|
||||||
|
%type tag_def_or_ref_opt { SNodeList* }
|
||||||
|
%destructor tag_def_or_ref_opt { nodesDestroyList($$); }
|
||||||
|
tag_def_or_ref_opt(A) ::= . { A = NULL; }
|
||||||
|
tag_def_or_ref_opt(A) ::= tags_def(B). { A = B; }
|
||||||
|
tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; }
|
||||||
|
|
||||||
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
|
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
|
||||||
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
|
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
|
||||||
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
|
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
|
||||||
|
|
|
@ -4991,7 +4991,7 @@ static const SSchema* getColSchema(const STableMeta* pTableMeta, const char* pCo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
|
static SSchema* getTagSchema(const STableMeta* pTableMeta, const char* pTagName) {
|
||||||
int32_t numOfTags = getNumOfTags(pTableMeta);
|
int32_t numOfTags = getNumOfTags(pTableMeta);
|
||||||
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
||||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||||
|
@ -5626,6 +5626,13 @@ static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pS
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char* getTagNameForCreateStreamTag(SNode* pTag) {
|
||||||
|
if (QUERY_NODE_COLUMN_DEF == nodeType(pTag)) {
|
||||||
|
return ((SColumnDefNode*)pTag)->colName;
|
||||||
|
}
|
||||||
|
return ((SColumnNode*)pTag)->colName;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SSelectStmt* pSelect) {
|
static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SSelectStmt* pSelect) {
|
||||||
if (NULL == pStmt->pTags) {
|
if (NULL == pStmt->pTags) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -5636,7 +5643,7 @@ static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStream
|
||||||
bool found = false;
|
bool found = false;
|
||||||
SNode* pPart = NULL;
|
SNode* pPart = NULL;
|
||||||
FOREACH(pPart, pSelect->pPartitionByList) {
|
FOREACH(pPart, pSelect->pPartitionByList) {
|
||||||
if (0 == strcmp(((SColumnDefNode*)pTag)->colName, ((SExprNode*)pPart)->userAlias)) {
|
if (0 == strcmp(getTagNameForCreateStreamTag(pTag), ((SExprNode*)pPart)->userAlias)) {
|
||||||
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSelect->pTags, nodesCloneNode(pPart))) {
|
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSelect->pTags, nodesCloneNode(pPart))) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -5645,7 +5652,7 @@ static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!found) {
|
if (!found) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnDefNode*)pTag)->colName);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnDefNode*)pTag)->colName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -5789,8 +5796,30 @@ static int32_t addProjToProjColPos(STranslateContext* pCxt, const SSchema* pSche
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols, const STableMeta* pMeta,
|
static int32_t setFillNullCols(SArray* pProjColPos, const STableMeta* pMeta, SCMCreateStreamReq* pReq) {
|
||||||
SNodeList** pProjections) {
|
int32_t numOfBoundCols = taosArrayGetSize(pProjColPos);
|
||||||
|
pReq->fillNullCols = taosArrayInit(pMeta->tableInfo.numOfColumns - numOfBoundCols, sizeof(SColLocation));
|
||||||
|
if (NULL == pReq->fillNullCols) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
int32_t indexOfSchema = 0;
|
||||||
|
const SSchema* pSchemas = getTableColumnSchema(pMeta);
|
||||||
|
for (int32_t i = 0; i < numOfBoundCols; ++i) {
|
||||||
|
SProjColPos* pPos = taosArrayGet(pProjColPos, i);
|
||||||
|
while (indexOfSchema < pMeta->tableInfo.numOfColumns) {
|
||||||
|
const SSchema* pSchema = pSchemas + indexOfSchema++;
|
||||||
|
if (pSchema->colId == pPos->colId) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
SColLocation colLoc = {.colId = pSchema->colId, .slotId = indexOfSchema - 1, .type = pSchema->type};
|
||||||
|
taosArrayPush(pReq->fillNullCols, &colLoc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCols, const STableMeta* pMeta,
|
||||||
|
SNodeList** pProjections, SCMCreateStreamReq* pReq) {
|
||||||
if (LIST_LENGTH(pCols) != LIST_LENGTH(*pProjections)) {
|
if (LIST_LENGTH(pCols) != LIST_LENGTH(*pProjections)) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
|
||||||
}
|
}
|
||||||
|
@ -5805,7 +5834,12 @@ static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols
|
||||||
SNode* pProj = NULL;
|
SNode* pProj = NULL;
|
||||||
FORBOTH(pCol, pCols, pProj, *pProjections) {
|
FORBOTH(pCol, pCols, pProj, *pProjections) {
|
||||||
const SSchema* pSchema = getColSchema(pMeta, ((SColumnNode*)pCol)->colName);
|
const SSchema* pSchema = getColSchema(pMeta, ((SColumnNode*)pCol)->colName);
|
||||||
code = addProjToProjColPos(pCxt, pSchema, pProj, pProjColPos);
|
if (NULL == pSchema) {
|
||||||
|
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)pCol)->colName);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addProjToProjColPos(pCxt, pSchema, pProj, pProjColPos);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -5826,6 +5860,10 @@ static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && pMeta->tableInfo.numOfColumns > LIST_LENGTH(pCols)) {
|
||||||
|
code = setFillNullCols(pProjColPos, pMeta, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
taosArrayDestroy(pProjColPos);
|
taosArrayDestroy(pProjColPos);
|
||||||
nodesDestroyList(*pProjections);
|
nodesDestroyList(*pProjections);
|
||||||
|
@ -5838,19 +5876,127 @@ static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t adjustStreamQueryForExistTableImpl(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
|
static int32_t adjustProjectionsForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
|
||||||
const STableMeta* pMeta) {
|
const STableMeta* pMeta, SCMCreateStreamReq* pReq) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
if (NULL == pStmt->pCols) {
|
if (NULL == pStmt->pCols) {
|
||||||
return adjustDataTypeOfProjections(pCxt, pMeta, pSelect->pProjectionList);
|
return adjustDataTypeOfProjections(pCxt, pMeta, pSelect->pProjectionList);
|
||||||
}
|
}
|
||||||
return adjustOrderOfProjection(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList);
|
return adjustOrderOfProjections(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
|
static int32_t adjustDataTypeOfTags(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pTags) {
|
||||||
SCMCreateStreamReq* pReq) {
|
if (getNumOfTags(pMeta) != LIST_LENGTH(pTags)) {
|
||||||
STableMeta* pMeta = NULL;
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of tags");
|
||||||
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, &pMeta);
|
}
|
||||||
|
|
||||||
|
SSchema* pSchemas = getTableTagSchema(pMeta);
|
||||||
|
int32_t index = 0;
|
||||||
|
SNode* pTag = NULL;
|
||||||
|
FOREACH(pTag, pTags) {
|
||||||
|
SSchema* pSchema = pSchemas + index++;
|
||||||
|
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
|
||||||
|
if (!dataTypeEqual(&dt, &((SExprNode*)pTag)->resType)) {
|
||||||
|
SNode* pFunc = NULL;
|
||||||
|
int32_t code = createCastFunc(pCxt, pTag, dt, &pFunc);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
REPLACE_NODE(pFunc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SNode* createNullValue() {
|
||||||
|
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
if (NULL == pValue) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pValue->isNull = true;
|
||||||
|
pValue->node.resType.type = TSDB_DATA_TYPE_NULL;
|
||||||
|
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
|
||||||
|
return (SNode*)pValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, const STableMeta* pMeta,
|
||||||
|
SNodeList** pTagExprs, SCMCreateStreamReq* pReq) {
|
||||||
|
if (LIST_LENGTH(pTags) != LIST_LENGTH(*pTagExprs)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of tags");
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pTagPos = taosArrayInit(LIST_LENGTH(pTags), sizeof(SProjColPos));
|
||||||
|
if (NULL == pTagPos) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SNode* pTag = NULL;
|
||||||
|
SNode* pTagExpr = NULL;
|
||||||
|
FORBOTH(pTag, pTags, pTagExpr, *pTagExprs) {
|
||||||
|
const SSchema* pSchema = getTagSchema(pMeta, ((SColumnNode*)pTag)->colName);
|
||||||
|
if (NULL == pSchema) {
|
||||||
|
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, ((SColumnNode*)pTag)->colName);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addProjToProjColPos(pCxt, pSchema, pTagExpr, pTagPos);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeList* pNewTagExprs = NULL;
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
taosArraySort(pTagPos, projColPosCompar);
|
||||||
|
int32_t indexOfBoundTags = 0;
|
||||||
|
int32_t numOfBoundTags = taosArrayGetSize(pTagPos);
|
||||||
|
int32_t numOfTags = getNumOfTags(pMeta);
|
||||||
|
const SSchema* pTagsSchema = getTableTagSchema(pMeta);
|
||||||
|
pNewTagExprs = nodesMakeList();
|
||||||
|
if (NULL == pNewTagExprs) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
|
||||||
|
const SSchema* pTagSchema = pTagsSchema + i;
|
||||||
|
if (indexOfBoundTags < numOfBoundTags) {
|
||||||
|
SProjColPos* pPos = taosArrayGet(pTagPos, indexOfBoundTags);
|
||||||
|
if (pPos->colId == pTagSchema->colId) {
|
||||||
|
++indexOfBoundTags;
|
||||||
|
code = nodesListStrictAppend(pNewTagExprs, pPos->pProj);
|
||||||
|
pPos->pProj = NULL;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
code = nodesListStrictAppend(pNewTagExprs, createNullValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
taosArrayDestroy(pTagPos);
|
||||||
|
nodesDestroyList(*pTagExprs);
|
||||||
|
*pTagExprs = pNewTagExprs;
|
||||||
|
} else {
|
||||||
|
taosArrayDestroyEx(pTagPos, projColPosDelete);
|
||||||
|
nodesDestroyList(pNewTagExprs);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta,
|
||||||
|
SCMCreateStreamReq* pReq) {
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
|
if (NULL == pStmt->pTags) {
|
||||||
|
return adjustDataTypeOfTags(pCxt, pMeta, pSelect->pTags);
|
||||||
|
}
|
||||||
|
return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq,
|
||||||
|
STableMeta** pMeta) {
|
||||||
|
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta);
|
||||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||||
if (NULL != pStmt->pCols) {
|
if (NULL != pStmt->pCols) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
||||||
|
@ -5860,18 +6006,18 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
|
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
|
||||||
pReq->targetStbUid = pMeta->suid;
|
pReq->targetStbUid = (*pMeta)->suid;
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
|
|
||||||
}
|
|
||||||
taosMemoryFree(pMeta);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||||
pCxt->createStream = true;
|
pCxt->createStream = true;
|
||||||
int32_t code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
|
STableMeta* pMeta = NULL;
|
||||||
|
int32_t code = translateStreamTargetTable(pCxt, pStmt, pReq, &pMeta);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = translateQuery(pCxt, pStmt->pQuery);
|
code = translateQuery(pCxt, pStmt->pQuery);
|
||||||
}
|
}
|
||||||
|
@ -5881,13 +6027,17 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkStreamQuery(pCxt, pStmt);
|
code = checkStreamQuery(pCxt, pStmt);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
||||||
code = adjustStreamQueryForExistTable(pCxt, pStmt, pReq);
|
code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
||||||
|
code = adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(pMeta);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5918,8 +6068,10 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
|
||||||
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
|
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
|
||||||
pReq->fillHistory = pStmt->pOptions->fillHistory;
|
pReq->fillHistory = pStmt->pOptions->fillHistory;
|
||||||
pReq->igExpired = pStmt->pOptions->ignoreExpired;
|
pReq->igExpired = pStmt->pOptions->ignoreExpired;
|
||||||
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
|
if (pReq->createStb) {
|
||||||
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
|
||||||
|
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -852,9 +852,12 @@ TEST_F(ParserInitialCTest, createStream) {
|
||||||
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
|
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
|
||||||
clearCreateStreamReq();
|
clearCreateStreamReq();
|
||||||
|
|
||||||
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1",
|
// st1 already exists
|
||||||
STREAM_CREATE_STABLE_FALSE);
|
setCreateStreamReq(
|
||||||
run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)");
|
"s1", "test",
|
||||||
|
"create stream s1 into st1 tags(tag2) as select max(c1), c2 from t1 partition by tbname tag2 interval(10s)",
|
||||||
|
"st1", STREAM_CREATE_STABLE_FALSE);
|
||||||
|
run("CREATE STREAM s1 INTO st1 TAGS(tag2) AS SELECT MAX(c1), c2 FROM t1 PARTITION BY TBNAME tag2 INTERVAL(10S)");
|
||||||
clearCreateStreamReq();
|
clearCreateStreamReq();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ static void parseArg(int argc, char* argv[]) {
|
||||||
{"dump", no_argument, NULL, 'd'},
|
{"dump", no_argument, NULL, 'd'},
|
||||||
{"async", required_argument, NULL, 'a'},
|
{"async", required_argument, NULL, 'a'},
|
||||||
{"skipSql", required_argument, NULL, 's'},
|
{"skipSql", required_argument, NULL, 's'},
|
||||||
|
{"limitSql", required_argument, NULL, 'i'},
|
||||||
{"log", required_argument, NULL, 'l'},
|
{"log", required_argument, NULL, 'l'},
|
||||||
{0, 0, 0, 0}
|
{0, 0, 0, 0}
|
||||||
};
|
};
|
||||||
|
@ -101,6 +102,9 @@ static void parseArg(int argc, char* argv[]) {
|
||||||
case 's':
|
case 's':
|
||||||
setSkipSqlNum(optarg);
|
setSkipSqlNum(optarg);
|
||||||
break;
|
break;
|
||||||
|
case 'i':
|
||||||
|
setLimitSqlNum(optarg);
|
||||||
|
break;
|
||||||
case 'l':
|
case 'l':
|
||||||
setLogLevel(optarg);
|
setLogLevel(optarg);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -49,9 +49,11 @@ bool g_dump = false;
|
||||||
bool g_testAsyncApis = true;
|
bool g_testAsyncApis = true;
|
||||||
int32_t g_logLevel = 131;
|
int32_t g_logLevel = 131;
|
||||||
int32_t g_skipSql = 0;
|
int32_t g_skipSql = 0;
|
||||||
|
int32_t g_limitSql = 0;
|
||||||
|
|
||||||
void setAsyncFlag(const char* pFlag) { g_testAsyncApis = stoi(pFlag) > 0 ? true : false; }
|
void setAsyncFlag(const char* pArg) { g_testAsyncApis = stoi(pArg) > 0 ? true : false; }
|
||||||
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); }
|
void setSkipSqlNum(const char* pArg) { g_skipSql = stoi(pArg); }
|
||||||
|
void setLimitSqlNum(const char* pArg) { g_limitSql = stoi(pArg); }
|
||||||
|
|
||||||
struct TerminateFlag : public exception {
|
struct TerminateFlag : public exception {
|
||||||
const char* what() const throw() { return "success and terminate"; }
|
const char* what() const throw() { return "success and terminate"; }
|
||||||
|
@ -63,22 +65,27 @@ int32_t getLogLevel() { return g_logLevel; }
|
||||||
|
|
||||||
class ParserTestBaseImpl {
|
class ParserTestBaseImpl {
|
||||||
public:
|
public:
|
||||||
ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase), sqlNo_(0) {}
|
ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase), sqlNo_(0), sqlNum_(0) {}
|
||||||
|
|
||||||
void login(const std::string& user) { caseEnv_.user_ = user; }
|
void login(const std::string& user) { caseEnv_.user_ = user; }
|
||||||
|
|
||||||
void useDb(const string& acctId, const string& db) {
|
void useDb(const string& acctId, const string& db) {
|
||||||
caseEnv_.acctId_ = acctId;
|
caseEnv_.acctId_ = acctId;
|
||||||
caseEnv_.db_ = db;
|
caseEnv_.db_ = db;
|
||||||
caseEnv_.nsql_ = g_skipSql;
|
caseEnv_.numOfSkipSql_ = g_skipSql;
|
||||||
|
caseEnv_.numOfLimitSql_ = g_limitSql;
|
||||||
}
|
}
|
||||||
|
|
||||||
void run(const string& sql, int32_t expect, ParserStage checkStage) {
|
void run(const string& sql, int32_t expect, ParserStage checkStage) {
|
||||||
++sqlNo_;
|
++sqlNo_;
|
||||||
if (caseEnv_.nsql_ > 0) {
|
if (caseEnv_.numOfSkipSql_ > 0) {
|
||||||
--(caseEnv_.nsql_);
|
--(caseEnv_.numOfSkipSql_);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (caseEnv_.numOfLimitSql_ > 0 && caseEnv_.numOfLimitSql_ == sqlNum_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
++sqlNum_;
|
||||||
|
|
||||||
runInternalFuncs(sql, expect, checkStage);
|
runInternalFuncs(sql, expect, checkStage);
|
||||||
runApis(sql, expect, checkStage);
|
runApis(sql, expect, checkStage);
|
||||||
|
@ -94,9 +101,10 @@ class ParserTestBaseImpl {
|
||||||
string acctId_;
|
string acctId_;
|
||||||
string user_;
|
string user_;
|
||||||
string db_;
|
string db_;
|
||||||
int32_t nsql_;
|
int32_t numOfSkipSql_;
|
||||||
|
int32_t numOfLimitSql_;
|
||||||
|
|
||||||
caseEnv() : user_("wangxiaoyu"), nsql_(0) {}
|
caseEnv() : user_("wangxiaoyu"), numOfSkipSql_(0) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct stmtEnv {
|
struct stmtEnv {
|
||||||
|
@ -532,6 +540,7 @@ class ParserTestBaseImpl {
|
||||||
stmtRes res_;
|
stmtRes res_;
|
||||||
ParserTestBase* pBase_;
|
ParserTestBase* pBase_;
|
||||||
int32_t sqlNo_;
|
int32_t sqlNo_;
|
||||||
|
int32_t sqlNum_;
|
||||||
};
|
};
|
||||||
|
|
||||||
ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl(this)) {}
|
ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl(this)) {}
|
||||||
|
|
|
@ -65,10 +65,11 @@ class ParserDdlTest : public ParserTestBase {
|
||||||
|
|
||||||
extern bool g_dump;
|
extern bool g_dump;
|
||||||
|
|
||||||
extern void setAsyncFlag(const char* pFlag);
|
extern void setAsyncFlag(const char* pArg);
|
||||||
extern void setLogLevel(const char* pLogLevel);
|
extern void setLogLevel(const char* pArg);
|
||||||
extern int32_t getLogLevel();
|
extern int32_t getLogLevel();
|
||||||
extern void setSkipSqlNum(const char* pNum);
|
extern void setSkipSqlNum(const char* pArg);
|
||||||
|
extern void setLimitSqlNum(const char* pArg);
|
||||||
|
|
||||||
} // namespace ParserTest
|
} // namespace ParserTest
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue