Merge pull request #19543 from taosdata/feat/ly_stream
create stream and use existing super table
This commit is contained in:
commit
94e8aafcf6
|
@ -344,6 +344,7 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
|
|||
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
|
||||
|
||||
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
|
||||
#define IS_SET_NULL(s) (((s)->flags & COL_SET_NULL) == COL_SET_NULL)
|
||||
|
||||
#define SSCHMEA_TYPE(s) ((s)->type)
|
||||
#define SSCHMEA_FLAGS(s) ((s)->flags)
|
||||
|
@ -1772,6 +1773,8 @@ typedef struct {
|
|||
int64_t checkpointFreq; // ms
|
||||
// 3.0.2.3
|
||||
int8_t createStb;
|
||||
uint64_t targetStbUid;
|
||||
SArray* fillNullCols;
|
||||
} SCMCreateStreamReq;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -207,6 +207,12 @@ typedef struct SQueryNodeStat {
|
|||
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
|
||||
} SQueryNodeStat;
|
||||
|
||||
typedef struct SColLocation {
|
||||
int16_t slotId;
|
||||
col_id_t colId;
|
||||
int8_t type;
|
||||
} SColLocation;
|
||||
|
||||
int32_t initTaskQueue();
|
||||
int32_t cleanupTaskQueue();
|
||||
|
||||
|
|
|
@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
|||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -5486,6 +5487,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
|||
}
|
||||
}
|
||||
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
|
|
@ -314,7 +314,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
}
|
||||
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
|
||||
|
||||
if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
|
||||
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
} else {
|
||||
pObj->targetStbUid = pCreate->targetStbUid;
|
||||
}
|
||||
pObj->targetDbUid = pTargetDb->uid;
|
||||
mndReleaseDb(pMnode, pTargetDb);
|
||||
|
||||
|
@ -334,6 +338,38 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
|
||||
if(numOfNULL > 0) {
|
||||
pObj->outputSchema.nCols += numOfNULL;
|
||||
SSchema* pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
|
||||
if (!pFullSchema) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
int32_t nullIndex = 0;
|
||||
int32_t dataIndex = 0;
|
||||
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
|
||||
SColLocation* pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
|
||||
if (i < pos->slotId) {
|
||||
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
||||
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
||||
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
|
||||
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
|
||||
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
|
||||
dataIndex++;
|
||||
} else {
|
||||
pFullSchema[i].bytes = 0;
|
||||
pFullSchema[i].colId = pos->colId;
|
||||
pFullSchema[i].flags = COL_SET_NULL;
|
||||
memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
|
||||
pFullSchema[i].type = pos->type;
|
||||
nullIndex++;
|
||||
}
|
||||
}
|
||||
taosMemoryFree(pObj->outputSchema.pSchema);
|
||||
pObj->outputSchema.pSchema = pFullSchema;
|
||||
}
|
||||
|
||||
SPlanContext cxt = {
|
||||
.pAstRoot = pAst,
|
||||
.topicQuery = false,
|
||||
|
|
|
@ -323,19 +323,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
taosArrayDestroy(tagArray);
|
||||
}
|
||||
|
||||
static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, void** pBuf, int32_t* contLen) {
|
||||
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
|
||||
int32_t ret = 0;
|
||||
SVCreateTbBatchReq reqs = {0};
|
||||
|
||||
reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
|
||||
if (NULL == reqs.pArray) {
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
taosArrayPush(reqs.pArray, req);
|
||||
reqs.nReqs = 1;
|
||||
|
||||
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
|
||||
tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
|
||||
if (ret < 0) {
|
||||
ret = -1;
|
||||
goto end;
|
||||
|
@ -350,7 +341,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo
|
|||
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
|
||||
SEncoder coder = {0};
|
||||
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) );
|
||||
if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
|
||||
if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
|
||||
rpcFreeCont(*pBuf);
|
||||
*pBuf = NULL;
|
||||
*contLen = 0;
|
||||
|
@ -361,14 +352,13 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo
|
|||
tEncoderClear(&coder);
|
||||
|
||||
end:
|
||||
taosArrayDestroy(reqs.pArray);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbReq* pReq) {
|
||||
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
||||
void* buf = NULL;
|
||||
int32_t tlen = 0;
|
||||
encodeCreateChildTableForRPC(pReq, TD_VID(pVnode), &buf, &tlen);
|
||||
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
|
||||
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_CREATE_TABLE,
|
||||
|
@ -387,6 +377,7 @@ _error:
|
|||
tqError("failed to encode submit req since %s", terrstr());
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
|
@ -402,6 +393,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
void* pBuf = NULL;
|
||||
SArray* tagArray = NULL;
|
||||
SArray* pVals = NULL;
|
||||
SArray* crTblArray = NULL;
|
||||
|
||||
for (int32_t i = 0; i < blockSz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
|
@ -442,8 +434,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
||||
}
|
||||
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
SVCreateTbBatchReq reqs = {0};
|
||||
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
|
||||
if (NULL == reqs.pArray) {
|
||||
goto _end;
|
||||
}
|
||||
for (int32_t rowId = 0; rowId < rows; rowId++) {
|
||||
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
SVCreateTbReq* pCreateTbReq = &createTbReq;
|
||||
if (!pCreateTbReq) {
|
||||
goto _end;
|
||||
}
|
||||
|
@ -511,6 +509,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
// set table name
|
||||
|
@ -524,13 +523,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
|
||||
memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
|
||||
}
|
||||
|
||||
if (tqPutReqToQueue(pVnode, pCreateTbReq) != TSDB_CODE_SUCCESS) {
|
||||
taosArrayPush(reqs.pArray, pCreateTbReq);
|
||||
}
|
||||
reqs.nReqs = taosArrayGetSize(reqs.pArray);
|
||||
if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
taosMemoryFreeClear(pCreateTbReq);
|
||||
}
|
||||
tagArray = taosArrayDestroy(tagArray);
|
||||
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
|
||||
crTblArray = NULL;
|
||||
} else {
|
||||
SSubmitTbData tbData = {0};
|
||||
tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows);
|
||||
|
@ -579,7 +580,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
goto _end;
|
||||
}
|
||||
STagVal tagVal = {
|
||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||
.cid = pTSchema->numOfCols + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.i64 = (int64_t)pDataBlock->info.id.groupId,
|
||||
};
|
||||
|
@ -638,16 +639,23 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
taosArrayClear(pVals);
|
||||
int32_t dataIndex = 0;
|
||||
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||
const STColumn* pCol = &pTSchema->columns[k];
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
if (k == 0) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||
void* colData = colDataGetData(pColData, j);
|
||||
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
|
||||
}
|
||||
if (IS_SET_NULL(pCol)) {
|
||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
||||
taosArrayPush(pVals, &cv);
|
||||
} else{
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||
if (colDataIsNull_s(pColData, j)) {
|
||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
||||
taosArrayPush(pVals, &cv);
|
||||
dataIndex++;
|
||||
} else {
|
||||
void* colData = colDataGetData(pColData, j);
|
||||
if (IS_STR_DATA_TYPE(pCol->type)) {
|
||||
|
@ -661,6 +669,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
|
||||
taosArrayPush(pVals, &cv);
|
||||
}
|
||||
dataIndex++;
|
||||
}
|
||||
}
|
||||
}
|
||||
SRow* pRow = NULL;
|
||||
|
@ -716,5 +726,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
|||
_end:
|
||||
taosArrayDestroy(tagArray);
|
||||
taosArrayDestroy(pVals);
|
||||
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
|
||||
// TODO: change
|
||||
}
|
||||
|
|
|
@ -5740,7 +5740,7 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable
|
|||
int32_t index = 0;
|
||||
SNode* pProj = NULL;
|
||||
FOREACH(pProj, pProjections) {
|
||||
SSchema* pSchema = pSchemas + index;
|
||||
SSchema* pSchema = pSchemas + index++;
|
||||
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
|
||||
if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) {
|
||||
SNode* pFunc = NULL;
|
||||
|
@ -5761,7 +5761,7 @@ typedef struct SProjColPos {
|
|||
} SProjColPos;
|
||||
|
||||
static int32_t projColPosCompar(const void* l, const void* r) {
|
||||
return ((SProjColPos*)l)->colId < ((SProjColPos*)r)->colId;
|
||||
return ((SProjColPos*)l)->colId > ((SProjColPos*)r)->colId;
|
||||
}
|
||||
|
||||
static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); }
|
||||
|
@ -5856,7 +5856,11 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
|
|||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
||||
}
|
||||
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
||||
pReq->targetStbUid = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
|
||||
pReq->targetStbUid = pMeta->suid;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
|
||||
|
|
|
@ -247,8 +247,8 @@
|
|||
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
|
||||
,,y,script,./test.sh -f tsim/stream/tableAndTag0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/tableAndTag1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
|
||||
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
|
||||
,,y,script,./test.sh -f tsim/trans/create_db.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/basic1.sim
|
||||
|
|
|
@ -0,0 +1,310 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
|
||||
print ===== step1
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print ===== step2
|
||||
|
||||
sql create database result vgroups 1;
|
||||
|
||||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int);
|
||||
|
||||
sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
|
||||
sql insert into t1 values(1648791213000,1,2,3);
|
||||
sql insert into t2 values(1648791213000,2,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
sql select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
|
||||
print $data00, $data01, $data02
|
||||
print $data10, $data11, $data12
|
||||
print $data20, $data21, $data22
|
||||
|
||||
loop0:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result.streamt0 order by ta;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00, $data01, $data02
|
||||
print $data10, $data11, $data12
|
||||
print $data20, $data21, $data22
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data02 != 1 then
|
||||
print =====data02=$data02
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
print =====data11=$data11
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data12 != 2 then
|
||||
print =====data12=$data12
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
print ===== step3
|
||||
|
||||
sql create database result1 vgroups 1;
|
||||
|
||||
sql create database test1 vgroups 4;
|
||||
sql use test1;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int);
|
||||
|
||||
sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
|
||||
sql insert into t1 values(1648791213000,10,20,30);
|
||||
sql insert into t2 values(1648791213000,40,50,60);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
|
||||
print $data00, $data01, $data02, $data03
|
||||
print $data10, $data11, $data12, $data13
|
||||
print $data20, $data21, $data22, $data23
|
||||
|
||||
loop1:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result1.streamt1 order by ta;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00, $data01, $data02, $data03
|
||||
print $data10, $data11, $data12, $data13
|
||||
print $data20, $data21, $data22, $data23
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data01 != 10 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 20 then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != 1 then
|
||||
print =====data03=$data03
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data11 != 40 then
|
||||
print =====data11=$data11
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data12 != 50 then
|
||||
print =====data12=$data12
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data13 != 1 then
|
||||
print =====data13=$data13
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
||||
print ===== step4
|
||||
|
||||
sql create database result2 vgroups 1;
|
||||
|
||||
sql create database test2 vgroups 4;
|
||||
sql use test2;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20));
|
||||
|
||||
# tag dest 1, source 2
|
||||
##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s);
|
||||
|
||||
# column dest 3, source 4
|
||||
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
|
||||
|
||||
# column dest 3, source 4
|
||||
sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b) as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
|
||||
|
||||
# column dest 3, source 2
|
||||
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s);
|
||||
|
||||
# column dest 3, source 2
|
||||
sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
|
||||
|
||||
|
||||
print ===== step5
|
||||
|
||||
sql create database result3 vgroups 1;
|
||||
|
||||
sql create database test3 vgroups 4;
|
||||
sql use test3;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,2,3);
|
||||
sql create table t2 using st tags(4,5,6);
|
||||
|
||||
sql create stable result3.streamt3(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
|
||||
|
||||
sql create stream streams3 trigger at_once into result3.streamt3(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
|
||||
|
||||
sql insert into t1 values(1648791213000,10,20,30);
|
||||
sql insert into t2 values(1648791213000,40,50,60);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
|
||||
print $data00, $data01, $data02, $data03, $data04
|
||||
print $data10, $data11, $data12, $data13, $data14
|
||||
print $data20, $data21, $data22, $data23, $data24
|
||||
|
||||
loop2:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result3.streamt3;
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
print $data00, $data01, $data02, $data03
|
||||
print $data10, $data11, $data12, $data13
|
||||
print $data20, $data21, $data22, $data23
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data01 != 40 then
|
||||
print =====data01=$data01
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data02 != 20 then
|
||||
print =====data02=$data02
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data03 != 2 then
|
||||
print =====data03=$data03
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data04 != NULL then
|
||||
print =====data04=$data04
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
print ===== step6
|
||||
|
||||
sql create database result4 vgroups 1;
|
||||
|
||||
sql create database test4 vgroups 4;
|
||||
sql use test4;
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,2,3);
|
||||
sql create table t2 using st tags(4,5,6);
|
||||
|
||||
sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
|
||||
|
||||
sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
|
||||
|
||||
sql insert into t1 values(1648791213000,10,20,30);
|
||||
sql insert into t2 values(1648791213000,40,50,60);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
|
||||
print $data00, $data01, $data02, $data03
|
||||
print $data10, $data11, $data12, $data13
|
||||
print $data20, $data21, $data22, $data23
|
||||
|
||||
loop2:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result4.streamt4;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
print $data00, $data01, $data02, $data03
|
||||
print $data10, $data11, $data12, $data13
|
||||
print $data20, $data21, $data22, $data23
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data01 != 40 then
|
||||
print =====data01=$data01
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data02 != 20 then
|
||||
print =====data02=$data02
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data03 != 2 then
|
||||
print =====data03=$data03
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data04 != NULL then
|
||||
print =====data04=$data04
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
print ======over
|
||||
|
||||
system sh/stop_dnodes.sh
|
|
@ -268,6 +268,105 @@ if $data10 != tbn-t2 then
|
|||
endi
|
||||
|
||||
|
||||
print ===== step5
|
||||
print ===== tag name + table name
|
||||
|
||||
sql create database result4 vgroups 1;
|
||||
|
||||
sql create database test4 vgroups 4;
|
||||
sql use test4;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create table t3 using st tags(3,3,3);
|
||||
|
||||
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", tbname)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as dd, tbname interval(10s);
|
||||
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop7:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
|
||||
|
||||
if $rows != 3 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data00 != tbn-t1 then
|
||||
print =====data00=$data00
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data10 != tbn-t2 then
|
||||
print =====data10=$data10
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data20 != tbn-t3 then
|
||||
print =====data20=$data20
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop8:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result4.streamt4 order by 3;
|
||||
|
||||
if $rows != 3 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data02 != tag-t1 then
|
||||
print =====data02=$data02
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
print =====data11=$data11
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data12 != tag-t2 then
|
||||
print =====data12=$data12
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data22 != tag-t3 then
|
||||
print =====data22=$data22
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
print ======over
|
||||
|
||||
system sh/stop_dnodes.sh
|
|
@ -269,6 +269,104 @@ if $data10 != tbn-2 then
|
|||
goto loop6
|
||||
endi
|
||||
|
||||
print ===== step5
|
||||
print ===== tag name + table name
|
||||
|
||||
sql create database result4 vgroups 1;
|
||||
|
||||
sql create database test4 vgroups 4;
|
||||
sql use test4;
|
||||
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create table t3 using st tags(3,3,3);
|
||||
|
||||
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st partition by concat("t", cast(a as varchar(10) ) ) as dd interval(10s);
|
||||
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop7:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
|
||||
|
||||
if $rows != 3 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data00 != tbn-t1 then
|
||||
print =====data00=$data00
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data10 != tbn-t2 then
|
||||
print =====data10=$data10
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data20 != tbn-t3 then
|
||||
print =====data20=$data20
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop8:
|
||||
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from result4.streamt4 order by 3;
|
||||
|
||||
if $rows != 3 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data02 != t1 then
|
||||
print =====data02=$data02
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
print =====data11=$data11
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data12 != t2 then
|
||||
print =====data12=$data12
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data22 != t3 then
|
||||
print =====data22=$data22
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
print ======over
|
||||
|
Loading…
Reference in New Issue