Merge pull request #13167 from taosdata/feature/tq

fix(tmq): update tq for auto create table
This commit is contained in:
Liu Jicong 2022-05-28 20:01:15 +08:00 committed by GitHub
commit c47827527a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 1 deletions

View File

@ -940,7 +940,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} }
// do not show for cleared subscription // do not show for cleared subscription
#if 0 #if 1
int32_t sz = taosArrayGetSize(pSub->unassignedVgs); int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);

View File

@ -235,6 +235,15 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} }
} }
} }
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
ASSERT(code == 0);
}
}
return 0; return 0;
} }

View File

@ -678,6 +678,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
int32_t nRows; int32_t nRows;
int32_t tsize, ret; int32_t tsize, ret;
SEncoder encoder = {0}; SEncoder encoder = {0};
SArray *newTbUids = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
pRsp->code = 0; pRsp->code = 0;
@ -698,6 +699,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
} }
submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp)); submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
newTbUids = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(int64_t));
if (!submitRsp.pArray) { if (!submitRsp.pArray) {
pRsp->code = TSDB_CODE_OUT_OF_MEMORY; pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
@ -727,6 +729,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
goto _exit; goto _exit;
} }
} }
taosArrayPush(newTbUids, &createTbReq.uid);
submitBlkRsp.uid = createTbReq.uid; submitBlkRsp.uid = createTbReq.uid;
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
@ -754,8 +757,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
submitRsp.affectedRows += submitBlkRsp.affectedRows; submitRsp.affectedRows += submitBlkRsp.affectedRows;
taosArrayPush(submitRsp.pArray, &submitBlkRsp); taosArrayPush(submitRsp.pArray, &submitBlkRsp);
} }
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
_exit: _exit:
taosArrayDestroy(newTbUids);
tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
pRsp->pCont = rpcMallocCont(tsize); pRsp->pCont = rpcMallocCont(tsize);
pRsp->contLen = tsize; pRsp->contLen = tsize;

View File

@ -35,6 +35,14 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
return (void*)buf; return (void*)buf;
} }
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.data = data,
};
return 0;
}
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamTaskExecReq req = { SStreamTaskExecReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
@ -407,6 +415,26 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
return 0; return 0;
} }
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) { int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->streamId); tlen += taosEncodeFixedI64(buf, pReq->streamId);

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.common import tdCom
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
#for i in range(100):
tdSql.prepare()
dbname = tdCom.getLongName(10, "letters")
tdSql.execute('create database if not exists djnhawvlgq vgroups 1')
tdSql.execute('use djnhawvlgq')
tdSql.execute('create table if not exists downsampling_stb (ts timestamp, c1 int, c2 double, c3 varchar(100), c4 bool) tags (t1 int, t2 double, t3 varchar(100), t4 bool);')
tdSql.execute('create table downsampling_ct1 using downsampling_stb tags(10, 10.1, "Beijing", True);')
tdSql.execute('create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 nchar(20), c5 nchar(20)) tags (t1 int);')
tdSql.execute('create table scalar_ct1 using scalar_stb tags(10);')
tdSql.execute('create table if not exists data_filter_stb (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 float, c6 double, c7 binary(100), c8 nchar(200), c9 bool, c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 float, t6 double, t7 binary(100), t8 nchar(200), t9 bool, t10 tinyint unsigned, t11 smallint unsigned, t12 int unsigned, t13 bigint unsigned)')
tdSql.execute('create table if not exists data_filter_ct1 using data_filter_stb tags (1, 2, 3, 4, 5.5, 6.6, "binary7", "nchar8", true, 11, 12, 13, 14)')
tdSql.execute('create stream data_filter_stream into output_data_filter_stb as select * from data_filter_stb where ts >= 1653648072973+1s and c1 = 1 or c2 > 1 and c3 != 4 or c4 <= 3 and c5 <> 0 or c6 is not Null or c7 is Null or c8 between "na" and "nchar4" and c8 not between "bi" and "binary" and c8 match "nchar[19]" and c8 nmatch "nchar[25]" or c9 in (1, 2, 3) or c10 not in (6, 7) and c8 like "nch%" and c7 not like "bina_" and c11 <= 10 or c12 is Null or c13 >= 4;')
tdSql.execute('insert into data_filter_ct1 values (1653648072973, 1, 1, 1, 3, 1.1, 1.1, "binary1", "nchar1", true, 1, 2, 3, 4);')
tdSql.execute('insert into data_filter_ct1 values (1653648072973+1s, 2, 2, 1, 3, 1.1, 1.1, "binary2", "nchar2", true, 2, 3, 4, 5);')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())