diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8f199c72f7..ae6f034df5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -441,6 +441,25 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver); + +typedef struct { + char tbName[TSDB_TABLE_NAME_LEN]; + char stbName[TSDB_TABLE_NAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; + int64_t dbId; + int32_t numOfTags; + int32_t numOfColumns; + int8_t precision; + int8_t tableType; + int32_t sversion; + int32_t tversion; + uint64_t suid; + uint64_t tuid; + int32_t vgId; + SSchema* pSchemas; +} STableMetaRsp; + + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; @@ -472,6 +491,14 @@ int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); void tFreeSMCreateStbReq(SMCreateStbReq* pReq); +typedef struct { + STableMetaRsp* pMeta; +} SMCreateStbRsp; + +int32_t tEncodeSMCreateStbRsp(SEncoder* pEncoder, const SMCreateStbRsp* pRsp); +int32_t tDecodeSMCreateStbRsp(SDecoder* pDecoder, SMCreateStbRsp* pRsp); +void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; @@ -1239,23 +1266,6 @@ typedef struct { SVgroupInfo vgroups[]; } SVgroupsInfo; -typedef struct { - char tbName[TSDB_TABLE_NAME_LEN]; - char stbName[TSDB_TABLE_NAME_LEN]; - char dbFName[TSDB_DB_FNAME_LEN]; - int64_t dbId; - int32_t numOfTags; - int32_t numOfColumns; - int8_t precision; - int8_t tableType; - int32_t sversion; - int32_t tversion; - uint64_t suid; - uint64_t tuid; - int32_t vgId; - SSchema* pSchemas; -} STableMetaRsp; - typedef struct { STableMetaRsp* pMeta; } SMAlterStbRsp; @@ -2028,11 +2038,13 @@ int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq); typedef struct { - int32_t code; + int32_t code; + STableMetaRsp* pMeta; } SVCreateTbRsp, SVUpdateTbRsp; int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp); int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp); +void tFreeSVCreateTbRsp(void* param); int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index f275ae0885..de91703f82 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -363,8 +363,9 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList); void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta); -int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clientImpl.c and become a static function -int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx +int32_t removeMeta(STscObj* pTscObj, SArray* tbList); +int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); +int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); bool qnodeRequired(SRequestObj* pRequest); #ifdef __cplusplus diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5f0af55d13..df3e82a05e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -780,6 +780,10 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) { return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res); } +int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) { + return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res); +} + int32_t handleQueryExecRsp(SRequestObj* pRequest) { if (NULL == pRequest->body.resInfo.execRes.res) { return TSDB_CODE_SUCCESS; @@ -802,6 +806,19 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { code = handleAlterTbExecRes(pRes->res, pCatalog); break; } + case TDMT_VND_CREATE_TABLE: { + SArray* pList = (SArray*)pRes->res; + int32_t num = taosArrayGetSize(pList); + for (int32_t i = 0; i < num; ++i) { + void* res = taosArrayGetP(pList, i); + code = handleCreateTbExecRes(res, pCatalog); + } + break; + } + case TDMT_MND_CREATE_STB: { + code = handleCreateTbExecRes(pRes->res, pCatalog); + break; + } case TDMT_VND_SUBMIT: { atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes); @@ -859,17 +876,13 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { return; } - if (code == TSDB_CODE_SUCCESS) { - code = handleQueryExecRsp(pRequest); - ASSERT(pRequest->code == TSDB_CODE_SUCCESS); - pRequest->code = code; - } - tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type)); - if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { + if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) { removeMeta(pTscObj, pRequest->targetTableList); } + handleQueryExecRsp(pRequest); + // return to client pRequest->body.queryFp(pRequest->body.param, pRequest, code); } @@ -930,6 +943,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue qDestroyQuery(pQuery); } + if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) { + removeMeta(pRequest->pTscObj, pRequest->targetTableList); + } + handleQueryExecRsp(pRequest); if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { @@ -1127,10 +1144,6 @@ SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool valida inRetry = true; } while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES); - if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { - removeMeta(pRequest->pTscObj, pRequest->targetTableList); - } - return pRequest; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 0c4cf23c4e..68aeb68ee0 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -232,13 +232,36 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { assert(pMsg != NULL && param != NULL); SRequestObj* pRequest = param; - taosMemoryFree(pMsg->pData); if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); + } else { + SMCreateStbRsp createRsp = {0}; + SDecoder coder = {0}; + tDecoderInit(&coder, pMsg->pData, pMsg->len); + tDecodeSMCreateStbRsp(&coder, &createRsp); + tDecoderClear(&coder); + + pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB; + pRequest->body.resInfo.execRes.res = createRsp.pMeta; } + taosMemoryFree(pMsg->pData); + if (pRequest->body.queryFp != NULL) { - removeMeta(pRequest->pTscObj, pRequest->tableList); + SExecResult* pRes = &pRequest->body.resInfo.execRes; + + if (code == TSDB_CODE_SUCCESS) { + SCatalog* pCatalog = NULL; + int32_t ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (pRes->res != NULL) { + ret = handleCreateTbExecRes(pRes->res, pCatalog); + } + + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + } + pRequest->body.queryFp(pRequest->body.param, pRequest, code); } else { tsem_post(&pRequest->body.rspSem); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 533d924546..3ceb9ca192 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3196,12 +3196,16 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) { if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1; int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns; - pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols); - if (pRsp->pSchemas == NULL) return -1; + if (totalCols > 0) { + pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols); + if (pRsp->pSchemas == NULL) return -1; - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pRsp->pSchemas[i]; - if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1; + for (int32_t i = 0; i < totalCols; ++i) { + SSchema *pSchema = &pRsp->pSchemas[i]; + if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1; + } + } else { + pRsp->pSchemas = NULL; } return 0; @@ -5090,6 +5094,10 @@ int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) { if (tStartEncode(pCoder) < 0) return -1; if (tEncodeI32(pCoder, pRsp->code) < 0) return -1; + if (tEncodeI32(pCoder, pRsp->pMeta ? 1 : 0) < 0) return -1; + if (pRsp->pMeta) { + if (tEncodeSTableMetaRsp(pCoder, pRsp->pMeta) < 0) return -1; + } tEndEncode(pCoder); return 0; @@ -5100,10 +5108,25 @@ int tDecodeSVCreateTbRsp(SDecoder *pCoder, SVCreateTbRsp *pRsp) { if (tDecodeI32(pCoder, &pRsp->code) < 0) return -1; + int32_t meta = 0; + if (tDecodeI32(pCoder, &meta) < 0) return -1; + if (meta) { + pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp)); + if (NULL == pRsp->pMeta) return -1; + if (tDecodeSTableMetaRsp(pCoder, pRsp->pMeta) < 0) return -1; + } else { + pRsp->pMeta = NULL; + } + tEndDecode(pCoder); return 0; } +void tFreeSVCreateTbRsp(void* param) { + SVCreateTbRsp* pRsp = (SVCreateTbRsp*)param; + taosMemoryFree(pRsp->pMeta); +} + // TDMT_VND_DROP_TABLE ================= static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) { if (tStartEncode(pCoder) < 0) return -1; @@ -5558,6 +5581,60 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) { } } + +int32_t tEncodeSMCreateStbRsp(SEncoder *pEncoder, const SMCreateStbRsp *pRsp) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->pMeta->pSchemas ? 1 : 0) < 0) return -1; + if (pRsp->pMeta->pSchemas) { + if (tEncodeSTableMetaRsp(pEncoder, pRsp->pMeta) < 0) return -1; + } + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) { + int32_t meta = 0; + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI32(pDecoder, &meta) < 0) return -1; + if (meta) { + pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp)); + if (NULL == pRsp->pMeta) return -1; + if (tDecodeSTableMetaRsp(pDecoder, pRsp->pMeta) < 0) return -1; + } + tEndDecode(pDecoder); + return 0; +} + +int32_t tDeserializeSMCreateStbRsp(void *buf, int32_t bufLen, SMCreateStbRsp *pRsp) { + int32_t meta = 0; + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &meta) < 0) return -1; + if (meta) { + pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp)); + if (NULL == pRsp->pMeta) return -1; + if (tDecodeSTableMetaRsp(&decoder, pRsp->pMeta) < 0) return -1; + } + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) { + if (NULL == pRsp) { + return; + } + + if (pRsp->pMeta) { + taosMemoryFree(pRsp->pMeta->pSchemas); + taosMemoryFree(pRsp->pMeta); + } +} + + + int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) { if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1; if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index 010199a89f..8f0d55e100 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -35,6 +35,7 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName); int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb); int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); void mndFreeStb(SStbObj *pStb); +int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen); void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst); void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ebec3d5ea6..b29271e455 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1774,6 +1774,67 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i return 0; } +int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen) { + int32_t ret = -1; + SDbObj *pDb = mndAcquireDb(pMnode, dbFName); + if (NULL == pDb) { + return -1; + } + + SStbObj *pObj = mndAcquireStb(pMnode, stbFName); + if (NULL == pObj) { + goto _OVER; + } + + SEncoder ec = {0}; + uint32_t contLen = 0; + SMCreateStbRsp stbRsp = {0}; + SName name = {0}; + tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + + stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp)); + if (NULL == stbRsp.pMeta) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, stbRsp.pMeta); + if (ret) { + tFreeSMCreateStbRsp(&stbRsp); + goto _OVER; + } + + tEncodeSize(tEncodeSMCreateStbRsp, &stbRsp, contLen, ret); + if (ret) { + tFreeSMCreateStbRsp(&stbRsp); + goto _OVER; + } + + void *cont = taosMemoryMalloc(contLen); + tEncoderInit(&ec, cont, contLen); + tEncodeSMCreateStbRsp(&ec, &stbRsp); + tEncoderClear(&ec); + + tFreeSMCreateStbRsp(&stbRsp); + + *pCont = cont; + *pLen = contLen; + + ret = 0; + +_OVER: + if (pObj) { + mndReleaseStb(pMnode, pObj); + } + + if (pDb) { + mndReleaseDb(pMnode, pDb); + } + + return ret; +} + + static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void *alterOriData, int32_t alterOriDataLen) { int32_t code = -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 17b4336465..e610fa6d27 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -17,6 +17,7 @@ #include "mndTrans.h" #include "mndConsumer.h" #include "mndDb.h" +#include "mndStb.h" #include "mndPrivilege.h" #include "mndShow.h" #include "mndSync.h" @@ -900,15 +901,6 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } SRpcMsg rspMsg = {.code = code, .info = *pInfo}; - if (pTrans->rpcRspLen != 0) { - void *rpcCont = rpcMallocCont(pTrans->rpcRspLen); - if (rpcCont != NULL) { - memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen); - rspMsg.pCont = rpcCont; - rspMsg.contLen = pTrans->rpcRspLen; - } - } - if (pTrans->originRpcType == TDMT_MND_CREATE_DB) { mDebug("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType)); SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1); @@ -924,6 +916,21 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } } mndReleaseDb(pMnode, pDb); + } else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) { + void *pCont = NULL; + int32_t contLen = 0; + if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname1, pTrans->dbname2, &pCont, &contLen) != 0) { + mndTransSetRpcRsp(pTrans, pCont, contLen); + } + } + + if (pTrans->rpcRspLen != 0) { + void *rpcCont = rpcMallocCont(pTrans->rpcRspLen); + if (rpcCont != NULL) { + memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen); + rspMsg.pCont = rpcCont; + rspMsg.contLen = pTrans->rpcRspLen; + } } tmsgSendRsp(&rspMsg); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 39c5f3873e..927c314a4c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -102,7 +102,7 @@ int metaCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); -int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); +int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp **pMetaRsp); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index aa107ab253..6b18e1b48d 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -367,7 +367,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { return 0; } -int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { +int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) { SMetaEntry me = {0}; SMetaReader mr = {0}; @@ -427,6 +427,21 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { if (metaHandleEntry(pMeta, &me) < 0) goto _err; + if (pMetaRsp) { + *pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp)); + + if (*pMetaRsp) { + if (me.type == TSDB_CHILD_TABLE) { + (*pMetaRsp)->tableType = TSDB_CHILD_TABLE; + (*pMetaRsp)->tuid = pReq->uid; + (*pMetaRsp)->suid = pReq->ctb.suid; + strcpy((*pMetaRsp)->tbName, pReq->name); + } else { + metaUpdateMetaRsp(pReq->uid, pReq->name, pReq->ntb.schemaRow, *pMetaRsp); + } + } + } + metaDebug("vgId:%d, table:%s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid, pReq->type); return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7a8d168f4f..947fb845cd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -370,6 +370,10 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { } void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { + if (NULL == pMetaRsp) { + return; + } + strcpy(pMetaRsp->dbFName, pVnode->config.dbname); pMetaRsp->dbId = pVnode->config.dbId; pMetaRsp->vgId = TD_VID(pVnode); @@ -514,7 +518,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR } // do create table - if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) { + if (metaCreateTable(pVnode->pMeta, version, pCreateReq, &cRsp.pMeta) < 0) { if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { cRsp.code = TSDB_CODE_SUCCESS; } else { @@ -524,6 +528,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR cRsp.code = TSDB_CODE_SUCCESS; tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); taosArrayPush(tbUids, &pCreateReq->uid); + vnodeUpdateMetaRsp(pVnode, cRsp.pMeta); } taosArrayPush(rsp.pArray, &cRsp); @@ -552,7 +557,7 @@ _exit: pCreateReq = req.pReqs + iReq; taosArrayDestroy(pCreateReq->ctb.tagName); } - taosArrayDestroy(rsp.pArray); + taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp); taosArrayDestroy(tbUids); tDecoderClear(&decoder); tEncoderClear(&encoder); @@ -864,7 +869,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq goto _exit; } - if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) { + if (metaCreateTable(pVnode->pMeta, version, &createTbReq, NULL) < 0) { if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { submitBlkRsp.code = terrno; pRsp->code = terrno; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 5143aa4af1..3c8b019d81 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -219,6 +219,11 @@ void destroyQueryExecRes(SExecResult* pRes) { } switch (pRes->msgType) { + case TDMT_VND_CREATE_TABLE: { + taosArrayDestroyEx((SArray*)pRes->res, tFreeSTableMetaRsp); + break; + } + case TDMT_MND_CREATE_STB: case TDMT_VND_ALTER_TABLE: case TDMT_MND_ALTER_STB: { tFreeSTableMetaRsp((STableMetaRsp*)pRes->res); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index ecd9daf1bc..bd0c3009b0 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -102,15 +102,26 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa tDecoderInit(&coder, msg, msgSize); code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp); if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { + pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES); + pJob->execRes.msgType = TDMT_VND_CREATE_TABLE; + for (int32_t i = 0; i < batchRsp.nRsps; ++i) { SVCreateTbRsp *rsp = batchRsp.pRsps + i; + if (rsp->pMeta) { + taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta); + } + if (TSDB_CODE_SUCCESS != rsp->code) { code = rsp->code; - tDecoderClear(&coder); - SCH_ERR_JRET(code); } } + + if (taosArrayGetSize((SArray*)pJob->execRes.res) <= 0) { + taosArrayDestroy((SArray*)pJob->execRes.res); + pJob->execRes.res = NULL; + } } + tDecoderClear(&coder); SCH_ERR_JRET(code); }