From 5ef5af88659c782aa5dd227f994de05df8dcb36c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Dec 2021 18:48:07 +0800 Subject: [PATCH 1/4] [td-11818] support create dnode/drop dnode. --- include/util/tdef.h | 2 +- source/client/src/clientImpl.c | 10 ++++++- source/client/src/clientMsgHandler.c | 11 ++++--- source/client/test/clientTests.cpp | 40 ++++++++++++++++++++++++-- source/libs/parser/inc/astToMsg.h | 2 ++ source/libs/parser/src/astToMsg.c | 40 ++++++++++++++++++++++++++ source/libs/parser/src/dCDAstProcess.c | 23 +++++++++++++-- source/libs/parser/src/parser.c | 3 +- 8 files changed, 120 insertions(+), 11 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 64a169b4f1..5edd0549f0 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -204,7 +204,7 @@ do { \ #define TSDB_CLUSTER_ID_LEN 40 #define TSDB_FQDN_LEN 128 -#define TSDB_EP_LEN (TSDB_FQDN_LEN+6) +#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) #define TSDB_IPv4ADDR_LEN 16 #define TSDB_FILENAME_LEN 128 #define TSDB_SHOW_SQL_LEN 512 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0ee99f77aa..e584a6685a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -420,7 +420,15 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } - SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen}; + SDataBuf buf = {.len = pMsg->contLen}; + buf.pData = calloc(1, pMsg->contLen); + if (buf.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pMsg->code = TSDB_CODE_OUT_OF_MEMORY; + } else { + memcpy(buf.pData, pMsg->pCont, pMsg->contLen); + } + pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); rpcFreeCont(pMsg->pCont); } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index bdf54eb21c..b46304e622 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -99,10 +99,10 @@ SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { } else { assert(pRequest != NULL); pMsgSendInfo->requestObjRefId = pRequest->self; - pMsgSendInfo->msgInfo = pRequest->body.requestMsg; - pMsgSendInfo->msgType = pRequest->type; + pMsgSendInfo->msgInfo = pRequest->body.requestMsg; + pMsgSendInfo->msgType = pRequest->type; pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->param = pRequest; + pMsgSendInfo->param = pRequest; pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericRspCallback:handleRequestRspFp[pRequest->type]; } @@ -165,8 +165,11 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) pRetrieve->precision = htons(pRetrieve->precision); SReqResultInfo* pResInfo = &pRequest->body.resInfo; + + tfree(pResInfo->pRspMsg); + pResInfo->pRspMsg = pMsg->pData; pResInfo->numOfRows = pRetrieve->numOfRows; - pResInfo->pData = pRetrieve->data; // todo fix this in async model + pResInfo->pData = pRetrieve->data; // todo fix this in async model pResInfo->current = 0; setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index d14719fffb..f5943fd01a 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -169,6 +169,44 @@ TEST(testCase, create_db_Test) { taos_close(pConn); } +TEST(testCase, create_dnode_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create dnode abc1"); + if (taos_errno(pRes) != 0) { + printf("error in create dnode, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, drop_dnode_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop dnode 2"); + if (taos_errno(pRes) != 0) { + printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + taos_close(pConn); +} + TEST(testCase, use_db_test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -273,7 +311,6 @@ TEST(testCase, show_stable_Test) { } TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); int32_t numOfFields = taos_num_fields(pRes); @@ -284,7 +321,6 @@ TEST(testCase, show_stable_Test) { } taos_free_result(pRes); - taos_close(pConn); } diff --git a/source/libs/parser/inc/astToMsg.h b/source/libs/parser/inc/astToMsg.h index 848bbc346e..a2843b5871 100644 --- a/source/libs/parser/inc/astToMsg.h +++ b/source/libs/parser/inc/astToMsg.h @@ -13,5 +13,7 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* ms SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); +SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); +SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); #endif // TDENGINE_ASTTOMSG_H diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 6b92357748..bafa9b25d4 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -376,3 +376,43 @@ SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* return pDropTableMsg; } +SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) { + const char* msg1 = "invalid host name (name too long, maximum length 128)"; + const char* msg2 = "dnode name can not be string"; + + if (taosArrayGetSize(pInfo->pMiscInfo->a) > 1) { + buildInvalidOperationMsg(pMsgBuf, msg1); + return NULL; + } + + SToken* id = taosArrayGet(pInfo->pMiscInfo->a, 0); + if (id->type != TK_ID) { + buildInvalidOperationMsg(pMsgBuf, msg2); + return NULL; + } + + SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *) calloc(1, sizeof(SCreateDnodeMsg)); + strncpy(pCreate->ep, id->z, id->n); + *len = sizeof(SCreateDnodeMsg); + + return pCreate; +} + +SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) { + SToken* pzName = taosArrayGet(pInfo->pMiscInfo->a, 0); + + + char* end = NULL; + SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg)); + pDrop->dnodeId = strtoll(pzName->z, &end, 10); + *len = sizeof(SDropDnodeMsg); + + if (end - pzName->z != pzName->n) { + buildInvalidOperationMsg(pMsgBuf, "invalid dnode id"); + tfree(pDrop); + return NULL; + } + + return pDrop; +} + diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index ddbd6eb58c..abf78f4f85 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -712,11 +712,30 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_DROP_TABLE: { pDcl->pMsg = (char*)buildDropTableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); if (pDcl->pMsg == NULL) { - return terrno; + code = terrno; } pDcl->msgType = TDMT_MND_DROP_STB; - return TSDB_CODE_SUCCESS; + break; + } + + case TSDB_SQL_CREATE_DNODE: { + pDcl->pMsg = (char*) buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); + if (pDcl->pMsg == NULL) { + code = terrno; + } + + pDcl->msgType = TDMT_MND_CREATE_DNODE; + break; + } + + case TSDB_SQL_DROP_DNODE: { + pDcl->pMsg = (char*) buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); + if (pDcl->pMsg == NULL) { + code = terrno; + } + + pDcl->msgType = TDMT_MND_DROP_DNODE; break; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 0f77135ec1..5c9a48e52f 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -44,11 +44,12 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } if (!isDqlSqlStatement(&info)) { - SDclStmtInfo* pDcl = calloc(1, sizeof(SQueryStmtInfo)); + SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); if (NULL == pDcl) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. return terrno; } + pDcl->nodeType = info.type; int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen); if (code == TSDB_CODE_SUCCESS) { From f3de50620c1de9a348d47e38199b0c8eff0ff8b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Dec 2021 18:59:34 +0800 Subject: [PATCH 2/4] [td-11818]fix bug in drop stable. --- source/client/src/clientImpl.c | 1 + source/libs/parser/inc/astToMsg.h | 2 +- source/libs/parser/src/astToMsg.c | 6 +++--- source/libs/parser/src/dCDAstProcess.c | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e584a6685a..64cbc8a5d0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -152,6 +152,7 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { .pMsg = pRequest->msgBuf, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE }; + int32_t code = qParseQuerySql(&cxt, pQuery); tfree(cxt.ctx.db); return code; diff --git a/source/libs/parser/inc/astToMsg.h b/source/libs/parser/inc/astToMsg.h index a2843b5871..5358a523fa 100644 --- a/source/libs/parser/inc/astToMsg.h +++ b/source/libs/parser/inc/astToMsg.h @@ -12,7 +12,7 @@ SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen); SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); -SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); +SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index bafa9b25d4..025ec7fb0a 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -356,7 +356,7 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le return pCreateTableMsg; } -SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) { +SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) { SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0); SName name = {0}; @@ -366,12 +366,12 @@ SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* return NULL; } - SDropTableMsg *pDropTableMsg = (SDropTableMsg*) calloc(1, sizeof(SDropTableMsg)); + SDropStbMsg *pDropTableMsg = (SDropStbMsg*) calloc(1, sizeof(SDropStbMsg)); code = tNameExtractFullName(&name, pDropTableMsg->name); assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T); - pDropTableMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; + pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; *len = sizeof(SDropTableMsg); return pDropTableMsg; } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index abf78f4f85..bdddbec327 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -710,7 +710,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } case TSDB_SQL_DROP_TABLE: { - pDcl->pMsg = (char*)buildDropTableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); + pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); if (pDcl->pMsg == NULL) { code = terrno; } From 4ebabfcf304326a61d20a67dcdb4b59c1baffa2f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Dec 2021 19:04:04 +0800 Subject: [PATCH 3/4] [td-11818] refactor sim script. --- source/client/test/clientTests.cpp | 36 +++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index f5943fd01a..2a3b38b828 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -50,13 +50,13 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } TEST(testCase, connect_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); taos_close(pConn); } TEST(testCase, create_user_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); @@ -69,7 +69,7 @@ TEST(testCase, create_user_Test) { } TEST(testCase, create_account_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); @@ -82,7 +82,7 @@ TEST(testCase, create_account_Test) { } TEST(testCase, drop_account_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); @@ -95,7 +95,7 @@ TEST(testCase, drop_account_Test) { } TEST(testCase, show_user_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show users"); @@ -114,7 +114,7 @@ TEST(testCase, show_user_Test) { } TEST(testCase, drop_user_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "drop user abc"); @@ -127,7 +127,7 @@ TEST(testCase, drop_user_Test) { } TEST(testCase, show_db_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show databases"); @@ -146,7 +146,7 @@ TEST(testCase, show_db_Test) { } TEST(testCase, create_db_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "create database abc1"); @@ -170,7 +170,7 @@ TEST(testCase, create_db_Test) { } TEST(testCase, create_dnode_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "create dnode abc1"); @@ -189,7 +189,7 @@ TEST(testCase, create_dnode_Test) { } TEST(testCase, drop_dnode_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "drop dnode 2"); @@ -208,7 +208,7 @@ TEST(testCase, drop_dnode_Test) { } TEST(testCase, use_db_test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "use abc1"); @@ -226,7 +226,7 @@ TEST(testCase, use_db_test) { } TEST(testCase, drop_db_test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); showDB(pConn); @@ -248,7 +248,7 @@ TEST(testCase, drop_db_test) { } TEST(testCase, create_stable_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "create database abc1"); @@ -279,7 +279,7 @@ TEST(testCase, drop_db_test) { } TEST(testCase, create_table_Test) { - // TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // // TAOS_RES* pRes = taos_query(pConn, "use abc1"); @@ -294,7 +294,7 @@ TEST(testCase, create_table_Test) { TEST(testCase, create_ctable_Test) {} TEST(testCase, show_stable_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "use abc1"); @@ -325,7 +325,7 @@ TEST(testCase, show_stable_Test) { } TEST(testCase, show_vgroup_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "use abc1"); @@ -358,7 +358,7 @@ TEST(testCase, show_vgroup_Test) { } TEST(testCase, drop_stable_Test) { - TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "create database abc1"); @@ -383,7 +383,7 @@ TEST(testCase, drop_stable_Test) { } //TEST(testCase, show_table_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // // TAOS_RES* pRes = taos_query(pConn, "use abc1"); From 7e3e6022db74290fa243cce37766745d63ac75b5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 27 Dec 2021 19:06:26 +0800 Subject: [PATCH 4/4] complete index write/search --- source/libs/index/inc/indexInt.h | 17 ++++- source/libs/index/inc/index_cache.h | 6 ++ source/libs/index/inc/index_fst.h | 2 + source/libs/index/inc/index_tfile.h | 10 +++ source/libs/index/src/index.c | 87 +++++++++++++++++++++- source/libs/index/src/index_cache.c | 52 +++++++++++++ source/libs/index/src/index_tfile.c | 111 ++++++++++++++++++++++++++++ 7 files changed, 282 insertions(+), 3 deletions(-) diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index a8f231da0a..048c9e804e 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -51,6 +51,8 @@ struct SIndex { int64_t suid; // current super table id, -1 is normal table int32_t cVersion; // current version allocated to cache + char* path; + SIndexStat stat; pthread_mutex_t mtx; }; @@ -87,12 +89,23 @@ typedef struct SIndexTermQuery { EIndexQueryType qType; } SIndexTermQuery; -typedef struct Iterate { - void* iter; +typedef struct Iterate Iterate; + +typedef struct IterateValue { int8_t type; char* colVal; SArray* val; +} IterateValue; + +typedef struct Iterate { + void* iter; + IterateValue val; + bool (*next)(Iterate* iter); + IterateValue* (*getValue)(Iterate* iter); } Iterate; + +void iterateValueDestroy(IterateValue* iv, bool destroy); + extern void* indexQhandle; int indexFlushCacheTFile(SIndex* sIdx, void*); diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 07b5b8d564..27f5a6fb20 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -41,6 +41,7 @@ typedef struct IndexCache { } IndexCache; +#define CACHE_VERSION(cache) atomic_load_32(&cache->version) typedef struct CacheTerm { // key int32_t nColVal; @@ -57,6 +58,9 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type); void indexCacheDestroy(void* cache); +Iterate* indexCacheIteratorCreate(IndexCache* cache); +void indexCacheIteratorDestroy(Iterate* iiter); + int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); // int indexCacheGet(void *cache, uint64_t *rst); @@ -66,6 +70,8 @@ void indexCacheRef(IndexCache* cache); void indexCacheUnRef(IndexCache* cache); void indexCacheDebug(IndexCache* cache); + +void indexCacheDestroySkiplist(SSkipList* slt); #ifdef __cplusplus } #endif diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 3c572787fc..73c79b2619 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -319,6 +319,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min); StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallback callback); FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut); + +void fstStreamBuilderDestroy(FstStreamBuilder* b); // set up bound range // refator, simple code by marco diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 550492ba50..f97a3126c8 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -113,6 +113,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArr void tfileReaderRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader); +TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type); +void tfileWriteClose(TFileWriter* tw); TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); void tfileWriterDestroy(TFileWriter* tw); int tfileWriterPut(TFileWriter* tw, void* data); @@ -123,6 +125,14 @@ IndexTFile* indexTFileCreate(const char* path); int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result); +Iterate* tfileIteratorCreate(TFileReader* reader); +void tfileIteratorDestroy(Iterate* iterator); + +TFileValue* tfileValueCreate(char* val); + +int tfileValuePush(TFileValue* tf, uint64_t val); +void tfileValueDestroy(TFileValue* tf); + #ifdef __cplusplus } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 3f871af01d..23df5f9f9a 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -75,9 +75,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { sIdx->tindex = indexTFileCreate(path); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->cVersion = 1; + sIdx->path = calloc(1, strlen(path) + 1); + memcpy(sIdx->path, path, strlen(path)); pthread_mutex_init(&sIdx->mtx, NULL); *index = sIdx; + return 0; #endif @@ -361,14 +364,96 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType } return 0; } + int indexFlushCacheTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; } indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); - IndexCache* pCache = (IndexCache*)cache; + IndexCache* pCache = (IndexCache*)cache; TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName); + // handle flush + Iterate* cacheIter = indexCacheIteratorCreate(pCache); + Iterate* tfileIter = tfileIteratorCreate(pReader); + + SArray* result = taosArrayInit(1024, sizeof(void*)); + + bool cn = cacheIter->next(cacheIter); + bool tn = tfileIter->next(tfileIter); + while (cn == true && tn == true) { + IterateValue* cv = cacheIter->getValue(cacheIter); + IterateValue* tv = tfileIter->getValue(tfileIter); + + // dump value + int comp = strcmp(cv->colVal, tv->colVal); + if (comp == 0) { + TFileValue* tfv = tfileValueCreate(cv->colVal); + taosArrayAddAll(tfv->tableId, cv->val); + taosArrayAddAll(tfv->tableId, tv->val); + taosArrayPush(result, &tfv); + + cn = cacheIter->next(cacheIter); + tn = tfileIter->next(tfileIter); + continue; + } else if (comp < 0) { + TFileValue* tfv = tfileValueCreate(cv->colVal); + taosArrayAddAll(tfv->tableId, cv->val); + taosArrayPush(result, &tfv); + + // copy to final Result; + cn = cacheIter->next(cacheIter); + } else { + TFileValue* tfv = tfileValueCreate(tv->colVal); + taosArrayPush(result, &tfv); + taosArrayAddAll(tfv->tableId, tv->val); + // copy to final result + tn = tfileIter->next(tfileIter); + } + } + while (cn == true) { + IterateValue* cv = cacheIter->getValue(cacheIter); + TFileValue* tfv = tfileValueCreate(cv->colVal); + taosArrayAddAll(tfv->tableId, cv->val); + taosArrayPush(result, &tfv); + cn = cacheIter->next(cacheIter); + } + while (tn == true) { + IterateValue* tv = tfileIter->getValue(tfileIter); + TFileValue* tfv = tfileValueCreate(tv->colVal); + taosArrayAddAll(tfv->tableId, tv->val); + taosArrayPush(result, &tfv); + tn = tfileIter->next(tfileIter); + } + + int32_t version = CACHE_VERSION(pCache); + uint8_t colType = pCache->type; + + TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, pCache->colName, colType); + if (tw == NULL) { + indexError("faile to open file to write"); + } else { + int ret = tfileWriterPut(tw, result); + if (ret != 0) { indexError("faile to write into tindex "); } + } + // not free later, just put int table cache + SSkipList* timm = (SSkipList*)pCache->imm; + pCache->imm = NULL; // or throw int bg thread + indexCacheDestroySkiplist(timm); + + tfileWriteClose(tw); + indexCacheIteratorDestroy(cacheIter); + tfileIteratorDestroy(tfileIter); + tfileReaderUnRef(pReader); indexCacheUnRef(pCache); return 0; } +void iterateValueDestroy(IterateValue* value, bool destroy) { + if (destroy) { + taosArrayDestroy(value->val); + } else { + taosArrayClear(value->val); + } + free(value->colVal); + value->colVal = NULL; +} diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 8181c17505..54aee8858b 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -94,6 +94,16 @@ void indexCacheDebug(IndexCache* cache) { tSkipListDestroyIter(iter); } +void indexCacheDestroySkiplist(SSkipList* slt) { + SSkipListIterator* iter = tSkipListCreateIter(slt); + while (tSkipListIterNext(iter)) { + SSkipListNode* node = tSkipListIterGet(iter); + CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + if (ct != NULL) {} + } + tSkipListDestroyIter(iter); +} + void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; if (pCache == NULL) { return; } @@ -108,6 +118,48 @@ static void doMergeWork(SSchedMsg* msg) { SIndex* sidx = (SIndex*)pCache->index; indexFlushCacheTFile(sidx, pCache); } +static bool indexCacheIteratorNext(Iterate* itera) { + SSkipListIterator* iter = itera->iter; + if (iter == NULL) { return false; } + + IterateValue* iv = &itera->val; + iterateValueDestroy(iv, false); + + bool next = tSkipListIterNext(iter); + if (next) { + SSkipListNode* node = tSkipListIterGet(iter); + CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + + iv->type = ct->operaType; + iv->colVal = ct->colVal; + + taosArrayPush(iv->val, &ct->uid); + } + + return next; +} + +static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { + return &iter->val; +} +Iterate* indexCacheIteratorCreate(IndexCache* cache) { + Iterate* iiter = calloc(1, sizeof(Iterate)); + if (iiter == NULL) { return NULL; } + + iiter->val.val = taosArrayInit(1, sizeof(uint64_t)); + iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL; + iiter->next = indexCacheIteratorNext; + iiter->getValue = indexCacheIteratorGetValue; + + return iiter; +} +void indexCacheIteratorDestroy(Iterate* iter) { + if (iter == NULL) { return; } + + tSkipListDestroyIter(iter->iter); + iterateValueDestroy(&iter->val, true); + free(iter); +} int indexCacheSchedToMerge(IndexCache* pCache) { SSchedMsg schedMsg = {0}; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 0dfb14cc8d..fc31ff3c29 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -23,6 +23,13 @@ #include "taosdef.h" #include "tcompare.h" +typedef struct TFileFstIter { + FstStreamBuilder* fb; + StreamWithState* st; + AutomationCtx* ctx; + TFileReader* rdr; +} TFileFstIter; + #define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t)) static int tfileStrCompare(const void* a, const void* b); @@ -184,6 +191,23 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul return ret; } +TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) { + char filename[128] = {0}; + int32_t coldId = 1; + tfileGenFileName(filename, suid, coldId, version); + + char fullname[256] = {0}; + snprintf(fullname, sizeof(fullname), "%s/%s", path, filename); + WriterCtx* wcx = writerCtxCreate(TFile, fullname, true, 1024 * 1024); + + TFileHeader tfh = {0}; + tfh.suid = suid; + tfh.version = version; + memcpy(tfh.colName, colName, strlen(colName)); + tfh.colType = colType; + + return tfileWriterCreate(wcx, &tfh); +} TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { // char pathBuf[128] = {0}; // sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version); @@ -279,6 +303,11 @@ int tfileWriterPut(TFileWriter* tw, void* data) { tw->fb = NULL; return 0; } +void tfileWriteClose(TFileWriter* tw) { + if (tw == NULL) { return; } + writerCtxDestroy(tw->ctx); + free(tw); +} void tfileWriterDestroy(TFileWriter* tw) { if (tw == NULL) { return; } @@ -314,6 +343,71 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { return 0; } +static bool tfileIteratorNext(Iterate* iiter) { + IterateValue* iv = &iiter->val; + iterateValueDestroy(iv, false); + // SArray* tblIds = iv->val; + + char* colVal = NULL; + uint64_t offset = 0; + + TFileFstIter* tIter = iiter->iter; + StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL); + if (rt == NULL) { return false; } + + int32_t sz = 0; + char* ch = (char*)fstSliceData(&rt->data, &sz); + colVal = calloc(1, sz + 1); + memcpy(colVal, ch, sz); + + offset = (uint64_t)(rt->out.out); + + swsResultDestroy(rt); + // set up iterate value + if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; } + + iv->colVal = colVal; + + // std::string key(ch, sz); +} + +static IterateValue* tifileIterateGetValue(Iterate* iter) { + return &iter->val; +} + +static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { + TFileFstIter* tIter = calloc(1, sizeof(Iterate)); + if (tIter == NULL) { return NULL; } + tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); + tIter->fb = fstSearch(reader->fst, tIter->ctx); + tIter->st = streamBuilderIntoStream(tIter->fb); + tIter->rdr = reader; + return tIter; +} + +Iterate* tfileIteratorCreate(TFileReader* reader) { + Iterate* iter = calloc(1, sizeof(Iterate)); + + iter->iter = tfileFstIteratorCreate(reader); + if (iter->iter == NULL) { return NULL; } + + iter->next = tfileIteratorNext; + iter->getValue = tifileIterateGetValue; + return iter; +} +void tfileIteratorDestroy(Iterate* iter) { + if (iter == NULL) { return; } + IterateValue* iv = &iter->val; + iterateValueDestroy(iv, true); + + TFileFstIter* tIter = iter->iter; + streamWithStateDestroy(tIter->st); + fstStreamBuilderDestroy(tIter->fb); + automCtxDestroy(tIter->ctx); + + free(iter); +} + TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { if (tf == NULL) { return NULL; } TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; @@ -334,6 +428,23 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { return fn(av->colVal, bv->colVal); } + +TFileValue* tfileValueCreate(char* val) { + TFileValue* tf = calloc(1, sizeof(TFileValue)); + if (tf == NULL) { return NULL; } + + tf->tableId = taosArrayInit(32, sizeof(uint64_t)); + return tf; +} +int tfileValuePush(TFileValue* tf, uint64_t val) { + if (tf == NULL) { return -1; } + taosArrayPush(tf->tableId, &val); + return 0; +} +void tfileValueDestroy(TFileValue* tf) { + taosArrayDestroy(tf->tableId); + free(tf); +} static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { int sz = taosArrayGetSize(ids); SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);