feat: support writing streams to existing tables
This commit is contained in:
parent
aef9dc5908
commit
11306876fa
|
@ -352,7 +352,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
|
||||||
code = catalogGetTableMeta(pParCxt->pCatalog, &conn, pName, pMeta);
|
code = catalogGetTableMeta(pParCxt->pCatalog, &conn, pName, pMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_TSC_INVALID_TABLE_NAME != code) {
|
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_PAR_TABLE_NOT_EXIST != code) {
|
||||||
parserError("0x%" PRIx64 " catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", pCxt->pParseCxt->requestId,
|
parserError("0x%" PRIx64 " catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", pCxt->pParseCxt->requestId,
|
||||||
tstrerror(code), pName->dbname, pName->tname);
|
tstrerror(code), pName->dbname, pName->tname);
|
||||||
}
|
}
|
||||||
|
@ -5842,11 +5842,11 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
|
||||||
SCMCreateStreamReq* pReq) {
|
SCMCreateStreamReq* pReq) {
|
||||||
STableMeta* pMeta = NULL;
|
STableMeta* pMeta = NULL;
|
||||||
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, &pMeta);
|
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, &pMeta);
|
||||||
if (TSDB_CODE_TSC_INVALID_TABLE_NAME == code) {
|
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||||
if (NULL != pStmt->pCols) {
|
if (NULL != pStmt->pCols) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
||||||
}
|
}
|
||||||
pReq->createStb = 1;
|
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -5908,8 +5908,6 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
|
||||||
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||||
}
|
}
|
||||||
|
|
||||||
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -427,7 +427,7 @@ class MockCatalogServiceImpl {
|
||||||
int32_t copyTableSchemaMeta(const string& db, const string& tbname, std::unique_ptr<STableMeta>* dst) const {
|
int32_t copyTableSchemaMeta(const string& db, const string& tbname, std::unique_ptr<STableMeta>* dst) const {
|
||||||
STableMeta* src = getTableSchemaMeta(db, tbname);
|
STableMeta* src = getTableSchemaMeta(db, tbname);
|
||||||
if (nullptr == src) {
|
if (nullptr == src) {
|
||||||
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
int32_t len = sizeof(STableMeta) + sizeof(SSchema) * (src->tableInfo.numOfTags + src->tableInfo.numOfColumns);
|
int32_t len = sizeof(STableMeta) + sizeof(SSchema) * (src->tableInfo.numOfTags + src->tableInfo.numOfColumns);
|
||||||
dst->reset((STableMeta*)taosMemoryCalloc(1, len));
|
dst->reset((STableMeta*)taosMemoryCalloc(1, len));
|
||||||
|
|
|
@ -755,14 +755,23 @@ TEST_F(ParserInitialCTest, createStream) {
|
||||||
};
|
};
|
||||||
|
|
||||||
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
|
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
|
||||||
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
|
int8_t createStb = STREAM_CREATE_STABLE_TRUE, int8_t igExists = 0) {
|
||||||
int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
|
|
||||||
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
|
|
||||||
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
|
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
|
||||||
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
|
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
|
||||||
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
|
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
|
||||||
expect.igExists = igExists;
|
expect.igExists = igExists;
|
||||||
expect.sql = strdup(pSql);
|
expect.sql = strdup(pSql);
|
||||||
|
expect.createStb = createStb;
|
||||||
|
expect.triggerType = STREAM_TRIGGER_AT_ONCE;
|
||||||
|
expect.maxDelay = 0;
|
||||||
|
expect.watermark = 0;
|
||||||
|
expect.fillHistory = STREAM_DEFAULT_FILL_HISTORY;
|
||||||
|
expect.igExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
|
||||||
|
};
|
||||||
|
|
||||||
|
auto setStreamOptions = [&](int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, int64_t watermark = 0,
|
||||||
|
int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
|
||||||
|
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
|
||||||
expect.triggerType = triggerType;
|
expect.triggerType = triggerType;
|
||||||
expect.maxDelay = maxDelay;
|
expect.maxDelay = maxDelay;
|
||||||
expect.watermark = watermark;
|
expect.watermark = watermark;
|
||||||
|
@ -813,6 +822,8 @@ TEST_F(ParserInitialCTest, createStream) {
|
||||||
ASSERT_EQ(pField->flags, pExpectField->flags);
|
ASSERT_EQ(pField->flags, pExpectField->flags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ASSERT_EQ(req.checkpointFreq, expect.checkpointFreq);
|
||||||
|
ASSERT_EQ(req.createStb, expect.createStb);
|
||||||
tFreeSCMCreateStreamReq(&req);
|
tFreeSCMCreateStreamReq(&req);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -824,7 +835,8 @@ TEST_F(ParserInitialCTest, createStream) {
|
||||||
"s1", "test",
|
"s1", "test",
|
||||||
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st3 "
|
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st3 "
|
||||||
"as select count(*) from t1 interval(10s)",
|
"as select count(*) from t1 interval(10s)",
|
||||||
"st3", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
|
"st3", 1, 1);
|
||||||
|
setStreamOptions(STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
|
||||||
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS "
|
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS "
|
||||||
"SELECT COUNT(*) "
|
"SELECT COUNT(*) "
|
||||||
"FROM t1 INTERVAL(10S)");
|
"FROM t1 INTERVAL(10S)");
|
||||||
|
@ -840,7 +852,8 @@ TEST_F(ParserInitialCTest, createStream) {
|
||||||
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
|
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
|
||||||
clearCreateStreamReq();
|
clearCreateStreamReq();
|
||||||
|
|
||||||
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1");
|
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1",
|
||||||
|
STREAM_CREATE_STABLE_FALSE);
|
||||||
run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)");
|
run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)");
|
||||||
clearCreateStreamReq();
|
clearCreateStreamReq();
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ TEST_F(PlanOtherTest, createTopic) {
|
||||||
TEST_F(PlanOtherTest, createStream) {
|
TEST_F(PlanOtherTest, createStream) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 "
|
run("create stream if not exists s1 trigger window_close watermark 10s into st3 as select count(*) from t1 "
|
||||||
"interval(10s)");
|
"interval(10s)");
|
||||||
|
|
||||||
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
|
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
|
||||||
|
@ -43,9 +43,9 @@ TEST_F(PlanOtherTest, createStream) {
|
||||||
TEST_F(PlanOtherTest, createStreamUseSTable) {
|
TEST_F(PlanOtherTest, createStreamUseSTable) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("CREATE STREAM IF NOT EXISTS s1 into st1 as SELECT COUNT(*) FROM st1 INTERVAL(10s)");
|
run("CREATE STREAM IF NOT EXISTS s1 into st3 as SELECT COUNT(*) FROM st1 INTERVAL(10s)");
|
||||||
|
|
||||||
run("CREATE STREAM IF NOT EXISTS s1 into st1 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)");
|
run("CREATE STREAM IF NOT EXISTS s1 into st3 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PlanOtherTest, createSmaIndex) {
|
TEST_F(PlanOtherTest, createSmaIndex) {
|
||||||
|
|
Loading…
Reference in New Issue