From 527f075b1d30a1076b834bc769a1341fdc9696c8 Mon Sep 17 00:00:00 2001 From: tomchon Date: Mon, 16 May 2022 09:47:08 +0800 Subject: [PATCH 01/18] test:modify testcase that test qnode --- .../1-insert/insertWithMoreVgroup.py | 89 ++++++++++++------- tests/system-test/1-insert/manyVgroups.json | 2 +- 2 files changed, 57 insertions(+), 34 deletions(-) diff --git a/tests/system-test/1-insert/insertWithMoreVgroup.py b/tests/system-test/1-insert/insertWithMoreVgroup.py index d3da4f2c59..6759ece33e 100644 --- a/tests/system-test/1-insert/insertWithMoreVgroup.py +++ b/tests/system-test/1-insert/insertWithMoreVgroup.py @@ -30,7 +30,10 @@ class TDTestCase: # # --------------- main frame ------------------- # - + clientCfgDict = {'queryproxy': '1'} + clientCfgDict["queryproxy"] = '2' + updatecfgDict = {'clientCfg': {}} + updatecfgDict["clientCfg"] = clientCfgDict def caseDescription(self): ''' limit and offset keyword function test cases; @@ -75,8 +78,13 @@ class TDTestCase: # self.test_case2() # tdLog.debug(" LIMIT test_case2 ............ [OK]") - # test case - self.test_case3() + # # test case + # self.test_case3() + # tdLog.debug(" LIMIT test_case3 ............ [OK]") + + + # test qnode + self.test_case4() tdLog.debug(" LIMIT test_case3 ............ [OK]") @@ -199,16 +207,10 @@ class TDTestCase: os.system("%s -f %s -y " %(taosBenchbin,jsonFile)) return - def taosBenchCreate(self,host,dropdb,dbname,stbname,vgroups,threadNumbers,count): + def taosBenchCreate(self,host,dropdb,dbname,stbname,vgroups,processNumbers,count): # count=50000 buildPath = self.getBuildPath() - if (buildPath == ""): - tdLog.exit("taosd not found!") - else: - tdLog.info("taosd found in %s" % buildPath) - taosBenchbin = buildPath+ "/build/bin/taosBenchmark" - buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) @@ -222,8 +224,7 @@ class TDTestCase: tsql.execute("use %s" %dbname) threads = [] - # threadNumbers=2 - for i in range(threadNumbers): + for i in range(processNumbers): jsonfile="1-insert/Vgroups%d%d.json"%(vgroups,i) os.system("cp -f 1-insert/manyVgroups.json %s"%(jsonfile)) os.system("sed -i 's/\"name\": \"db\",/\"name\": \"%s\",/g' %s"%(dbname,jsonfile)) @@ -247,29 +248,8 @@ class TDTestCase: # test case1 base def test_case1(self): tdLog.debug("-----create database and tables test------- ") - tdSql.execute("drop database if exists db1") - tdSql.execute("drop database if exists db4") - tdSql.execute("drop database if exists db6") - tdSql.execute("drop database if exists db8") - tdSql.execute("drop database if exists db12") - tdSql.execute("drop database if exists db16") #create database and tables; - - # tdSql.execute("create database db11 vgroups 1") - # # self.create_tables("db1", "stb1", 30*10000) - # tdSql.execute("use db1") - # tdSql.execute("create stable stb1(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)") - - # tdSql.execute("create database db12 vgroups 1") - # # self.create_tables("db1", "stb1", 30*10000) - # tdSql.execute("use db1") - - # t1 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(1,)) - # t2 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(2,)) - # t1 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", 0,count/2,)) - # t2 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", count/2,count,)) - count=50000 vgroups=1 threads = [] @@ -356,6 +336,49 @@ class TDTestCase: return + def test_case4(self): + self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10) + tdSql.execute("use db1;") + tdSql.query("show dnodes;") + dnodeId=tdSql.getData(0,0) + print(dnodeId) + tdSql.execute("create qnode on dnode %s"%dnodeId) + tdSql.query("select max(c1) from stb10;") + maxQnode=tdSql.getData(0,0) + tdSql.query("select min(c1) from stb11;") + minQnode=tdSql.getData(0,0) + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + unionQnode=tdSql.queryResult + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") + unionallQnode=tdSql.queryResult + + # tdSql.query("show qnodes;") + # qnodeId=tdSql.getData(0,0) + tdSql.execute("drop qnode on dnode %s"%dnodeId) + tdSql.execute("reset query cache") + tdSql.query("select max(c1) from stb10;") + tdSql.checkData(0, 0, "%s"%maxQnode) + tdSql.query("select min(c1) from stb11;") + tdSql.checkData(0, 0, "%s"%minQnode) + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + unionVnode=tdSql.queryResult + assert unionQnode == unionVnode + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") + unionallVnode=tdSql.queryResult + assert unionallQnode == unionallVnode + + + # tdSql.execute("create qnode on dnode %s"%dnodeId) + + + # self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000) + + # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000) + + # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000) + # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000) + + return # # add case with filename # diff --git a/tests/system-test/1-insert/manyVgroups.json b/tests/system-test/1-insert/manyVgroups.json index e6719aedc9..1c9aa1f28c 100644 --- a/tests/system-test/1-insert/manyVgroups.json +++ b/tests/system-test/1-insert/manyVgroups.json @@ -11,7 +11,7 @@ "confirm_parameter_prompt": "no", "insert_interval": 0, "interlace_rows": 100000, - "num_of_records_per_req": 100000, + "num_of_records_per_req": 100, "databases": [ { "dbinfo": { From 6acc51dbb517a4b341716da3c50a4e15235da683 Mon Sep 17 00:00:00 2001 From: tomchon Date: Tue, 17 May 2022 20:28:46 +0800 Subject: [PATCH 02/18] modify testcase of threeReplica --- .../sync/threeReplica1VgElectWihtInsert.sim | 70 +++++++++---------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim b/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim index 1e12e8565f..f6996f1291 100644 --- a/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim +++ b/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim @@ -31,7 +31,7 @@ if $data[0][4] != ready then goto check_dnode_ready endi -#sql connect +sql connect sql create dnode $hostname port 7200 sql create dnode $hostname port 7300 sql create dnode $hostname port 7400 @@ -83,7 +83,7 @@ print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $dat if $rows != 3 then return -1 endi -if $data(db)[19] != ready then +if $data(db)[19] != nostrict then goto check_db_ready endi @@ -93,49 +93,48 @@ $loop_cnt = 0 check_vg_ready: $loop_cnt = $loop_cnt + 1 sleep 200 -if $loop_cnt == 10 then +if $loop_cnt == 40 then print ====> vgroups not ready! return -1 endi sql show vgroups print ===> rows: $rows -print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13] print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] if $rows != $vgroups then return -1 endi if $data[0][4] == LEADER then - if $data[0][6] != FLLOWER then + if $data[0][6] != FOLLOWER then goto check_vg_ready endi - if $data[0][8] != FLLOWER then + if $data[0][8] != FOLLOWER then goto check_vg_ready endi print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] - goto vg_ready -endi -if $data[0][6] == LEADER then - if $data[0][4] != FLLOWER then + goto vg_ready +elif $data[0][6] == LEADER then + if $data[0][4] != FOLLOWER then goto check_vg_ready endi - if $data[0][8] != FLLOWER then + if $data[0][8] != FOLLOWER then goto check_vg_ready endi print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] - goto vg_ready -endi -if $data[0][8] == LEADER then - if $data[0][4] != FLLOWER then + goto vg_ready +elif $data[0][8] == LEADER then + if $data[0][4] != FOLLOWER then goto check_vg_ready endi - if $data[0][6] != FLLOWER then + if $data[0][6] != FOLLOWER then goto check_vg_ready endi print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] - goto vg_ready + goto vg_ready +else + goto check_vg_ready endi -vg_ready: +vg_ready: print ====> create stable/child table sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) @@ -185,7 +184,7 @@ print ====> create a normal table for interaction between main and back threads sql create table interaction (ts timestamp, flag binary(10), childrows int, stbrows int) print ====> start to run_back to insert data -run_back tsim/tmq/insertDataByRunBack.sim +run_back tsim/sync/insertDataByRunBack.sim print ====> waiting insert thread starting insert data @@ -239,34 +238,34 @@ if $rows != $vgroups then return -1 endi if $data[0][4] == LEADER then - if $data[0][6] != FLLOWER then + if $data[0][6] != FOLLOWER then goto check_vg_ready_2 endi - if $data[0][8] != FLLOWER then + if $data[0][8] != FOLLOWER then goto check_vg_ready_2 endi print ---- vgroup $data[0][0] leader switch to dnode $data[0][3] goto vg_ready_2 -endi -if $data[0][6] == LEADER then - if $data[0][4] != FLLOWER then +elif $data[0][6] == LEADER then + if $data[0][4] != FOLLOWER then goto check_vg_ready_2 endi - if $data[0][8] != FLLOWER then + if $data[0][8] != FOLLOWER then goto check_vg_ready_2 endi print ---- vgroup $data[0][0] leader switch to dnode $data[0][5] goto vg_ready_2 -endi -if $data[0][8] == LEADER then - if $data[0][4] != FLLOWER then +elif $data[0][8] == LEADER then + if $data[0][4] != FOLLOWER then goto check_vg_ready_2 endi - if $data[0][6] != FLLOWER then + if $data[0][6] != FOLLOWER then goto check_vg_ready_2 endi print ---- vgroup $data[0][0] leader switch to dnode $data[0][7] goto vg_ready_2 +else + goto check_vg_ready_2 endi vg_ready_2: @@ -344,28 +343,28 @@ if $rows != $vgroups then return -1 endi if $data[0][4] == LEADER then - if $data[0][6] != FLLOWER then + if $data[0][6] != FOLLOWER then goto check_vg_ready_1 endi - if $data[0][8] != FLLOWER then + if $data[0][8] != FOLLOWER then goto check_vg_ready_1 endi goto vg_ready_1 endi if $data[0][6] == LEADER then - if $data[0][4] != FLLOWER then + if $data[0][4] != FOLLOWER then goto check_vg_ready_1 endi - if $data[0][8] != FLLOWER then + if $data[0][8] != FOLLOWER then goto check_vg_ready_1 endi goto vg_ready_1 endi if $data[0][8] == LEADER then - if $data[0][4] != FLLOWER then + if $data[0][4] != FOLLOWER then goto check_vg_ready_1 endi - if $data[0][6] != FLLOWER then + if $data[0][6] != FOLLOWER then goto check_vg_ready_1 endi goto vg_ready_1 @@ -420,7 +419,6 @@ elif $data[0][6] == LEADER then goto check_vg_ready_3 endi print ---- vgroup $data[0][0] leader locating dnode $data[0][7] -endi elif $data[0][8] == LEADER then if $data[0][4] == LEADER then goto check_vg_ready_3 From 47e6e95866f760e9787a51fad36c7fe846822470 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 May 2022 20:39:00 +0800 Subject: [PATCH 03/18] enh(query): record and return the schema version and tags version of current queried table. --- include/libs/executor/executor.h | 9 ++++++ source/libs/executor/inc/executorimpl.h | 10 ++++++- source/libs/executor/src/executor.c | 13 ++++++++ source/libs/executor/src/executorimpl.c | 40 +++++++++++++++++++++---- source/libs/executor/src/scanoperator.c | 2 +- 5 files changed, 67 insertions(+), 7 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 8f300c96c5..9cafb4ee04 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -95,6 +95,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model); +/** + * + * @param tinfo + * @param sversion + * @param tversion + * @return + */ +int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion); + /** * The main task execution function, including query on both table and multiple tables, * which are decided according to the tag or table name query conditions diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bf178612ba..89bbb1f48c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -217,6 +217,13 @@ typedef struct SExecTaskInfo { int64_t owner; // if it is in execution int32_t code; uint64_t totalRows; // total number of rows + struct { + char *tablename; + char *dbname; + int32_t sversion; + int32_t tversion; + } schemaVer; + STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure char* sql; // query sql string jmp_buf env; // jump to this position when error happens. @@ -670,7 +677,8 @@ SSDataBlock* loadNextDataBlock(void* param); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, - int32_t type); + SExecTaskInfo* pTaskInfo, int32_t type); + SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 66073b70eb..314ed97cfc 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,6 +14,7 @@ */ #include "executor.h" +#include #include #include "executorimpl.h" #include "planner.h" @@ -171,3 +172,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo return TSDB_CODE_SUCCESS; } + +int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) { + ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; + + *sversion = pTaskInfo->schemaVer.sversion; + *tversion = pTaskInfo->schemaVer.tversion; + strcpy(dbName, pTaskInfo->schemaVer.dbname); + strcpy(tableName, pTaskInfo->schemaVer.tablename); + + return 0; +} \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d0a1840d72..bd93284da2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -4653,6 +4654,26 @@ static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); +void extractTableSchemaVersion(SReadHandle *pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pHandle->meta, 0); + metaGetTableEntryByUid(&mr, uid); + + pTaskInfo->schemaVer.tablename = strdup(mr.me.name); + + if (mr.me.type == TSDB_SUPER_TABLE) { + pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver; + pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver; + } else if (mr.me.type == TSDB_CHILD_TABLE) { + tb_uid_t suid = mr.me.ctbEntry.suid; + metaGetTableEntryByUid(&mr, suid); + pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver; + pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver; + } else { + pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schema.sver; + } +} + SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { int32_t type = nodeType(pPhyNode); @@ -4666,6 +4687,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } + extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; @@ -4703,7 +4725,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createResDataBlock(pDescNode); - SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy); @@ -4718,7 +4740,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pDescNode); int32_t numOfOutputCols = 0; - SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID); + SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = createSysTableScanOperatorInfo( pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); @@ -4741,7 +4763,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfOutputCols = 0; SArray* colList = - extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID); + extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo); @@ -4823,7 +4845,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; - SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { @@ -5014,7 +5036,7 @@ SArray* createSortInfo(SNodeList* pNodeList) { } SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, - int32_t type) { + SExecTaskInfo* pTaskInfo, int32_t type) { size_t numOfCols = LIST_LENGTH(pNodeList); SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); if (pList == NULL) { @@ -5022,10 +5044,16 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod return NULL; } + const char* tname = pTaskInfo->schemaVer.tablename; for (int32_t i = 0; i < numOfCols; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + if (tname != NULL && (pTaskInfo->schemaVer.dbname == NULL) && + strncmp(pColNode->tableName, tname, tListLen(pColNode->tableName)) == 0) { + pTaskInfo->schemaVer.dbname = strdup(pColNode->dbName); + } + SColMatchInfo c = {0}; c.output = true; c.colId = pColNode->colId; @@ -5219,6 +5247,8 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { // taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosHashCleanup(pTaskInfo->summary.operatorProfResults); + taosMemoryFree(pTaskInfo->schemaVer.dbname); + taosMemoryFree(pTaskInfo->schemaVer.tablename); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d042e463f0..470c9d6569 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -455,7 +455,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { From 003963f8bb7846993f5415f447b71567a2a0e701 Mon Sep 17 00:00:00 2001 From: tomchon Date: Thu, 19 May 2022 01:25:49 +0800 Subject: [PATCH 04/18] test:modify testcase of multi-Thread creating tables --- .../1-insert/insertWithMoreVgroup.py | 238 +++++++++--------- 1 file changed, 122 insertions(+), 116 deletions(-) diff --git a/tests/system-test/1-insert/insertWithMoreVgroup.py b/tests/system-test/1-insert/insertWithMoreVgroup.py index 6759ece33e..d8720e8045 100644 --- a/tests/system-test/1-insert/insertWithMoreVgroup.py +++ b/tests/system-test/1-insert/insertWithMoreVgroup.py @@ -13,7 +13,7 @@ import sys import os -import threading +import threading as thd import multiprocessing as mp from numpy.lib.function_base import insert import taos @@ -66,58 +66,13 @@ class TDTestCase: # self.create_tables(); self.ts = 1500000000000 - - # run case - def run(self): - - # # test base case - # self.test_case1() - # tdLog.debug(" LIMIT test_case1 ............ [OK]") - - # test case - # self.test_case2() - # tdLog.debug(" LIMIT test_case2 ............ [OK]") - - # # test case - # self.test_case3() - # tdLog.debug(" LIMIT test_case3 ............ [OK]") - - - # test qnode - self.test_case4() - tdLog.debug(" LIMIT test_case3 ............ [OK]") - - # stop def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) - # --------------- case ------------------- - # create tables - def create_tables(self,dbname,stbname,count): - tdSql.execute("use %s" %dbname) - tdSql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname) - pre_create = "create table" - sql = pre_create - tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) - # print(time.time()) - exeStartTime=time.time() - for i in range(count): - sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1) - if i >0 and i%3000 == 0: - tdSql.execute(sql) - sql = pre_create - # print(time.time()) - # end sql - if sql != pre_create: - tdSql.execute(sql) - exeEndTime=time.time() - spendTime=exeEndTime-exeStartTime - speedCreate=count/spendTime - tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) - return + # --------------- case ------------------- def newcur(self,host,cfg): user = "root" @@ -128,28 +83,23 @@ class TDTestCase: print(cur) return cur - def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop): - host = "localhost" + # create tables + def create_tables(self,host,dbname,stbname,count): buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) - tsql.execute("drop database if exists %s"%dbname) - tsql.execute("create database %s vgroups %d"%(dbname,vgroups)) tsql.execute("use %s" %dbname) - tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname) pre_create = "create table" sql = pre_create - tcountStop=int(tcountStop) - tcountStart=int(tcountStart) - count=tcountStop-tcountStart + count=int(count) tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) # print(time.time()) exeStartTime=time.time() # print(type(tcountStop),type(tcountStart)) - for i in range(tcountStart,tcountStop): + for i in range(0,count): sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1) if i >0 and i%20000 == 0: # print(sql) @@ -166,11 +116,78 @@ class TDTestCase: # tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) return + def mutiThread_create_tables(self,host,dbname,stbname,vgroups,threadNumbers,count): + buildPath = self.getBuildPath() + config = buildPath+ "../sim/dnode1/cfg/" + + tsql=self.newcur(host,config) + tdLog.debug("create database %s"%dbname) + tsql.execute("drop database if exists %s"%dbname) + tsql.execute("create database %s vgroups %d"%(dbname,vgroups)) + tsql.execute("use %s" %dbname) + count=int(count) + threads = [] + for i in range(threadNumbers): + tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i)) + threads.append(thd.Thread(target=self.create_tables, args=(host, dbname, stbname+"%d"%i, count,))) + start_time = time.time() + for tr in threads: + tr.start() + for tr in threads: + tr.join() + end_time = time.time() + spendTime=end_time-start_time + speedCreate=count/spendTime + tdLog.debug("spent %.2fs to create %d stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,threadNumbers,threadNumbers*count,speedCreate)) + + return + + # def create_tables(self,host,dbname,stbname,vgroups,tcountStart,tcountStop): # insert data - def insert_data(self, dbname, stbname, ts_start, tcountStart,tcountStop,rowCount): - tdSql.execute("use %s" %dbname) + def insert_data(self, host, dbname, stbname, ts_start,rowCount): + buildPath = self.getBuildPath() + config = buildPath+ "../sim/dnode1/cfg/" + + tsql=self.newcur(host,config) + tdLog.debug("ready to inser data") + + tsql.execute("use %s" %dbname) + pre_insert = "insert into " + sql = pre_insert + tcount=int(tcount) + allRows=tcount*rowCount + tdLog.debug("doing insert data into stable-index:%s rows:%d ..."%(stbname, allRows)) + exeStartTime=time.time() + for i in range(0,tcount): + sql += " %s_%d values "%(stbname,i) + for j in range(rowCount): + sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j) + if j >0 and j%5000 == 0: + # print(sql) + tdSql.execute(sql) + sql = "insert into %s_%d values " %(stbname,i) + # end sql + if sql != pre_insert: + # print(sql) + tdSql.execute(sql) + exeEndTime=time.time() + spendTime=exeEndTime-exeStartTime + speedInsert=allRows/spendTime + # tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert)) + + tdLog.debug("INSERT TABLE DATA ............ [OK]") + return + + def mutiThread_insert_data(self, host, dbname, stbname, threadNumbers, ts_start, tcountStart,tcountStop,rowCount): + buildPath = self.getBuildPath() + config = buildPath+ "../sim/dnode1/cfg/" + + tsql=self.newcur(host,config) + tdLog.debug("ready to inser data") + + tsql.execute("use %s" %dbname) pre_insert = "insert into " sql = pre_insert tcount=tcountStop-tcountStart @@ -195,8 +212,30 @@ class TDTestCase: # tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert)) tdLog.debug("INSERT TABLE DATA ............ [OK]") + + + buildPath = self.getBuildPath() + config = buildPath+ "../sim/dnode1/cfg/" + + tsql=self.newcur(host,config) + tsql.execute("use %s" %dbname) + count=int(count) + threads = [] + for i in range(threadNumbers): + tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i)) + threads.append(thd.Thread(target=self.create_tables, args=(host, dbname, stbname+"%d"%i, count,))) + start_time = time.time() + for tr in threads: + tr.start() + for tr in threads: + tr.join() + end_time = time.time() + spendTime=end_time-start_time + speedCreate=count/spendTime + tdLog.debug("spent %.2fs to create %d stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,threadNumbers,threadNumbers*count,speedCreate)) return + def taosBench(self,jsonFile): buildPath = self.getBuildPath() if (buildPath == ""): @@ -247,47 +286,15 @@ class TDTestCase: return # test case1 base def test_case1(self): - tdLog.debug("-----create database and tables test------- ") - - #create database and tables; - count=50000 - vgroups=1 - threads = [] - threadNumbers=2 - for i in range(threadNumbers): - threads.append(mp.Process(target=self.new_create_tables, args=("db1%d"%i, vgroups, "stb1", 0,count,))) - start_time = time.time() - for tr in threads: - tr.start() - for tr in threads: - tr.join() - end_time = time.time() - spendTime=end_time-start_time - speedCreate=count/spendTime - tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) - # self.new_create_tables("db1", "stb1", 15*10000) - # self.new_create_tables("db1", "stb1", 15*10000) - - # tdSql.execute("create database db4 vgroups 4") - # self.create_tables("db4", "stb4", 30*10000) - - # tdSql.execute("create database db6 vgroups 6") - # self.create_tables("db6", "stb6", 30*10000) - - # tdSql.execute("create database db8 vgroups 8") - # self.create_tables("db8", "stb8", 30*10000) - - # tdSql.execute("create database db12 vgroups 12") - # self.create_tables("db12", "stb12", 30*10000) - - # tdSql.execute("create database db16 vgroups 16") - # self.create_tables("db16", "stb16", 30*10000) + tdLog.debug("-----create database and muti-thread create tables test------- ") + #host,dbname,stbname,vgroups,threadNumbers,tcountStart,tcountStop + self.mutiThread_create_tables(host="localhost",dbname="db2",stbname="stb2", vgroups=1, threadNumbers=5, count=10000) return # test case2 base:insert data def test_case2(self): - tdLog.debug("-----insert data test------- ") + tdLog.debug("-----muti-thread insert data test------- ") # drop database tdSql.execute("drop database if exists db1") tdSql.execute("drop database if exists db4") @@ -301,28 +308,6 @@ class TDTestCase: tdSql.execute("create database db1 vgroups 1") self.create_tables("db1", "stb1", 1*100) self.insert_data("db1", "stb1", self.ts, 1*50,1*10000) - - - tdSql.execute("create database db4 vgroups 4") - self.create_tables("db4", "stb4", 1*100) - self.insert_data("db4", "stb4", self.ts, 1*100,1*10000) - - tdSql.execute("create database db6 vgroups 6") - self.create_tables("db6", "stb6", 1*100) - self.insert_data("db6", "stb6", self.ts, 1*100,1*10000) - - tdSql.execute("create database db8 vgroups 8") - self.create_tables("db8", "stb8", 1*100) - self.insert_data("db8", "stb8", self.ts, 1*100,1*10000) - - tdSql.execute("create database db12 vgroups 12") - self.create_tables("db12", "stb12", 1*100) - self.insert_data("db12", "stb12", self.ts, 1*100,1*10000) - - tdSql.execute("create database db16 vgroups 16") - self.create_tables("db16", "stb16", 1*100) - self.insert_data("db16", "stb16", self.ts, 1*100,1*10000) - return def test_case3(self): @@ -378,6 +363,27 @@ class TDTestCase: # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000) # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000) + # run case + def run(self): + + # # test base case + # self.test_case1() + # tdLog.debug(" LIMIT test_case1 ............ [OK]") + + # test case + # self.test_case2() + # tdLog.debug(" LIMIT test_case2 ............ [OK]") + + # test case + self.test_case3() + tdLog.debug(" LIMIT test_case3 ............ [OK]") + + + # # test qnode + # self.test_case4() + # tdLog.debug(" LIMIT test_case3 ............ [OK]") + + return # # add case with filename From f81f0978434efead48baf05c06dffd7e993db8bd Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 19 May 2022 14:31:59 +0800 Subject: [PATCH 05/18] fix(os): file path format error --- source/libs/tfs/src/tfs.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 18a3a28bab..0054a94b80 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -160,7 +160,12 @@ bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) { if (pFile1 == NULL || pFile2 == NULL || pFile1->pTfs != pFile2->pTfs) return false; if (pFile1->did.level != pFile2->did.level) return false; if (pFile1->did.id != pFile2->did.id) return false; - if (strncmp(pFile1->rname, pFile2->rname, TSDB_FILENAME_LEN) != 0) return false; + char nameBuf1[TMPNAME_LEN], nameBuf2[TMPNAME_LEN]; + memset(nameBuf1, 0, TMPNAME_LEN); + memset(nameBuf2, 0, TMPNAME_LEN); + taosRealPath(pFile1->rname, nameBuf1, TMPNAME_LEN); + taosRealPath(pFile2->rname, nameBuf2, TMPNAME_LEN); + if (strncmp(nameBuf1, nameBuf2, TMPNAME_LEN) != 0) return false; return true; } From 5d1b09be03ccd83c4bf8d4cee67a857ada13d46c Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 19 May 2022 14:41:42 +0800 Subject: [PATCH 06/18] fix(os): file path format error --- source/libs/tfs/src/tfs.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 0054a94b80..92beeffa0c 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -161,10 +161,12 @@ bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) { if (pFile1->did.level != pFile2->did.level) return false; if (pFile1->did.id != pFile2->did.id) return false; char nameBuf1[TMPNAME_LEN], nameBuf2[TMPNAME_LEN]; - memset(nameBuf1, 0, TMPNAME_LEN); - memset(nameBuf2, 0, TMPNAME_LEN); - taosRealPath(pFile1->rname, nameBuf1, TMPNAME_LEN); - taosRealPath(pFile2->rname, nameBuf2, TMPNAME_LEN); + strncpy(nameBuf1, pFile1->rname, TMPNAME_LEN); + strncpy(nameBuf2, pFile2->rname, TMPNAME_LEN); + nameBuf1[TMPNAME_LEN - 1] = 0; + nameBuf2[TMPNAME_LEN - 1] = 0; + taosRealPath(nameBuf1, NULL, TMPNAME_LEN); + taosRealPath(nameBuf2, NULL, TMPNAME_LEN); if (strncmp(nameBuf1, nameBuf2, TMPNAME_LEN) != 0) return false; return true; } From 568c6269dcbb57483f47a400d576b3934d76d7f8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 14:59:55 +0800 Subject: [PATCH 07/18] fix: acquire vnode on restart may deadlock --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 8c3b8576a8..41c0b3086b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -334,19 +334,23 @@ static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) { } static int32_t vmStart(SVnodeMgmt *pMgmt) { - taosRLockLatch(&pMgmt->latch); + int32_t numOfVnodes = 0; + SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); - void *pIter = taosHashIterate(pMgmt->hash, NULL); - while (pIter) { - SVnodeObj **ppVnode = pIter; - if (ppVnode == NULL || *ppVnode == NULL) continue; - - SVnodeObj *pVnode = *ppVnode; + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; vnodeStart(pVnode->pImpl); - pIter = taosHashIterate(pMgmt->hash, pIter); } - taosRUnLockLatch(&pMgmt->latch); + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; + vmReleaseVnode(pMgmt, pVnode); + } + + if (pVnodes != NULL) { + taosMemoryFree(pVnodes); + } + return 0; } From c5d18b5afd0af45ac0cad023f3e3c496a55c86ac Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 15:08:32 +0800 Subject: [PATCH 08/18] refactor: changel lockfree to rwlock --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 30 ++++++++++----------- source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 4 +-- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 4 +-- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 19 ++++++------- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 7fc10c4237..022d6204ef 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -26,21 +26,21 @@ extern "C" { #endif typedef struct SVnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - SQWorkerPool queryPool; - SQWorkerPool fetchPool; - SWWorkerPool syncPool; - SWWorkerPool writePool; - SWWorkerPool mergePool; - SSingleWorker mgmtWorker; - SSingleWorker monitorWorker; - SHashObj *hash; - SRWLatch latch; - SVnodesStat state; - STfs *pTfs; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + SQWorkerPool queryPool; + SQWorkerPool fetchPool; + SWWorkerPool syncPool; + SWWorkerPool writePool; + SWWorkerPool mergePool; + SSingleWorker mgmtWorker; + SSingleWorker monitorWorker; + SHashObj *hash; + TdThreadRwlock lock; + SVnodesStat state; + STfs *pTfs; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 7a6c5f982e..cf5a7ad885 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -17,7 +17,7 @@ #include "vmInt.h" SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); int32_t num = 0; int32_t size = taosHashGetSize(pMgmt->hash); @@ -38,7 +38,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { } } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); *numOfVnodes = num; return pVnodes; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a4da6d089c..602922feeb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -20,7 +20,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { @@ -34,7 +34,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pIter = taosHashIterate(pMgmt->hash, pIter); } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); } void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 41c0b3086b..0c8d492ef4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -20,14 +20,14 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; } else { refCount = atomic_add_fetch_32(&pVnode->refCount, 1); } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); if (pVnode != NULL) { dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); @@ -39,9 +39,9 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { if (pVnode == NULL) return; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } @@ -70,9 +70,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - taosWLockLatch(&pMgmt->latch); + taosThreadRwlockWrlock(&pMgmt->lock); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); - taosWUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); return code; } @@ -80,9 +80,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { char path[TSDB_FILENAME_LEN] = {0}; - taosWLockLatch(&pMgmt->latch); + taosThreadRwlockWrlock(&pMgmt->lock); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); - taosWUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); while (pVnode->refCount > 0) taosMsleep(10); @@ -239,6 +239,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { vmStopWorker(pMgmt); vnodeCleanup(); tfsClose(pMgmt->pTfs); + taosThreadRwlockDestroy(&pMgmt->lock); taosMemoryFree(pMgmt); } @@ -260,7 +261,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.mgmt = pMgmt; - taosInitRWLatch(&pMgmt->latch); + taosThreadRwlockInit(&pMgmt->lock, NULL); SDiskCfg dCfg = {0}; tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN); From 36229739ff8db69042b2db6f2a4f3b35902c3d1c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 19 May 2022 15:09:17 +0800 Subject: [PATCH 09/18] enh(stream): trigger after processed by tsdb --- source/dnode/vnode/src/vnd/vnodeSvr.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5c8cd362fd..bcce8326c9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -62,11 +62,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - switch (pMsg->msgType) { /* META */ case TDMT_VND_CREATE_STB: @@ -123,6 +118,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg break; } + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { + vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } + vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); // commit if need From 3c4b91a796fd14019e568a791bdd0831ff7f33e9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 19 May 2022 07:15:00 +0000 Subject: [PATCH 10/18] fix: tdb concurrent w/r --- source/libs/tdb/src/db/tdbPager.c | 3 +- source/libs/tdb/test/tdbTest.cpp | 127 ++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 6b5a3af347..4024cfe745 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -214,6 +214,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { } } + pPager->dbOrigSize = pPager->dbFileSize; + // release the page for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) { pPager->pDirty = pPage->pDirtyNext; @@ -230,7 +232,6 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { // remote the journal file tdbOsClose(pPager->jfd); tdbOsRemove(pPager->jFileName); - pPager->dbOrigSize = pPager->dbFileSize; pPager->inTran = 0; return 0; diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index a161f1c589..24ef8c0c83 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -1,9 +1,12 @@ #include +#define ALLOW_FORBID_FUNC #include "os.h" #include "tdb.h" #include +#include +#include typedef struct SPoolMem { int64_t size; @@ -480,4 +483,128 @@ TEST(tdb_test, simple_upsert1) { tdbTbClose(pDb); tdbClose(pEnv); +} + +TEST(tdb_test, multi_thread_query) { + int ret; + TDB *pEnv; + TTB *pDb; + tdb_cmpr_fn_t compFunc; + int nData = 20000; + TXN txn; + + taosRemoveDir("tdb"); + + // Open Env + ret = tdbOpen("tdb", 512, 1, &pEnv); + GTEST_ASSERT_EQ(ret, 0); + + // Create a database + compFunc = tKeyCmpr; + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + GTEST_ASSERT_EQ(ret, 0); + + char key[64]; + char val[64]; + int64_t poolLimit = 4096; // 1M pool limit + int64_t txnid = 0; + SPoolMem *pPool; + + // open the pool + pPool = openPool(); + + // start a transaction + txnid++; + txn = {.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED, + .txnId = -1, + .xMalloc = poolMalloc, + .xFree = poolFree, + .xArg = pPool}; + // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, ); + tdbBegin(pEnv, &txn); + + for (int iData = 1; iData <= nData; iData++) { + sprintf(key, "key%d", iData); + sprintf(val, "value%d", iData); + ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + GTEST_ASSERT_EQ(ret, 0); + + // if pool is full, commit the transaction and start a new one + // if (pPool->size >= poolLimit) { + // break; + // // commit current transaction + // tdbCommit(pEnv, &txn); + // tdbTxnClose(&txn); + + // // start a new transaction + // clearPool(pPool); + // txnid++; + // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + // tdbBegin(pEnv, &txn); + // } + } + + auto f = [](TTB *pDb, int nData) { + TBC *pDBC; + void *pKey = NULL; + void *pVal = NULL; + int vLen, kLen; + int count = 0; + int ret; + TXN txn; + + SPoolMem *pPool = openPool(); + txn = {.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool}; + + ret = tdbTbcOpen(pDb, &pDBC, &txn); + GTEST_ASSERT_EQ(ret, 0); + + tdbTbcMoveToFirst(pDBC); + + for (;;) { + ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen); + if (ret < 0) break; + + // std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " "; + // std::cout.write((char *)pVal, vLen) /* << " " << vLen */; + // std::cout << std::endl; + + count++; + } + + GTEST_ASSERT_EQ(count, nData); + + tdbTbcClose(pDBC); + + tdbFree(pKey); + tdbFree(pVal); + }; + + // tdbCommit(pEnv, &txn); + + // multi-thread query + int nThreads = 20; + std::vector threads; + for (int i = 0; i < nThreads; i++) { + if (i == 0) { + threads.push_back(std::thread(tdbCommit, pEnv, &txn)); + } else { + threads.push_back(std::thread(f, pDb, nData)); + } + } + + for (auto &th : threads) { + th.join(); + } + + // commit the transaction + tdbCommit(pEnv, &txn); + tdbTxnClose(&txn); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); } \ No newline at end of file From c6f23145f3be3c72ca700fab54c5a41e8d6374f4 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 19 May 2022 15:18:18 +0800 Subject: [PATCH 11/18] fix(os): file path format error --- source/dnode/vnode/src/meta/metaOpen.c | 1 + source/dnode/vnode/src/tsdb/tsdbOpen.c | 1 + source/dnode/vnode/src/vnd/vnodeOpen.c | 2 ++ tests/tsim/src/simParse.c | 1 + 4 files changed, 5 insertions(+) diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 135f44e939..9a97357b97 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -44,6 +44,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { pMeta->path = (char *)&pMeta[1]; sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, VNODE_META_DIR); + taosRealPath(pMeta->path, NULL, slen); pMeta->pVnode = pVnode; // create path if not created yet diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 180eea3237..fa54c811ff 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -55,6 +55,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee memcpy(pTsdb->dir, dir, strlen(dir)); pTsdb->path = (char *)&pTsdb[1]; sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir); + taosRealPath(pTsdb->path, NULL, slen); pTsdb->pVnode = pVnode; pTsdb->repoLocked = false; taosThreadMutexInit(&pTsdb->mutex, NULL); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 739f7f9fa3..9412c39b27 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -109,6 +109,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open wal sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR); + taosRealPath(tdir, NULL, sizeof(tdir)); pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); if (pVnode->pWal == NULL) { vError("vgId:%d failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -117,6 +118,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open tq sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); + taosRealPath(tdir, NULL, sizeof(tdir)); pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal); if (pVnode->pTq == NULL) { vError("vgId:%d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); diff --git a/tests/tsim/src/simParse.c b/tests/tsim/src/simParse.c index a0721941e3..638c4a1ccb 100644 --- a/tests/tsim/src/simParse.c +++ b/tests/tsim/src/simParse.c @@ -183,6 +183,7 @@ SScript *simParseScript(char *fileName) { strcpy(name, fileName); } else { sprintf(name, "%s" TD_DIRSEP "%s", simScriptDir, fileName); + taosRealPath(name, NULL, sizeof(name)); } // if ((fd = fopen(name, "r")) == NULL) { From 380780331da64917d7e9754214ca485cd8343c9f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 15:19:19 +0800 Subject: [PATCH 12/18] refactor: change lockfree to rwlock --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 8 +++---- source/dnode/mgmt/mgmt_snode/inc/smInt.h | 1 - source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 2 +- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 22 +++++++++--------- source/dnode/mgmt/node_mgmt/src/dmNodes.c | 4 ++-- source/dnode/mgmt/node_util/inc/dmUtil.h | 24 ++++++++++---------- source/dnode/mgmt/node_util/src/dmEps.c | 25 ++++++++++----------- 7 files changed, 43 insertions(+), 43 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 7c1162ec10..f7337f482f 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -19,11 +19,11 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); - taosWLockLatch(&pMgmt->pData->latch); + taosThreadRwlockWrlock(&pMgmt->pData->lock); pMgmt->pData->dnodeId = pCfg->dnodeId; pMgmt->pData->clusterId = pCfg->clusterId; dmWriteEps(pMgmt->pData); - taosWUnLockLatch(&pMgmt->pData->latch); + taosThreadRwlockUnlock(&pMgmt->pData->lock); } } @@ -50,7 +50,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { void dmSendStatusReq(SDnodeMgmt *pMgmt) { SStatusReq req = {0}; - taosRLockLatch(&pMgmt->pData->latch); + taosThreadRwlockRdlock(&pMgmt->pData->lock); req.sver = tsVersion; req.dnodeVer = pMgmt->pData->dnodeVer; req.dnodeId = pMgmt->pData->dnodeId; @@ -69,7 +69,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); - taosRUnLockLatch(&pMgmt->pData->latch); + taosThreadRwlockUnlock(&pMgmt->pData->lock); SMonVloadInfo vinfo = {0}; (*pMgmt->getVnodeLoadsFp)(&vinfo); diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 6d0bea9590..68b6ef659e 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -30,7 +30,6 @@ typedef struct SSnodeMgmt { SMsgCb msgCb; const char *path; const char *name; - SRWLatch latch; int8_t uniqueWorkerInUse; SArray *uniqueWorkers; // SArray SSingleWorker sharedWorker; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 9d092a93bc..5818b58801 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -70,7 +70,7 @@ typedef struct SMgmtWrapper { const char *name; char *path; int32_t refCount; - SRWLatch latch; + TdThreadRwlock lock; EDndNodeType ntype; bool deployed; bool required; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 3cbb9ff046..96285bbe1c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -91,7 +91,7 @@ static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) { return -1; } - taosInitRWLatch(&pData->latch); + taosThreadRwlockInit(&pData->lock, NULL); taosThreadMutexInit(&pDnode->mutex, NULL); return 0; } @@ -100,6 +100,7 @@ static void dmClearVars(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; taosMemoryFreeClear(pWrapper->path); + taosThreadRwlockDestroy(&pWrapper->lock); } if (pDnode->lockfile != NULL) { taosUnLockFile(pDnode->lockfile); @@ -108,7 +109,7 @@ static void dmClearVars(SDnode *pDnode) { } SDnodeData *pData = &pDnode->data; - taosWLockLatch(&pData->latch); + taosThreadRwlockWrlock(&pData->lock); if (pData->dnodeEps != NULL) { taosArrayDestroy(pData->dnodeEps); pData->dnodeEps = NULL; @@ -117,8 +118,9 @@ static void dmClearVars(SDnode *pDnode) { taosHashCleanup(pData->dnodeHash); pData->dnodeHash = NULL; } - taosWUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); + taosThreadRwlockDestroy(&pData->lock); taosThreadMutexDestroy(&pDnode->mutex); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); } @@ -151,7 +153,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { if (ntype == DNODE) { pWrapper->proc.ptype = DND_PROC_SINGLE; } - taosInitRWLatch(&pWrapper->latch); + taosThreadRwlockInit(&pWrapper->lock, NULL); snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); @@ -223,7 +225,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pRetWrapper = pWrapper; - taosRLockLatch(&pWrapper->latch); + taosThreadRwlockRdlock(&pWrapper->lock); if (pWrapper->deployed) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount); @@ -231,7 +233,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { terrno = TSDB_CODE_NODE_NOT_DEPLOYED; pRetWrapper = NULL; } - taosRUnLockLatch(&pWrapper->latch); + taosThreadRwlockUnlock(&pWrapper->lock); return pRetWrapper; } @@ -239,7 +241,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { int32_t code = 0; - taosRLockLatch(&pWrapper->latch); + taosThreadRwlockRdlock(&pWrapper->lock); if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount); @@ -247,7 +249,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { terrno = TSDB_CODE_NODE_NOT_DEPLOYED; code = -1; } - taosRUnLockLatch(&pWrapper->latch); + taosThreadRwlockUnlock(&pWrapper->lock); return code; } @@ -255,9 +257,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { void dmReleaseWrapper(SMgmtWrapper *pWrapper) { if (pWrapper == NULL) return; - taosRLockLatch(&pWrapper->latch); + taosThreadRwlockRdlock(&pWrapper->lock); int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); - taosRUnLockLatch(&pWrapper->latch); + taosThreadRwlockUnlock(&pWrapper->lock); dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index ecfa37725a..ab9d3f67e7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -186,12 +186,12 @@ void dmCloseNode(SMgmtWrapper *pWrapper) { } } - taosWLockLatch(&pWrapper->latch); + taosThreadRwlockWrlock(&pWrapper->lock); if (pWrapper->pMgmt != NULL) { (*pWrapper->func.closeFp)(pWrapper->pMgmt); pWrapper->pMgmt = NULL; } - taosWUnLockLatch(&pWrapper->latch); + taosThreadRwlockUnlock(&pWrapper->lock); if (!OnlyInSingleProc(pWrapper)) { dmCleanupProc(pWrapper); diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index b0e764bf8e..e7256a3a87 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -94,18 +94,18 @@ typedef void (*GetVnodeLoadsFp)(); typedef void (*GetMnodeLoadsFp)(); typedef struct { - int32_t dnodeId; - int64_t clusterId; - int64_t dnodeVer; - int64_t updateTime; - int64_t rebootTime; - bool dropped; - bool stopped; - SEpSet mnodeEps; - SArray *dnodeEps; - SHashObj *dnodeHash; - SRWLatch latch; - SMsgCb msgCb; + int32_t dnodeId; + int64_t clusterId; + int64_t dnodeVer; + int64_t updateTime; + int64_t rebootTime; + bool dropped; + bool stopped; + SEpSet mnodeEps; + SArray *dnodeEps; + SHashObj *dnodeHash; + TdThreadRwlock lock; + SMsgCb msgCb; } SDnodeData; typedef struct { diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index a6c9fda64d..94fa569557 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -21,7 +21,7 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep); static void dmResetEps(SDnodeData *pData, SArray *dnodeEps); static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - taosRLockLatch(&pData->latch); + taosThreadRwlockRdlock(&pData->lock); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { @@ -36,7 +36,7 @@ static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pF } } - taosRUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); } int32_t dmReadEps(SDnodeData *pData) { @@ -232,7 +232,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) { int32_t numOfEps = taosArrayGetSize(eps); if (numOfEps <= 0) return; - taosWLockLatch(&pData->latch); + taosThreadRwlockWrlock(&pData->lock); int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps); if (numOfEps != numOfEpsOld) { @@ -246,7 +246,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) { } } - taosWUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); } static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) { @@ -292,7 +292,7 @@ static void dmPrintEps(SDnodeData *pData) { static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { bool changed = false; if (dnodeId == 0) return changed; - taosRLockLatch(&pData->latch); + taosThreadRwlockRdlock(&pData->lock); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { @@ -304,24 +304,23 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { } } - taosRUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); return changed; } void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { - taosRLockLatch(&pData->latch); + taosThreadRwlockRdlock(&pData->lock); *pEpSet = pData->mnodeEps; - taosRUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); } void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { - dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); - - taosWLockLatch(&pData->latch); + taosThreadRwlockWrlock(&pData->lock); pData->mnodeEps = *pEpSet; + taosThreadRwlockUnlock(&pData->lock); + + dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } - - taosWUnLockLatch(&pData->latch); } From 2f9c63ae3fd513da4ab8a9c603f143674f73638e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 19 May 2022 15:31:20 +0800 Subject: [PATCH 13/18] enh(tq): update tb uid when droping table --- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 ++-- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 9 ++++++++- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4fe7d8a399..57f7b341d4 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -848,7 +848,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); - // app id + // client id char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN); varDataSetLen(clientId, strlen(varDataVal(clientId))); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b1760e6dae..0051312908 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep int tsdbClose(STsdb** pTsdb); int tsdbBegin(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb); -int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg); +int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, @@ -118,7 +118,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); -int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList); +int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 30de2dcfa0..eb1e6e4b83 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -104,7 +104,7 @@ static void tdSRowDemo() { taosMemoryFree(pTSChema); } -int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { +int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; STqExec* pExec = NULL; while (1) { @@ -113,7 +113,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { pExec = (STqExec*)pIter; if (pExec->subType == TOPIC_SUB_TYPE__DB) continue; for (int32_t i = 0; i < 5; i++) { - int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true); + int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd); ASSERT(code == 0); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a1a8e7bc88..09c644bd79 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -386,7 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, tDecoderClear(&decoder); - tqUpdateTbUidList(pVnode->pTq, tbUids); + tqUpdateTbUidList(pVnode->pTq, tbUids, true); tdUpdateTbUidList(pVnode->pSma, pStore); tdUidStoreFree(pStore); @@ -517,6 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in SDecoder decoder = {0}; SEncoder encoder = {0}; int ret; + SArray *tbUids; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->pCont = NULL; @@ -533,7 +534,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in } // process req + tbUids = taosArrayInit(req.nReqs, sizeof(int64_t)); rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp)); + if (tbUids == NULL || rsp.pArray == NULL) goto _exit; + for (int iReq = 0; iReq < req.nReqs; iReq++) { SVDropTbReq *pDropTbReq = req.pReqs + iReq; SVDropTbRsp dropTbRsp = {0}; @@ -553,7 +557,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in taosArrayPush(rsp.pArray, &dropTbRsp); } + tqUpdateTbUidList(pVnode->pTq, tbUids, false); + _exit: + taosArrayDestroy(tbUids); tDecoderClear(&decoder); tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret); pRsp->pCont = rpcMallocCont(pRsp->contLen); From f30e6be9e035b5860109f4425e545a3c60414ae5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 19 May 2022 07:46:36 +0000 Subject: [PATCH 14/18] fix windows compile error --- source/libs/tdb/test/tdbTest.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 24ef8c0c83..bbeeaa3faf 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -515,11 +515,11 @@ TEST(tdb_test, multi_thread_query) { // start a transaction txnid++; - txn = {.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED, - .txnId = -1, - .xMalloc = poolMalloc, - .xFree = poolFree, - .xArg = pPool}; + txn = (TXN){.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED, + .txnId = -1, + .xMalloc = poolMalloc, + .xFree = poolFree, + .xArg = pPool}; // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, ); tdbBegin(pEnv, &txn); @@ -554,7 +554,7 @@ TEST(tdb_test, multi_thread_query) { TXN txn; SPoolMem *pPool = openPool(); - txn = {.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool}; + txn = (TXN){.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool}; ret = tdbTbcOpen(pDb, &pDBC, &txn); GTEST_ASSERT_EQ(ret, 0); From 2ee38b94fd530421490925dc0b1552ef5893cabf Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 15:52:35 +0800 Subject: [PATCH 15/18] refactor: change lockfree to rwlock --- include/dnode/mnode/sdb/sdb.h | 34 ++++++------ source/dnode/mnode/sdb/src/sdb.c | 6 ++- source/dnode/mnode/sdb/src/sdbFile.c | 6 +-- source/dnode/mnode/sdb/src/sdbHash.c | 80 ++++++++++++++-------------- 4 files changed, 64 insertions(+), 62 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index a56c6ca16d..2abe0e5c73 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -333,23 +333,23 @@ SSdbRow *sdbAllocRow(int32_t objSize); void *sdbGetRowObj(SSdbRow *pRow); typedef struct SSdb { - SMnode *pMnode; - char *currDir; - char *syncDir; - char *tmpDir; - int64_t lastCommitVer; - int64_t curVer; - int64_t tableVer[SDB_MAX]; - int64_t maxId[SDB_MAX]; - EKeyType keyTypes[SDB_MAX]; - SHashObj *hashObjs[SDB_MAX]; - SRWLatch locks[SDB_MAX]; - SdbInsertFp insertFps[SDB_MAX]; - SdbUpdateFp updateFps[SDB_MAX]; - SdbDeleteFp deleteFps[SDB_MAX]; - SdbDeployFp deployFps[SDB_MAX]; - SdbEncodeFp encodeFps[SDB_MAX]; - SdbDecodeFp decodeFps[SDB_MAX]; + SMnode *pMnode; + char *currDir; + char *syncDir; + char *tmpDir; + int64_t lastCommitVer; + int64_t curVer; + int64_t tableVer[SDB_MAX]; + int64_t maxId[SDB_MAX]; + EKeyType keyTypes[SDB_MAX]; + SHashObj *hashObjs[SDB_MAX]; + TdThreadRwlock locks[SDB_MAX]; + SdbInsertFp insertFps[SDB_MAX]; + SdbUpdateFp updateFps[SDB_MAX]; + SdbDeleteFp deleteFps[SDB_MAX]; + SdbDeployFp deployFps[SDB_MAX]; + SdbEncodeFp encodeFps[SDB_MAX]; + SdbDecodeFp decodeFps[SDB_MAX]; } SSdb; #ifdef __cplusplus diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 51f40c12cd..1f11a77e6c 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -48,7 +48,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { } for (ESdbType i = 0; i < SDB_MAX; ++i) { - taosInitRWLatch(&pSdb->locks[i]); + taosThreadRwlockInit(&pSdb->locks[i], NULL); pSdb->maxId[i] = 0; pSdb->tableVer[i] = 0; pSdb->keyTypes[i] = SDB_KEY_INT32; @@ -98,7 +98,10 @@ void sdbCleanup(SSdb *pSdb) { taosHashClear(hash); taosHashCleanup(hash); + taosThreadRwlockDestroy(&pSdb->locks[i]); pSdb->hashObjs[i] = NULL; + memset(&pSdb->locks[i], 0, sizeof(pSdb->locks[i])); + mDebug("sdb table:%s is cleaned up", sdbTableName(i)); } @@ -134,7 +137,6 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->maxId[sdbType] = 0; pSdb->hashObjs[sdbType] = hash; - taosInitRWLatch(&pSdb->locks[sdbType]); mDebug("sdb table:%s is initialized", sdbTableName(sdbType)); return 0; diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index e9037a7b11..ad1429f667 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -257,8 +257,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); SHashObj *hash = pSdb->hashObjs[i]; - SRWLatch *pLock = &pSdb->locks[i]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[i]; + taosThreadRwlockWrlock(pLock); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -303,7 +303,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { sdbFreeRaw(pRaw); ppRow = taosHashIterate(hash, ppRow); } - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } if (code == 0) { diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 94008b2f7c..a25c7a5233 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -129,12 +129,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) { } static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockWrlock(pLock); SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); if (pOldRow != NULL) { - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; return terrno; @@ -145,13 +145,13 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * sdbPrintOper(pSdb, pRow, "insert"); if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); int32_t code = 0; SdbInsertFp insertFp = pSdb->insertFps[pRow->type]; @@ -159,9 +159,9 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * code = (*insertFp)(pSdb, pRow->pObj); if (code != 0) { code = terrno; - taosWLockLatch(pLock); + taosThreadRwlockWrlock(pLock); taosHashRemove(hash, pRow->pObj, keySize); - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = code; return terrno; @@ -180,19 +180,19 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { - SRWLatch *pLock = &pSdb->locks[pNewRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pNewRow->type]; + taosThreadRwlockWrlock(pLock); SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); } SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; sdbPrintOper(pSdb, pOldRow, "update"); - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); int32_t code = 0; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; @@ -207,12 +207,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockWrlock(pLock); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; return terrno; @@ -223,7 +223,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * sdbPrintOper(pSdb, pOldRow, "delete"); taosHashRemove(hash, pOldRow->pObj, keySize); - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); pSdb->tableVer[pOldRow->type]++; sdbFreeRow(pSdb, pRow, false); @@ -278,12 +278,12 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { void *pRet = NULL; int32_t keySize = sdbGetkeySize(pSdb, type, pKey); - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); SSdbRow **ppRow = taosHashGet(hash, pKey, keySize); if (ppRow == NULL || *ppRow == NULL) { - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; return NULL; } @@ -306,13 +306,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { break; } - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); return pRet; } static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockWrlock(pLock); int32_t ref = atomic_load_32(&pRow->refCount); sdbPrintOper(pSdb, pRow, "check"); @@ -320,7 +320,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { sdbFreeRow(pSdb, pRow, true); } - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } void sdbRelease(SSdb *pSdb, void *pObj) { @@ -329,8 +329,8 @@ void sdbRelease(SSdb *pSdb, void *pObj) { SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); if (pRow->type >= SDB_MAX) return; - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockWrlock(pLock); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); sdbPrintOper(pSdb, pRow, "release"); @@ -338,7 +338,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { sdbFreeRow(pSdb, pRow, true); } - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { @@ -347,8 +347,8 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return NULL; - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); SSdbRow **ppRow = taosHashIterate(hash, pIter); while (ppRow != NULL) { @@ -363,7 +363,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { *ppObj = pRow->pObj; break; } - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); return ppRow; } @@ -374,18 +374,18 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) { SHashObj *hash = sdbGetHash(pSdb, pRow->type); if (hash == NULL) return; - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockRdlock(pLock); taosHashCancelIterate(hash, pIter); - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return; - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -401,17 +401,17 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2 ppRow = taosHashIterate(hash, ppRow); } - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return 0; - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); int32_t size = taosHashGetSize(hash); - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); return size; } @@ -424,8 +424,8 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { int32_t maxId = 0; - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -435,7 +435,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { ppRow = taosHashIterate(hash, ppRow); } - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); maxId = TMAX(maxId, pSdb->maxId[type]); return maxId + 1; From c49b3f39d9a19fc4c9a19490ad4fa04528380e6c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 19 May 2022 08:16:06 +0000 Subject: [PATCH 16/18] make windows compile pass --- source/libs/tdb/test/tdbTest.cpp | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index bbeeaa3faf..fee3447884 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -515,11 +515,11 @@ TEST(tdb_test, multi_thread_query) { // start a transaction txnid++; - txn = (TXN){.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED, - .txnId = -1, - .xMalloc = poolMalloc, - .xFree = poolFree, - .xArg = pPool}; + txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED; + txn.txnId = -1; + txn.xMalloc = poolMalloc; + txn.xFree = poolFree; + txn.xArg = pPool; // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, ); tdbBegin(pEnv, &txn); @@ -528,20 +528,6 @@ TEST(tdb_test, multi_thread_query) { sprintf(val, "value%d", iData); ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); GTEST_ASSERT_EQ(ret, 0); - - // if pool is full, commit the transaction and start a new one - // if (pPool->size >= poolLimit) { - // break; - // // commit current transaction - // tdbCommit(pEnv, &txn); - // tdbTxnClose(&txn); - - // // start a new transaction - // clearPool(pPool); - // txnid++; - // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - // tdbBegin(pEnv, &txn); - // } } auto f = [](TTB *pDb, int nData) { @@ -554,7 +540,11 @@ TEST(tdb_test, multi_thread_query) { TXN txn; SPoolMem *pPool = openPool(); - txn = (TXN){.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool}; + txn.flags = 0; + txn.txnId = 0; + txn.xMalloc = poolMalloc; + txn.xFree = poolFree; + txn.xArg = pPool; ret = tdbTbcOpen(pDb, &pDBC, &txn); GTEST_ASSERT_EQ(ret, 0); From 90a6ec0917c2ab2bc7789319aea068b61b3e24ec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 May 2022 16:16:16 +0800 Subject: [PATCH 17/18] fix(query): release meta reader after initializing it. --- source/libs/executor/src/executorimpl.c | 8 ++------ tests/system-test/2-query/abs.py | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index bd93284da2..08521f91e8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1850,12 +1850,6 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t pCtx[i].resultInfo = pEntry; pCtx[i].scanFlag = stage; - - // set the timestamp output buffer for top/bottom/diff query - // int32_t fid = pCtx[i].functionId; - // if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) { - // if (i > 0) pCtx[i].pTsOutput = pCtx[i-1].pOutput; - // } } initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); @@ -4672,6 +4666,8 @@ void extractTableSchemaVersion(SReadHandle *pHandle, uint64_t uid, SExecTaskInfo } else { pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schema.sver; } + + metaReaderClear(&mr); } SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, diff --git a/tests/system-test/2-query/abs.py b/tests/system-test/2-query/abs.py index ccf83df952..a3e976b490 100644 --- a/tests/system-test/2-query/abs.py +++ b/tests/system-test/2-query/abs.py @@ -13,7 +13,7 @@ class TDTestCase: "wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143} def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) def prepare_datas(self): tdSql.execute( From 2174b5e48eb335f31bf125458d3e576653bf6c10 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 19 May 2022 16:01:20 +0800 Subject: [PATCH 18/18] enh(tq): update tb uid when droping table --- source/client/src/tmq.c | 2 +- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/meta/metaTable.c | 5 +++-- source/dnode/vnode/src/tq/tq.c | 28 ++++++++++++++++++------- source/dnode/vnode/src/tq/tqRead.c | 26 ++++++++++++++--------- source/dnode/vnode/src/vnd/vnodeSvr.c | 14 ++++++------- 8 files changed, 51 insertions(+), 28 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 2a2fdea537..36c0d8156c 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1435,7 +1435,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { while (1) { tmqHandleAllDelayedTask(tmq); - tmqPollImpl(tmq, wait_time); + if (tmqPollImpl(tmq, wait_time) < 0) return NULL; rspObj = tmqHandleAllRsp(tmq, wait_time, false); if (rspObj) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ecb022426b..b48a8775ce 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -128,6 +128,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); +bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid, int32_t *pNumOfRows, int16_t *pNumOfCols); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index d0420e1b84..a8a3e4f601 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -163,6 +163,7 @@ typedef struct { int8_t withSchema; int8_t withTag; char* qmsg; + SHashObj* pDropTbUid; STqPushHandle pushHandle; // SRWLatch lock; SWalReadHandle* pWalReader; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0051312908..1c89649313 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -82,7 +82,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); -int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq); +int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 619458af24..2bcfd01904 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -255,7 +255,7 @@ _err: return -1; } -int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { +int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) { TBC *pTbDbc = NULL; TBC *pUidIdxc = NULL; TBC *pNameIdxc = NULL; @@ -336,6 +336,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { if (type == TSDB_CHILD_TABLE) { ctime = me.ctbEntry.ctime; suid = me.ctbEntry.suid; + taosArrayPush(tbUids, &me.uid); } else if (type == TSDB_NORMAL_TABLE) { ctime = me.ntbEntry.ctime; suid = 0; @@ -906,4 +907,4 @@ static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { _err: metaULock(pMeta); return -1; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index eb1e6e4b83..212834f7d7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -111,7 +111,17 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { pIter = taosHashIterate(pTq->execs, pIter); if (pIter == NULL) break; pExec = (STqExec*)pIter; - if (pExec->subType == TOPIC_SUB_TYPE__DB) continue; + if (pExec->subType == TOPIC_SUB_TYPE__DB) { + if (isAdd) { + continue; + } else { + int32_t sz = taosArrayGetSize(tbUidList); + for (int32_t i = 0; i < sz; i++) { + int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); + taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0); + } + } + } for (int32_t i = 0; i < 5; i++) { int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd); ASSERT(code == 0); @@ -582,7 +592,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { rsp.withSchema = 1; STqReadHandle* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pCont, 0); - while (tqNextDataBlock(pReader)) { + while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) { SSDataBlock block = {0}; if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows, &block.info.numOfCols) < 0) { @@ -915,9 +925,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { req.qmsg = NULL; pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); - for (int32_t i = 0; i < 5; i++) { - pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + for (int32_t i = 0; i < 5; i++) { + pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SReadHandle handle = { .reader = pExec->pExecReader[i], .meta = pTq->pVnode->pMeta, @@ -925,9 +936,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { }; pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); ASSERT(pExec->task[i]); - } else { - pExec->task[i] = NULL; } + } else { + for (int32_t i = 0; i < 5; i++) { + pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + } + pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); return 0; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 8fbd1e24e1..28344de897 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -64,22 +64,28 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { } if (pHandle->pBlock == NULL) return false; - /*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/ - /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ if (pHandle->tbIdHash == NULL) { return true; } void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t)); if (ret != NULL) { - /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/ - /*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/ - /*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/ - /*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/ - /*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/ - /*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/ return true; - /*} else {*/ - /*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/ + } + } + return false; +} + +bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) { + while (1) { + if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + return false; + } + if (pHandle->pBlock == NULL) return false; + + ASSERT(pHandle->tbIdHash == NULL); + void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t)); + if (ret == NULL) { + return true; } } return false; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 09c644bd79..6db548285c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -62,11 +62,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - switch (pMsg->msgType) { /* META */ case TDMT_VND_CREATE_STB: @@ -125,6 +120,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { + vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } + // commit if need if (vnodeShouldCommit(pVnode)) { vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version); @@ -517,7 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in SDecoder decoder = {0}; SEncoder encoder = {0}; int ret; - SArray *tbUids; + SArray *tbUids = NULL; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->pCont = NULL; @@ -543,7 +543,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in SVDropTbRsp dropTbRsp = {0}; /* code */ - ret = metaDropTable(pVnode->pMeta, version, pDropTbReq); + ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids); if (ret < 0) { if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) { dropTbRsp.code = TSDB_CODE_SUCCESS;