support drop ctb

This commit is contained in:
wangjiaming0909 2024-03-13 12:02:29 +08:00
parent 94f2f6ae22
commit 1a06dd684e
4 changed files with 53 additions and 4 deletions

View File

@ -228,6 +228,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_STB_DROP, "drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_TSMA, "get-table-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TSMA, "get-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -63,6 +63,7 @@ static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq);
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq);
int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = {
@ -91,6 +92,8 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP, mndProcessDropStbReqFromMNode);
mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA_RSP, mndTransProcessRsp);
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
// mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
@ -3703,3 +3706,7 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
}
return code;
}
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
}

View File

@ -3673,7 +3673,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
SName tsmaTargetTbName = {0};
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName);
int32_t len = snprintf(tsmaTargetTbName.tname, TSDB_TABLE_NAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name);
int32_t len = snprintf(tsmaTargetTbName.tname, TSDB_TABLE_NAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name); // TODO what if tsma name is too long
len = taosCreateMD5Hash(tsmaTargetTbName.tname, len);
sprintf(tsmaTargetTbName.tname + len, "_%s", pRealTable->table.tableName);
collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables);
@ -7950,6 +7950,9 @@ static int32_t doTranslateDropCtbsWithTsma(STranslateContext* pCxt, SDropTableSt
FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode;
if (pClause->pTsmas) {
for (int32_t i = 0; i < pClause->pTsmas->size; ++i) {
}
// generate tsma res ctb names and get it's vgInfo
}
}
@ -12376,6 +12379,44 @@ SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap) {
return pBufArray;
}
static int32_t dropTableAddTsmaResTb(STranslateContext* pCxt, SHashObj* pVgMap, SDropTableStmt* pStmt) {
char tsmaResTbName[TSDB_TABLE_NAME_LEN + TSDB_DB_FNAME_LEN + 1];
SName tbName = {0};
SNode* pNode;
int32_t code = 0;
STableMeta* pTableMeta = NULL;
FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode;
if (pClause->pTsmas) {
for (int32_t i = 0; pClause->pTsmas->size; ++i) {
const STableTSMAInfo* pTsma = taosArrayGetP(pClause->pTsmas, i);
int32_t len = sprintf(tsmaResTbName, "%s.%s", pTsma->dbFName, pTsma->name);
len = taosCreateMD5Hash(tsmaResTbName, len);
sprintf(tsmaResTbName + len, "_%s", pClause->tableName);
toName(pCxt->pParseCxt->acctId, pClause->dbName, tsmaResTbName, &tbName);
code = getTargetMeta(pCxt, &tbName, &pTableMeta, false);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
code = TSDB_CODE_SUCCESS;
continue;
}
if (code) break;
collectUseTable(&tbName, pCxt->pTargetTables);
SVgroupInfo info = {0};
if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroup(pCxt, pClause->dbName, tsmaResTbName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
addDropTbReqIntoVgroup(pVgMap, pClause, &info, pTableMeta->suid);
}
}
}
if (code) break;
}
return code;
}
static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
SDropTableStmt* pStmt = (SDropTableStmt*)pQuery->pRoot;
bool isSuperTable = false;
@ -12415,8 +12456,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
}
}
if (pStmt->withTsma) {
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
dropTableAddTsmaResTb(pCxt, pVgroupHashmap, pStmt);
}
SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);

View File

@ -751,6 +751,7 @@ class TDTestCase:
self.create_tsma('tsma5', 'test', 'norm_tb', [
'avg(c1)', 'avg(c2)'], '10m')
time.sleep(999999)
self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg()
self.test_recursive_tsma()
@ -1000,7 +1001,7 @@ class TDTestCase:
def run(self):
self.init_data()
# time.sleep(999999)
#self.test_ddl()
self.test_ddl()
self.test_query_with_tsma()
# time.sleep(999999)