Merge pull request #19546 from taosdata/fix/TD-21873-3.0

fix: tsma transaction refactor
This commit is contained in:
dapan1121 2023-01-16 18:19:42 +08:00 committed by GitHub
commit 122afc414e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 43 deletions

View File

@ -3001,7 +3001,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
void tFreeSerializeSTableIndexRsp(STableIndexRsp *pRsp) {
if (pRsp->pIndex != NULL) {
taosArrayDestroy(pRsp->pIndex);
tFreeSTableIndexRsp(pRsp);
pRsp->pIndex = NULL;
}
}

View File

@ -157,6 +157,7 @@ static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
return 0;
}
#if 0
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
if (pReq->isTsma) {
SMsgHead *smaMsg = pReq->pTsma;
@ -165,6 +166,7 @@ static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
}
return 0;
}
#endif
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SCreateVnodeReq req = {0};
@ -245,12 +247,14 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _OVER;
}
#if 0
code = vmTsmaProcessCreate(pImpl, &req);
if (code != 0) {
dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
code = terrno;
goto _OVER;
}
#endif
code = vnodeStart(pImpl);
if (code != 0) {

View File

@ -459,8 +459,10 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
taosMemoryFreeClear(pSmaReq);
if (pReq == NULL) return -1;
if (pReq == NULL) {
taosMemoryFreeClear(pSmaReq);
return -1;
}
action.pCont = pReq;
action.contLen = contLen;
@ -468,10 +470,21 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFreeClear(pSmaReq);
taosMemoryFree(pReq);
return -1;
}
action.pCont = pSmaReq;
action.contLen = smaContLen;
action.msgType = TDMT_VND_CREATE_SMA;
action.acceptableCode = TSDB_CODE_TSMA_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFreeClear(pSmaReq);
return -1;
}
return 0;
}

View File

@ -25,14 +25,13 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
@ -42,7 +41,6 @@ int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) {
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
@ -258,28 +256,23 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t rows = pDataBlock->info.rows;
SSubmitTbData *pTbData = (SSubmitTbData *)taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (!pTbData) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
SSubmitTbData tbData = {0};
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
taosMemoryFree(pTbData);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
goto _end;
}
pTbData->suid = suid;
pTbData->uid = 0; // uid is assigned by vnode
pTbData->sver = pTSchema->version;
tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version;
if (createTb) {
pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i);
if (pTbData->pCreateTbReq) pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
tbData.pCreateTbReq = taosArrayGetP(createTbArray, i);
if (tbData.pCreateTbReq) tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
}
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(pTbData->aRowP);
taosMemoryFree(pTbData);
taosArrayDestroy(tbData.aRowP);
goto _end;
}
@ -307,14 +300,13 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
}
SRow *pRow = NULL;
if ((terrno = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE);
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
taosArrayPush(pTbData->aRowP, &pRow);
taosArrayPush(tbData.aRowP, &pRow);
}
taosArrayPush(pReq->aSubmitTbData, pTbData);
taosArrayPush(pReq->aSubmitTbData, &tbData);
}
// encode
@ -336,9 +328,13 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
tEncoderClear(&encoder);
}
_end:
taosArrayDestroy(createTbArray);
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
if (pReq) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
if (terrno != 0) {
rpcFreeCont(pBuf);

View File

@ -65,7 +65,7 @@ class TDTestCase:
tdSql.query('select count(*),db_name, stable_name from information_schema.ins_tables group by db_name, stable_name;')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 23)
tdSql.checkData(0, 0, 24)
tdSql.checkData(0, 1, 'information_schema')
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 0, 3)
@ -77,7 +77,7 @@ class TDTestCase:
tdSql.query('select count(1),db_name, stable_name from information_schema.ins_tables group by db_name, stable_name;')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 23)
tdSql.checkData(0, 0, 24)
tdSql.checkData(0, 1, 'information_schema')
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 0, 5)
@ -93,7 +93,7 @@ class TDTestCase:
tdSql.checkData(0, 1, 'performance_schema')
tdSql.checkData(1, 0, 3)
tdSql.checkData(1, 1, 'tbl_count')
tdSql.checkData(2, 0, 23)
tdSql.checkData(2, 0, 24)
tdSql.checkData(2, 1, 'information_schema')
tdSql.query("select count(*) from information_schema.ins_tables where db_name='tbl_count'")
@ -106,7 +106,7 @@ class TDTestCase:
tdSql.query('select count(*) from information_schema.ins_tables')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 31)
tdSql.checkData(0, 0, 32)
tdSql.execute('create table stba (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
@ -182,7 +182,7 @@ class TDTestCase:
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 'tbl_count')
tdSql.checkData(0, 2, 'stba')
tdSql.checkData(1, 0, 23)
tdSql.checkData(1, 0, 24)
tdSql.checkData(1, 1, 'information_schema')
tdSql.checkData(1, 2, None)
tdSql.checkData(2, 0, 3)
@ -194,7 +194,7 @@ class TDTestCase:
tdSql.query('select count(1),db_name, stable_name from information_schema.ins_tables group by db_name, stable_name;')
tdSql.checkRows(4)
tdSql.checkData(0, 0, 23)
tdSql.checkData(0, 0, 24)
tdSql.checkData(0, 1, 'information_schema')
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 0, 5)
@ -213,7 +213,7 @@ class TDTestCase:
tdSql.checkData(0, 1, 'performance_schema')
tdSql.checkData(1, 0, 4)
tdSql.checkData(1, 1, 'tbl_count')
tdSql.checkData(2, 0, 23)
tdSql.checkData(2, 0, 24)
tdSql.checkData(2, 1, 'information_schema')
tdSql.query("select count(*) from information_schema.ins_tables where db_name='tbl_count'")
@ -226,7 +226,7 @@ class TDTestCase:
tdSql.query('select count(*) from information_schema.ins_tables')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 32)
tdSql.checkData(0, 0, 33)
tdSql.execute('drop database tbl_count')

View File

@ -7,9 +7,10 @@ sql connect
print =============== create database
sql create database d1 keep 36500d vgroups 1
sql use d1
sql alter local 'querySmaOptimize' '1';
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 binary(10),c5 nchar(10)) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
@ -25,16 +26,32 @@ if $rows != 1 then
endi
print =============== insert data, mode1: one row one table in sql
sql insert into ct1 values('2022-10-19 09:55:45.682', 10, 2.0, 3.0)
sql insert into ct1 values('2022-10-19 09:55:46.682', 11, 2.1, 3.1)('2022-10-19 09:55:47.682', -12, -2.2, -3.2)('2022-10-19 09:55:48.682', -13, -2.3, -3.3)
sql insert into ct1 values('2022-10-19 09:55:45.682', 10, 2.0, 3.0, "a", "n0")
sql insert into ct1 values('2022-10-19 09:55:46.682', 11, 2.1, 3.1,"b","n1")('2022-10-19 09:55:47.682', -12, -2.2, -3.2,"c","n2")('2022-10-19 09:55:48.682', -13, -2.3, -3.3,"d","n3")
print =============== create sma index from super table
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(5m,10s) sliding(5m) watermark 1s max_delay 1s
print $data00 $data01 $data02 $data03
sql create sma index sma_index_name2 on stb function(sum(c1),first(c1), last(c1), first(c4),last(c4),count(c4),first(c5),last(c5),count(c5),apercentile(c1,80,"t-digest"), avg(c2),count(c3), spread(c3), stddev(c2), hyperloglog(c2), hyperloglog(c4), hyperloglog(c5)) interval(5m,10s) sliding(5m);
# for varchar/binary
sql_error create sma index sma_index_name3 on stb function(sum(c4)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(min(c4)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(max(c4)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(avg(c4)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(apercentile(c4)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(spread(c4)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(stddev(c4)) interval(5m,10s) sliding(5m);
# for nchar
sql_error create sma index sma_index_name3 on stb function(sum(c5)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(min(c5)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(max(c5)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(avg(c5)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(apercentile(c5)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(spread(c5)) interval(5m,10s) sliding(5m);
sql_error create sma index sma_index_name3 on stb function(stddev(c5)) interval(5m,10s) sliding(5m);
print =============== trigger stream to execute sma aggr task and insert sma data into sma store
sql insert into ct1 values('2022-10-19 09:55:50.682', 20, 20.0, 30.0)
sql insert into ct1 values('2022-10-19 09:55:50.682', 20, 20.0, 30.0,"e","n5")
#==================== sleep 2s to wait for tsma result
sleep 2000
@ -42,9 +59,11 @@ print =============== show streams ================================
sql show streams;
print $data00 $data01 $data02
if $data00 != sma_index_name1 then
print $data00
return -1
if $data00 != sma_index_name1 then
if $data00 != sma_index_name2 then
print $data00
return -1
endi
endi
print =============== select * from ct1 from memory

View File

@ -56,7 +56,7 @@ class TDTestCase:
self.binary_str = 'taosdata'
self.nchar_str = '涛思数据'
self.ins_list = ['ins_dnodes','ins_mnodes','ins_modules','ins_qnodes','ins_snodes','ins_cluster','ins_databases','ins_functions',\
'ins_indexes','ins_stables','ins_tables','ins_tags','ins_users','ins_grants','ins_vgroups','ins_configs','ins_dnode_variables',\
'ins_indexes','ins_stables','ins_tables','ins_tags','ins_columns','ins_users','ins_grants','ins_vgroups','ins_configs','ins_dnode_variables',\
'ins_topics','ins_subscriptions','ins_streams','ins_stream_tasks','ins_vnodes','ins_user_privileges']
self.perf_list = ['perf_connections','perf_queries','perf_consumers','perf_trans','perf_apps']
def insert_data(self,column_dict,tbname,row_num):

View File

@ -22,7 +22,7 @@ class TDTestCase:
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
tdSql.query("select count(*) from information_schema.ins_columns")
tdSql.checkData(0, 0, 265)
tdSql.checkData(0, 0, 267)
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
tdSql.checkRows(14)