diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5cf027591f..af250bff03 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -174,6 +174,7 @@ typedef enum _mgmt_table { typedef struct SBuildTableMetaInput { int32_t vgId; + char* dbName; char* tableFullName; } SBuildTableMetaInput; @@ -355,9 +356,9 @@ typedef struct SEpSet { } SEpSet; static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { - if(buf == NULL) return sizeof(SEpSet); + if (buf == NULL) return sizeof(SEpSet); memcpy(buf, pEp, sizeof(SEpSet)); - //TODO: endian conversion + // TODO: endian conversion return sizeof(SEpSet); } @@ -776,6 +777,7 @@ typedef struct { typedef struct { SMsgHead header; + char dbFname[TSDB_DB_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN]; } STableInfoMsg; @@ -810,6 +812,7 @@ typedef struct { typedef struct { char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name char stbFname[TSDB_TABLE_FNAME_LEN]; + char dbFname[TSDB_DB_FNAME_LEN]; int32_t numOfTags; int32_t numOfColumns; int8_t precision; @@ -1122,10 +1125,10 @@ typedef struct STaskDropRsp { } STaskDropRsp; typedef struct { - int8_t igExists; - char* name; - char* physicalPlan; - char* logicalPlan; + int8_t igExists; + char* name; + char* physicalPlan; + char* logicalPlan; } SCMCreateTopicReq; static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { @@ -1161,8 +1164,8 @@ static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopi } typedef struct { - char* topicName; - char* consumerGroup; + char* topicName; + char* consumerGroup; int64_t consumerId; } SCMSubscribeReq; @@ -1183,7 +1186,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq typedef struct { int32_t vgId; - SEpSet pEpSet; + SEpSet pEpSet; } SCMSubscribeRsp; static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { @@ -1252,9 +1255,9 @@ typedef struct SVCreateTbReq { char* name; uint32_t ttl; uint32_t keep; -#define TD_SUPER_TABLE 0 -#define TD_CHILD_TABLE 1 -#define TD_NORMAL_TABLE 2 +#define TD_SUPER_TABLE TSDB_SUPER_TABLE +#define TD_CHILD_TABLE TSDB_CHILD_TABLE +#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE uint8_t type; union { struct { @@ -1282,8 +1285,10 @@ typedef struct { int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq); int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq); -int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq); +int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); +int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq); +void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq); typedef struct SVCreateTbRsp { } SVCreateTbRsp; diff --git a/include/common/tmsgtype.h b/include/common/tmsgtype.h index 8e7ad87a0a..ebbf99b942 100644 --- a/include/common/tmsgtype.h +++ b/include/common/tmsgtype.h @@ -40,8 +40,9 @@ enum { // the SQL below is for mgmt node TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_STABLE, "create-stable" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) - TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" ) diff --git a/include/dnode/vnode/meta/meta.h b/include/dnode/vnode/meta/meta.h index d587107270..86ebb643a4 100644 --- a/include/dnode/vnode/meta/meta.h +++ b/include/dnode/vnode/meta/meta.h @@ -25,20 +25,20 @@ extern "C" { #endif +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_NORMAL_TABLE TD_NORMAL_TABLE + // Types exported typedef struct SMeta SMeta; -#define META_SUPER_TABLE 0 -#define META_CHILD_TABLE 1 -#define META_NORMAL_TABLE 2 - typedef struct SMetaCfg { /// LRU cache size uint64_t lruSize; } SMetaCfg; typedef struct { - int32_t nCols; + uint32_t nCols; SSchema *pSchema; } SSchemaWrapper; diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index 041adbb582..b326ac032c 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -27,7 +27,7 @@ extern "C" { #include "tname.h" #include "tvariant.h" -/* +/** * The first field of a node of any type is guaranteed to be the int16_t. * Hence the type of any node can be gotten by casting it to SQueryNode. */ @@ -157,7 +157,7 @@ typedef struct SVgDataBlocks { typedef struct SInsertStmtInfo { int16_t nodeType; SArray* pDataBlocks; // data block for each vgroup, SArray. - int8_t schemaAttache; // denote if submit block is built with table schema or not + int8_t schemaAttache; // denote if submit block is built with table schema or not uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] const char* sql; // current sql statement position diff --git a/include/util/freelist.h b/include/util/freelist.h new file mode 100644 index 0000000000..497a6d58c3 --- /dev/null +++ b/include/util/freelist.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_FREELIST_H_ +#define _TD_UTIL_FREELIST_H_ + +#include "os.h" +#include "tlist.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct SFreeListNode { + TD_SLIST_NODE(SFreeListNode); + char payload[]; +}; + +typedef TD_SLIST(SFreeListNode) SFreeList; + +#define TFL_MALLOC(SIZE, LIST) \ + ({ \ + void *ptr = malloc((SIZE) + sizeof(struct SFreeListNode)); \ + if (ptr) { \ + TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \ + ptr = ((struct SFreeListNode *)ptr)->payload; \ + } \ + ptr; \ + }) + +#define tFreeListInit(pFL) TD_SLIST_INIT(pFL) + +static FORCE_INLINE void tFreeListClear(SFreeList *pFL) { + struct SFreeListNode *pNode; + for (;;) { + pNode = TD_SLIST_HEAD(pFL); + if (pNode == NULL) break; + TD_SLIST_POP(pFL); + free(pNode); + } +} + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_FREELIST_H_*/ \ No newline at end of file diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 050e763919..b0b2c57ee4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -202,7 +202,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { - if (TSDB_SQL_INSERT == pRequest->type) { + if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); } return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 97692d71b4..7a87acfc36 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -435,42 +435,14 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} -TEST(testCase, show_table_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "show tables"); - if (taos_errno(pRes) != 0) { - printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -//TEST(testCase, create_multiple_tables) { +//TEST(testCase, show_table_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // // TAOS_RES* pRes = taos_query(pConn, "use abc1"); // taos_free_result(pRes); // -// pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st2 tags(2)"); +// pRes = taos_query(pConn, "show tables"); // if (taos_errno(pRes) != 0) { // printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); // taos_free_result(pRes); @@ -490,3 +462,31 @@ TEST(testCase, show_table_Test) { // taos_free_result(pRes); // taos_close(pConn); //} + +TEST(testCase, create_multiple_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b81143ee62..a18a472dba 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -98,7 +98,7 @@ int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) { return 0; } -int tSerializeSVCreateTbReq(void **buf, const SVCreateTbReq *pReq) { +int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; tlen += taosEncodeFixedU64(buf, pReq->ver); @@ -193,6 +193,33 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { return buf; } +int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) { + int tlen = 0; + + tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray)); + for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) { + SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i); + tlen += tSerializeSVCreateTbReq(buf, pCreateTbReq); + } + + return tlen; +} + +void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { + uint32_t nsize = 0; + + buf = taosDecodeFixedU64(buf, &pReq->ver); + buf = taosDecodeFixedU32(buf, &nsize); + for (size_t i = 0; i < nsize; i++) { + SVCreateTbReq req; + buf = tDeserializeSVCreateTbReq(buf, &req); + taosArrayPush(pReq->pArray, &req); + } + + return buf; +} + /* ------------------------ STATIC METHODS ------------------------ */ static int tmsgStartEncode(SMsgEncoder *pME) { struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode)); diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index daa1e964de..cbc4d75e8b 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -17,7 +17,7 @@ #include "vnodeDef.h" static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); -static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } @@ -43,7 +43,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg); case TDMT_VND_SHOW_TABLES_FETCH: return vnodeGetTableList(pVnode, pMsg); -// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); + // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg, pRsp); default: @@ -60,18 +60,21 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t nCols; int32_t nTagCols; SSchemaWrapper *pSW; - STableMetaMsg * pTbMetaMsg; + STableMetaMsg * pTbMetaMsg = NULL; SSchema * pTagSchema; + SRpcMsg rpcMsg; + int msgLen = 0; + int32_t code = TSDB_CODE_VND_APP_ERROR; pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid); if (pTbCfg == NULL) { - return -1; + goto _exit; } if (pTbCfg->type == META_CHILD_TABLE) { pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid); if (pStbCfg == NULL) { - return -1; + goto _exit; } pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true); @@ -91,12 +94,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pTagSchema = NULL; } - int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols); + msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols); pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen); if (pTbMetaMsg == NULL) { - return -1; + goto _exit; } + memcpy(pTbMetaMsg->dbFname, pReq->dbFname, sizeof(pTbMetaMsg->dbFname)); strcpy(pTbMetaMsg->tbFname, pTbCfg->name); if (pTbCfg->type == META_CHILD_TABLE) { strcpy(pTbMetaMsg->stbFname, pStbCfg->name); @@ -119,13 +123,15 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pSch->bytes = htonl(pSch->bytes); } - SRpcMsg rpcMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .pCont = pTbMetaMsg, - .contLen = msgLen, - .code = 0, - }; + code = 0; + +_exit: + + rpcMsg.handle = pMsg->handle; + rpcMsg.ahandle = pMsg->ahandle; + rpcMsg.pCont = pTbMetaMsg; + rpcMsg.contLen = msgLen; + rpcMsg.code = code; rpcSendResponse(&rpcMsg); @@ -138,10 +144,10 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { * @param pRsp */ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { - SMTbCursor* pCur = metaOpenTbCursor(pVnode->pMeta); - SArray* pArray = taosArrayInit(10, POINTER_BYTES); + SMTbCursor *pCur = metaOpenTbCursor(pVnode->pMeta); + SArray * pArray = taosArrayInit(10, POINTER_BYTES); - char* name = NULL; + char * name = NULL; int32_t totalLen = 0; while ((name = metaTbCursorNext(pCur)) != NULL) { taosArrayPush(pArray, &name); @@ -150,18 +156,19 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { metaCloseTbCursor(pCur); - int32_t rowLen = (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4; - int32_t numOfTables = (int32_t) taosArrayGetSize(pArray); + int32_t rowLen = + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4; + int32_t numOfTables = (int32_t)taosArrayGetSize(pArray); int32_t payloadLen = rowLen * numOfTables; -// SVShowTablesFetchReq *pFetchReq = pMsg->pCont; + // SVShowTablesFetchReq *pFetchReq = pMsg->pCont; SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen); memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen); - char* p = pFetchRsp->data; - for(int32_t i = 0; i < numOfTables; ++i) { - char* n = taosArrayGetP(pArray, i); + char *p = pFetchRsp->data; + for (int32_t i = 0; i < numOfTables; ++i) { + char *n = taosArrayGetP(pArray, i); STR_TO_VARSTR(p, n); p += (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); @@ -171,11 +178,11 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { pFetchRsp->precision = 0; SRpcMsg rpcMsg = { - .handle = pMsg->handle, + .handle = pMsg->handle, .ahandle = pMsg->ahandle, - .pCont = pFetchRsp, + .pCont = pFetchRsp, .contLen = sizeof(SVShowTablesFetchRsp) + payloadLen, - .code = 0, + .code = 0, }; rpcSendResponse(&rpcMsg); diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 3b1442a02c..88a73ca174 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -27,7 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { } int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - SRpcMsg * pMsg; + SRpcMsg *pMsg; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); @@ -50,8 +50,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { } int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - SVCreateTbReq vCreateTbReq; - void * ptr = vnodeMalloc(pVnode, pMsg->contLen); + SVCreateTbReq vCreateTbReq; + SVCreateTbBatchReq vCreateTbBatchReq; + void * ptr = vnodeMalloc(pVnode, pMsg->contLen); if (ptr == NULL) { // TODO: handle error } @@ -68,7 +69,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { switch (pMsg->msgType) { case TDMT_VND_CREATE_STB: - case TDMT_VND_CREATE_TABLE: tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { // TODO: handle error @@ -76,6 +76,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: maybe need to clear the requst struct break; + case TDMT_VND_CREATE_TABLE: + tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); + for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) { + SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); + if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { + // TODO: handle error + } + } + case TDMT_VND_DROP_STB: case TDMT_VND_DROP_TABLE: // if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { diff --git a/source/dnode/vnode/meta/inc/metaTbCfg.h b/source/dnode/vnode/meta/inc/metaTbCfg.h index b4ee095967..b7b3924d14 100644 --- a/source/dnode/vnode/meta/inc/metaTbCfg.h +++ b/source/dnode/vnode/meta/inc/metaTbCfg.h @@ -22,10 +22,6 @@ extern "C" { #endif -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE -#define META_NORMAL_TABLE TD_NORMAL_TABLE - int metaValidateTbCfg(SMeta *pMeta, const STbCfg *); size_t metaEncodeTbObjFromTbOptions(const STbCfg *, void *pBuf, size_t bsize); diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index af8af6a052..4254ad0acd 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -23,6 +23,7 @@ typedef struct { tb_uid_t uid; int32_t sver; + int32_t padding; } SSchemaKey; struct SMetaDB { @@ -55,6 +56,8 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT * static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg); +static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); +static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW); #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) @@ -169,18 +172,13 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { pBuf = buf; memset(&key, 0, sizeof(key)); memset(&value, 0, sizeof(key)); - SSchemaKey schemaKey = {uid, 0 /*TODO*/}; + SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0}; key.data = &schemaKey; key.size = sizeof(schemaKey); - taosEncodeFixedU32(&pBuf, ncols); - for (size_t i = 0; i < ncols; i++) { - taosEncodeFixedI8(&pBuf, pSchema[i].type); - taosEncodeFixedI32(&pBuf, pSchema[i].colId); - taosEncodeFixedI32(&pBuf, pSchema[i].bytes); - taosEncodeString(&pBuf, pSchema[i].name); - } + SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema}; + metaEncodeSchema(&pBuf, &sw); value.data = buf; value.size = POINTER_DISTANCE(pBuf, buf); @@ -197,6 +195,38 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { } /* ------------------------ STATIC METHODS ------------------------ */ +static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) { + int tlen = 0; + SSchema *pSchema; + + tlen += taosEncodeFixedU32(buf, pSW->nCols); + for (int i = 0; i < pSW->nCols; i++) { + pSchema = pSW->pSchema + i; + tlen += taosEncodeFixedI8(buf, pSchema->type); + tlen += taosEncodeFixedI32(buf, pSchema->colId); + tlen += taosEncodeFixedI32(buf, pSchema->bytes); + tlen += taosEncodeString(buf, pSchema->name); + } + + return tlen; +} + +static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) { + SSchema *pSchema; + + buf = taosDecodeFixedU32(buf, &pSW->nCols); + pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * pSW->nCols); + for (int i = 0; i < pSW->nCols; i++) { + pSchema = pSW->pSchema + i; + buf = taosDecodeFixedI8(buf, &pSchema->type); + buf = taosDecodeFixedI32(buf, &pSchema->colId); + buf = taosDecodeFixedI32(buf, &pSchema->bytes); + buf = taosDecodeStringTo(buf, pSchema->name); + } + + return buf; +} + static SMetaDB *metaNewDB() { SMetaDB *pDB = NULL; pDB = (SMetaDB *)calloc(1, sizeof(*pDB)); @@ -376,15 +406,8 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { tsize += taosEncodeFixedU8(buf, pTbCfg->type); if (pTbCfg->type == META_SUPER_TABLE) { - tsize += taosEncodeVariantU32(buf, pTbCfg->stbCfg.nTagCols); - for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) { - tsize += taosEncodeFixedI8(buf, pTbCfg->stbCfg.pTagSchema[i].type); - tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pTagSchema[i].colId); - tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pTagSchema[i].bytes); - tsize += taosEncodeString(buf, pTbCfg->stbCfg.pTagSchema[i].name); - } - - // tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema); + SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema}; + tsize += metaEncodeSchema(buf, &sw); } else if (pTbCfg->type == META_CHILD_TABLE) { tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid); tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag); @@ -403,14 +426,10 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { buf = taosDecodeFixedU8(buf, &(pTbCfg->type)); if (pTbCfg->type == META_SUPER_TABLE) { - buf = taosDecodeVariantU32(buf, &(pTbCfg->stbCfg.nTagCols)); - pTbCfg->stbCfg.pTagSchema = (SSchema *)malloc(sizeof(SSchema) * pTbCfg->stbCfg.nTagCols); - for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) { - buf = taosDecodeFixedI8(buf, &(pTbCfg->stbCfg.pTagSchema[i].type)); - buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pTagSchema[i].colId); - buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pTagSchema[i].bytes); - buf = taosDecodeStringTo(buf, pTbCfg->stbCfg.pTagSchema[i].name); - } + SSchemaWrapper sw; + buf = metaDecodeSchema(buf, &sw); + pTbCfg->stbCfg.nTagCols = sw.nCols; + pTbCfg->stbCfg.pTagSchema = sw.pSchema; } else if (pTbCfg->type == META_CHILD_TABLE) { buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid)); buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag)); @@ -496,7 +515,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo int ret; void * pBuf; SSchema * pSchema; - SSchemaKey schemaKey = {uid, sver}; + SSchemaKey schemaKey = {uid, sver, 0}; DBT key = {0}; DBT value = {0}; @@ -507,38 +526,14 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo // Query ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0); if (ret != 0) { + printf("failed to query schema DB since %s================\n", db_strerror(ret)); return NULL; } // Decode the schema pBuf = value.data; - taosDecodeFixedI32(&pBuf, &nCols); - if (isinline) { - pSW = (SSchemaWrapper *)malloc(sizeof(*pSW) + sizeof(SSchema) * nCols); - if (pSW == NULL) { - return NULL; - } - pSW->pSchema = POINTER_SHIFT(pSW, sizeof(*pSW)); - } else { - pSW = (SSchemaWrapper *)malloc(sizeof(*pSW)); - if (pSW == NULL) { - return NULL; - } - - pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * nCols); - if (pSW->pSchema == NULL) { - free(pSW); - return NULL; - } - } - - for (int i = 0; i < nCols; i++) { - pSchema = pSW->pSchema + i; - taosDecodeFixedI8(&pBuf, &(pSchema->type)); - taosDecodeFixedI32(&pBuf, &(pSchema->colId)); - taosDecodeFixedI32(&pBuf, &(pSchema->bytes)); - taosDecodeStringTo(&pBuf, pSchema->name); - } + pSW = malloc(sizeof(*pSW)); + metaDecodeSchema(pBuf, pSW); return pSW; } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 68ff1b8557..236264873e 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -161,7 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE char tbFullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFullName); - SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName}; + SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -194,10 +194,10 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - char tbFullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pTableName, tbFullName); + char dbFullName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFullName); - SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName}; + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -355,19 +355,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (output->metaNum != 1 && output->metaNum != 2) { ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp"); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == pCatalog->tableCache.cache) { pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } @@ -375,19 +375,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.stableCache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } if (output->metaNum == 2) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { ctgError("push ctable[%s] to table cache failed", output->ctbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } } @@ -398,26 +398,23 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); ctgError("push table[%s] to table cache failed", output->tbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname)); if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) { CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); } else { if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { ctgError("push table[%s] to table cache failed", output->tbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } -_return: - tfree(output->tbMeta); - CTG_RET(code); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 5979d3a147..1d8a48dfcb 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -557,6 +557,8 @@ void *ctgTestGetCtableMetaThread(void *param) { assert(0); } + tfree(tbMeta); + if (ctgTestEnableSleep) { usleep(rand()%5); } @@ -592,6 +594,8 @@ void *ctgTestSetCtableMetaThread(void *param) { } } + tfree(output.tbMeta); + return NULL; } @@ -944,7 +948,6 @@ TEST(dbVgroup, getSetDbVgroupCase) { catalogDestroy(); } -#endif TEST(multiThread, getSetDbVgroupCase) { struct SCatalog* pCtg = NULL; @@ -996,6 +999,9 @@ TEST(multiThread, getSetDbVgroupCase) { catalogDestroy(); } +#endif + + TEST(multiThread, ctableMeta) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; @@ -1024,8 +1030,9 @@ TEST(multiThread, ctableMeta) { pthread_attr_init(&thattr); pthread_t thread1, thread2; - pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg); pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg); + sleep(1); + pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg); while (true) { if (ctgTestDeadLoop) { diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index 4bbe6ab907..346bd0cbe4 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -68,7 +68,9 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ * @param type * @return */ -int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen); +SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); + +SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); /** * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index fd99cb6f66..76e26c159b 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -35,7 +35,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); SArray* array = NULL; - SName name = {0}; + SName name = {0}; tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db)); char dbFname[TSDB_DB_FNAME_LEN] = {0}; @@ -48,7 +48,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou pEpSet->numOfEps = info->numOfEps; pEpSet->inUse = info->inUse; - for(int32_t i = 0; i < pEpSet->numOfEps; ++i) { + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); pEpSet->port[i] = info->epAddr[i].port; } @@ -190,7 +190,7 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { val = htonl(pCreate->numOfVgroups); if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) { snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val, - TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); + TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); } return TSDB_CODE_SUCCESS; @@ -321,8 +321,12 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) { return TSDB_CODE_SUCCESS; } -int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len, - SEpSet* pEpSet) { +typedef struct SVgroupTablesBatch { + SVCreateTbBatchReq req; + SVgroupInfo info; +} SVgroupTablesBatch; + +int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { const char* msg1 = "invalid table name"; const char* msg2 = "tags number not matched"; const char* msg3 = "tag value too long"; @@ -330,17 +334,14 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; + SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + // super table name, create table by using dst - int32_t numOfTables = (int32_t)taosArrayGetSize(pCreateTable->childTableInfo); + size_t numOfTables = taosArrayGetSize(pCreateTable->childTableInfo); for (int32_t j = 0; j < numOfTables; ++j) { SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j); SToken* pSTableNameToken = &pCreateTableInfo->stbName; - - char buf[TSDB_TABLE_FNAME_LEN]; - SToken sTblToken; - sTblToken.z = buf; - int32_t code = parserValidateNameToken(pSTableNameToken); if (code != TSDB_CODE_SUCCESS) { return buildInvalidOperationMsg(pMsgBuf, msg1); @@ -357,7 +358,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p size_t numOfInputTag = taosArrayGetSize(pValList); STableMeta* pSuperTableMeta = NULL; - catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); + code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + assert(pSuperTableMeta != NULL); // too long tag values will return invalid sql, not be truncated automatically @@ -460,12 +465,13 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p for (int32_t i = 0; i < numOfInputTag; ++i) { SSchema* pSchema = &pTagSchema[i]; - SToken* pItem = taosArrayGet(pValList, i); - - char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; - SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema}; char* endPtr = NULL; + char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; + + SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema}; + + SToken* pItem = taosArrayGet(pValList, i); code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { @@ -478,7 +484,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder); if (row == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + return TSDB_CODE_QRY_OUT_OF_MEMORY; } tdSortKVRowByColIdx(row); @@ -489,40 +495,73 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p return code; } + SVgroupInfo info = {0}; + catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(tNameGetTableName(&tableName)); req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.pTag = row; - int32_t serLen = sizeof(SMsgHead) + tSerializeSVCreateTbReq(NULL, &req); - char* buf1 = calloc(1, serLen); - *pOutput = buf1; - buf1 += sizeof(SMsgHead); - tSerializeSVCreateTbReq((void*)&buf1, &req); - *len = serLen; + SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); + if (pTableBatch == NULL) { + SVgroupTablesBatch tBatch = {0}; + tBatch.info = info; - SVgroupInfo info = {0}; - catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); + taosArrayPush(tBatch.req.pArray, &req); - pEpSet->inUse = info.inUse; - pEpSet->numOfEps = info.numOfEps; - for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - pEpSet->port[i] = info.epAddr[i].port; - tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); + taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch)); + } else { // add to the correct vgroup + assert(info.vgId == pTableBatch->info.vgId); + taosArrayPush(pTableBatch->req.pArray, &req); + } + } + + // TODO: serialize and + SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); + + SVgroupTablesBatch* pTbBatch = NULL; + do { + pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch); + if (pTbBatch == NULL) break; + + int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); + void* buf = malloc(tlen); + if (buf == NULL) { + // TODO: handle error } - ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId); - ((SMsgHead*)(*pOutput))->contLen = htonl(serLen); - } + ((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId); + ((SMsgHead*)buf)->contLen = htonl(tlen); + + void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req)); + + SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks)); + pVgData->vg = pTbBatch->info; + pVgData->pData = buf; + pVgData->size = tlen; + pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray); + + taosArrayPush(pBufArray, &pVgData); + } while (true); + + SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo)); + pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; + pStmtInfo->pDataBlocks = pBufArray; + *pOutput = pStmtInfo; + *len = sizeof(SInsertStmtInfo); return TSDB_CODE_SUCCESS; } -int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, - int32_t msgBufLen) { +SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { int32_t code = 0; + SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); + SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf* pMsgBuf = &m; @@ -539,21 +578,25 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pPwd = &pUser->passwd; if (pName->n >= TSDB_USER_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } if (pInfo->type == TSDB_SQL_CREATE_USER) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } } else { if (pUser->type == TSDB_ALTER_USER_PASSWD) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } } else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) { assert(pPwd->type == TSDB_DATA_TYPE_NULL); @@ -564,10 +607,12 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) { // pCmd->count = 2; } else { - return buildInvalidOperationMsg(pMsgBuf, msg4); + code = buildInvalidOperationMsg(pMsgBuf, msg4); + goto _error; } } else { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } } @@ -586,15 +631,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pPwd = &pInfo->pMiscInfo->user.passwd; if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } if (pName->n >= TSDB_USER_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; @@ -604,7 +652,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { } else { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } } @@ -623,7 +672,11 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_SHOW: { SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf); - pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW; + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW; break; } @@ -632,13 +685,15 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg); + code = buildInvalidOperationMsg(pMsgBuf, msg); + goto _error; } SName n = {0}; int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n); if (ret != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg); + code = buildInvalidOperationMsg(pMsgBuf, msg); + goto _error; } SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg)); @@ -657,19 +712,22 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt); if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } char buf[TSDB_DB_NAME_LEN] = {0}; SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf)); if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } pDcl->pMsg = (char*)pCreateMsg; @@ -687,7 +745,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SName name = {0}; code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n); if (code != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg)); @@ -699,7 +758,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgLen = sizeof(SDropDbMsg); pDcl->pMsg = (char*)pDropDbMsg; - return TSDB_CODE_SUCCESS; + break; } case TSDB_SQL_CREATE_TABLE: { @@ -707,14 +766,16 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) { if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { - return code; + terrno = code; + goto _error; } + pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB; } else if (pCreateTable->type == TSQL_CREATE_CTABLE) { - if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) != + if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) != TSDB_CODE_SUCCESS) { - return code; + goto _error; } pDcl->msgType = TDMT_VND_CREATE_TABLE; @@ -729,7 +790,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_DROP_TABLE: { pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_DROP_STB; @@ -739,7 +800,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_CREATE_DNODE: { pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_CREATE_DNODE; @@ -749,7 +810,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_DROP_DNODE: { pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_DROP_DNODE; @@ -760,5 +821,29 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm break; } - return code; + return pDcl; + + _error: + terrno = code; + tfree(pDcl); + return NULL; } + +SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { + SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; + assert(pCreateTable->type == TSQL_CREATE_CTABLE); + + SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; + SMsgBuf* pMsgBuf = &m; + + SInsertStmtInfo* pInsertStmt = NULL; + + int32_t msgLen = 0; + int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen); + if (code != TSDB_CODE_SUCCESS) { + tfree(pInsertStmt); + return NULL; + } + + return pInsertStmt; +} \ No newline at end of file diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 710cf4b5d0..1b4d05808c 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -32,7 +32,7 @@ bool isInsertSql(const char* pStr, size_t length) { } bool qIsDdlQuery(const SQueryNode* pQuery) { - return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type; + return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type; } int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { @@ -44,16 +44,29 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } if (!isDqlSqlStatement(&info)) { - SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); - if (NULL == pDcl) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. - return terrno; + bool toVnode = false; + if (info.type == TSDB_SQL_CREATE_TABLE) { + SCreateTableSql* pCreateSql = info.pCreateTableInfo; + if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) { + toVnode = true; + } } - pDcl->nodeType = info.type; - int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen); - if (code == TSDB_CODE_SUCCESS) { + if (toVnode) { + SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); + if (pInsertInfo == NULL) { + return terrno; + } + + *pQuery = (SQueryNode*) pInsertInfo; + } else { + SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); + if (pDcl == NULL) { + return terrno; + } + *pQuery = (SQueryNode*)pDcl; + pDcl->nodeType = info.type; } } else { SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); diff --git a/source/libs/parser/test/parserTests.cpp b/source/libs/parser/test/parserTests.cpp index a67a9a8be8..fe430c5f5e 100644 --- a/source/libs/parser/test/parserTests.cpp +++ b/source/libs/parser/test/parserTests.cpp @@ -714,10 +714,9 @@ TEST(testCase, show_user_Test) { SSqlInfo info1 = doGenerateAST(sql1); ASSERT_EQ(info1.valid, true); - SDclStmtInfo output; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL}; - int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); - ASSERT_EQ(code, 0); + SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len); + ASSERT_NE(output, nullptr); // convert the show command to be the select query // select name, privilege, create_time, account from information_schema.users; @@ -735,10 +734,9 @@ TEST(testCase, create_user_Test) { ASSERT_EQ(info1.valid, true); ASSERT_EQ(isDclSqlStatement(&info1), true); - SDclStmtInfo output; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"}; - int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); - ASSERT_EQ(code, 0); + SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len); + ASSERT_NE(output, nullptr); destroySqlInfo(&info1); } \ No newline at end of file diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 35c6d59ffe..a68102ea6e 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -40,7 +40,7 @@ extern "C" { #define QNODE_SESSIONWINDOW 12 #define QNODE_STATEWINDOW 13 #define QNODE_FILL 14 -#define QNODE_INSERT 15 +#define QNODE_MODIFY 15 typedef struct SQueryDistPlanNodeInfo { bool stableQuery; // super table query or not diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 136073aa60..5f95f86d4a 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -37,15 +37,19 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { return 0; } -int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) { +static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { + SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode; + *pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); if (NULL == *pQueryPlan || NULL == blocks) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - (*pQueryPlan)->info.type = QNODE_INSERT; + + (*pQueryPlan)->info.type = QNODE_MODIFY; taosArrayAddAll(blocks, pInsert->pDataBlocks); (*pQueryPlan)->pExtInfo = blocks; + return TSDB_CODE_SUCCESS; } @@ -62,13 +66,14 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { case TSDB_SQL_SELECT: { return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan); } + case TSDB_SQL_INSERT: - return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan); + case TSDB_SQL_CREATE_TABLE: + return createInsertPlan(pNode, pQueryPlan); + default: return TSDB_CODE_FAILED; } - - return TSDB_CODE_SUCCESS; } int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 97c9cec7c7..978b1554f3 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -34,7 +34,7 @@ static const char* gOpName[] = { #undef INCLUDE_AS_NAME }; -static void* vailidPointer(void* p) { +static void* validPointer(void* p) { if (NULL == p) { THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); } @@ -76,7 +76,7 @@ int32_t dsinkNameToDsinkType(const char* name) { } static SDataSink* initDataSink(int32_t type, int32_t size) { - SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size)); + SDataSink* sink = (SDataSink*)validPointer(calloc(1, size)); sink->info.type = type; sink->info.name = dsinkTypeToDsinkName(type); return sink; @@ -121,7 +121,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) { } static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { - SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size)); + SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size)); node->info.type = type; node->info.name = opTypeToOpName(type); if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) { @@ -184,7 +184,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable } static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { - SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan))); + SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan))); subplan->id = pCxt->nextId; ++(pCxt->nextId.subplanId); subplan->type = type; @@ -192,15 +192,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { if (NULL != pCxt->pCurrentSubplan) { subplan->level = pCxt->pCurrentSubplan->level + 1; if (NULL == pCxt->pCurrentSubplan->pChildern) { - pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); } taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan); - subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); } SArray* currentLevel; if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { - currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); taosArrayPush(pCxt->pDag->pSubplans, ¤tLevel); } else { currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); @@ -272,7 +272,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; - case QNODE_INSERT: + case QNODE_MODIFY: // Insert is not an operator in a physical plan. break; default: @@ -306,7 +306,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { } static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { - if (QNODE_INSERT == pRoot->info.type) { + if (QNODE_MODIFY == pRoot->info.type) { splitInsertSubplan(pCxt, pRoot); } else { SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); @@ -321,12 +321,12 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD TRY(TSDB_MAX_TAG_CONDITIONS) { SPlanContext context = { .pCatalog = pCatalog, - .pDag = vailidPointer(calloc(1, sizeof(SQueryDag))), + .pDag = validPointer(calloc(1, sizeof(SQueryDag))), .pCurrentSubplan = NULL, .nextId = {0} // todo queryid }; *pDag = context.pDag; - context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); createSubplanByLevel(&context, pQueryNode); } CATCH(code) { CLEANUP_EXECUTE(); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 117297b9ff..b50eb2c92d 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -42,6 +42,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 bMsg->header.vgId = htonl(bInput->vgId); + if (bInput->dbName) { + strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname)); + bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0; + } + strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; @@ -243,9 +248,14 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { pOut->metaNum = 2; - - memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); - memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); + + if (pMetaMsg->dbFname[0]) { + snprintf(pOut->ctbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); + snprintf(pOut->tbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname); + } else { + memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); + memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); + } pOut->ctbMeta.vgId = pMetaMsg->vgId; pOut->ctbMeta.tableType = pMetaMsg->tableType; @@ -256,7 +266,11 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { } else { pOut->metaNum = 1; - memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); + if (pMetaMsg->dbFname[0]) { + snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); + } else { + memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); + } code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3f1799507d..20eb94c2ff 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -664,7 +664,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SSubQueryMsg *pMsg = msg; - + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 79aaa1beb0..bfc3906f79 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -33,4 +33,12 @@ ENDIF() INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +# freelistTest +add_executable(freelistTest "") +target_sources(freelistTest + PRIVATE + "freelistTest.cpp" +) +target_link_libraries(freelistTest os util gtest gtest_main) + diff --git a/source/util/test/freelistTest.cpp b/source/util/test/freelistTest.cpp new file mode 100644 index 0000000000..7a4e8be5b7 --- /dev/null +++ b/source/util/test/freelistTest.cpp @@ -0,0 +1,16 @@ +#include "gtest/gtest.h" + +#include "freelist.h" + +TEST(TD_UTIL_FREELIST_TEST, simple_test) { + SFreeList fl; + + tFreeListInit(&fl); + + for (size_t i = 0; i < 1000; i++) { + void *ptr = TFL_MALLOC(1024, &fl); + GTEST_ASSERT_NE(ptr, nullptr); + } + + tFreeListClear(&fl); +} \ No newline at end of file