From f893b656224b04fbc3a35fbdbab4663f504db14a Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 23 Mar 2022 23:48:00 -0400 Subject: [PATCH] reorganize physical plan code --- source/common/src/tmsg.c | 4 +++- source/dnode/mnode/impl/src/mndStb.c | 3 +++ source/libs/parser/src/parTranslater.c | 31 +++++++++++++++++++------- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1ea00566f4..cfa9486cab 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -287,6 +287,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->ver); + tlen += taosEncodeString(buf, pReq->dbFName); tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedU32(buf, pReq->ttl); tlen += taosEncodeFixedU32(buf, pReq->keep); @@ -360,6 +361,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeFixedI64(buf, &(pReq->ver)); + buf = taosDecodeString(buf, &(pReq->dbFName)); buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeFixedU32(buf, &(pReq->ttl)); buf = taosDecodeFixedU32(buf, &(pReq->keep)); @@ -478,7 +480,7 @@ void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { buf = taosDecodeFixedU32(buf, &nsize); pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq)); for (size_t i = 0; i < nsize; i++) { - SVCreateTbReq req; + SVCreateTbReq req = {0}; buf = tDeserializeSVCreateTbReq(buf, &req); taosArrayPush(pReq->pArray, &req); } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7b0de9d8bd..944572c86c 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -271,9 +271,12 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { SName name = {0}; tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(&name, dbFName); SVCreateTbReq req = {0}; req.ver = 0; + req.dbFName = dbFName; req.name = (char *)tNameGetTableName(&name); req.ttl = 0; req.keep = 0; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 100fc3f107..930aadd9ad 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1870,14 +1870,21 @@ static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema } static void destroyCreateTbReq(SVCreateTbReq* pReq) { + tfree(pReq->dbFName); tfree(pReq->name); tfree(pReq->ntbCfg.pSchema); } -static int32_t buildNormalTableBatchReq( - const char* pDbName, const char* pTableName, const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) { +static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, const char* pTableName, + const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId }; + strcpy(name.dbname, pDbName); + tNameGetFullDbName(&name, dbFName); + SVCreateTbReq req = {0}; req.type = TD_NORMAL_TABLE; + req.dbFName = strdup(dbFName); req.name = strdup(pTableName); req.ntbCfg.nCols = LIST_LENGTH(pColumns); req.ntbCfg.pSchema = calloc(req.ntbCfg.nCols, sizeof(SSchema)); @@ -1904,7 +1911,7 @@ static int32_t buildNormalTableBatchReq( return TSDB_CODE_SUCCESS; } -static int32_t serializeVgroupTablesBatch(int32_t acctId, SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { +static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req)); void* buf = malloc(tlen); if (NULL == buf) { @@ -1932,6 +1939,7 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) { size_t size = taosArrayGetSize(pTbBatch->req.pArray); for(int32_t i = 0; i < size; ++i) { SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i); + tfree(pTableReq->dbFName); tfree(pTableReq->name); if (pTableReq->type == TSDB_NORMAL_TABLE) { @@ -1973,9 +1981,9 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt* } SVgroupTablesBatch tbatch = {0}; - int32_t code = buildNormalTableBatchReq(pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch); + int32_t code = buildNormalTableBatchReq(acctId, pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch); if (TSDB_CODE_SUCCESS == code) { - code = serializeVgroupTablesBatch(acctId, &tbatch, *pBufArray); + code = serializeVgroupTablesBatch(&tbatch, *pBufArray); } destroyCreateTbReqBatch(&tbatch); @@ -2004,9 +2012,16 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { return code; } -static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pDbName, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) { +static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, + const char* pDbName, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId }; + strcpy(name.dbname, pDbName); + tNameGetFullDbName(&name, dbFName); + struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; + req.dbFName = strdup(dbFName); req.name = strdup(pTableName); req.ctbCfg.suid = suid; req.ctbCfg.pTag = row; @@ -2159,7 +2174,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info); } if (TSDB_CODE_SUCCESS == code) { - addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info); + addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info); } tfree(pSuperTableMeta); @@ -2181,7 +2196,7 @@ static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHash break; } - serializeVgroupTablesBatch(acctId, pTbBatch, pBufArray); + serializeVgroupTablesBatch(pTbBatch, pBufArray); destroyCreateTbReqBatch(pTbBatch); } while (true);