Merge pull request #13839 from taosdata/feature/TD-14481-3.0
enh: tsma code refactor
This commit is contained in:
commit
2383b2b76d
|
@ -44,6 +44,7 @@ static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
|
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
|
||||||
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
|
||||||
|
static void mndDestroySmaObj(SSmaObj *pSmaObj);
|
||||||
|
|
||||||
int32_t mndInitSma(SMnode *pMnode) {
|
int32_t mndInitSma(SMnode *pMnode) {
|
||||||
SSdbTable table = {
|
SSdbTable table = {
|
||||||
|
@ -390,7 +391,9 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
|
||||||
taosRLockLatch(&pStb->lock);
|
taosRLockLatch(&pStb->lock);
|
||||||
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
||||||
taosRUnLockLatch(&pStb->lock);
|
taosRUnLockLatch(&pStb->lock);
|
||||||
|
stbObj.numOfColumns = 0;
|
||||||
stbObj.pColumns = NULL;
|
stbObj.pColumns = NULL;
|
||||||
|
stbObj.numOfTags = 0;
|
||||||
stbObj.pTags = NULL;
|
stbObj.pTags = NULL;
|
||||||
stbObj.updateTime = taosGetTimestampMs();
|
stbObj.updateTime = taosGetTimestampMs();
|
||||||
stbObj.lock = 0;
|
stbObj.lock = 0;
|
||||||
|
@ -501,6 +504,13 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
|
||||||
|
if (pSmaObj) {
|
||||||
|
taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
|
||||||
|
taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) {
|
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) {
|
||||||
SSmaObj smaObj = {0};
|
SSmaObj smaObj = {0};
|
||||||
memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
|
@ -524,29 +534,17 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
smaObj.tagsFilterLen = pCreate->tagsFilterLen;
|
smaObj.tagsFilterLen = pCreate->tagsFilterLen;
|
||||||
smaObj.sqlLen = pCreate->sqlLen;
|
smaObj.sqlLen = pCreate->sqlLen;
|
||||||
smaObj.astLen = pCreate->astLen;
|
smaObj.astLen = pCreate->astLen;
|
||||||
|
|
||||||
if (smaObj.exprLen > 0) {
|
if (smaObj.exprLen > 0) {
|
||||||
smaObj.expr = taosMemoryMalloc(smaObj.exprLen);
|
smaObj.expr = pCreate->expr;
|
||||||
if (smaObj.expr == NULL) goto _OVER;
|
|
||||||
memcpy(smaObj.expr, pCreate->expr, smaObj.exprLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (smaObj.tagsFilterLen > 0) {
|
if (smaObj.tagsFilterLen > 0) {
|
||||||
smaObj.tagsFilter = taosMemoryMalloc(smaObj.tagsFilterLen);
|
smaObj.tagsFilter = pCreate->tagsFilter;
|
||||||
if (smaObj.tagsFilter == NULL) goto _OVER;
|
|
||||||
memcpy(smaObj.tagsFilter, pCreate->tagsFilter, smaObj.tagsFilterLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (smaObj.sqlLen > 0) {
|
if (smaObj.sqlLen > 0) {
|
||||||
smaObj.sql = taosMemoryMalloc(smaObj.sqlLen);
|
smaObj.sql = pCreate->sql;
|
||||||
if (smaObj.sql == NULL) goto _OVER;
|
|
||||||
memcpy(smaObj.sql, pCreate->sql, smaObj.sqlLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (smaObj.astLen > 0) {
|
if (smaObj.astLen > 0) {
|
||||||
smaObj.ast = taosMemoryMalloc(smaObj.astLen);
|
smaObj.ast = pCreate->ast;
|
||||||
if (smaObj.ast == NULL) goto _OVER;
|
|
||||||
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
|
@ -589,6 +587,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
mndDestroySmaObj(&smaObj);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1012,7 +1011,6 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
|
||||||
rsp->suid = pStb->uid;
|
rsp->suid = pStb->uid;
|
||||||
rsp->version = pStb->smaVer;
|
rsp->version = pStb->smaVer;
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
|
|
|
@ -132,7 +132,7 @@
|
||||||
#./test.sh -f tsim/mnode/basic1.sim -m
|
#./test.sh -f tsim/mnode/basic1.sim -m
|
||||||
|
|
||||||
# --- sma
|
# --- sma
|
||||||
#./test.sh -f tsim/sma/tsmaCreateInsertData.sim
|
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
|
||||||
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||||
|
|
||||||
# --- valgrind
|
# --- valgrind
|
||||||
|
|
|
@ -37,6 +37,14 @@ print =============== trigger stream to execute sma aggr task and insert sma dat
|
||||||
sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
|
sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
|
||||||
#===================================================================
|
#===================================================================
|
||||||
|
|
||||||
|
print =============== show streams ================================
|
||||||
|
sql show streams;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $data00 != d1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
print =============== select * from ct1 from memory
|
print =============== select * from ct1 from memory
|
||||||
sql select * from ct1;
|
sql select * from ct1;
|
||||||
print $data00 $data01
|
print $data00 $data01
|
||||||
|
|
Loading…
Reference in New Issue