feat: support writing streams to existing tables
This commit is contained in:
parent
0fd305158d
commit
de418e8c8d
|
@ -401,6 +401,7 @@ typedef struct SCreateStreamStmt {
|
|||
SNode* pQuery;
|
||||
SNodeList* pTags;
|
||||
SNode* pSubtable;
|
||||
SNodeList* pCols;
|
||||
} SCreateStreamStmt;
|
||||
|
||||
typedef struct SDropStreamStmt {
|
||||
|
|
|
@ -216,7 +216,7 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
|
|||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
|
||||
SNode* createStreamOptions(SAstCreateContext* pCxt);
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable,
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery);
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pStreamName);
|
||||
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
|
||||
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
|
||||
|
|
|
@ -541,9 +541,15 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
|
|||
|
||||
/************************************************ create/drop stream **************************************************/
|
||||
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
|
||||
full_table_name(C) tags_def_opt(F) subtable_opt(G) AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D); }
|
||||
full_table_name(C) col_list_opt(H) tags_def_opt(F) subtable_opt(G)
|
||||
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
|
||||
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
|
||||
|
||||
%type col_list_opt { SNodeList* }
|
||||
%destructor col_list_opt { nodesDestroyList($$); }
|
||||
col_list_opt(A) ::= . { A = NULL; }
|
||||
col_list_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
|
||||
|
||||
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
|
||||
|
|
|
@ -1742,21 +1742,20 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
|
|||
}
|
||||
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable,
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery) {
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
COPY_STRING_FORM_ID_TOKEN(pStmt->streamName, pStreamName);
|
||||
if (NULL != pRealTable) {
|
||||
strcpy(pStmt->targetDbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->targetTabName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
}
|
||||
strcpy(pStmt->targetDbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->targetTabName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
pStmt->ignoreExists = ignoreExists;
|
||||
pStmt->pOptions = (SStreamOptions*)pOptions;
|
||||
pStmt->pQuery = pQuery;
|
||||
pStmt->pTags = pTags;
|
||||
pStmt->pSubtable = pSubtable;
|
||||
pStmt->pCols = pCols;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
|
|
|
@ -4979,10 +4979,10 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSchema* getColSchema(STableMeta* pTableMeta, const char* pColName) {
|
||||
static const SSchema* getColSchema(const STableMeta* pTableMeta, const char* pColName) {
|
||||
int32_t numOfFields = getNumOfTags(pTableMeta) + getNumOfColumns(pTableMeta);
|
||||
for (int32_t i = 0; i < numOfFields; ++i) {
|
||||
SSchema* pSchema = pTableMeta->schema + i;
|
||||
const SSchema* pSchema = pTableMeta->schema + i;
|
||||
if (0 == strcmp(pColName, pSchema->name)) {
|
||||
return pSchema;
|
||||
}
|
||||
|
@ -5002,7 +5002,8 @@ static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta) {
|
||||
static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTableStmt* pStmt,
|
||||
const STableMeta* pTableMeta) {
|
||||
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
||||
if (getNumOfTags(pTableMeta) == 1 && pTagsSchema->type == TSDB_DATA_TYPE_JSON &&
|
||||
(pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG || pStmt->alterType == TSDB_ALTER_TABLE_DROP_TAG ||
|
||||
|
@ -5021,7 +5022,7 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
|
|||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Table is not super table");
|
||||
}
|
||||
|
||||
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
|
||||
} else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->type != pStmt->dataType.type ||
|
||||
|
@ -5716,6 +5717,139 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pProjections) {
|
||||
if (getNumOfColumns(pMeta) != LIST_LENGTH(pProjections)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
|
||||
}
|
||||
|
||||
SSchema* pSchemas = getTableColumnSchema(pMeta);
|
||||
int32_t index = 0;
|
||||
SNode* pProj = NULL;
|
||||
FOREACH(pProj, pProjections) {
|
||||
SSchema* pSchema = pSchemas + index;
|
||||
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
|
||||
if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) {
|
||||
SNode* pFunc = NULL;
|
||||
int32_t code = createCastFunc(pCxt, pProj, dt, &pFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
REPLACE_NODE(pFunc);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SProjColPos {
|
||||
int32_t colId;
|
||||
SNode* pProj;
|
||||
} SProjColPos;
|
||||
|
||||
static int32_t projColPosCompar(const void* l, const void* r) {
|
||||
return ((SProjColPos*)l)->colId < ((SProjColPos*)r)->colId;
|
||||
}
|
||||
|
||||
static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); }
|
||||
|
||||
static int32_t addProjToProjColPos(STranslateContext* pCxt, const SSchema* pSchema, SNode* pProj, SArray* pProjColPos) {
|
||||
SNode* pNewProj = nodesCloneNode(pProj);
|
||||
if (NULL == pNewProj) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
|
||||
if (!dataTypeEqual(&dt, &((SExprNode*)pNewProj)->resType)) {
|
||||
SNode* pFunc = NULL;
|
||||
code = createCastFunc(pCxt, pNewProj, dt, &pFunc);
|
||||
pNewProj = pFunc;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SProjColPos pos = {.colId = pSchema->colId, .pProj = pNewProj};
|
||||
code = (NULL == taosArrayPush(pProjColPos, &pos) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pNewProj);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols, const STableMeta* pMeta,
|
||||
SNodeList** pProjections) {
|
||||
if (LIST_LENGTH(pCols) != LIST_LENGTH(*pProjections)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
|
||||
}
|
||||
|
||||
SArray* pProjColPos = taosArrayInit(LIST_LENGTH(pCols), sizeof(SProjColPos));
|
||||
if (NULL == pProjColPos) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNode* pCol = NULL;
|
||||
SNode* pProj = NULL;
|
||||
FORBOTH(pCol, pCols, pProj, *pProjections) {
|
||||
const SSchema* pSchema = getColSchema(pMeta, ((SColumnNode*)pCol)->colName);
|
||||
code = addProjToProjColPos(pCxt, pSchema, pProj, pProjColPos);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SNodeList* pNewProjections = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosArraySort(pProjColPos, projColPosCompar);
|
||||
int32_t num = taosArrayGetSize(pProjColPos);
|
||||
pNewProjections = nodesMakeList();
|
||||
if (NULL == pNewProjections) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) {
|
||||
SProjColPos* pPos = taosArrayGet(pProjColPos, i);
|
||||
code = nodesListStrictAppend(pNewProjections, pPos->pProj);
|
||||
pPos->pProj = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosArrayDestroy(pProjColPos);
|
||||
nodesDestroyList(*pProjections);
|
||||
*pProjections = pNewProjections;
|
||||
} else {
|
||||
taosArrayDestroyEx(pProjColPos, projColPosDelete);
|
||||
nodesDestroyList(pNewProjections);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t adjustStreamQueryForExistTableImpl(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
|
||||
const STableMeta* pMeta) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (NULL == pStmt->pCols) {
|
||||
return adjustDataTypeOfProjections(pCxt, pMeta, pSelect->pProjectionList);
|
||||
}
|
||||
return adjustOrderOfProjection(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList);
|
||||
}
|
||||
|
||||
static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
|
||||
SCMCreateStreamReq* pReq) {
|
||||
STableMeta* pMeta = NULL;
|
||||
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, &pMeta);
|
||||
if (TSDB_CODE_TSC_INVALID_TABLE_NAME == code) {
|
||||
if (NULL != pStmt->pCols) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// todo pReq->create = true;
|
||||
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||
pCxt->createStream = true;
|
||||
int32_t code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery);
|
||||
|
@ -5728,6 +5862,9 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt->pQuery);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = adjustStreamQueryForExistTable(pCxt, pStmt, pReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||
|
@ -7351,12 +7488,12 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
||||
static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, const STableMeta* pTableMeta,
|
||||
SVAlterTbReq* pReq) {
|
||||
if (2 == getNumOfColumns(pTableMeta)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DROP_COL);
|
||||
}
|
||||
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
|
||||
} else if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
|
||||
|
@ -7372,11 +7509,11 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
||||
static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, const STableMeta* pTableMeta,
|
||||
SVAlterTbReq* pReq) {
|
||||
pReq->colModBytes = calcTypeBytes(pStmt->dataType);
|
||||
pReq->colModType = pStmt->dataType.type;
|
||||
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
|
||||
} else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->type != pStmt->dataType.type ||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue