diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6a59d66380..2519f4247d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -935,13 +935,13 @@ typedef struct { typedef struct { int32_t dnodeId; -} SMCreateMnodeMsg, SMDropMnodeMsg, SDDropMnodeMsg; +} SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq; typedef struct { int32_t dnodeId; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; -} SDCreateMnodeMsg, SDAlterMnodeMsg; +} SDCreateMnodeReq, SDAlterMnodeReq; typedef struct { int32_t dnodeId; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 923a726bf8..70cff7ed1a 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -87,14 +87,14 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, /** * Get a DB's all vgroup info. * @param pCatalog (input, got with catalogGetHandle) - * @param pRpc (input, rpc object) + * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pDBName (input, full db name) * @param forceUpdate (input, force update db vgroup info from mnode) * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); @@ -149,13 +149,13 @@ int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, con /** * Get a table's actual vgroup, for stable it's all possible vgroup list. * @param pCatalog (input, got with catalogGetHandle) - * @param pRpc (input, rpc object) + * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList); +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList); /** * Get a table's vgroup from its name's hash value. @@ -172,16 +172,16 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter /** * Get all meta data required in pReq. * @param pCatalog (input, got with catalogGetHandle) - * @param pRpc (input, rpc object) + * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pReq (input, reqest info) * @param pRsp (output, response data) * @return error code */ -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList); +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index 18596a9e18..ac8a10067d 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -135,9 +135,8 @@ typedef struct SQueryStmtInfo { SArray *pUdfInfo; struct SQueryStmtInfo *sibling; // sibling - struct SQueryStmtInfo *pDownstream; SMultiFunctionsDesc info; - SArray *pUpstream; // SArray + SArray *pDownstream; // SArray int32_t havingFieldNum; int32_t exprListLevelIndex; } SQueryStmtInfo; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 5bd18641bf..edf9cf461f 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -24,7 +24,6 @@ extern "C" { typedef struct SParseContext { SParseBasicCtx ctx; - int8_t schemaAttached; // denote if submit block is built with table schema or not const char *pSql; // sql string size_t sqlLen; // length of the sql string char *pMsg; // extended error message if exists to help identifying the problem in sql statement. @@ -41,8 +40,17 @@ typedef struct SParseContext { */ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); -bool qIsDdlQuery(const SQueryNode* pQuery); +/** + * Return true if it is a ddl/dcl sql statement + * @param pQuery + * @return + */ +bool qIsDdlQuery(const SQueryNode* pQueryNode); +/** + * Destroy logic query plan + * @param pQueryNode + */ void qDestroyQuery(SQueryNode* pQueryNode); /** @@ -62,8 +70,8 @@ void columnListDestroy(SArray* pColumnList); void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel); typedef struct SSourceParam { - SArray *pExprNodeList; //Array - SArray *pColumnList; //Array + SArray *pExprNodeList; //Array + SArray *pColumnList; //Array int32_t num; } SSourceParam; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index a55a4fedab..9e3cfbb0b0 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -50,8 +50,10 @@ struct SQueryStmtInfo; typedef SSchema SSlotSchema; typedef struct SDataBlockSchema { - SSlotSchema *pSchema; - int32_t numOfCols; // number of columns + SSlotSchema *pSchema; + int32_t numOfCols; // number of columns + int32_t resultRowSize; + int16_t precision; } SDataBlockSchema; typedef struct SQueryNodeBasicInfo { @@ -61,6 +63,7 @@ typedef struct SQueryNodeBasicInfo { typedef struct SDataSink { SQueryNodeBasicInfo info; + SDataBlockSchema schema; } SDataSink; typedef struct SDataDispatcher { @@ -139,9 +142,13 @@ typedef struct SQueryDag { struct SQueryNode; -/** - * Create the physical plan for the query, according to the AST. - */ + /** + * Create the physical plan for the query, according to the AST. + * @param pQueryInfo + * @param pDag + * @param requestId + * @return + */ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId); // Set datasource of this subplan, multiple calls may be made to a subplan. diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index e1eef1c3f5..0d5792fd91 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -87,7 +87,7 @@ typedef struct SUseDbOutput { SDBVgroupInfo dbVgroup; } SUseDbOutput; -typedef enum { +enum { META_TYPE_NON_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index b2ba7acebf..1114b66a80 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -75,6 +75,12 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void */ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob); +/** + * Fetch query result from the remote query executor + * @param pJob + * @param data + * @return + */ int32_t scheduleFetchRows(void *pJob, void **data); @@ -85,6 +91,10 @@ int32_t scheduleFetchRows(void *pJob, void **data); */ int32_t scheduleCancelJob(void *pJob); +/** + * Free the query job + * @param pJob + */ void scheduleFreeJob(void *pJob); void schedulerDestroy(void); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4fcdae2496..56f05ec0bf 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -259,28 +259,24 @@ int32_t* taosGetErrno(); #define TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0411) #define TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0420) #define TSDB_CODE_DND_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0421) -#define TSDB_CODE_DND_MNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0422) -#define TSDB_CODE_DND_MNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0423) -#define TSDB_CODE_DND_MNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0424) -#define TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0425) +#define TSDB_CODE_DND_MNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0422) +#define TSDB_CODE_DND_MNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0423) +#define TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0424) #define TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0430) #define TSDB_CODE_DND_QNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0431) -#define TSDB_CODE_DND_QNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0432) -#define TSDB_CODE_DND_QNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0433) -#define TSDB_CODE_DND_QNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0434) -#define TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0435) +#define TSDB_CODE_DND_QNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0432) +#define TSDB_CODE_DND_QNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0433) +#define TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0434) #define TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0440) #define TSDB_CODE_DND_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0441) -#define TSDB_CODE_DND_SNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0442) -#define TSDB_CODE_DND_SNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0443) -#define TSDB_CODE_DND_SNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0444) -#define TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445) +#define TSDB_CODE_DND_SNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0442) +#define TSDB_CODE_DND_SNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0443) +#define TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0444) #define TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0450) #define TSDB_CODE_DND_BNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0451) -#define TSDB_CODE_DND_BNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0452) -#define TSDB_CODE_DND_BNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0453) -#define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454) -#define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0455) +#define TSDB_CODE_DND_BNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0452) +#define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0453) +#define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454) #define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0460) #define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0461) #define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0462) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 705d6ef786..26afe237c9 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -62,6 +62,7 @@ typedef struct SAppInstInfo { SList *pConnList; // STscObj linked list int64_t clusterId; void *pTransporter; + SHeartBeatInfo hb; } SAppInstInfo; typedef struct SAppInfo { @@ -70,7 +71,7 @@ typedef struct SAppInfo { char *ep; int32_t pid; int32_t numOfThreads; - SHeartBeatInfo hb; + SHashObj *pInstMap; } SAppInfo; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3205524e3c..a6b04624d7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -291,10 +291,10 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { nPrintTsc("%s", sql) - SRequestObj* pRequest = NULL; - SQueryNode* pQuery = NULL; - SQueryDag* pDag = NULL; - void* pJob = NULL; + SRequestObj *pRequest = NULL; + SQueryNode *pQuery = NULL; + SQueryDag *pDag = NULL; + void *pJob = NULL; terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index dffe178524..1238976b97 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1,11 +1,12 @@ #include "os.h" +#include "tref.h" +#include "trpc.h" #include "clientInt.h" #include "clientLog.h" #include "query.h" #include "tmsg.h" #include "tglobal.h" -#include "tref.h" -#include "trpc.h" +#include "catalog.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 0ef96e657d..73c9fc5e9f 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -485,6 +485,22 @@ TEST(testCase, drop_stable_Test) { taos_close(pConn); } +TEST(testCase, generated_request_id_test) { + SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + + for (int32_t i = 0; i < 50000; ++i) { + uint64_t v = generateRequestId(); + void* result = taosHashGet(phash, &v, sizeof(v)); + if (result != nullptr) { + printf("0x%lx, index:%d\n", v, i); + } + assert(result == nullptr); + taosHashPut(phash, &v, sizeof(v), NULL, 0); + } + + taosHashCleanup(phash); +} + // TEST(testCase, create_topic_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -518,46 +534,55 @@ TEST(testCase, drop_stable_Test) { // tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql)); // taos_close(pConn); //} -TEST(testCase, generated_request_id_test) { - SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - - for (int32_t i = 0; i < 50000; ++i) { - uint64_t v = generateRequestId(); - void* result = taosHashGet(phash, &v, sizeof(v)); - if (result != nullptr) { - printf("0x%lx, index:%d\n", v, i); - } - assert(result == nullptr); - taosHashPut(phash, &v, sizeof(v), NULL, 0); - } - - taosHashCleanup(phash); -} - -// TEST(testCase, projection_query_tables) { +//TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_EQ(pConn, nullptr); // // TAOS_RES* pRes = taos_query(pConn, "use abc1"); // taos_free_result(pRes); // -// pRes = taos_query(pConn, "select * from t_2"); +// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); // if (taos_errno(pRes) != 0) { // printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); // taos_free_result(pRes); // ASSERT_TRUE(false); // } // -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// // taos_free_result(pRes); // taos_close(pConn); //} +//#endif + +TEST(testCase, projection_query_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use test1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s", taos_errstr(pRes)); + taos_free_result(pRes); + return; + } + + taos_free_result(pRes); + + pRes = taos_query(pConn, "select * from tm0"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index f4774c00df..61b220158f 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -268,7 +268,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_BNODE_ID_INVALID; + terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION; dError("failed to create bnode since %s", terrstr()); return -1; } else { @@ -281,7 +281,7 @@ int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_BNODE_ID_INVALID; + terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION; dError("failed to drop bnode since %s", terrstr()); return -1; } else { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 5f2d90123c..ae37967b3d 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -305,25 +305,24 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } -static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pReq) { +static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { dndInitMnodeOption(pDnode, pOption); pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); - pOption->replica = pReq->replica; + pOption->replica = pCreate->replica; pOption->selfIndex = -1; - for (int32_t i = 0; i < pReq->replica; ++i) { + for (int32_t i = 0; i < pCreate->replica; ++i) { SReplica *pReplica = &pOption->replicas[i]; - pReplica->id = pReq->replicas[i].id; - pReplica->port = pReq->replicas[i].port; - memcpy(pReplica->fqdn, pReq->replicas[i].fqdn, TSDB_FQDN_LEN); + pReplica->id = pCreate->replicas[i].id; + pReplica->port = pCreate->replicas[i].port; + memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); if (pReplica->id == pOption->dnodeId) { pOption->selfIndex = i; } } if (pOption->selfIndex == -1) { - terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND; dError("failed to build mnode options since %s", terrstr()); return -1; } @@ -423,8 +422,8 @@ static int32_t dndDropMnode(SDnode *pDnode) { return 0; } -static SDCreateMnodeMsg *dndParseCreateMnodeReq(SRpcMsg *pReq) { - SDCreateMnodeMsg *pCreate = pReq->pCont; +static SDCreateMnodeReq *dndParseCreateMnodeReq(SRpcMsg *pReq) { + SDCreateMnodeReq *pCreate = pReq->pCont; pCreate->dnodeId = htonl(pCreate->dnodeId); for (int32_t i = 0; i < pCreate->replica; ++i) { pCreate->replicas[i].id = htonl(pCreate->replicas[i].id); @@ -435,51 +434,85 @@ static SDCreateMnodeMsg *dndParseCreateMnodeReq(SRpcMsg *pReq) { } int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - SDCreateMnodeMsg *pCreate = dndParseCreateMnodeReq(pReq); + SDCreateMnodeReq *pCreate = dndParseCreateMnodeReq(pReq); - if (pCreate->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_MNODE_ID_INVALID; + if (pCreate->replica <= 1 || pCreate->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to create mnode since %s", terrstr()); return -1; - } else { - SMnodeOpt option = {0}; - if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) { - return -1; - } - - return dndOpenMnode(pDnode, &option); } + + SMnodeOpt option = {0}; + if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) { + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to create mnode since %s", terrstr()); + return -1; + } + + SMnode *pMnode = dndAcquireMnode(pDnode); + if (pMnode != NULL) { + dndReleaseMnode(pDnode, pMnode); + terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; + dError("failed to create mnode since %s", terrstr()); + return -1; + } + + dDebug("start to create mnode"); + return dndOpenMnode(pDnode, &option); } int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - SDAlterMnodeMsg *pAlter = dndParseCreateMnodeReq(pReq); + SDAlterMnodeReq *pAlter = dndParseCreateMnodeReq(pReq); if (pAlter->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_MNODE_ID_INVALID; + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to alter mnode since %s", terrstr()); return -1; } SMnodeOpt option = {0}; if (dndBuildMnodeOptionFromReq(pDnode, &option, pAlter) != 0) { + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to alter mnode since %s", terrstr()); return -1; } - if (dndAlterMnode(pDnode, &option) != 0) { + SMnode *pMnode = dndAcquireMnode(pDnode); + if (pMnode == NULL) { + terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; + dError("failed to alter mnode since %s", terrstr()); return -1; } - return dndWriteMnodeFile(pDnode); + dDebug("start to alter mnode"); + int32_t code = dndAlterMnode(pDnode, &option); + dndReleaseMnode(pDnode, pMnode); + + return code; } int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - SDDropMnodeMsg *pDrop = pReq->pCont; + SDDropMnodeReq *pDrop = pReq->pCont; pDrop->dnodeId = htonl(pDrop->dnodeId); if (pDrop->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_MNODE_ID_INVALID; + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to drop mnode since %s", terrstr()); return -1; - } else { - return dndDropMnode(pDnode); } + + SMnode *pMnode = dndAcquireMnode(pDnode); + if (pMnode == NULL) { + terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; + dError("failed to drop mnode since %s", terrstr()); + return -1; + } + + dDebug("start to drop mnode"); + int32_t code = dndDropMnode(pDnode); + dndReleaseMnode(pDnode, pMnode); + + return code; } static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) { @@ -506,6 +539,7 @@ static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc code = TSDB_CODE_OUT_OF_MEMORY; } else { code = dndWriteMsgToWorker(pWorker, pMsg, 0); + if (code != 0) code = terrno; } if (code != 0) { diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 31bf5d9b99..3deee93e29 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -274,7 +274,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_QNODE_ID_INVALID; + terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION; dError("failed to create qnode since %s", terrstr()); return -1; } else { @@ -287,7 +287,7 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_QNODE_ID_INVALID; + terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION; dError("failed to drop qnode since %s", terrstr()); return -1; } else { diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index 05f3611386..ab4e38bfb2 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -268,7 +268,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_SNODE_ID_INVALID; + terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION; dError("failed to create snode since %s", terrstr()); return -1; } else { @@ -281,7 +281,7 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_SNODE_ID_INVALID; + terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION; dError("failed to drop snode since %s", terrstr()); return -1; } else { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index bc80b6056f..6427ab080a 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -197,6 +197,10 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { dndFreeVnodeWriteQueue(pDnode, pVnode); dndFreeVnodeApplyQueue(pDnode, pVnode); dndFreeVnodeSyncQueue(pDnode, pVnode); + + vnodeClose(pVnode->pImpl); + pVnode->pImpl = NULL; + free(pVnode->path); free(pVnode->db); free(pVnode); diff --git a/source/dnode/mgmt/impl/test/bnode/dbnode.cpp b/source/dnode/mgmt/impl/test/bnode/dbnode.cpp index bafe8242a6..398d530648 100644 --- a/source/dnode/mgmt/impl/test/bnode/dbnode.cpp +++ b/source/dnode/mgmt/impl/test/bnode/dbnode.cpp @@ -34,7 +34,7 @@ TEST_F(DndTestBnode, 01_Create_Bnode) { SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_BNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_INVALID_OPTION); } { @@ -82,7 +82,7 @@ TEST_F(DndTestBnode, 01_Drop_Bnode) { SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_BNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_INVALID_OPTION); } { diff --git a/source/dnode/mgmt/impl/test/mnode/qmnode.cpp b/source/dnode/mgmt/impl/test/mnode/dmnode.cpp similarity index 54% rename from source/dnode/mgmt/impl/test/mnode/qmnode.cpp rename to source/dnode/mgmt/impl/test/mnode/dmnode.cpp index 00098a856a..841d00d14d 100644 --- a/source/dnode/mgmt/impl/test/mnode/qmnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/dmnode.cpp @@ -16,26 +16,29 @@ class DndTestMnode : public ::testing::Test { static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9113); } static void TearDownTestSuite() { test.Cleanup(); } - static Testbase test; + static Testbase test; public: void SetUp() override {} void TearDown() override {} }; -Testbase DndTestMnode::test; +Testbase DndTestMnode::test; -#if 0 TEST_F(DndTestMnode, 01_Create_Mnode) { { int32_t contLen = sizeof(SDCreateMnodeReq); SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); + pReq->replica = 1; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); } { @@ -43,10 +46,14 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); + pReq->replica = 1; + pReq->replicas[0].id = htonl(2); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); } { @@ -54,19 +61,13 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); - - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); - } - - test.Restart(); - - { - int32_t contLen = sizeof(SDCreateMnodeReq); - - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(1); + pReq->replica = 2; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); + pReq->replicas[1].id = htonl(1); + pReq->replicas[1].port = htonl(9114); + strcpy(pReq->replicas[1].fqdn, "localhost"); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -75,51 +76,50 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { } TEST_F(DndTestMnode, 02_Alter_Mnode) { - { - int32_t contLen = sizeof(SDCreateMnodeReq); + { + int32_t contLen = sizeof(SDAlterMnodeReq); - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); + pReq->replica = 1; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); } { - int32_t contLen = sizeof(SDCreateMnodeReq); + int32_t contLen = sizeof(SDAlterMnodeReq); - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); + pReq->replica = 1; + pReq->replicas[0].id = htonl(2); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); + } + + { + int32_t contLen = sizeof(SDAlterMnodeReq); + + SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + pReq->replica = 1; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); } - - { - int32_t contLen = sizeof(SDCreateMnodeReq); - - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(1); - - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); - } - - test.Restart(); - - { - int32_t contLen = sizeof(SDCreateMnodeReq); - - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(1); - - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); - } } TEST_F(DndTestMnode, 03_Drop_Mnode) { @@ -131,7 +131,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); } { @@ -156,28 +156,34 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); } - test.Restart(); - { - int32_t contLen = sizeof(SDDropMnodeReq); + int32_t contLen = sizeof(SDAlterMnodeReq); - SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); + pReq->replica = 1; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); - SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); } + { int32_t contLen = sizeof(SDCreateMnodeReq); SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); + pReq->replica = 2; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); } -} -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/source/dnode/mgmt/impl/test/qnode/dqnode.cpp b/source/dnode/mgmt/impl/test/qnode/dqnode.cpp index e64a0543fc..19fd6b4b12 100644 --- a/source/dnode/mgmt/impl/test/qnode/dqnode.cpp +++ b/source/dnode/mgmt/impl/test/qnode/dqnode.cpp @@ -34,7 +34,7 @@ TEST_F(DndTestQnode, 01_Create_Qnode) { SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_QNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_QNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_QNODE_INVALID_OPTION); } { @@ -82,7 +82,7 @@ TEST_F(DndTestQnode, 02_Drop_Qnode) { SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_QNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_QNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_QNODE_INVALID_OPTION); } { diff --git a/source/dnode/mgmt/impl/test/snode/dsnode.cpp b/source/dnode/mgmt/impl/test/snode/dsnode.cpp index 1e6bcdb3fb..f51f4a0268 100644 --- a/source/dnode/mgmt/impl/test/snode/dsnode.cpp +++ b/source/dnode/mgmt/impl/test/snode/dsnode.cpp @@ -34,7 +34,7 @@ TEST_F(DndTestSnode, 01_Create_Snode) { SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_SNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_SNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_SNODE_INVALID_OPTION); } { @@ -82,7 +82,7 @@ TEST_F(DndTestSnode, 01_Drop_Snode) { SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_SNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_SNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_SNODE_INVALID_OPTION); } { diff --git a/source/dnode/mgmt/impl/test/sut/src/sut.cpp b/source/dnode/mgmt/impl/test/sut/src/sut.cpp index 72a6416e22..65c7d67254 100644 --- a/source/dnode/mgmt/impl/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/sut.cpp @@ -16,7 +16,7 @@ #include "sut.h" void Testbase::InitLog(const char* path) { - dDebugFlag = 0; + dDebugFlag = 143; vDebugFlag = 0; mDebugFlag = 143; cDebugFlag = 0; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index df1848f2f1..c14d1f51f8 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -27,14 +27,14 @@ static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj); static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw); static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj); -static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode); -static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg); -static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg); -static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg); -static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); -static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew); +static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq); +static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq); +static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp); +static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp); +static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp); +static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); int32_t mndInitMnode(SMnode *pMnode) { @@ -65,7 +65,7 @@ void mndCleanupMnode(SMnode *pMnode) {} static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) { SSdb *pSdb = pMnode->pSdb; SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &mnodeId); - if (pObj == NULL) { + if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_MNODE_NOT_EXIST; } return pObj; @@ -207,9 +207,9 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { return 0; } -static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode) { - mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOldMnode->id, pOldMnode, pNewMnode); - pOldMnode->updateTime = pNewMnode->updateTime; +static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) { + mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + pOld->updateTime = pNew->updateTime; return 0; } @@ -277,13 +277,13 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno void *pIter = NULL; int32_t numOfReplicas = 0; - SDCreateMnodeMsg createMsg = {0}; + SDCreateMnodeReq createReq = {0}; while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); if (pIter == NULL) break; - SReplica *pReplica = &createMsg.replicas[numOfReplicas]; + SReplica *pReplica = &createReq.replicas[numOfReplicas]; pReplica->id = htonl(pMObj->id); pReplica->port = htons(pMObj->pDnode->port); memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); @@ -292,13 +292,13 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno sdbRelease(pSdb, pMObj); } - SReplica *pReplica = &createMsg.replicas[numOfReplicas]; + SReplica *pReplica = &createReq.replicas[numOfReplicas]; pReplica->id = htonl(pDnode->id); pReplica->port = htons(pDnode->port); memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); numOfReplicas++; - createMsg.replica = numOfReplicas; + createReq.replica = numOfReplicas; while (1) { SMnodeObj *pMObj = NULL; @@ -307,22 +307,23 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno STransAction action = {0}; - SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg)); - if (pMsg == NULL) { + SDAlterMnodeReq *pReq = malloc(sizeof(SDAlterMnodeReq)); + if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; } - memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg)); + memcpy(pReq, &createReq, sizeof(SDAlterMnodeReq)); - pMsg->dnodeId = htonl(pMObj->id); + pReq->dnodeId = htonl(pMObj->id); action.epSet = mndGetDnodeEpset(pMObj->pDnode); - action.pCont = pMsg; - action.contLen = sizeof(SDAlterMnodeMsg); + action.pCont = pReq; + action.contLen = sizeof(SDAlterMnodeReq); action.msgType = TDMT_DND_ALTER_MNODE; + action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; @@ -335,17 +336,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); - SDCreateMnodeMsg *pMsg = malloc(sizeof(SDCreateMnodeMsg)); - if (pMsg == NULL) return -1; - memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg)); - pMsg->dnodeId = htonl(pObj->id); + SDCreateMnodeReq *pReq = malloc(sizeof(SDCreateMnodeReq)); + if (pReq == NULL) return -1; + memcpy(pReq, &createReq, sizeof(SDAlterMnodeReq)); + pReq->dnodeId = htonl(pObj->id); action.epSet = mndGetDnodeEpset(pDnode); - action.pCont = pMsg; - action.contLen = sizeof(SDCreateMnodeMsg); + action.pCont = pReq; + action.contLen = sizeof(SDCreateMnodeReq); action.msgType = TDMT_DND_CREATE_MNODE; + action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); return -1; } } @@ -353,39 +355,23 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno return 0; } -static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeMsg *pCreate) { +static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { + int32_t code = -1; + SMnodeObj mnodeObj = {0}; mnodeObj.id = pDnode->id; mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.updateTime = mnodeObj.createdTime; - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); - goto CREATE_MNODE_OVER; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + if (pTrans == NULL) goto CREATE_MNODE_OVER; + mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); + if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto CREATE_MNODE_OVER; + if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto CREATE_MNODE_OVER; + if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto CREATE_MNODE_OVER; - if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_MNODE_OVER; - } - - if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_MNODE_OVER; - } - - if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_MNODE_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto CREATE_MNODE_OVER; - } + if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_MNODE_OVER; code = 0; @@ -394,9 +380,9 @@ CREATE_MNODE_OVER: return code; } -static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SMCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont; +static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMCreateMnodeReq *pCreate = pReq->rpcMsg.pCont; pCreate->dnodeId = htonl(pCreate->dnodeId); @@ -408,6 +394,9 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { mError("mnode:%d, mnode already exist", pObj->id); terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; return -1; + } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) { + mError("qnode:%d, failed to create mnode since %s", pCreate->dnodeId, terrstr()); + return -1; } SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); @@ -417,7 +406,7 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { return -1; } - int32_t code = mndCreateMnode(pMnode, pMsg, pDnode, pCreate); + int32_t code = mndCreateMnode(pMnode, pReq, pDnode, pCreate); mndReleaseDnode(pMnode, pDnode); if (code != 0) { @@ -449,14 +438,14 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode void *pIter = NULL; int32_t numOfReplicas = 0; - SDAlterMnodeMsg alterMsg = {0}; + SDAlterMnodeReq alterReq = {0}; while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); if (pIter == NULL) break; if (pMObj->id != pObj->id) { - SReplica *pReplica = &alterMsg.replicas[numOfReplicas]; + SReplica *pReplica = &alterReq.replicas[numOfReplicas]; pReplica->id = htonl(pMObj->id); pReplica->port = htons(pMObj->pDnode->port); memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); @@ -466,7 +455,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode sdbRelease(pSdb, pMObj); } - alterMsg.replica = numOfReplicas; + alterReq.replica = numOfReplicas; while (1) { SMnodeObj *pMObj = NULL; @@ -475,22 +464,23 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode if (pMObj->id != pObj->id) { STransAction action = {0}; - SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg)); - if (pMsg == NULL) { + SDAlterMnodeReq *pReq = malloc(sizeof(SDAlterMnodeReq)); + if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; } - memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeMsg)); + memcpy(pReq, &alterReq, sizeof(SDAlterMnodeReq)); - pMsg->dnodeId = htonl(pMObj->id); + pReq->dnodeId = htonl(pMObj->id); action.epSet = mndGetDnodeEpset(pMObj->pDnode); - action.pCont = pMsg; - action.contLen = sizeof(SDAlterMnodeMsg); + action.pCont = pReq; + action.contLen = sizeof(SDAlterMnodeReq); action.msgType = TDMT_DND_ALTER_MNODE; + action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; @@ -504,19 +494,20 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); - SDDropMnodeMsg *pMsg = malloc(sizeof(SDDropMnodeMsg)); - if (pMsg == NULL) { + SDDropMnodeReq *pReq = malloc(sizeof(SDDropMnodeReq)); + if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pMsg->dnodeId = htonl(pObj->id); + pReq->dnodeId = htonl(pObj->id); action.epSet = mndGetDnodeEpset(pDnode); - action.pCont = pMsg; - action.contLen = sizeof(SDDropMnodeMsg); + action.pCont = pReq; + action.contLen = sizeof(SDDropMnodeReq); action.msgType = TDMT_DND_DROP_MNODE; + action.acceptableCode = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); return -1; } } @@ -524,35 +515,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode return 0; } -static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { +static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("mnode:%d, failed to drop since %s", pObj->id, terrstr()); - goto DROP_MNODE_OVER; - } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + if (pTrans == NULL) goto DROP_MNODE_OVER; mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_MNODE_OVER; - } - - if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_MNODE_OVER; - } - - if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_MNODE_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_MNODE_OVER; - } + if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto DROP_MNODE_OVER; + if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto DROP_MNODE_OVER; + if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto DROP_MNODE_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_MNODE_OVER; code = 0; @@ -561,9 +535,9 @@ DROP_MNODE_OVER: return code; } -static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SMDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont; +static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMDropMnodeReq *pDrop = pReq->rpcMsg.pCont; pDrop->dnodeId = htonl(pDrop->dnodeId); mDebug("mnode:%d, start to drop", pDrop->dnodeId); @@ -577,12 +551,10 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { SMnodeObj *pObj = mndAcquireMnode(pMnode, pDrop->dnodeId); if (pObj == NULL) { mError("mnode:%d, not exist", pDrop->dnodeId); - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; return -1; } - int32_t code = mndDropMnode(pMnode, pMsg, pObj); - + int32_t code = mndDropMnode(pMnode, pReq, pObj); if (code != 0) { mError("mnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); return -1; @@ -592,23 +564,23 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -660,8 +632,8 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * return 0; } -static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 47d0ce4105..f030398144 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -86,10 +86,6 @@ static int32_t mndRestoreWal(SMnode *pMnode) { mndTransPullup(pMnode); - if (walBeginSnapshot(pWal, sdbVer) < 0) { - goto WAL_RESTORE_OVER; - } - if (sdbVer != lastSdbVer) { mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer); if (sdbWriteFile(pSdb) != 0) { @@ -97,6 +93,10 @@ static int32_t mndRestoreWal(SMnode *pMnode) { } } + if (walBeginSnapshot(pWal, sdbVer) < 0) { + goto WAL_RESTORE_OVER; + } + if (walEndSnapshot(pWal) < 0) { goto WAL_RESTORE_OVER; } diff --git a/source/dnode/mnode/impl/test/mnode/mnode.cpp b/source/dnode/mnode/impl/test/mnode/mnode.cpp index 6095eb5370..f56b864cad 100644 --- a/source/dnode/mnode/impl/test/mnode/mnode.cpp +++ b/source/dnode/mnode/impl/test/mnode/mnode.cpp @@ -18,37 +18,25 @@ class MndTestMnode : public ::testing::Test { public: static void SetUpTestSuite() { - test.Init("/tmp/mnode_test_mnode1", 9031); + test.Init("/tmp/mnode_test_mnode1", 9028); const char* fqdn = "localhost"; - const char* firstEp = "localhost:9031"; + const char* firstEp = "localhost:9028"; - server2.Start("/tmp/mnode_test_mnode2", fqdn, 9032, firstEp); - server3.Start("/tmp/mnode_test_mnode3", fqdn, 9033, firstEp); - server4.Start("/tmp/mnode_test_mnode4", fqdn, 9034, firstEp); - server5.Start("/tmp/mnode_test_mnode5", fqdn, 9035, firstEp); + server2.Start("/tmp/mnode_test_mnode2", fqdn, 9029, firstEp); taosMsleep(300); } static void TearDownTestSuite() { server2.Stop(); - server3.Stop(); - server4.Stop(); - server5.Stop(); test.Cleanup(); } static Testbase test; static TestServer server2; - static TestServer server3; - static TestServer server4; - static TestServer server5; }; Testbase MndTestMnode::test; TestServer MndTestMnode::server2; -TestServer MndTestMnode::server3; -TestServer MndTestMnode::server4; -TestServer MndTestMnode::server5; TEST_F(MndTestMnode, 01_ShowDnode) { test.SendShowMetaReq(TSDB_MGMT_TABLE_MNODE, ""); @@ -64,7 +52,7 @@ TEST_F(MndTestMnode, 01_ShowDnode) { EXPECT_EQ(test.GetShowRows(), 1); CheckInt16(1); - CheckBinary("localhost:9031", TSDB_EP_LEN); + CheckBinary("localhost:9028", TSDB_EP_LEN); CheckBinary("master", 12); CheckInt64(0); CheckTimestamp(); @@ -72,9 +60,9 @@ TEST_F(MndTestMnode, 01_ShowDnode) { TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) { { - int32_t contLen = sizeof(SMCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeReq); - SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -85,9 +73,9 @@ TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) { TEST_F(MndTestMnode, 03_Create_Mnode_Invalid_Id) { { - int32_t contLen = sizeof(SMCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeReq); - SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -103,7 +91,7 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); strcpy(pReq->fqdn, "localhost"); - pReq->port = htonl(9032); + pReq->port = htonl(9029); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -117,9 +105,9 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { { // create mnode - int32_t contLen = sizeof(SMCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeReq); - SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -132,8 +120,8 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { CheckInt16(1); CheckInt16(2); - CheckBinary("localhost:9031", TSDB_EP_LEN); - CheckBinary("localhost:9032", TSDB_EP_LEN); + CheckBinary("localhost:9028", TSDB_EP_LEN); + CheckBinary("localhost:9029", TSDB_EP_LEN); CheckBinary("master", 12); CheckBinary("slave", 12); CheckInt64(0); @@ -144,9 +132,9 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { { // drop mnode - int32_t contLen = sizeof(SMDropMnodeMsg); + int32_t contLen = sizeof(SMDropMnodeReq); - SMDropMnodeMsg* pReq = (SMDropMnodeMsg*)rpcMallocCont(contLen); + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); @@ -158,144 +146,145 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { EXPECT_EQ(test.GetShowRows(), 1); CheckInt16(1); - CheckBinary("localhost:9031", TSDB_EP_LEN); + CheckBinary("localhost:9028", TSDB_EP_LEN); CheckBinary("master", 12); CheckInt64(0); CheckTimestamp(); } + + { + // drop mnode + int32_t contLen = sizeof(SMDropMnodeReq); + + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MNODE_NOT_EXIST); + } } -// { -// int32_t contLen = sizeof(SDropDnodeReq); -// SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen); -// pReq->dnodeId = htonl(2); +TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMCreateMnodeReq); -// SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen); -// ASSERT_NE(pRsp, nullptr); -// ASSERT_EQ(pRsp->code, 0); -// } + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); -// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, ""); -// CHECK_META("show dnodes", 7); -// test.SendShowRetrieveReq(); -// EXPECT_EQ(test.GetShowRows(), 1); + server2.Stop(); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } -// CheckInt16(1); -// CheckBinary("localhost:9031", TSDB_EP_LEN); -// CheckInt16(0); -// CheckInt16(1); -// CheckBinary("ready", 10); -// CheckTimestamp(); -// CheckBinary("", 24); + { + // continue send message, mnode is creating + int32_t contLen = sizeof(SMCreateMnodeReq); -// { -// int32_t contLen = sizeof(SCreateDnodeReq); + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); -// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); -// strcpy(pReq->ep, "localhost:9033"); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_CREATING); + } -// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); -// ASSERT_NE(pRsp, nullptr); -// ASSERT_EQ(pRsp->code, 0); -// } + { + // continue send message, mnode is creating + int32_t contLen = sizeof(SMDropMnodeReq); -// { -// int32_t contLen = sizeof(SCreateDnodeReq); + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); -// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); -// strcpy(pReq->ep, "localhost:9034"); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_CREATING); + } -// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); -// ASSERT_NE(pRsp, nullptr); -// ASSERT_EQ(pRsp->code, 0); -// } + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); -// { -// int32_t contLen = sizeof(SCreateDnodeReq); + int32_t retry = 0; + int32_t retryMax = 20; -// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); -// strcpy(pReq->ep, "localhost:9035"); + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateMnodeReq); -// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); -// ASSERT_NE(pRsp, nullptr); -// ASSERT_EQ(pRsp->code, 0); -// } + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); -// taosMsleep(1300); -// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, ""); -// CHECK_META("show dnodes", 7); -// test.SendShowRetrieveReq(); -// EXPECT_EQ(test.GetShowRows(), 4); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + if (pRsp->code == TSDB_CODE_MND_MNODE_ALREADY_EXIST) break; + taosMsleep(1000); + } -// CheckInt16(1); -// CheckInt16(3); -// CheckInt16(4); -// CheckInt16(5); -// CheckBinary("localhost:9031", TSDB_EP_LEN); -// CheckBinary("localhost:9033", TSDB_EP_LEN); -// CheckBinary("localhost:9034", TSDB_EP_LEN); -// CheckBinary("localhost:9035", TSDB_EP_LEN); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(1); -// CheckInt16(1); -// CheckInt16(1); -// CheckInt16(1); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckBinary("", 24); -// CheckBinary("", 24); -// CheckBinary("", 24); -// CheckBinary("", 24); + ASSERT_NE(retry, retryMax); + } +} -// // restart -// uInfo("stop all server"); -// test.Restart(); -// server2.Restart(); -// server3.Restart(); -// server4.Restart(); -// server5.Restart(); +TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMDropMnodeReq); -// taosMsleep(1300); -// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, ""); -// CHECK_META("show dnodes", 7); -// test.SendShowRetrieveReq(); -// EXPECT_EQ(test.GetShowRows(), 4); + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); -// CheckInt16(1); -// CheckInt16(3); -// CheckInt16(4); -// CheckInt16(5); -// CheckBinary("localhost:9031", TSDB_EP_LEN); -// CheckBinary("localhost:9033", TSDB_EP_LEN); -// CheckBinary("localhost:9034", TSDB_EP_LEN); -// CheckBinary("localhost:9035", TSDB_EP_LEN); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(1); -// CheckInt16(1); -// CheckInt16(1); -// CheckInt16(1); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckBinary("", 24); -// CheckBinary("", 24); -// CheckBinary("", 24); -// CheckBinary("", 24); -// } + server2.Stop(); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, mnode is dropping + int32_t contLen = sizeof(SMCreateMnodeReq); + + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // continue send message, mnode is dropping + int32_t contLen = sizeof(SMDropMnodeReq); + + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 20; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateMnodeReq); + + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + if (pRsp->code == 0) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } +} \ No newline at end of file diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 45019256a6..da0951fe1f 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1288,8 +1288,8 @@ _return: } -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { +int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { + if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1312,7 +1312,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p SName *name = taosArrayGet(pReq->pTableName, i); STableMeta *pTableMeta = NULL; - CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, name, false, &pTableMeta, -1)); + CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, name, false, &pTableMeta, -1)); if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) { ctgError("taosArrayPush failed, idx:%d", i); @@ -1325,7 +1325,6 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p return TSDB_CODE_SUCCESS; _return: - if (pRsp->pTableMeta) { int32_t aSize = taosArrayGetSize(pRsp->pTableMeta); for (int32_t i = 0; i < aSize; ++i) { diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index a6f70b9e83..ba941ab22d 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,5 +8,5 @@ target_include_directories( target_link_libraries( executor - PRIVATE os util common function parser + PRIVATE os util common function parser planner qcom ) \ No newline at end of file diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h new file mode 100644 index 0000000000..3f0b150c8e --- /dev/null +++ b/source/libs/executor/inc/dataSinkInt.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _DATA_SINK_INT_H +#define _DATA_SINK_INT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common.h" +#include "dataSinkMgt.h" + +struct SDataSink; +struct SDataSinkHandle; + +typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes); +typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen); +typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); + +typedef struct SDataSinkHandle { + FPutDataBlock fPut; + FGetDataBlock fGet; + FDestroyDataSinker fDestroy; +} SDataSinkHandle; + +int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle); + +#ifdef __cplusplus +} +#endif + +#endif /*_DATA_SINK_INT_H*/ diff --git a/source/libs/executor/inc/dataSinkMgt.h b/source/libs/executor/inc/dataSinkMgt.h new file mode 100644 index 0000000000..fab5958107 --- /dev/null +++ b/source/libs/executor/inc/dataSinkMgt.h @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _DATA_SINK_MGT_H +#define _DATA_SINK_MGT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" +#include "executorimpl.h" + +#define DS_CAPACITY_ENOUGH 1 +#define DS_CAPACITY_FULL 2 +#define DS_NEED_SCHEDULE 3 + +struct SDataSink; +struct SSDataBlock; + +typedef struct SDataSinkMgtCfg { + uint32_t maxDataBlockNum; + uint32_t maxDataBlockNumPerQuery; +} SDataSinkMgtCfg; + +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); + +typedef void* DataSinkHandle; + +typedef struct SDataResult { + SQueryCostInfo profile; + const SSDataBlock* pData; + SHashObj* pTableRetrieveTsMap; +} SDataResult; + +/** + * Create a subplan's datasinker handle for all later operations. + * @param pDataSink + * @param pHandle output + * @return error code + */ +int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle); + +/** + * Put the result set returned by the executor into datasinker. + * @param handle + * @param pRes + * @return error code + */ +int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes); + +/** + * Get the length of the data returned by the next call to dsGetDataBlock. + * @param handle + * @return data length + */ +int32_t dsGetDataLength(DataSinkHandle handle); + +/** + * Get data, the caller needs to allocate data memory. + * @param handle + * @param pData output + * @param pLen output + * @return error code + */ +int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen); + +/** + * Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call. + * @param handle + * @return datasinker status + */ +int32_t dsGetStatus(DataSinkHandle handle); + +/** + * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. + * @param ahandle + * @param pItem + */ +void dsScheduleProcess(void* ahandle, void* pItem); + +/** + * Destroy the datasinker handle. + * @param handle + */ +void dsDestroyDataSinker(DataSinkHandle handle); + +#ifdef __cplusplus +} +#endif + +#endif /*_DATA_SINK_MGT_H*/ diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c new file mode 100644 index 0000000000..b2c135e96d --- /dev/null +++ b/source/libs/executor/src/dataDispatcher.c @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "dataSinkInt.h" +#include "dataSinkMgt.h" +#include "planner.h" +#include "tcompression.h" +#include "tglobal.h" +#include "tqueue.h" + +#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos +#define GET_BUF_REMAIN(buf) (buf)->remain + +typedef struct SBuf { + int32_t size; + int32_t pos; + int32_t remain; + char* pData; +} SBuf; + +typedef struct SDataDispatchHandle { + SDataSinkHandle sink; + SDataBlockSchema schema; + STaosQueue* pDataBlocks; + SBuf buf; +} SDataDispatchHandle; + +static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) { + if (tsCompressColData < 0 || 0 == pData->info.rows) { + return false; + } + + for (int32_t col = 0; col < pSchema->numOfCols; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); + int32_t colSize = pColRes->info.bytes * pData->info.rows; + if (NEEDTO_COMPRESS_QUERY(colSize)) { + return true; + } + } + + return false; +} + +static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { + int32_t colSize = pColRes->info.bytes * numOfRows; + return (*(tDataTypes[pColRes->info.type].compFunc))( + pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); +} + +static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) { + int32_t *compSizes = (int32_t*)data; + if (compressed) { + data += pSchema->numOfCols * sizeof(int32_t); + } + + for (int32_t col = 0; col < pSchema->numOfCols; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col); + if (compressed) { + compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed); + data += compSizes[col]; + *compLen += compSizes[col]; + compSizes[col] = htonl(compSizes[col]); + } else { + memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows); + data += pColRes->info.bytes * pRes->pData->info.rows; + } + } + + int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap); + *(int32_t*)data = htonl(numOfTables); + data += sizeof(int32_t); + + STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL); + while (item) { + STableIdInfo* pDst = (STableIdInfo*)data; + pDst->uid = htobe64(item->uid); + pDst->key = htobe64(item->key); + data += sizeof(STableIdInfo); + item = taosHashIterate(pRes->pTableRetrieveTsMap, item); + } +} + +static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) { + SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData; + pRsp->useconds = htobe64(pRes->profile.elapsedTime); + pRsp->precision = htons(pHandle->schema.precision); + pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema)); + pRsp->numOfRows = htonl(pRes->pData->info.rows); + + *pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp); + doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen); + *pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows); + + pRsp->compLen = htonl(pRsp->compLen); + // todo completed +} + +static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + int32_t useSize = 0; + toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize); +} + +static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) { + +} + +static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { + +} + +int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) { + SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle)); + if (NULL == dispatcher) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + dispatcher->sink.fPut = putDataBlock; + dispatcher->sink.fGet = getDataBlock; + dispatcher->sink.fDestroy = destroyDataSinker; + dispatcher->pDataBlocks = taosOpenQueue(); + if (NULL == dispatcher->pDataBlocks) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + *pHandle = dispatcher; + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c new file mode 100644 index 0000000000..2193babc76 --- /dev/null +++ b/source/libs/executor/src/dataSinkMgt.c @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "dataSinkMgt.h" +#include "dataSinkInt.h" +#include "planner.h" + +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) { + // todo +} + +int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) { + if (DSINK_Dispatch == pDataSink->info.type) { + return createDataDispatcher(pDataSink, pHandle); + } + return TSDB_CODE_FAILED; +} + +int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + return pHandleImpl->fPut(pHandleImpl, pRes); +} + +int32_t dsGetDataLength(DataSinkHandle handle) { + // todo +} + +int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + return pHandleImpl->fGet(pHandleImpl, pData, pLen); +} + +int32_t dsGetStatus(DataSinkHandle handle) { + // todo +} + +void dsScheduleProcess(void* ahandle, void* pItem) { + // todo +} + +void dsDestroyDataSinker(DataSinkHandle handle) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + pHandleImpl->fDestroy(pHandleImpl); +} diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index d18934d5f5..10ec335fc8 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -44,14 +44,14 @@ void clearAllTableMetaInfo(SQueryStmtInfo* pQueryInfo, bool removeMeta, uint64_t /** * Validate the sql info, according to the corresponding metadata info from catalog. - * @param pCatalog - * @param pSqlInfo - * @param pQueryInfo a bounded AST with essential meta data from local buffer or mgmt node - * @param id - * @param msg + * @param pCtx + * @param pInfo + * @param pQueryInfo + * @param msgBuf + * @param msgBufLen * @return */ -int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQueryStmtInfo* pQueryInfo, int64_t id, char* msg, int32_t msgLen); +int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen); /** * validate the ddl ast, and convert the ast to the corresponding message format @@ -62,6 +62,14 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ */ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); +/** + * + * @param pInfo + * @param pCtx + * @param msgBuf + * @param msgBufLen + * @return + */ SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); /** @@ -90,7 +98,7 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf); * @param msgBufLen * @return */ -int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen); +int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseBasicCtx *pCtx, char* msg, int32_t msgBufLen); /** * Destroy the meta data request structure. diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index daa960c17a..3ca3d87a79 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -213,7 +213,7 @@ SQueryStmtInfo *createQueryInfo() { pQueryInfo->slimit.limit = -1; pQueryInfo->slimit.offset = 0; - pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->window = TSWINDOW_INITIALIZER; pQueryInfo->exprList = calloc(10, POINTER_BYTES); @@ -247,8 +247,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) { tfree(pQueryInfo->fillVal); tfree(pQueryInfo->buf); - taosArrayDestroy(pQueryInfo->pUpstream); - pQueryInfo->pUpstream = NULL; + taosArrayDestroy(pQueryInfo->pDownstream); + pQueryInfo->pDownstream = NULL; pQueryInfo->bufLen = 0; } @@ -256,9 +256,9 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) { while (pQueryInfo != NULL) { SQueryStmtInfo* p = pQueryInfo->sibling; - size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pUpstream); + size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pDownstream); for (int32_t i = 0; i < numOfUpstream; ++i) { - SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i); + SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pDownstream, i); destroyQueryInfoImpl(pUpQueryInfo); clearAllTableMetaInfo(pUpQueryInfo, false, 0); tfree(pUpQueryInfo); @@ -288,7 +288,6 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI } pSub->pUdfInfo = pUdfInfo; - pSub->pDownstream = pQueryInfo; int32_t code = validateSqlNode(p, pSub, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { return code; @@ -311,7 +310,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI tstrncpy(pTableMetaInfo1->aliasName, subInfo->aliasName.z, subInfo->aliasName.n + 1); } - taosArrayPush(pQueryInfo->pUpstream, &pSub); + taosArrayPush(pQueryInfo->pDownstream, &pSub); // NOTE: order mix up in subquery not support yet. pQueryInfo->order = pSub->order; @@ -600,7 +599,7 @@ int32_t checkForUnsupportedQuery(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) { return buildInvalidOperationMsg(pMsgBuf, msg1); } - if (f == FUNCTION_BLKINFO && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { + if (f == FUNCTION_BLKINFO && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { return buildInvalidOperationMsg(pMsgBuf, msg1); } @@ -1584,7 +1583,6 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* } pushDownAggFuncExprInfo(pQueryInfo); -// addColumnNodeFromLowerLevel(pQueryInfo); for(int32_t i = 0; i < 1; ++i) { SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]); @@ -3630,12 +3628,12 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf) return TSDB_CODE_SUCCESS; } -int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, int64_t id, char* msgBuf, int32_t msgBufLen) { - assert(pCatalog != NULL && pInfo != NULL); +int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) { + assert(pCtx != NULL && pInfo != NULL); int32_t code = 0; - SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; - SMsgBuf *pMsgBuf = &m; + SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; + SMsgBuf* pMsgBuf = &m; switch (pInfo->type) { #if 0 @@ -3682,22 +3680,6 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer break; } - case TSDB_SQL_USE_DB: { - const char* msg = "invalid db name"; - SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); - - if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg); - } - - int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pSql), pToken); - if (ret != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg); - } - - break; - } - case TSDB_SQL_RESET_CACHE: { return TSDB_CODE_SUCCESS; } @@ -3712,55 +3694,6 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer break; } - case TSDB_SQL_CREATE_DNODE: { - const char* msg = "invalid host name (ip address)"; - - if (taosArrayGetSize(pInfo->pMiscInfo->a) > 1) { - return buildInvalidOperationMsg(pMsgBuf, msg); - } - - SToken* id = taosArrayGet(pInfo->pMiscInfo->a, 0); - if (id->type == TK_STRING) { - id->n = strdequote(id->z); - } - break; - } - - case TSDB_SQL_CREATE_ACCT: - case TSDB_SQL_ALTER_ACCT: { - const char* msg1 = "invalid state option, available options[no, r, w, all]"; - const char* msg2 = "invalid user/account name"; - const char* msg3 = "name too long"; - - SToken* pName = &pInfo->pMiscInfo->user.user; - SToken* pPwd = &pInfo->pMiscInfo->user.passwd; - - if (handlePassword(pCmd, pPwd) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } - - if (pName->n >= TSDB_USER_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg3); - } - - if (tscValidateName(pName) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg2); - } - - SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; - if (pAcctOpt->stat.n > 0) { - if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) { - } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) { - } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { - } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { - } else { - return buildInvalidOperationMsg(pMsgBuf, msg1); - } - } - - break; - } - case TSDB_SQL_DESCRIBE_TABLE: { const char* msg1 = "invalid table name"; @@ -3865,29 +3798,6 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer return TSDB_CODE_SUCCESS; } - case TSDB_SQL_CREATE_TABLE: { - SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; - - if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) { - if ((code = doCheckForCreateTable(pSql, 0, pInfo)) != TSDB_CODE_SUCCESS) { - return code; - } - - } else if (pCreateTable->type == TSQL_CREATE_TABLE_FROM_STABLE) { - assert(pCmd->numOfCols == 0); - if ((code = doCheckForCreateFromStable(pSql, pInfo)) != TSDB_CODE_SUCCESS) { - return code; - } - - } else if (pCreateTable->type == TSQL_CREATE_STREAM) { - if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) { - return code; - } - } - - break; - } - case TSDB_SQL_SELECT: { const char * msg1 = "no nested query supported in union clause"; code = loadAllTableMeta(pSql, pInfo); @@ -3981,27 +3891,41 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer } break; } - #endif default: return buildInvalidOperationMsg(pMsgBuf, "not support sql expression"); } +#endif + } - SCatalogReq req = {0}; - SMetaData data = {0}; + SCatalogReq req = {0}; + SMetaData data = {0}; // TODO: check if the qnode info has been cached already req.qNodeRequired = true; - code = qParserExtractRequestedMetaInfo(pInfo, &req, msgBuf, msgBufLen); + code = qParserExtractRequestedMetaInfo(pInfo, &req, pCtx, msgBuf, msgBufLen); if (code != TSDB_CODE_SUCCESS) { return code; } // load the meta data from catalog - code = catalogGetAllMeta(pCatalog, NULL, NULL, &req, &data); +// code = catalogGetAllMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &req, &data); + STableMeta* pmt = NULL; + + SName* name = taosArrayGet(req.pTableName, 0); + code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &pmt); if (code != TSDB_CODE_SUCCESS) { return code; } + data.pTableMeta = taosArrayInit(1, POINTER_BYTES); + taosArrayPush(data.pTableMeta, &pmt); + + pQueryInfo->pTableMetaInfo = calloc(1, POINTER_BYTES); + pQueryInfo->pTableMetaInfo[0] = calloc(1, sizeof(STableMetaInfo)); + pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt; + pQueryInfo->pTableMetaInfo[0]->name = *name; + pQueryInfo->numOfTables = 1; + // evaluate the sqlnode STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0); assert(pTableMeta != NULL); diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 60f4d4835b..06729813c1 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -1,12 +1,11 @@ -#include -#include -#include "astToMsg.h" +#include "tmsg.h" +#include "tglobal.h" #include "parserInt.h" +#include "ttime.h" +#include "astToMsg.h" +#include "astGenerator.h" #include "parserUtil.h" #include "queryInfoUtil.h" -#include "tglobal.h" -#include "tmsg.h" -#include "ttime.h" /* is contained in pFieldList or not */ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) { @@ -195,6 +194,18 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); } + val = htonl(pCreate->maxRows); + if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) { + snprintf(msg, tListLen(msg), "invalid number of max rows in file block for DB:%d valid range: [%d, %d]", val, + TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK); + } + + val = htonl(pCreate->minRows); + if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) { + snprintf(msg, tListLen(msg), "invalid number of min rows in file block for DB:%d valid range: [%d, %d]", val, + TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 8b3c328cce..04c287baf1 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -624,12 +624,11 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) { if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return TSDB_CODE_TSC_OUT_OF_MEMORY; } *pInfo = context.pOutput; context.pOutput->nodeType = TSDB_SQL_INSERT; - context.pOutput->schemaAttache = pContext->schemaAttached; context.pOutput->payloadType = PAYLOAD_TYPE_KV; int32_t code = skipInsertInto(&context); @@ -638,5 +637,5 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) { } destroyInsertParseContext(&context); terrno = code; - return (TSDB_CODE_SUCCESS == code ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED); + return code; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 3971f90ac4..d22f92517d 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -31,8 +31,8 @@ bool isInsertSql(const char* pStr, size_t length) { } while (1); } -bool qIsDdlQuery(const SQueryNode* pQuery) { - return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type; +bool qIsDdlQuery(const SQueryNode* pQueryNode) { + return TSDB_SQL_INSERT != pQueryNode->type && TSDB_SQL_SELECT != pQueryNode->type && TSDB_SQL_CREATE_TABLE != pQueryNode->type; } int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { @@ -61,13 +61,13 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { pDcl->nodeType = info.type; } } else { - SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); + SQueryStmtInfo* pQueryInfo = createQueryInfo(); if (pQueryInfo == NULL) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code. return terrno; } - int32_t code = qParserValidateSqlNode(pCxt->ctx.pCatalog, &info, pQueryInfo, pCxt->ctx.requestId, pCxt->pMsg, pCxt->msgLen); + int32_t code = qParserValidateSqlNode(&pCxt->ctx, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen); if (code == TSDB_CODE_SUCCESS) { *pQuery = (SQueryNode*)pQueryInfo; } @@ -89,7 +89,7 @@ int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) { return 0; } -static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf); +static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); static int32_t tnameComparFn(const void* p1, const void* p2) { SName* pn1 = (SName*)p1; @@ -113,7 +113,7 @@ static int32_t tnameComparFn(const void* p1, const void* p2) { } } -static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf) { +static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) { int32_t numOfSub = (int32_t)taosArrayGetSize(pSqlNode->from->list); for (int32_t j = 0; j < numOfSub; ++j) { @@ -123,12 +123,12 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis for (int32_t i = 0; i < num; ++i) { SSqlNode* p = taosArrayGetP(sub->pSubquery->node, i); if (p->from->type == SQL_FROM_NODE_TABLES) { - int32_t code = getTableNameFromSqlNode(p, tableNameList, pMsgBuf); + int32_t code = getTableNameFromSqlNode(p, tableNameList, pCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } } else { - getTableNameFromSubquery(p, tableNameList, pMsgBuf); + getTableNameFromSubquery(p, tableNameList, pCtx, pMsgBuf); } } } @@ -136,7 +136,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis return TSDB_CODE_SUCCESS; } -int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf) { +int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pParseCtx, SMsgBuf* pMsgBuf) { const char* msg1 = "invalid table name"; int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list); @@ -155,7 +155,11 @@ int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgB } SName name = {0}; - strndequote(name.tname, t->z, t->n); + int32_t code = createSName(&name, t, pParseCtx, pMsgBuf); + if (code != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + taosArrayPush(tableNameList, &name); } @@ -166,7 +170,7 @@ static void freePtrElem(void* p) { tfree(*(char**)p); } -int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen) { +int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseBasicCtx *pCtx, char* msg, int32_t msgBufLen) { int32_t code = TSDB_CODE_SUCCESS; SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen}; @@ -182,12 +186,12 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* p // load the table meta in the FROM clause if (pSqlNode->from->type == SQL_FROM_NODE_TABLES) { - code = getTableNameFromSqlNode(pSqlNode, pMetaInfo->pTableName, &msgBuf); + code = getTableNameFromSqlNode(pSqlNode, pMetaInfo->pTableName, pCtx, &msgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } } else { - code = getTableNameFromSubquery(pSqlNode, pMetaInfo->pTableName, &msgBuf); + code = getTableNameFromSubquery(pSqlNode, pMetaInfo->pTableName, pCtx, &msgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/parser/test/parserTests.cpp b/source/libs/parser/test/parserTests.cpp index fe430c5f5e..8758fdbc71 100644 --- a/source/libs/parser/test/parserTests.cpp +++ b/source/libs/parser/test/parserTests.cpp @@ -77,12 +77,15 @@ void sqlCheck(const char* sql, bool valid) { buf.len = 128; buf.buf = msg; + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; SSqlNode* pNode = (SSqlNode*)taosArrayGetP(((SArray*)info1.sub.node), 0); int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -119,7 +122,11 @@ TEST(testCase, validateAST_test) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -177,7 +184,11 @@ TEST(testCase, function_Test) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -223,7 +234,11 @@ TEST(testCase, function_Test2) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -269,7 +284,11 @@ TEST(testCase, function_Test3) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -314,7 +333,11 @@ TEST(testCase, function_Test4) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -362,7 +385,11 @@ TEST(testCase, function_Test5) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -447,7 +474,11 @@ TEST(testCase, function_Test6) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -525,7 +556,11 @@ TEST(testCase, function_Test6) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -587,7 +622,11 @@ TEST(testCase, function_Test6) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -636,7 +675,7 @@ TEST(testCase, function_Test6) { code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -666,7 +705,10 @@ TEST(testCase, function_Test6) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -688,7 +730,7 @@ TEST(testCase, function_Test6) { code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp index 04c3a7d81a..8d9fbadfad 100644 --- a/source/libs/parser/test/plannerTest.cpp +++ b/source/libs/parser/test/plannerTest.cpp @@ -81,7 +81,8 @@ void generateLogicplan(const char* sql) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -121,7 +122,9 @@ TEST(testCase, planner_test) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); diff --git a/source/libs/parser/test/tokenizerTest.cpp b/source/libs/parser/test/tokenizerTest.cpp index 3ab6a6531c..ee01a50148 100644 --- a/source/libs/parser/test/tokenizerTest.cpp +++ b/source/libs/parser/test/tokenizerTest.cpp @@ -710,7 +710,11 @@ TEST(testCase, extractMeta_test) { char msg[128] = {0}; SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 31e057f4c0..63d2e9b855 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -53,8 +53,8 @@ typedef struct SQueryDistPlanNodeInfo { typedef struct SQueryTableInfo { char *tableName; // to be deleted uint64_t uid; // to be deleted - STableMetaInfo* pMeta; - STimeWindow window; + STableMetaInfo *pMeta; + STimeWindow window; } SQueryTableInfo; typedef struct SQueryPlanNode { diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index fa7b3776dc..9a9b40473b 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -64,10 +64,11 @@ static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode* } int32_t createSelectPlan(const SQueryStmtInfo* pSelect, SQueryPlanNode** pQueryPlan) { - SArray* upstream = createQueryPlanImpl(pSelect); - assert(taosArrayGetSize(upstream) == 1); - *pQueryPlan = taosArrayGetP(upstream, 0); - taosArrayDestroy(upstream); + SArray* pDownstream = createQueryPlanImpl(pSelect); + assert(taosArrayGetSize(pDownstream) == 1); + + *pQueryPlan = taosArrayGetP(pDownstream, 0); + taosArrayDestroy(pDownstream); return TSDB_CODE_SUCCESS; } @@ -100,23 +101,21 @@ void destroyQueryPlan(SQueryPlanNode* pQueryNode) { //====================================================================================================================== -static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, +static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** pChildrenNode, int32_t numOfChildren, SExprInfo** pExpr, int32_t numOfOutput, const void* pExtInfo) { SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode)); pNode->info.type = type; pNode->info.name = strdup(name); - pNode->numOfExpr = numOfOutput; - pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES); - for(int32_t i = 0; i < numOfOutput; ++i) { - taosArrayPush(pNode->pExpr, &pExpr[i]); - } + pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES); + taosArrayAddBatch(pNode->pExpr, pExpr, numOfOutput); + assert(pNode->numOfExpr == numOfOutput); pNode->pChildren = taosArrayInit(4, POINTER_BYTES); - for(int32_t i = 0; i < numOfPrev; ++i) { - taosArrayPush(pNode->pChildren, &prev[i]); + for(int32_t i = 0; i < numOfChildren; ++i) { + taosArrayPush(pNode->pChildren, &pChildrenNode[i]); } switch(type) { @@ -184,8 +183,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla return pNode; } -static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info, - SArray* pExprs, SArray* tableCols) { +static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQueryTableInfo* info, SArray* pExprs, SArray* tableCols) { if (pQueryInfo->info.onlyTagQuery) { int32_t num = (int32_t) taosArrayGetSize(pExprs); SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info); @@ -193,16 +191,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, ST if (pQueryInfo->info.distinct) { pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); } - return pNode; } SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); - if (pQueryInfo->info.projectionQuery) { - int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs); - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL); - } else { + if (!pQueryInfo->info.projectionQuery) { STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); // table source column projection, generate the projection expr @@ -262,7 +256,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL); } } else { - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); + // here we can push down the projection to tablescan operator. + pNode->numOfExpr = num; + pNode->pExpr = taosArrayInit(num, POINTER_BYTES); + taosArrayAddAll(pNode->pExpr, p); +// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); } } @@ -299,9 +297,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTable(const SQueryStmtInfo* pQu tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN); SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,}; + info.window = pQueryInfo->window; + info.pMeta = pTableMetaInfo; // handle the only tag query - SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols); + SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, pExprs, tableCols); if (pQueryInfo->info.onlyTagQuery) { tfree(info.tableName); return pNode; @@ -326,23 +326,23 @@ static bool isAllAggExpr(SArray* pList) { } SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { - SArray* upstream = NULL; + SArray* pDownstream = NULL; - if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause - upstream = taosArrayInit(4, POINTER_BYTES); + if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { // subquery in the from clause + pDownstream = taosArrayInit(4, POINTER_BYTES); - size_t size = taosArrayGetSize(pQueryInfo->pUpstream); + size_t size = taosArrayGetSize(pQueryInfo->pDownstream); for(int32_t i = 0; i < size; ++i) { - SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i); + SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pDownstream, i); SArray* p = createQueryPlanImpl(pq); - taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); + taosArrayAddBatch(pDownstream, p->pData, (int32_t) taosArrayGetSize(p)); } } if (pQueryInfo->numOfTables > 1) { // it is a join query // 1. separate the select clause according to table - taosArrayDestroy(upstream); - upstream = taosArrayInit(5, POINTER_BYTES); + taosArrayDestroy(pDownstream); + pDownstream = taosArrayInit(5, POINTER_BYTES); for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i]; @@ -365,30 +365,30 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { columnListCopy(tableColumnList, pQueryInfo->colList, uid); // 4. add the projection query node - SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList); + SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, exprList, tableColumnList); columnListDestroy(tableColumnList); // dropAllExprInfo(exprList); - taosArrayPush(upstream, &pNode); + taosArrayPush(pDownstream, &pNode); } // 3. add the join node here SQueryTableInfo info = {0}; int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]); - SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables, + SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", pDownstream->pData, pQueryInfo->numOfTables, pQueryInfo->exprList[0]->pData, num, NULL); // 4. add the aggregation or projection execution node pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info); - upstream = taosArrayInit(5, POINTER_BYTES); - taosArrayPush(upstream, &pNode); + pDownstream = taosArrayInit(5, POINTER_BYTES); + taosArrayPush(pDownstream, &pNode); } else { // only one table, normal query process STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList); - upstream = taosArrayInit(5, POINTER_BYTES); - taosArrayPush(upstream, &pNode); + pDownstream = taosArrayInit(5, POINTER_BYTES); + taosArrayPush(pDownstream, &pNode); } - return upstream; + return pDownstream; } static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { @@ -434,22 +434,23 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, switch(pQueryNode->info.type) { case QNODE_TABLESCAN: { SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; - len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid, - pInfo->window.skey, pInfo->window.ekey); + len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid); assert(len1 > 0); len += len1; - for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { - SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i); - len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId); + len1 = sprintf(buf + len, " , cols:"); + assert(len1 > 0); + len += len1; - assert(len1 > 0); - len += len1; - } - - len1 = sprintf(buf + len, "\n"); + len = printExprInfo(buf, pQueryNode, len); + len1 = sprintf(buf + len, ")"); assert(len1 > 0); + // todo print filter info + len1 = sprintf(buf + len, ") filters:(nil)"); + len += len1; + + len1 = sprintf(buf + len, " time_range: %" PRId64 " - %" PRId64"\n", pInfo->window.skey, pInfo->window.ekey); len += len1; break; } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index e7468e44eb..461f16cdf0 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -187,7 +187,8 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan))); subplan->id = pCxt->nextId; ++(pCxt->nextId.subplanId); - subplan->type = type; + + subplan->type = type; subplan->level = 0; if (NULL != pCxt->pCurrentSubplan) { subplan->level = pCxt->pCurrentSubplan->level + 1; @@ -275,6 +276,8 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; + case QNODE_PROJECT: +// node = create case QNODE_MODIFY: // Insert is not an operator in a physical plan. break; @@ -335,7 +338,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD .pCatalog = pCatalog, .pDag = validPointer(calloc(1, sizeof(SQueryDag))), .pCurrentSubplan = NULL, - .nextId = {0} // todo queryid + .nextId = {.queryId = requestId}, }; *pDag = context.pDag; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index b25d9a3627..faeed74f1a 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -230,9 +230,11 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) { if (res) { res = cJSON_AddNumberToObject(jCol, jkColumnInfoBytes, col->bytes); } - if (res) { - res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters); + + if (res) { // TODO: temporarily disable it +// res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters); } + return res; } @@ -794,7 +796,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) { } // The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized. - bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id); if (res) { res = addObject(jSubplan, jkSubplanNode, phyNodeToJson, subplan->pNode); @@ -807,6 +808,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) { cJSON_Delete(jSubplan); return NULL; } + return jSubplan; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index b61c7c390f..54bddd0e3f 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -64,6 +64,13 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, return code; } + // + if (logicPlan->info.type != QNODE_MODIFY) { +// char* str = NULL; +// queryPlanToString(logicPlan, &str); +// printf("%s\n", str); + } + code = optimizeQueryPlan(logicPlan); if (TSDB_CODE_SUCCESS != code) { destroyQueryPlan(logicPlan); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index df4f121773..a07620f102 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -88,7 +88,6 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - return TSDB_CODE_SUCCESS; } @@ -248,19 +247,20 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { } int32_t addNum = 0; - int32_t nodeNum = taosArrayGetSize(job->nodeList); - - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { - SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); - - if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { - qError("taosArrayPush failed"); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - ++addNum; - } + if (job->nodeList) { + int32_t nodeNum = (int32_t) taosArrayGetSize(job->nodeList); + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); + + if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ++addNum; + } + } /* for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); @@ -279,8 +279,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), - pJob->queryId); + qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), pJob->queryId); return TSDB_CODE_SUCCESS; } @@ -997,7 +996,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void } int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) { - if (NULL == transport || NULL == nodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { + if (NULL == transport || /*NULL == nodeList || */NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0bf56dbaaf..eaaaf28297 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -259,26 +259,22 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_READ_FILE_ERROR, "Read dnode.json error TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR, "Write dnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED, "Mnode already deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_NOT_DEPLOYED, "Mnode not deployed") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ID_INVALID, "Mnode Id invalid") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ID_NOT_FOUND, "Mnode Id not found") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_INVALID_OPTION, "Mnode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_READ_FILE_ERROR, "Read mnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR, "Write mnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED, "Qnode already deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_NOT_DEPLOYED, "Qnode not deployed") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_ID_INVALID, "Qnode Id invalid") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_ID_NOT_FOUND, "Qnode Id not found") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_INVALID_OPTION, "Qnode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_READ_FILE_ERROR, "Read qnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR, "Write qnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED, "Snode already deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_NOT_DEPLOYED, "Snode not deployed") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_ID_INVALID, "Snode Id invalid") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_ID_NOT_FOUND, "Snode Id not found") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_INVALID_OPTION, "Snode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_READ_FILE_ERROR, "Read snode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR, "Write snode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED, "Bnode already deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_NOT_DEPLOYED, "Bnode not deployed") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_ID_INVALID, "Bnode Id invalid") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_ID_NOT_FOUND, "Bnode Id not found") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_INVALID_OPTION, "Bnode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_READ_FILE_ERROR, "Read bnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR, "Write bnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnode directories") diff --git a/tests/script/general/table/basic1.sim b/tests/script/general/table/basic1.sim index c46378beb9..b5393a03dc 100644 --- a/tests/script/general/table/basic1.sim +++ b/tests/script/general/table/basic1.sim @@ -62,13 +62,12 @@ print $data00 $data01 $data02 print $data10 $data11 $data22 print $data20 $data11 $data22 -return - print =============== insert data sql insert into c1 values(now+1s, 1) sql insert into c1 values(now+2s, 2) sql insert into c1 values(now+3s, 3) +return print =============== query data sql select * from c1 if $rows != 3 then