From 04c4135a389f1290bf6d4a4a44becdeba38b3693 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 27 Apr 2022 18:38:31 +0800 Subject: [PATCH 01/10] test: add unitest for sdb --- source/dnode/mnode/impl/src/mndUser.c | 18 +-- source/dnode/mnode/impl/test/sdb/sdbTest.cpp | 136 ++++++++++++------- 2 files changed, 96 insertions(+), 58 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 261d334de2..f4bbe8cf88 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -39,14 +39,16 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); int32_t mndInitUser(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_USER, - .keyType = SDB_KEY_BINARY, - .deployFp = (SdbDeployFp)mndCreateDefaultUsers, - .encodeFp = (SdbEncodeFp)mndUserActionEncode, - .decodeFp = (SdbDecodeFp)mndUserActionDecode, - .insertFp = (SdbInsertFp)mndUserActionInsert, - .updateFp = (SdbUpdateFp)mndUserActionUpdate, - .deleteFp = (SdbDeleteFp)mndUserActionDelete}; + SSdbTable table = { + .sdbType = SDB_USER, + .keyType = SDB_KEY_BINARY, + .deployFp = (SdbDeployFp)mndCreateDefaultUsers, + .encodeFp = (SdbEncodeFp)mndUserActionEncode, + .decodeFp = (SdbDecodeFp)mndUserActionDecode, + .insertFp = (SdbInsertFp)mndUserActionInsert, + .updateFp = (SdbUpdateFp)mndUserActionUpdate, + .deleteFp = (SdbDeleteFp)mndUserActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_USER, mndProcessCreateUserReq); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_USER, mndProcessAlterUserReq); diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index 998dca1401..21913375d6 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -9,75 +9,111 @@ * */ -#include "sut.h" +#include -class MndTestShow : public ::testing::Test { +#include "sdb.h" + +class MndTestSdb : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/mnode_test_show", 9021); } - static void TearDownTestSuite() { test.Cleanup(); } - - static Testbase test; + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} public: void SetUp() override {} void TearDown() override {} }; -Testbase MndTestShow::test; +typedef struct SStrObj { + char key[24]; + int8_t v8; + int16_t v16; + int32_t v32; + int64_t v64; + char vstr[32]; + char unused[48]; +} SStrObj; -TEST_F(MndTestShow, 01_ShowMsg_InvalidMsgMax) { - SShowReq showReq = {0}; - showReq.type = TSDB_MGMT_TABLE_MAX; +typedef struct SI32Obj { + int32_t key; + int8_t v8; + int16_t v16; + int32_t v32; + int64_t v64; + char vstr[32]; + char unused[48]; +} SI32Obj; - int32_t contLen = tSerializeSShowReq(NULL, 0, &showReq); - void* pReq = rpcMallocCont(contLen); - tSerializeSShowReq(pReq, contLen, &showReq); - tFreeSShowReq(&showReq); +typedef struct SI64Obj { + int64_t key; + int8_t v8; + int16_t v16; + int32_t v32; + int64_t v64; + char vstr[32]; + char unused[48]; +} SI64Obj; - SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_NE(pRsp->code, 0); +SSdbRaw *strEncode(SStrObj *pObj) { + int32_t dataPos = 0; + SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, 1, sizeof(SStrObj)); + + sdbSetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->key)); + dataPos += sizeof(pObj->key); + sdbSetRawInt8(pRaw, dataPos, pObj->v8); + dataPos += sizeof(pObj->v8); + sdbSetRawInt16(pRaw, dataPos, pObj->v16); + dataPos += sizeof(pObj->v16); + sdbSetRawInt32(pRaw, dataPos, pObj->v32); + dataPos += sizeof(pObj->v32); + sdbSetRawInt64(pRaw, dataPos, pObj->v64); + dataPos += sizeof(pObj->v64); + sdbSetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->vstr)); + dataPos += sizeof(pObj->key); + sdbSetRawDataLen(pRaw, dataPos); + + return pRaw; } -TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { - SShowReq showReq = {0}; - showReq.type = TSDB_MGMT_TABLE_START; +SSdbRaw *strDecode(SStrObj *pObj) { + int32_t dataPos = 0; + SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, 1, sizeof(SStrObj)); - int32_t contLen = tSerializeSShowReq(NULL, 0, &showReq); - void* pReq = rpcMallocCont(contLen); - tSerializeSShowReq(pReq, contLen, &showReq); - tFreeSShowReq(&showReq); + sdbSetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->key)); + dataPos += sizeof(pObj->key); + sdbSetRawInt8(pRaw, dataPos, pObj->v8); + dataPos += sizeof(pObj->v8); + sdbSetRawInt16(pRaw, dataPos, pObj->v16); + dataPos += sizeof(pObj->v16); + sdbSetRawInt32(pRaw, dataPos, pObj->v32); + dataPos += sizeof(pObj->v32); + sdbSetRawInt64(pRaw, dataPos, pObj->v64); + dataPos += sizeof(pObj->v64); + sdbSetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->vstr)); + dataPos += sizeof(pObj->key); + sdbSetRawDataLen(pRaw, dataPos); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_NE(pRsp->code, 0); + return pRaw; } -TEST_F(MndTestShow, 03_ShowMsg_Conn) { - char passwd[] = "taosdata"; - char secretEncrypt[TSDB_PASSWORD_LEN] = {0}; - taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt); +TEST_F(MndTestSdb, 01_Basic) { + SSdbOpt opt = {0}; + opt.path = "/tmp/mnode_test_sdb"; - SConnectReq connectReq = {0}; - connectReq.pid = 1234; - strcpy(connectReq.app, "mnode_test_show"); - strcpy(connectReq.db, ""); - strcpy(connectReq.user, "root"); - strcpy(connectReq.passwd, secretEncrypt); + SSdb *pSdb = sdbInit(&opt); + EXPECT_NE(pSdb, nullptr); - int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); - void* pReq = rpcMallocCont(contLen); - tSerializeSConnectReq(pReq, contLen, &connectReq); + SSdbTable strTable = { + .sdbType = SDB_USER, + .keyType = SDB_KEY_BINARY, + .deployFp = (SdbDeployFp)strEncode, + .encodeFp = (SdbEncodeFp)strDecode, + .decodeFp = (SdbDecodeFp)NULL, + .insertFp = (SdbInsertFp)NULL, + .updateFp = (SdbUpdateFp)NULL, + .deleteFp = (SdbDeleteFp)NULL, + }; - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); + sdbSetTable(pSdb, strTable); - test.SendShowReq(TSDB_MGMT_TABLE_CONNS, "connections", ""); - // EXPECT_EQ(test.GetShowRows(), 1); -} - -TEST_F(MndTestShow, 04_ShowMsg_Cluster) { - test.SendShowReq(TSDB_MGMT_TABLE_CLUSTER, "cluster", ""); - EXPECT_EQ(test.GetShowRows(), 1); + sdbCleanup(pSdb); } From 46af79b90cd1d229e1733d16fa31b8727d9fe812 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 Apr 2022 09:55:31 +0800 Subject: [PATCH 02/10] test: add unitest for sdb --- source/dnode/mnode/impl/test/sdb/sdbTest.cpp | 47 +++++++++++++++++--- source/dnode/mnode/sdb/src/sdb.c | 2 +- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index 21913375d6..4da414e274 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -23,6 +23,12 @@ class MndTestSdb : public ::testing::Test { void TearDown() override {} }; +typedef struct SMnode { + int32_t v100; + int32_t v200; + SSdb *pSdb; +} SMnode; + typedef struct SStrObj { char key[24]; int8_t v8; @@ -95,8 +101,39 @@ SSdbRaw *strDecode(SStrObj *pObj) { return pRaw; } +int32_t strInsert(SSdb *pSdb, SStrObj *pObj) { return 0; } + +int32_t strDelete(SSdb *pSdb, SStrObj *pObj, bool callFunc) { return 0; } + +int32_t strUpdate(SSdb *pSdb, SStrObj *pOld, SStrObj *pNew) { + pOld->v8 = pNew->v8; + pOld->v16 = pNew->v16; + pOld->v32 = pNew->v32; + pOld->v64 = pNew->v64; + return 0; +} + +int32_t strDefault(SMnode *pMnode) { + SStrObj strObj = {0}; + strcpy(strObj.key, "k1000"); + strObj.v8 = 1; + strObj.v16 = 1; + strObj.v32 = 1000; + strObj.v64 = 1000; + strcpy(strObj.vstr, "v1000"); + + SSdbRaw *pRaw = strEncode(&strObj); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + return sdbWrite(pMnode->pSdb, pRaw); +} + TEST_F(MndTestSdb, 01_Basic) { + SMnode mnode; + mnode.v100 = 100; + mnode.v200 = 200; + SSdbOpt opt = {0}; + opt.pMnode = &mnode; opt.path = "/tmp/mnode_test_sdb"; SSdb *pSdb = sdbInit(&opt); @@ -106,11 +143,11 @@ TEST_F(MndTestSdb, 01_Basic) { .sdbType = SDB_USER, .keyType = SDB_KEY_BINARY, .deployFp = (SdbDeployFp)strEncode, - .encodeFp = (SdbEncodeFp)strDecode, - .decodeFp = (SdbDecodeFp)NULL, - .insertFp = (SdbInsertFp)NULL, - .updateFp = (SdbUpdateFp)NULL, - .deleteFp = (SdbDeleteFp)NULL, + .encodeFp = (SdbEncodeFp)strEncode, + .decodeFp = (SdbDecodeFp)strDecode, + .insertFp = (SdbInsertFp)strInsert, + .updateFp = (SdbUpdateFp)strDelete, + .deleteFp = (SdbDeleteFp)strUpdate, }; sdbSetTable(pSdb, strTable); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index f85ff7a26e..d180271175 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -141,7 +141,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { } static int32_t sdbCreateDir(SSdb *pSdb) { - if (taosMkDir(pSdb->currDir) != 0) { + if (taosMulMkDir(pSdb->currDir) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to create dir:%s since %s", pSdb->currDir, terrstr()); return -1; From 108b9ee181bacff17b812cb7b7cdb4750e0a75e6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 28 Apr 2022 13:36:43 +0800 Subject: [PATCH 03/10] feat(stream): support perf schema --- source/client/inc/clientInt.h | 1 - source/dnode/mnode/impl/inc/mndDef.h | 5 +- source/dnode/mnode/impl/src/mndConsumer.c | 1 + source/dnode/mnode/impl/src/mndDb.c | 4 +- source/dnode/mnode/impl/src/mndDef.c | 4 +- source/dnode/mnode/impl/src/mndInfoSchema.c | 11 +-- source/dnode/mnode/impl/src/mndPerfSchema.c | 12 +++ source/dnode/mnode/impl/src/mndScheduler.c | 2 +- source/dnode/mnode/impl/src/mndSma.c | 22 +++--- source/dnode/mnode/impl/src/mndStream.c | 81 ++++++++++----------- source/dnode/mnode/impl/src/mndSubscribe.c | 15 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 1 - 12 files changed, 82 insertions(+), 77 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 49a9d4e10d..ce5b101b4a 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -188,7 +188,6 @@ typedef struct SRequestSendRecvBody { typedef struct { int8_t resType; - int32_t code; char topic[TSDB_TOPIC_FNAME_LEN]; int32_t vgId; SSchemaWrapper schema; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 03f8a7cb2e..9fc43d601a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -582,8 +582,9 @@ typedef struct { typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - char outputSTbName[TSDB_TABLE_FNAME_LEN]; + char sourceDb[TSDB_DB_FNAME_LEN]; + char targetDb[TSDB_DB_FNAME_LEN]; + char targetSTbName[TSDB_TABLE_FNAME_LEN]; int64_t createTime; int64_t updateTime; int64_t uid; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index be584848a3..025f61dc87 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -812,6 +812,7 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock colDataAppend(pColInfo, numOfRows, (const char *)status, false); // subscribed topics + // TODO: split into multiple rows char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0}; char *showStr = taosShowStrArray(pConsumer->assignedTopics); tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 15ebcf02db..c784f46541 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1501,8 +1501,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.singleSTable, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.streamMode, false); + /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/ + /*colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.streamMode, false);*/ pColInfo = taosArrayGet(pBlock->pDataBlock, cols); colDataAppend(pColInfo, rows, (const char *)b, false); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 2f167b72d9..54f411d71f 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -410,7 +410,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { int32_t sz = 0; /*int32_t outputNameSz = 0;*/ if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; - if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; @@ -456,7 +456,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pObj->db) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 07f8228cda..2df23ad38b 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -89,7 +89,7 @@ static const SInfosTableSchema userDBSchema[] = { {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, - {.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + /*{.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},*/ {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update }; @@ -124,14 +124,6 @@ static const SInfosTableSchema userStbsSchema[] = { {.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; -static const SInfosTableSchema userStreamsSchema[] = { - {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, - {.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR}, -}; - static const SInfosTableSchema userTblsSchema[] = { {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, @@ -259,7 +251,6 @@ static const SInfosTableMeta infosMeta[] = { {TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)}, {TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)}, {TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)}, - {TSDB_INS_TABLE_USER_STREAMS, userStreamsSchema, tListLen(userStreamsSchema)}, {TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)}, {TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)}, {TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)}, diff --git a/source/dnode/mnode/impl/src/mndPerfSchema.c b/source/dnode/mnode/impl/src/mndPerfSchema.c index 2cb8c4351e..ee17b28065 100644 --- a/source/dnode/mnode/impl/src/mndPerfSchema.c +++ b/source/dnode/mnode/impl/src/mndPerfSchema.c @@ -76,6 +76,18 @@ static const SPerfsTableSchema offsetSchema[] = { {.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; +static const SPerfsTableSchema streamSchema[] = { + {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, + {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, +}; + static const SPerfsTableMeta perfsMeta[] = { {TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)}, {TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)}, diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 4976bdefc7..3f4b2fe145 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -382,7 +382,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; - SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); + SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb); ASSERT(pDb); if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { sdbRelease(pSdb, pDb); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index a474ccc5b6..0e3e8b68cc 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -40,7 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq); static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq); static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp); static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp); -static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows); +static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); int32_t mndInitSma(SMnode *pMnode) { @@ -406,7 +406,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre SStreamObj streamObj = {0}; tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); - tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN); + tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); @@ -686,9 +686,9 @@ _OVER: return code; } -int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) { - int32_t code = -1; - SSmaObj *pSma = NULL; +int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) { + int32_t code = -1; + SSmaObj *pSma = NULL; pSma = mndAcquireSma(pMnode, indexReq->indexFName); if (pSma == NULL) { @@ -701,13 +701,14 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserI strcpy(rsp->indexType, TSDB_INDEX_TYPE_SMA); SNodeList *pList = NULL; - int32_t extOffset = 0; + int32_t extOffset = 0; code = nodesStringToList(pSma->expr, &pList); if (0 == code) { SNode *node = NULL; FOREACH(node, pList) { SFunctionNode *pFunc = (SFunctionNode *)node; - extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", (extOffset ? ",":""), pFunc->functionName); + extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", + (extOffset ? "," : ""), pFunc->functionName); } *exist = true; @@ -718,13 +719,12 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserI return code; } - static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) { +static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; @@ -758,8 +758,8 @@ static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName)); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char*) n, false); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)n, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1062154e02..058e11f397 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -40,7 +40,7 @@ static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp); /*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/ static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq); static int32_t mndGetStreamMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); int32_t mndInitStream(SMnode *pMnode) { @@ -58,8 +58,8 @@ int32_t mndInitStream(SMnode *pMnode) { /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ - // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream); - /*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/ + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); return sdbSetTable(pMnode->pSdb, table); } @@ -294,8 +294,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); - tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN); - tstrncpy(streamObj.outputSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN); + tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); + tstrncpy(streamObj.targetSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN); streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); @@ -424,58 +424,55 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS return 0; } -static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; - int32_t cols = 0; - char *pWrite; - char prefix[TSDB_DB_FNAME_LEN] = {0}; - - SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); - if (pDb == NULL) return 0; - - tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN); - strcat(prefix, TS_PATH_DELIMITER); - int32_t prefixLen = (int32_t)strlen(prefix); while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); + pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pStream); if (pShow->pIter == NULL) break; - if (pStream->dbUid != pDb->uid) { - if (strncmp(pStream->name, prefix, prefixLen) != 0) { - mError("Inconsistent stream data, name:%s, db:%s, dbUid:%" PRIu64, pStream->name, pDb->name, pDb->uid); - } + SColumnInfoData *pColInfo; + SName n; + int32_t cols = 0; - sdbRelease(pSdb, pStream); - continue; - } + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tNameFromString(&n, pStream->name, T_NAME_ACCT | T_NAME_DB); + tNameGetDbName(&n, varDataVal(streamName)); + varDataSetLen(streamName, strlen(varDataVal(streamName))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)streamName, false); - cols = 0; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->createTime, false); - char streamName[TSDB_TABLE_NAME_LEN] = {0}; - tstrncpy(streamName, pStream->name + prefixLen, TSDB_TABLE_NAME_LEN); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, streamName); - cols++; + char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(&sql[VARSTR_HEADER_SIZE], pStream->sql, TSDB_SHOW_SQL_LEN); + varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE])); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)sql, false); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pStream->createTime; - cols++; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pStream->sql, pShow->bytes[cols]); - cols++; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->sourceDb, true); - numOfRows++; - sdbRelease(pSdb, pStream); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetDb, true); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetSTbName, true); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->waterMark, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false); } - - mndReleaseDb(pMnode, pDb); - pShow->numOfRows += numOfRows; - return numOfRows; + return 0; } static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index f271c1b565..aeecab34cb 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -309,9 +309,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR newConsumerEp.consumerId = consumerId; newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); - /*SMqConsumer* pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/ - /*ASSERT(pTestNew->consumerId == consumerId);*/ - /*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/ taosArrayPush(pOutput->newConsumers, &consumerId); } } @@ -369,7 +366,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } } - // 8. generate logs + // 8. TODO generate logs + mInfo("rebalance calculation completed, rebalanced vg:"); + for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { + SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); + mInfo("vg: %d moved from consumer %ld to consumer %ld", pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, + pOutputRebVg->newConsumerId); + } // 9. clear taosHashCleanup(pHash); @@ -447,7 +450,9 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO goto REB_FAIL; } } - // 4. commit log: modification log + // 4. TODO commit log: modification log + + // 5. execution if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL; mndTransDrop(pTrans); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7f7023fccd..90d93bd3f5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -61,7 +61,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); - // todo: change the interface here if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; From 7d3dd776f17f62999b8a5365a6ac059bd44f0d37 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 Apr 2022 13:56:10 +0800 Subject: [PATCH 04/10] test: add unitest for sdb --- include/dnode/mnode/sdb/sdb.h | 1 - source/dnode/mnode/impl/test/sdb/sdbTest.cpp | 238 ++++++++++++++++--- source/dnode/mnode/sdb/inc/sdbInt.h | 2 + source/os/src/osDir.c | 4 +- 4 files changed, 206 insertions(+), 39 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 1613339d10..4fab11e237 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -331,7 +331,6 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver); int32_t sdbGetRawTotalSize(SSdbRaw *pRaw); SSdbRow *sdbAllocRow(int32_t objSize); -void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); void *sdbGetRowObj(SSdbRow *pRow); typedef struct SSdb { diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index 4da414e274..606db251d2 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -73,32 +73,39 @@ SSdbRaw *strEncode(SStrObj *pObj) { dataPos += sizeof(pObj->v32); sdbSetRawInt64(pRaw, dataPos, pObj->v64); dataPos += sizeof(pObj->v64); - sdbSetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->vstr)); - dataPos += sizeof(pObj->key); + sdbSetRawBinary(pRaw, dataPos, pObj->vstr, sizeof(pObj->vstr)); + dataPos += sizeof(pObj->vstr); sdbSetRawDataLen(pRaw, dataPos); return pRaw; } -SSdbRaw *strDecode(SStrObj *pObj) { - int32_t dataPos = 0; - SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, 1, sizeof(SStrObj)); +SSdbRow *strDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sver != 1) return NULL; - sdbSetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->key)); + SSdbRow *pRow = sdbAllocRow(sizeof(SStrObj)); + if (pRow == NULL) return NULL; + + SStrObj *pObj = (SStrObj *)sdbGetRowObj(pRow); + if (pObj == NULL) return NULL; + + int32_t dataPos = 0; + sdbGetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->key)); dataPos += sizeof(pObj->key); - sdbSetRawInt8(pRaw, dataPos, pObj->v8); + sdbGetRawInt8(pRaw, dataPos, &pObj->v8); dataPos += sizeof(pObj->v8); - sdbSetRawInt16(pRaw, dataPos, pObj->v16); + sdbGetRawInt16(pRaw, dataPos, &pObj->v16); dataPos += sizeof(pObj->v16); - sdbSetRawInt32(pRaw, dataPos, pObj->v32); + sdbGetRawInt32(pRaw, dataPos, &pObj->v32); dataPos += sizeof(pObj->v32); - sdbSetRawInt64(pRaw, dataPos, pObj->v64); + sdbGetRawInt64(pRaw, dataPos, &pObj->v64); dataPos += sizeof(pObj->v64); - sdbSetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->vstr)); - dataPos += sizeof(pObj->key); - sdbSetRawDataLen(pRaw, dataPos); + sdbGetRawBinary(pRaw, dataPos, pObj->vstr, sizeof(pObj->vstr)); + dataPos += sizeof(pObj->vstr); - return pRaw; + return pRow; } int32_t strInsert(SSdb *pSdb, SStrObj *pObj) { return 0; } @@ -110,39 +117,196 @@ int32_t strUpdate(SSdb *pSdb, SStrObj *pOld, SStrObj *pNew) { pOld->v16 = pNew->v16; pOld->v32 = pNew->v32; pOld->v64 = pNew->v64; + strcpy(pOld->vstr, pNew->vstr); return 0; } -int32_t strDefault(SMnode *pMnode) { - SStrObj strObj = {0}; - strcpy(strObj.key, "k1000"); - strObj.v8 = 1; - strObj.v16 = 1; - strObj.v32 = 1000; - strObj.v64 = 1000; - strcpy(strObj.vstr, "v1000"); - - SSdbRaw *pRaw = strEncode(&strObj); - sdbSetRawStatus(pRaw, SDB_STATUS_READY); - return sdbWrite(pMnode->pSdb, pRaw); +void strSetDefault(SStrObj *pObj, int32_t index) { + memset(pObj, 0, sizeof(SStrObj)); + snprintf(pObj->key, sizeof(pObj->key), "k%d", index * 1000); + pObj->v8 = index; + pObj->v16 = index; + pObj->v32 = index * 1000; + pObj->v64 = index * 1000; + snprintf(pObj->vstr, sizeof(pObj->vstr), "v%d", index * 1000); } -TEST_F(MndTestSdb, 01_Basic) { - SMnode mnode; +int32_t strDefault(SMnode *pMnode) { + SStrObj strObj; + SSdbRaw *pRaw = NULL; + + strSetDefault(&strObj, 1); + pRaw = strEncode(&strObj); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + if (sdbWrite(pMnode->pSdb, pRaw) != 0) return -1; + + strSetDefault(&strObj, 2); + pRaw = strEncode(&strObj); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + if (sdbWriteWithoutFree(pMnode->pSdb, pRaw) != 0) return -1; + sdbFreeRaw(pRaw); + + return 0; +} + +bool sdbTraverseSucc1(SMnode *pMnode, SStrObj *pObj, int32_t *p1, int32_t *p2, int32_t *p3) { + if (pObj->v8 == 1) { + *p1 = *p2 + *p3 + pObj->v8; + } + return true; +} + +bool sdbTraverseSucc2(SMnode *pMnode, SStrObj *pObj, int32_t *p1, int32_t *p2, int32_t *p3) { + *p1 = *p2 + *p3 + pObj->v8; + return true; +} + +bool sdbTraverseFail(SMnode *pMnode, SStrObj *pObj, int32_t *p1, int32_t *p2, int32_t *p3) { + *p1 = *p2 + *p3; + return false; +} + +TEST_F(MndTestSdb, 01_Write) { + void *pIter; + int32_t num; + SStrObj *pObj; + SMnode mnode; + SSdb *pSdb; + SSdbOpt opt = {0}; + int32_t p1 = 0; + int32_t p2 = 111; + int32_t p3 = 222; + mnode.v100 = 100; mnode.v200 = 200; - - SSdbOpt opt = {0}; opt.pMnode = &mnode; opt.path = "/tmp/mnode_test_sdb"; - - SSdb *pSdb = sdbInit(&opt); - EXPECT_NE(pSdb, nullptr); + taosRemoveDir(opt.path); SSdbTable strTable = { .sdbType = SDB_USER, .keyType = SDB_KEY_BINARY, - .deployFp = (SdbDeployFp)strEncode, + .deployFp = (SdbDeployFp)strDefault, + .encodeFp = (SdbEncodeFp)strEncode, + .decodeFp = (SdbDecodeFp)strDecode, + .insertFp = (SdbInsertFp)strInsert, + .updateFp = (SdbUpdateFp)strUpdate, + .deleteFp = (SdbDeleteFp)strDelete, + }; + + pSdb = sdbInit(&opt); + mnode.pSdb = pSdb; + + ASSERT_NE(pSdb, nullptr); + ASSERT_EQ(sdbSetTable(pSdb, strTable), 0); + ASSERT_EQ(sdbDeploy(pSdb), 0); +#if 0 + pObj = (SStrObj *)sdbAcquire(pSdb, SDB_USER, "k1000"); + ASSERT_NE(pObj, nullptr); + EXPECT_STREQ(pObj->key, "k1000"); + EXPECT_STREQ(pObj->vstr, "v1000"); + EXPECT_EQ(pObj->v8, 1); + EXPECT_EQ(pObj->v16, 1); + EXPECT_EQ(pObj->v32, 1000); + EXPECT_EQ(pObj->v64, 1000); + sdbRelease(pSdb, pObj); + + pObj = (SStrObj *)sdbAcquire(pSdb, SDB_USER, "k2000"); + ASSERT_NE(pObj, nullptr); + EXPECT_STREQ(pObj->key, "k2000"); + EXPECT_STREQ(pObj->vstr, "v2000"); + EXPECT_EQ(pObj->v8, 2); + EXPECT_EQ(pObj->v16, 2); + EXPECT_EQ(pObj->v32, 2000); + EXPECT_EQ(pObj->v64, 2000); + sdbRelease(pSdb, pObj); + + pObj = (SStrObj *)sdbAcquire(pSdb, SDB_USER, "k200"); + ASSERT_EQ(pObj, nullptr); + + pIter = NULL; + num = 0; + do { + pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pObj); + if (pIter == NULL) break; + ASSERT_NE(pObj, nullptr); + num++; + sdbRelease(pSdb, pObj); + } while (1); + EXPECT_EQ(num, 2); + + do { + pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pObj); + if (pIter == NULL) break; + if (strcmp(pObj->key, "k1000") == 0) { + sdbCancelFetch(pSdb, pIter); + break; + } + } while (1); + EXPECT_STREQ(pObj->key, "k1000"); + + p1 = 0; + p2 = 111; + p3 = 222; + sdbTraverse(pSdb, SDB_USER, (sdbTraverseFp)sdbTraverseSucc2, &p1, &p2, &p3); + EXPECT_EQ(p1, 334); + + p1 = 0; + p2 = 111; + p3 = 222; + sdbTraverse(pSdb, SDB_USER, (sdbTraverseFp)sdbTraverseSucc2, &p1, &p2, &p3); + EXPECT_EQ(p1, 669); + + p1 = 0; + p2 = 111; + p3 = 222; + sdbTraverse(pSdb, SDB_USER, (sdbTraverseFp)sdbTraverseFail, &p1, &p2, &p3); + EXPECT_EQ(p1, 333); + + EXPECT_EQ(sdbGetSize(pSdb, SDB_USER), 2); + EXPECT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); + EXPECT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2); + EXPECT_EQ(sdbUpdateVer(pSdb, 0), 2); + EXPECT_EQ(sdbUpdateVer(pSdb, 1), 3); + EXPECT_EQ(sdbUpdateVer(pSdb, -1), 2); + + // insert, call func + + // update, call func + + // delete, call func 2 + + // write version + + // sdb Write ver + + // sdbRead +#endif + ASSERT_EQ(sdbWriteFile(pSdb), 0); + sdbCleanup(pSdb); +} + +TEST_F(MndTestSdb, 01_Read) { + void *pIter; + int32_t num; + SStrObj *pObj; + SMnode mnode; + SSdb *pSdb; + SSdbOpt opt = {0}; + int32_t p1 = 0; + int32_t p2 = 111; + int32_t p3 = 222; + + mnode.v100 = 100; + mnode.v200 = 200; + opt.pMnode = &mnode; + opt.path = "/tmp/mnode_test_sdb"; + taosRemoveDir(opt.path); + + SSdbTable strTable = { + .sdbType = SDB_USER, + .keyType = SDB_KEY_BINARY, + .deployFp = (SdbDeployFp)strDefault, .encodeFp = (SdbEncodeFp)strEncode, .decodeFp = (SdbDecodeFp)strDecode, .insertFp = (SdbInsertFp)strInsert, @@ -150,7 +314,9 @@ TEST_F(MndTestSdb, 01_Basic) { .deleteFp = (SdbDeleteFp)strUpdate, }; - sdbSetTable(pSdb, strTable); + pSdb = sdbInit(&opt); + mnode.pSdb = pSdb; + ASSERT_EQ(sdbReadFile(pSdb), 0); sdbCleanup(pSdb); -} +} \ No newline at end of file diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index de71ac47b6..563fc72d00 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -52,6 +52,8 @@ typedef struct SSdbRow { const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); +void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); + #ifdef __cplusplus } #endif diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 46fa2ecd3a..748128e34e 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -72,8 +72,8 @@ void taosRemoveDir(const char *dirname) { while ((de = taosReadDir(pDir)) != NULL) { if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; - char filename[1024]; - snprintf(filename, sizeof(filename), "%s/%s", dirname, taosGetDirEntryName(de)); + char filename[1024] = {0}; + snprintf(filename, sizeof(filename), "%s%s%s", dirname, TD_DIRSEP, taosGetDirEntryName(de)); if (taosDirEntryIsDir(de)) { taosRemoveDir(filename); } else { From f0acbbb74bd917cfa149d8b6c4c3f6ee1e4d3dbe Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 28 Apr 2022 14:00:32 +0800 Subject: [PATCH 05/10] add flag back --- include/util/tdef.h | 1 + source/dnode/mnode/impl/src/mndDb.c | 4 ++-- source/dnode/mnode/impl/src/mndInfoSchema.c | 2 +- source/dnode/mnode/impl/src/mndPerfSchema.c | 1 + tests/script/jenkins/basic.txt | 2 +- 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index cf0c75e58f..b0520c9102 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -132,6 +132,7 @@ extern const int32_t TYPE_BYTES[15]; #define TSDB_PERFS_TABLE_CONSUMERS "consumers" #define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" #define TSDB_PERFS_TABLE_OFFSETS "offsets" +#define TSDB_PERFS_TABLE_STREAMS "streams" #define TSDB_INDEX_TYPE_SMA "SMA" #define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT" diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c784f46541..15ebcf02db 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1501,8 +1501,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.singleSTable, false); - /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/ - /*colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.streamMode, false);*/ + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.streamMode, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols); colDataAppend(pColInfo, rows, (const char *)b, false); diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 2df23ad38b..1e708ca321 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -89,7 +89,7 @@ static const SInfosTableSchema userDBSchema[] = { {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, - /*{.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},*/ + {.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update }; diff --git a/source/dnode/mnode/impl/src/mndPerfSchema.c b/source/dnode/mnode/impl/src/mndPerfSchema.c index ee17b28065..e5374330ad 100644 --- a/source/dnode/mnode/impl/src/mndPerfSchema.c +++ b/source/dnode/mnode/impl/src/mndPerfSchema.c @@ -95,6 +95,7 @@ static const SPerfsTableMeta perfsMeta[] = { {TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)}, {TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)}, {TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)}, + {TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)}, }; // connection/application/ diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index ba4296f9bf..a8e2c22d42 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -81,7 +81,7 @@ ./test.sh -f tsim/insert/backquote.sim -m ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m ./test.sh -f tsim/query/interval-offset.sim -m -#./test.sh -f tsim/tmq/basic1.sim -m +./test.sh -f tsim/tmq/basic3.sim -m ./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/qnode/basic1.sim -m ./test.sh -f tsim/mnode/basic1.sim -m From ba5123ad2423a72541924ec811c0dc91fcccdab4 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 28 Apr 2022 14:07:24 +0800 Subject: [PATCH 06/10] agg proc func parameter change --- include/libs/function/tudf.h | 2 +- source/libs/function/src/udfd.c | 2 +- source/libs/function/test/udf2.c | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 985bf6fa6f..e49f5cac45 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -140,7 +140,7 @@ typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); -typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf); +typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 896ebd3763..ae24a832c8 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -232,7 +232,7 @@ void udfdProcessRequest(uv_work_t *req) { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen= udf->bufSize, .numOfResult = 0}; - udf->aggProcFunc(&input, &outBuf); + udf->aggProcFunc(&input, &call->interBuf, &outBuf); subRsp->resultBuf = outBuf; break; diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index 250c20ba88..83187c5855 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -24,7 +24,7 @@ int32_t udf2_start(SUdfInterBuf *buf) { return 0; } -int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) { +int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int64_t sumSquares = *(int64_t*)interBuf->buf; for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t j = 0; j < block->numOfRows; ++i) { @@ -35,10 +35,10 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) { } } - *(int64_t*)interBuf = sumSquares; - interBuf->bufLen = sizeof(int64_t); + *(int64_t*)newInterBuf = sumSquares; + newInterBuf->bufLen = sizeof(int64_t); //TODO: if all null value, numOfResult = 0; - interBuf->numOfResult = 1; + newInterBuf->numOfResult = 1; return 0; } From d998e3e80483e601279506cd21790700ad4b8920 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 28 Apr 2022 14:20:32 +0800 Subject: [PATCH 07/10] fix: create stream --- include/common/tmsg.h | 5 +++-- source/client/src/tmq.c | 2 +- source/common/src/tmsg.c | 8 +++++--- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/libs/parser/src/parTranslater.c | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index da48846a8f..89f7cd6c39 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1281,8 +1281,9 @@ typedef struct { #define STREAM_TRIGGER_WINDOW_CLOSE 2 typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - char outputSTbName[TSDB_TABLE_FNAME_LEN]; + char name[TSDB_TABLE_FNAME_LEN]; + char sourceDB[TSDB_DB_FNAME_LEN]; + char targetStbFullName[TSDB_TABLE_FNAME_LEN]; int8_t igExists; char* sql; char* ast; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index b03947e2ca..a093b7449b 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -739,7 +739,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa .sql = (char*)sql, }; tNameExtractFullName(&name, req.name); - strcpy(req.outputSTbName, tbName); + strcpy(req.targetStbFullName, tbName); int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req); void* buf = taosMemoryMalloc(tlen); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index afab703e77..5400e6e08a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3569,10 +3569,12 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->outputSTbName) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; + if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1; + if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; @@ -3592,7 +3594,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->outputSTbName) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; @@ -3806,4 +3808,4 @@ int tDecodeSVCreateTbRsp(SCoder *pCoder, SVCreateTbRsp *pRsp) { tEndDecode(pCoder); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 058e11f397..db5c725f5e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -295,7 +295,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe SStreamObj streamObj = {0}; tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); - tstrncpy(streamObj.targetSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN); + tstrncpy(streamObj.targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 463764e713..9af6aa787f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2734,7 +2734,7 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* if ('\0' != pStmt->targetTabName[0]) { strcpy(name.dbname, pStmt->targetDbName); strcpy(name.tname, pStmt->targetTabName); - tNameExtractFullName(&name, createReq.outputSTbName); + tNameExtractFullName(&name, createReq.targetStbFullName); } int32_t code = translateQuery(pCxt, pStmt->pQuery); From 00509d0edfa394e589f605e67b0795de90dc8da2 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 28 Apr 2022 14:43:54 +0800 Subject: [PATCH 08/10] udaf integrate into function framework --- include/libs/function/function.h | 2 +- include/libs/function/functionMgt.h | 1 + source/libs/executor/src/executorimpl.c | 9 ++++++++- source/libs/function/src/functionMgt.c | 5 ++++- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 0580f3acba..094ce80106 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -207,7 +207,7 @@ typedef struct SqlFunctionCtx { struct SSDataBlock *pSrcBlock; int32_t curBufPage; - char* udfName[TSDB_FUNC_NAME_LEN]; + char udfName[TSDB_FUNC_NAME_LEN]; } SqlFunctionCtx; enum { diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 126a2ccf99..0572262bfc 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -162,6 +162,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); +int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f1995b723d..943a016c27 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1897,7 +1897,14 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; if (fmIsAggFunc(pCtx->functionId) || fmIsNonstandardSQLFunc(pCtx->functionId)) { - fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); + bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId); + if (!isUdaf) { + fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); + } else { + char *udfName = pExpr->pExpr->_function.pFunctNode->functionName; + strncpy(pCtx->udfName, udfName, strlen(udfName)); + fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet); + } pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); } else { fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 585eb57a56..0113da94eb 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -124,7 +124,10 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { return TSDB_CODE_SUCCESS; } -int32_t fmGetUdafExecFuncs(SFuncExecFuncs* pFpSet) { +int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { + if (!fmIsUserDefinedFunc(funcId)) { + return TSDB_CODE_FAILED; + } pFpSet->getEnv = udfAggGetEnv; pFpSet->init = udfAggInit; pFpSet->process = udfAggProcess; From 7df0fba51a56b442e7e34d3f71dd711ceb94ebb2 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 28 Apr 2022 15:17:40 +0800 Subject: [PATCH 09/10] [test: add test cases for taosshell] --- tests/system-test/0-others/taosShell.py | 86 ++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index 51b00bd66a..4e9a115500 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -57,12 +57,12 @@ def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key else: return "TAOS_FAIL" else: - if key == 'A' or key1 == 'A' or key == 'C' or key1 == 'C': + if key == 'A' or key1 == 'A' or key == 'C' or key1 == 'C' or key == 'V' or key1 == 'V': return "TAOS_OK", retResult else: return "TAOS_OK" else: - if key == 'A' or key1 == 'A' or key == 'C' or key1 == 'C': + if key == 'A' or key1 == 'A' or key == 'C' or key1 == 'C' or key == 'V' or key1 == 'V': return "TAOS_OK", retResult else: return "TAOS_FAIL" @@ -311,7 +311,7 @@ class TDTestCase: tdSql.query('drop database %s'%newDbName) tdLog.printNoPrefix("================================ parameter: -C") - newDbName="dbcc" + #newDbName="dbcc" retCode, retVal = taos_command(buildPath, "C", keyDict['C'], "buildinfo", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -C fail") @@ -336,6 +336,86 @@ class TDTestCase: if (totalCfgItem["numOfCores"][2] != count) and (totalCfgItem["numOfCores"][0] != 'default'): tdLog.exit("taos -C return numOfCores error!") + version = totalCfgItem["version"][2] + + tdLog.printNoPrefix("================================ parameter: -V") + #newDbName="dbvv" + retCode, retVal = taos_command(buildPath, "V", keyDict['V'], "", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -V fail") + + version = 'version: ' + version + retVal = retVal.replace("\n", "") + retVal = retVal.replace("\r", "") + if retVal != version: + print ("return version: [%s]"%retVal) + print ("dict version: [%s]"%version) + tdLog.exit("taos -V version not match") + + tdLog.printNoPrefix("================================ parameter: -d") + newDbName="dbd" + sqlString = 'create database ' + newDbName + ';' + retCode = taos_command(buildPath, "d", keyDict['d'], "taos>", keyDict['c'], sqlString, '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -d %s fail"%(keyDict['d'])) + else: + tdSql.query("show databases") + for i in range(tdSql.queryRows): + if tdSql.getData(i, 0) == newDbName: + break + else: + tdLog.exit("create db fail after taos -d %s fail"%(keyDict['d'])) + + tdSql.query('drop database %s'%newDbName) + + retCode = taos_command(buildPath, "d", 'dbno', "taos>", keyDict['c'], sqlString, '', '') + if retCode != "TAOS_FAIL": + tdLog.exit("taos -d dbno fail") + + tdLog.printNoPrefix("================================ parameter: -w") + newDbName="dbw" + keyDict['s'] = "\"create database " + newDbName + "\"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -w fail") + + keyDict['s'] = "\"create table " + newDbName + ".ntb (ts timestamp, c binary(128))\"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -w create table fail") + + keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.001', 'abcd0123456789')('2021-04-01 08:00:00.002', 'abcd012345678901234567890123456789') \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -w insert data fail") + + keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.003', 'aaaaaaaaaaaaaaaaaaaa')('2021-04-01 08:00:01.004', 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -w insert data fail") + + keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.005', 'cccccccccccccccccccc')('2021-04-01 08:00:01.006', 'dddddddddddddddddddddddddddddddddddddddd') \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -w insert data fail") + + keyDict['s'] = "\"select * from " + newDbName + ".ntb \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "aaaaaaaaaaaaaaaaaaaa", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -w insert data fail") + + keyDict['s'] = "\"select * from " + newDbName + ".ntb \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "dddddddddddddddddddddddddddddddddddddddd", keyDict['c'], '', '', '') + if retCode != "TAOS_FAIL": + tdLog.exit("taos -w insert data fail") + + keyDict['s'] = "\"select * from " + newDbName + ".ntb \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "dddddddddddddddddddddddddddddddddddddddd", keyDict['c'], '', 'w', '60') + if retCode != "TAOS_OK": + tdLog.exit("taos -w insert data fail") + + tdSql.query('drop database %s'%newDbName) + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") From 0260686faaa37ba2117b49e9358bd0aab72f28e8 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 28 Apr 2022 15:24:12 +0800 Subject: [PATCH 10/10] [test: add test case for db] --- tests/script/tsim/db/create_all_options.sim | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/script/tsim/db/create_all_options.sim b/tests/script/tsim/db/create_all_options.sim index ebb3716882..8a3826fe21 100644 --- a/tests/script/tsim/db/create_all_options.sim +++ b/tests/script/tsim/db/create_all_options.sim @@ -346,8 +346,8 @@ sql drop database db sql_error create database db PRECISION 'as' sql_error create database db PRECISION -1 -print ====> QUORUM value [1 | 2, default: 1] -#sql create database db QUORUM 2 +print ====> QUORUM value [1 | 2, default: 1] 3.0 not support this item +#sql_error create database db QUORUM 2 #sql show databases #print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db #if $data5_db != 2 then @@ -362,9 +362,11 @@ print ====> QUORUM value [1 | 2, default: 1] # return -1 #endi #sql drop database db -#sql_error create database db QUORUM 3 -#sql_error create database db QUORUM 0 -#sql_error create database db QUORUM -1 +sql_error create database db QUORUM 1 +sql_error create database db QUORUM 2 +sql_error create database db QUORUM 3 +sql_error create database db QUORUM 0 +sql_error create database db QUORUM -1 print ====> REPLICA value [1 | 3, default: 1] sql create database db REPLICA 3