From 74ea5ff492af8a8a264ac599b609eb072556ab8a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 3 Jul 2020 19:44:15 +0800 Subject: [PATCH 01/12] add func dnodePutQhandleIntoReadQueue to simplify the refCount of vnode --- src/dnode/src/dnodeVRead.c | 19 +++++++++++++++++-- src/inc/dnode.h | 1 + src/inc/taosdef.h | 1 + src/inc/vnode.h | 1 + src/vnode/src/vnodeMain.c | 9 +++++++++ 5 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 6bbb291b6a..947d0fa501 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -179,6 +179,16 @@ void dnodeFreeVnodeRqueue(void *rqueue) { // dynamically adjust the number of threads } +void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) { + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + pRead->pCont = qhandle; + pRead->contLen = 0; + + taos_queue queue = vnodeAccquireRqueue(pVnode); + taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); +} + static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = pMsg->rpcMsg; @@ -219,9 +229,14 @@ static void *dnodeProcessReadQueue(void *param) { break; } - dDebug("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); + dDebug("%p, msg:%s will be processed in vread queue, qtype:%d", pReadMsg->rpcMsg.ahandle, + taosMsg[pReadMsg->rpcMsg.msgType], type); int32_t code = vnodeProcessRead(pVnode, pReadMsg); - dnodeSendRpcReadRsp(pVnode, pReadMsg, code); + + if (type == TAOS_QTYPE_RPC) { + dnodeSendRpcReadRsp(pVnode, pReadMsg, code); + } + taosFreeQitem(pReadMsg); } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index b561c407a3..1d33dafbaa 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); void dnodeFreeVnodeWqueue(void *queue); void *dnodeAllocateVnodeRqueue(void *pVnode); void dnodeFreeVnodeRqueue(void *rqueue); +void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 76ca99c9ad..e4ee058cef 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -365,6 +365,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TAOS_QTYPE_FWD 1 #define TAOS_QTYPE_WAL 2 #define TAOS_QTYPE_CQ 3 +#define TAOS_QTYPE_QUERY 4 typedef enum { TSDB_SUPER_TABLE = 0, // super table diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 9f0c8cc241..49bd67a04f 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -52,6 +52,7 @@ void vnodeRelease(void *pVnode); void* vnodeAccquireVnode(int32_t vgId); // add refcount void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged +void* vnodeAccquireRqueue(void *); void* vnodeGetRqueue(void *); void* vnodeGetWqueue(int32_t vgId); void* vnodeGetWal(void *pVnode); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 5eb78fda52..4c446a78ec 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -393,6 +393,15 @@ void *vnodeAccquireVnode(int32_t vgId) { return pVnode; } +void *vnodeAccquireRqueue(void *param) { + SVnodeObj *pVnode = param; + if (pVnode == NULL) return NULL; + + atomic_add_fetch_32(&pVnode->refCount, 1); + vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + return ((SVnodeObj *)pVnode)->rqueue; +} + void *vnodeGetRqueue(void *pVnode) { return ((SVnodeObj *)pVnode)->rqueue; } From 0c91ffae759e8a6b38b386bed1f63ed61dc898ea Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 4 Jul 2020 14:44:25 +0800 Subject: [PATCH 02/12] [TD-825] vnodehash may be null while close all vnodes --- src/dnode/src/dnodeMgmt.c | 17 ++++++++++------ src/inc/vnode.h | 4 +++- src/plugins/mqtt/src/mqttSystem.c | 2 +- src/vnode/src/vnodeMain.c | 34 +++++++++++++++++++++++++------ 4 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 9cf024ba83..10eb77058b 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -176,6 +176,7 @@ void dnodeCleanupMgmt() { tsMgmtQset = NULL; tsMgmtQueue = NULL; + vnodeCleanupResources(); } void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { @@ -242,8 +243,14 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { int32_t vnode = atoi(de->d_name + 5); if (vnode == 0) continue; - vnodeList[*numOfVnodes] = vnode; (*numOfVnodes)++; + + if (*numOfVnodes >= TSDB_MAX_VNODES) { + dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES); + continue; + } else { + vnodeList[*numOfVnodes - 1] = vnode; + } } } closedir(dir); @@ -337,7 +344,7 @@ static int32_t dnodeOpenVnodes() { void dnodeStartStream() { int32_t vnodeList[TSDB_MAX_VNODES]; int32_t numOfVnodes = 0; - int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); + int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes); if (status != TSDB_CODE_SUCCESS) { dInfo("get dnode list failed"); @@ -352,15 +359,14 @@ void dnodeStartStream() { } static void dnodeCloseVnodes() { - int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES); + int32_t vnodeList[TSDB_MAX_VNODES]; int32_t numOfVnodes; int32_t status; - status = dnodeGetVnodeList(vnodeList, &numOfVnodes); + status = vnodeGetVnodeList(vnodeList, &numOfVnodes); if (status != TSDB_CODE_SUCCESS) { dInfo("get dnode list failed"); - free(vnodeList); return; } @@ -368,7 +374,6 @@ static void dnodeCloseVnodes() { vnodeClose(vnodeList[i]); } - free(vnodeList); dInfo("total vnodes:%d are all closed", numOfVnodes); } diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 9f0c8cc241..971f341258 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -57,8 +57,10 @@ void* vnodeGetWqueue(int32_t vgId); void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); -void vnodeBuildStatusMsg(void * param); +int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); +void vnodeBuildStatusMsg(void *param); void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes); +void vnodeCleanupResources(); int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 3df62f8bf4..2687106124 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -111,7 +111,7 @@ void mqttStopSystem() { } void mqttCleanUpSystem() { - mqttInfo("starting to clean up mqtt"); + mqttInfo("starting to cleanup mqtt"); free(recntStatus.user_name); free(recntStatus.password); free(recntStatus.hostname); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 5eb78fda52..ec97cbc48c 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -69,6 +69,12 @@ static void vnodeInit() { } } +void vnodeCleanupResources() { + taosHashCleanup(tsDnodeVnodesHash); + vnodeModuleInit = PTHREAD_ONCE_INIT; + tsDnodeVnodesHash = NULL; +} + int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; pthread_once(&vnodeModuleInit, vnodeInit); @@ -362,12 +368,6 @@ void vnodeRelease(void *pVnodeRaw) { int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); - - if (count <= 0) { - taosHashCleanup(tsDnodeVnodesHash); - vnodeModuleInit = PTHREAD_ONCE_INIT; - tsDnodeVnodesHash = NULL; - } } void *vnodeGetVnode(int32_t vgId) { @@ -424,6 +424,28 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) { pLoad->replica = pVnode->syncCfg.replica; } +int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { + if (tsDnodeVnodesHash == NULL) return TSDB_CODE_SUCCESS; + + SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash); + while (taosHashIterNext(pIter)) { + SVnodeObj **pVnode = taosHashIterGet(pIter); + if (pVnode == NULL) continue; + if (*pVnode == NULL) continue; + + (*numOfVnodes)++; + if (*numOfVnodes >= TSDB_MAX_VNODES) { + vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES); + continue; + } else { + vnodeList[*numOfVnodes - 1] = (*pVnode)->vgId; + } + } + + taosHashDestroyIter(pIter); + return TSDB_CODE_SUCCESS; +} + void vnodeBuildStatusMsg(void *param) { SDMStatusMsg *pStatus = param; SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash); From a744e419f4d1e0ae237a4602c4a785197bf252e2 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Sat, 4 Jul 2020 15:50:46 +0800 Subject: [PATCH 03/12] [TD-687] --- tests/script/jenkins/basic.txt | 4 + tests/script/jenkins/unique.txt | 4 + .../migrate/mn2_vn2_repl2_rmMnodeDir.sim | 272 +++++++++++++++++ .../migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim | 274 ++++++++++++++++++ ..._repl2_rmMnodeVnodeDir_stopAll_starAll.sim | 210 ++++++++++++++ .../migrate/mn2_vn2_repl2_rmVnodeDir.sim | 272 +++++++++++++++++ 6 files changed, 1036 insertions(+) create mode 100644 tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeDir.sim create mode 100644 tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim create mode 100644 tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir_stopAll_starAll.sim create mode 100644 tests/script/unique/migrate/mn2_vn2_repl2_rmVnodeDir.sim diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index e2ebd9af63..3c4733a25b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -365,3 +365,7 @@ cd ../../../debug; make ./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim ./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim +./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeDir.sim +./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim +./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir_stopAll_starAll.sim +./test.sh -f unique/migrate/mn2_vn2_repl2_rmVnodeDir.sim diff --git a/tests/script/jenkins/unique.txt b/tests/script/jenkins/unique.txt index afd0ea55c0..06edb8890a 100644 --- a/tests/script/jenkins/unique.txt +++ b/tests/script/jenkins/unique.txt @@ -133,3 +133,7 @@ cd ../../../debug; make ./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim ./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim +./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeDir.sim +./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim +./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir_stopAll_starAll.sim +./test.sh -f unique/migrate/mn2_vn2_repl2_rmVnodeDir.sim diff --git a/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeDir.sim b/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeDir.sim new file mode 100644 index 0000000000..e0b5e9b931 --- /dev/null +++ b/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeDir.sim @@ -0,0 +1,272 @@ +# Test case describe: dnode1/dnode2 include mnode and vnode roles +# step 1: start dnode1/dnode2, and added into cluster +# step 2: create db(repl = 2), table, insert data, +# step 4: stop dnode1, remove its mnode dir, and copy mnode dir of dnode2 to dnode1 +# step 5: restart dnode1, waiting sync end +# step 6: stop dnode2, reset query cache, and query + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +#system sh/deploy.sh -n dnode3 -i 3 +#system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2 +#system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 +#system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1 + +system sh/cfg.sh -n dnode1 -c walLevel -v 2 +system sh/cfg.sh -n dnode2 -c walLevel -v 2 +#system sh/cfg.sh -n dnode3 -c walLevel -v 2 +#system sh/cfg.sh -n dnode4 -c walLevel -v 2 + +system sh/cfg.sh -n dnode1 -c balanceInterval -v 10 +system sh/cfg.sh -n dnode2 -c balanceInterval -v 10 +#system sh/cfg.sh -n dnode3 -c balanceInterval -v 10 +#system sh/cfg.sh -n dnode4 -c balanceInterval -v 10 + +system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 +system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 +#system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 +#system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 + +system sh/cfg.sh -n dnode1 -c alternativeRole -v 0 +system sh/cfg.sh -n dnode2 -c alternativeRole -v 0 +#system sh/cfg.sh -n dnode3 -c alternativeRole -v 2 +#system sh/cfg.sh -n dnode4 -c alternativeRole -v 2 + +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4 +system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4 + +system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator +#system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator + +print ============== step0: start tarbitrator +system sh/exec_tarbitrator.sh -s start + +print ============== step1: start dnode1/dnode2 and add into cluster +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sleep 1000 +sql connect +sleep 1000 +sql create dnode $hostname2 +sleep 1000 + +print ============== step2: create database with replica 2, and create table, insert data +$totalTableNum = 10 +$sleepTimer = 3000 + +$db = db +sql create database $db replica 2 cache 1 +sql use $db + +# create table , insert data +$stb = stb +sql create table $stb (ts timestamp, c1 double) tags(t1 int) +$rowNum = 1200 +$tblNum = $totalTableNum +$totalRows = 0 +$tsStart = 1577808000000 # 2020-01-01 00:00:00.000 + +$i = 0 +while $i < $tblNum + $tb = tb . $i + sql create table $tb using $stb tags( $i ) + + $x = 0 + while $x < $rowNum + $ts = $tsStart + $x + sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x ) + $x = $x + 60 + endw + $totalRows = $totalRows + $x + print info: inserted $x rows into $tb and totalRows: $totalRows + $i = $i + 1 +endw + +sql select count(*) from $stb +print rows:$rows data00:$data00 totalRows:$totalRows +if $rows != 1 then + return -1 +endi + +if $data00 != $totalRows then + return -1 +endi + + +print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc +sql insert into $tb values ( now - 20d , -20 ) +sql insert into $tb values ( now - 40d , -40 ) +$totalRows = $totalRows + 2 + +print ============== step4: stop dnode1 +system sh/exec.sh -n dnode1 -s stop -x SIGINT + +$loopCnt = 0 +wait_dnode1_offline: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show dnodes +if $rows != 2 then + sleep 2000 + goto wait_dnode1_offline +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != offline then + sleep 2000 + goto wait_dnode1_offline +endi +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode1_offline +endi + +# check using select +sql select count(*) from $stb +print data00 $data00 +if $data00 != $totalRows then + return -1 +endi + +#sql show vgroups +#print show vgroups: +#print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1 +#print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2 +#print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3 + +print ============== step5: remove the mnode dir of dnode1, then copy the monde dir of dnode2 +system_content rm -rf ../../../sim/dnode1/data/mnode +system_content cp -rf ../../../sim/dnode2/data/mnode ../../../sim/dnode1/data/ + +print ============== step6: restart dnode1, waiting sync end +system sh/exec.sh -n dnode1 -s start +sleep 1000 + +$loopCnt = 0 +wait_dnode1_ready: +$loopCnt = $loopCnt + 1 +if $loopCnt == 20 then + return -1 +endi + +sql show dnodes -x wait_dnode1_ready +if $rows != 2 then + sleep 2000 + goto wait_dnode1_ready +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != ready then + sleep 2000 + goto wait_dnode1_ready +endi +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode1_ready +endi + +$loopCnt = 0 +wait_dnode1_vgroup_slave: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show vgroups +if $rows != 3 then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +print show vgroups: +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4 $data9_4 +$d2v2status = $data4_4 +$d2v3status = $data4_2 +$d2v4status = $data4_3 + +$d1v2status = $data7_4 +$d1v3status = $data7_2 +$d1v4status = $data7_3 + +if $d2v2status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d2v3status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d2v4status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi + +if $d1v2status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d1v3status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d1v4status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi + +print ============== step7: stop dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +$loopCnt = 0 +wait_dnode2_offline: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show dnodes +if $rows != 2 then + sleep 2000 + goto wait_dnode2_offline +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != ready then + sleep 2000 + goto wait_dnode2_offline +endi +if $dnode2Status != offline then + sleep 2000 + goto wait_dnode2_offline +endi + +sql reset query cache + +# check using select +sql select count(*) from $stb +print data00 $data00 +if $data00 != $totalRows then + return -1 +endi \ No newline at end of file diff --git a/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim b/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim new file mode 100644 index 0000000000..ae7fc6af17 --- /dev/null +++ b/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim @@ -0,0 +1,274 @@ +# Test case describe: dnode1/dnode2 include mnode and vnode roles +# step 1: start dnode1/dnode2, and added into cluster +# step 2: create db(repl = 2), table, insert data, +# step 4: stop dnode1, remove its mnode and vnode dir, and copy mnode and vnode dir of dnode2 to dnode1 +# step 5: restart dnode1, waiting sync end +# step 6: stop dnode2, reset query cache, and query + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +#system sh/deploy.sh -n dnode3 -i 3 +#system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2 +#system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 +#system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1 + +system sh/cfg.sh -n dnode1 -c walLevel -v 2 +system sh/cfg.sh -n dnode2 -c walLevel -v 2 +#system sh/cfg.sh -n dnode3 -c walLevel -v 2 +#system sh/cfg.sh -n dnode4 -c walLevel -v 2 + +system sh/cfg.sh -n dnode1 -c balanceInterval -v 10 +system sh/cfg.sh -n dnode2 -c balanceInterval -v 10 +#system sh/cfg.sh -n dnode3 -c balanceInterval -v 10 +#system sh/cfg.sh -n dnode4 -c balanceInterval -v 10 + +system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 +system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 +#system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 +#system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 + +system sh/cfg.sh -n dnode1 -c alternativeRole -v 0 +system sh/cfg.sh -n dnode2 -c alternativeRole -v 0 +#system sh/cfg.sh -n dnode3 -c alternativeRole -v 2 +#system sh/cfg.sh -n dnode4 -c alternativeRole -v 2 + +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4 +system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4 + +system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator +#system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator + +print ============== step0: start tarbitrator +system sh/exec_tarbitrator.sh -s start + +print ============== step1: start dnode1/dnode2 and add into cluster +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sleep 1000 +sql connect +sleep 1000 +sql create dnode $hostname2 +sleep 1000 + +print ============== step2: create database with replica 2, and create table, insert data +$totalTableNum = 10 +$sleepTimer = 3000 + +$db = db +sql create database $db replica 2 cache 1 +sql use $db + +# create table , insert data +$stb = stb +sql create table $stb (ts timestamp, c1 double) tags(t1 int) +$rowNum = 1200 +$tblNum = $totalTableNum +$totalRows = 0 +$tsStart = 1577808000000 # 2020-01-01 00:00:00.000 + +$i = 0 +while $i < $tblNum + $tb = tb . $i + sql create table $tb using $stb tags( $i ) + + $x = 0 + while $x < $rowNum + $ts = $tsStart + $x + sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x ) + $x = $x + 60 + endw + $totalRows = $totalRows + $x + print info: inserted $x rows into $tb and totalRows: $totalRows + $i = $i + 1 +endw + +sql select count(*) from $stb +print rows:$rows data00:$data00 totalRows:$totalRows +if $rows != 1 then + return -1 +endi + +if $data00 != $totalRows then + return -1 +endi + + +print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc +sql insert into $tb values ( now - 20d , -20 ) +sql insert into $tb values ( now - 40d , -40 ) +$totalRows = $totalRows + 2 + +print ============== step4: stop dnode1 +system sh/exec.sh -n dnode1 -s stop -x SIGINT + +$loopCnt = 0 +wait_dnode1_offline: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show dnodes +if $rows != 2 then + sleep 2000 + goto wait_dnode1_offline +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != offline then + sleep 2000 + goto wait_dnode1_offline +endi +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode1_offline +endi + +# check using select +sql select count(*) from $stb +print data00 $data00 +if $data00 != $totalRows then + return -1 +endi + +#sql show vgroups +#print show vgroups: +#print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1 +#print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2 +#print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3 + +print ============== step5: remove the mnode dir of dnode1, then copy the monde dir of dnode2 +system_content rm -rf ../../../sim/dnode1/data/vnode +system_content rm -rf ../../../sim/dnode1/data/mnode +system_content cp -rf ../../../sim/dnode2/data/vnode ../../../sim/dnode1/data/ +system_content cp -rf ../../../sim/dnode2/data/mnode ../../../sim/dnode1/data/ + +print ============== step6: restart dnode1, waiting sync end +system sh/exec.sh -n dnode1 -s start +sleep 1000 + +$loopCnt = 0 +wait_dnode1_ready: +$loopCnt = $loopCnt + 1 +if $loopCnt == 20 then + return -1 +endi + +sql show dnodes -x wait_dnode1_ready +if $rows != 2 then + sleep 2000 + goto wait_dnode1_ready +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != ready then + sleep 2000 + goto wait_dnode1_ready +endi +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode1_ready +endi + +$loopCnt = 0 +wait_dnode1_vgroup_slave: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show vgroups +if $rows != 3 then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +print show vgroups: +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4 $data9_4 +$d2v2status = $data4_4 +$d2v3status = $data4_2 +$d2v4status = $data4_3 + +$d1v2status = $data7_4 +$d1v3status = $data7_2 +$d1v4status = $data7_3 + +if $d2v2status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d2v3status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d2v4status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi + +if $d1v2status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d1v3status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d1v4status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi + +print ============== step7: stop dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +$loopCnt = 0 +wait_dnode2_offline: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show dnodes +if $rows != 2 then + sleep 2000 + goto wait_dnode2_offline +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != ready then + sleep 2000 + goto wait_dnode2_offline +endi +if $dnode2Status != offline then + sleep 2000 + goto wait_dnode2_offline +endi + +sql reset query cache + +# check using select +sql select count(*) from $stb +print data00 $data00 +if $data00 != $totalRows then + return -1 +endi \ No newline at end of file diff --git a/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir_stopAll_starAll.sim b/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir_stopAll_starAll.sim new file mode 100644 index 0000000000..dc9bc62696 --- /dev/null +++ b/tests/script/unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir_stopAll_starAll.sim @@ -0,0 +1,210 @@ +# Test case describe: dnode1/dnode2 include mnode and vnode roles +# step 1: start dnode1/dnode2, and added into cluster +# step 2: create db(repl = 2), table, insert data, +# step 4: stop dnode1, remove its mnode and vnode dir, and copy mnode and vnode dir of dnode2 to dnode1 +# step 5: restart dnode1, waiting sync end +# step 6: stop dnode2, reset query cache, and query + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +#system sh/deploy.sh -n dnode3 -i 3 +#system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2 +#system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 +#system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1 + +system sh/cfg.sh -n dnode1 -c walLevel -v 2 +system sh/cfg.sh -n dnode2 -c walLevel -v 2 +#system sh/cfg.sh -n dnode3 -c walLevel -v 2 +#system sh/cfg.sh -n dnode4 -c walLevel -v 2 + +system sh/cfg.sh -n dnode1 -c balanceInterval -v 10 +system sh/cfg.sh -n dnode2 -c balanceInterval -v 10 +#system sh/cfg.sh -n dnode3 -c balanceInterval -v 10 +#system sh/cfg.sh -n dnode4 -c balanceInterval -v 10 + +system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 +system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 +#system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 +#system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 + +system sh/cfg.sh -n dnode1 -c alternativeRole -v 0 +system sh/cfg.sh -n dnode2 -c alternativeRole -v 0 +#system sh/cfg.sh -n dnode3 -c alternativeRole -v 2 +#system sh/cfg.sh -n dnode4 -c alternativeRole -v 2 + +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4 +system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4 + +system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator +#system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator + +print ============== step0: start tarbitrator +system sh/exec_tarbitrator.sh -s start + +print ============== step1: start dnode1/dnode2 and add into cluster +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sleep 1000 +sql connect +sleep 1000 +sql create dnode $hostname2 +sleep 1000 + +print ============== step2: create database with replica 2, and create table, insert data +$totalTableNum = 10 +$sleepTimer = 3000 + +$db = db +sql create database $db replica 2 cache 1 +sql use $db + +# create table , insert data +$stb = stb +sql create table $stb (ts timestamp, c1 double) tags(t1 int) +$rowNum = 1200 +$tblNum = $totalTableNum +$totalRows = 0 +$tsStart = 1577808000000 # 2020-01-01 00:00:00.000 + +$i = 0 +while $i < $tblNum + $tb = tb . $i + sql create table $tb using $stb tags( $i ) + + $x = 0 + while $x < $rowNum + $ts = $tsStart + $x + sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x ) + $x = $x + 60 + endw + $totalRows = $totalRows + $x + print info: inserted $x rows into $tb and totalRows: $totalRows + $i = $i + 1 +endw + +sql select count(*) from $stb +print rows:$rows data00:$data00 totalRows:$totalRows +if $rows != 1 then + return -1 +endi + +if $data00 != $totalRows then + return -1 +endi + + +print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc +sql insert into $tb values ( now - 20d , -20 ) +sql insert into $tb values ( now - 40d , -40 ) +$totalRows = $totalRows + 2 + +print ============== step4: stop dnode1 +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +print ============== step5: remove the mnode dir of dnode1, then copy the monde dir of dnode2 +system_content rm -rf ../../../sim/dnode1/data/vnode +system_content rm -rf ../../../sim/dnode1/data/mnode +system_content cp -rf ../../../sim/dnode2/data/vnode ../../../sim/dnode1/data/ +system_content cp -rf ../../../sim/dnode2/data/mnode ../../../sim/dnode1/data/ + +print ============== step6: restart dnode1/dnode2 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sleep 1000 +sql connect +sql use $db + +$loopCnt = 0 +wait_dnode1_ready: +$loopCnt = $loopCnt + 1 +if $loopCnt == 20 then + return -1 +endi + +sql show dnodes -x wait_dnode1_ready +if $rows != 2 then + sleep 2000 + goto wait_dnode1_ready +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != ready then + sleep 2000 + goto wait_dnode1_ready +endi +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode1_ready +endi + +$loopCnt = 0 +wait_dnode1_vgroup_slave: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show vgroups +if $rows != 3 then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +print show vgroups: +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4 $data9_4 +$d2v2status = $data4_4 +$d2v3status = $data4_2 +$d2v4status = $data4_3 + +$d1v2status = $data7_4 +$d1v3status = $data7_2 +$d1v4status = $data7_3 + +if $d2v2status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d2v3status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d2v4status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi + +if $d1v2status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d1v3status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d1v4status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi + +sql reset query cache + +# check using select +sql select count(*) from $stb +print data00 $data00 +if $data00 != $totalRows then + return -1 +endi \ No newline at end of file diff --git a/tests/script/unique/migrate/mn2_vn2_repl2_rmVnodeDir.sim b/tests/script/unique/migrate/mn2_vn2_repl2_rmVnodeDir.sim new file mode 100644 index 0000000000..b754dc7a49 --- /dev/null +++ b/tests/script/unique/migrate/mn2_vn2_repl2_rmVnodeDir.sim @@ -0,0 +1,272 @@ +# Test case describe: dnode1/dnode2 include mnode and vnode roles +# step 1: start dnode1/dnode2, and added into cluster +# step 2: create db(repl = 2), table, insert data, +# step 4: stop dnode1, remove its vnode dir, and copy vnode dir of dnode2 to dnode1 +# step 5: restart dnode1, waiting sync end +# step 6: stop dnode2, reset query cache, and query + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +#system sh/deploy.sh -n dnode3 -i 3 +#system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2 +#system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 +#system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1 + +system sh/cfg.sh -n dnode1 -c walLevel -v 2 +system sh/cfg.sh -n dnode2 -c walLevel -v 2 +#system sh/cfg.sh -n dnode3 -c walLevel -v 2 +#system sh/cfg.sh -n dnode4 -c walLevel -v 2 + +system sh/cfg.sh -n dnode1 -c balanceInterval -v 10 +system sh/cfg.sh -n dnode2 -c balanceInterval -v 10 +#system sh/cfg.sh -n dnode3 -c balanceInterval -v 10 +#system sh/cfg.sh -n dnode4 -c balanceInterval -v 10 + +system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 +system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 +#system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 +#system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 + +system sh/cfg.sh -n dnode1 -c alternativeRole -v 0 +system sh/cfg.sh -n dnode2 -c alternativeRole -v 0 +#system sh/cfg.sh -n dnode3 -c alternativeRole -v 2 +#system sh/cfg.sh -n dnode4 -c alternativeRole -v 2 + +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4 +system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4 +#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4 + +system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator +#system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator + +print ============== step0: start tarbitrator +system sh/exec_tarbitrator.sh -s start + +print ============== step1: start dnode1/dnode2 and add into cluster +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sleep 1000 +sql connect +sleep 1000 +sql create dnode $hostname2 +sleep 1000 + +print ============== step2: create database with replica 2, and create table, insert data +$totalTableNum = 10 +$sleepTimer = 3000 + +$db = db +sql create database $db replica 2 cache 1 +sql use $db + +# create table , insert data +$stb = stb +sql create table $stb (ts timestamp, c1 double) tags(t1 int) +$rowNum = 1200 +$tblNum = $totalTableNum +$totalRows = 0 +$tsStart = 1577808000000 # 2020-01-01 00:00:00.000 + +$i = 0 +while $i < $tblNum + $tb = tb . $i + sql create table $tb using $stb tags( $i ) + + $x = 0 + while $x < $rowNum + $ts = $tsStart + $x + sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x ) + $x = $x + 60 + endw + $totalRows = $totalRows + $x + print info: inserted $x rows into $tb and totalRows: $totalRows + $i = $i + 1 +endw + +sql select count(*) from $stb +print rows:$rows data00:$data00 totalRows:$totalRows +if $rows != 1 then + return -1 +endi + +if $data00 != $totalRows then + return -1 +endi + + +print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc +sql insert into $tb values ( now - 20d , -20 ) +sql insert into $tb values ( now - 40d , -40 ) +$totalRows = $totalRows + 2 + +print ============== step4: stop dnode1 +system sh/exec.sh -n dnode1 -s stop -x SIGINT + +$loopCnt = 0 +wait_dnode1_offline: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show dnodes +if $rows != 2 then + sleep 2000 + goto wait_dnode1_offline +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != offline then + sleep 2000 + goto wait_dnode1_offline +endi +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode1_offline +endi + +# check using select +sql select count(*) from $stb +print data00 $data00 +if $data00 != $totalRows then + return -1 +endi + +#sql show vgroups +#print show vgroups: +#print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1 +#print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2 +#print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3 + +print ============== step5: remove the mnode dir of dnode1, then copy the monde dir of dnode2 +system_content rm -rf ../../../sim/dnode1/data/vnode +system_content cp -rf ../../../sim/dnode2/data/vnode ../../../sim/dnode1/data/ + +print ============== step6: restart dnode1, waiting sync end +system sh/exec.sh -n dnode1 -s start +sleep 1000 + +$loopCnt = 0 +wait_dnode1_ready: +$loopCnt = $loopCnt + 1 +if $loopCnt == 20 then + return -1 +endi + +sql show dnodes -x wait_dnode1_ready +if $rows != 2 then + sleep 2000 + goto wait_dnode1_ready +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != ready then + sleep 2000 + goto wait_dnode1_ready +endi +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode1_ready +endi + +$loopCnt = 0 +wait_dnode1_vgroup_slave: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show vgroups +if $rows != 3 then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +print show vgroups: +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4 $data9_4 +$d2v2status = $data4_4 +$d2v3status = $data4_2 +$d2v4status = $data4_3 + +$d1v2status = $data7_4 +$d1v3status = $data7_2 +$d1v4status = $data7_3 + +if $d2v2status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d2v3status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d2v4status != master then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi + +if $d1v2status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d1v3status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi +if $d1v4status != slave then + sleep 2000 + goto wait_dnode1_vgroup_slave +endi + +print ============== step7: stop dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +$loopCnt = 0 +wait_dnode2_offline: +$loopCnt = $loopCnt + 1 +if $loopCnt == 10 then + return -1 +endi + +sql show dnodes +if $rows != 2 then + sleep 2000 + goto wait_dnode2_offline +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 + +if $dnode1Status != ready then + sleep 2000 + goto wait_dnode2_offline +endi +if $dnode2Status != offline then + sleep 2000 + goto wait_dnode2_offline +endi + +sql reset query cache + +# check using select +sql select count(*) from $stb +print data00 $data00 +if $data00 != $totalRows then + return -1 +endi \ No newline at end of file From dfb07ae09c697393a8f9c2aff57ba47b99b67aee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jul 2020 16:35:01 +0800 Subject: [PATCH 04/12] [td-225] refactor cache module --- src/util/inc/tcache.h | 5 ++-- src/util/src/tcache.c | 68 ++++++++++++++++++++++--------------------- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index 87bb1e41f7..b026ad4386 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -65,7 +65,7 @@ typedef struct { int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. int64_t refreshTime; STrashElem * pTrash; - const char * cacheName; + char* name; // void * tmrCtrl; // void * pTimer; SCacheStatis statistics; @@ -163,8 +163,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove); /** * move all data node into trash, clear node in trash can if it is not referenced by any clients * @param handle + * @param _remove remove the data or not if refcount is greater than 0 */ -void taosCacheEmpty(SCacheObj *pCacheObj); +void taosCacheEmpty(SCacheObj *pCacheObj, bool _remove); /** * release all allocated memory and destroy the cache object. diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 016b352188..d546970868 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -119,9 +119,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo int32_t size = pNode->size; taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); - uDebug("key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes, cacheName:%s", - pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size, - pCacheObj->cacheName); + uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", + pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size); if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); free(pNode); } @@ -226,7 +225,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); */ static void* taosCacheRefresh(void *handle); -SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) { +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) { if (refreshTimeInSeconds <= 0) { return NULL; } @@ -238,7 +237,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo } pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false); - pCacheObj->cacheName = cacheName; + pCacheObj->name = strdup(cacheName); if (pCacheObj->pHashTable == NULL) { free(pCacheObj); uError("failed to allocate memory, reason:%s", strerror(errno)); @@ -268,10 +267,6 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo return pCacheObj; } -SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) { - return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn, cacheName); -} - void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) { SCacheDataNode *pNode; @@ -288,16 +283,16 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v if (NULL != pNode) { pCacheObj->totalSize += pNode->size; - uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64 - "bytes size:%" PRId64 "bytes, cacheName:%s", - key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), - (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize, pCacheObj->cacheName); + uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64 + "bytes size:%" PRId64 "bytes", + pCacheObj->name, key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), + (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize); } else { - uError("key:%p, failed to added into cache, out of memory, cacheName:%s", key, pCacheObj->cacheName); + uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key); } } else { // old data exists, update the node pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); - uDebug("key:%p, %p exist in cache, updated, cacheName:%s", key, pNode->data, pCacheObj->cacheName); + uDebug("cache:%s, key:%p, %p exist in cache, updated", pCacheObj->name, key, pNode->data); } __cache_unlock(pCacheObj); @@ -332,10 +327,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen if (ptNode != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("key:%p, %p is retrieved from cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data, ref, pCacheObj->cacheName); + uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, (*ptNode)->data, ref); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName); + uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); @@ -360,11 +355,11 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke if (ptNode != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("key:%p, %p expireTime is updated in cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data, - T_REF_VAL_GET(*ptNode), pCacheObj->cacheName); + uDebug("cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d", pCacheObj->name, key, + (*ptNode)->data, T_REF_VAL_GET(*ptNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName); + uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); @@ -383,7 +378,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { } int32_t ref = T_REF_INC(ptNode); - uDebug("%p acquired by data in cache, refcnt:%d, cacheName:%s", ptNode->data, ref, pCacheObj->cacheName); + uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref); // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan if (pCacheObj->extendLifespan) { @@ -391,7 +386,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) { ptNode->extendFactor += 1; - uDebug("%p extend life time to %" PRId64, ptNode->data, + uDebug("cache:%s, %p extend life time to %" PRId64, pCacheObj->name, ptNode->data, ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime); } } @@ -437,7 +432,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { *data = NULL; int16_t ref = T_REF_DEC(pNode); - uDebug("key:%p, %p is released, refcnt:%d, cacheName:%s", pNode->key, pNode->data, ref, pCacheObj->cacheName); + uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); if (_remove && (!pNode->inTrashCan)) { __cache_wr_lock(pCacheObj); @@ -455,7 +450,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } } -void taosCacheEmpty(SCacheObj *pCacheObj) { +void taosCacheEmpty(SCacheObj *pCacheObj, bool _remove) { SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); __cache_wr_lock(pCacheObj); @@ -465,12 +460,16 @@ void taosCacheEmpty(SCacheObj *pCacheObj) { } SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); - taosCacheMoveToTrash(pCacheObj, pNode); + if (T_REF_VAL_GET(pNode) == 0 || _remove) { + taosCacheReleaseNode(pCacheObj, pNode); + } else { + taosCacheMoveToTrash(pCacheObj, pNode); + } } __cache_unlock(pCacheObj); taosHashDestroyIter(pIter); - taosTrashCanEmpty(pCacheObj, false); + taosTrashCanEmpty(pCacheObj, _remove); } void taosCacheCleanup(SCacheObj *pCacheObj) { @@ -481,7 +480,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { pCacheObj->deleting = 1; pthread_join(pCacheObj->refreshWorker, NULL); - uInfo("cacheName:%p, will be cleanuped", pCacheObj->cacheName); + uInfo("cache:%s will be cleaned up", pCacheObj->name); doCleanupDataCache(pCacheObj); } @@ -601,22 +600,25 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); while (taosHashIterNext(pIter)) { SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); - // if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { - if (T_REF_VAL_GET(pNode) <= 0) { + + int32_t c = T_REF_VAL_GET(pNode); + if (c <= 0) { taosCacheReleaseNode(pCacheObj, pNode); } else { - uDebug("key:%p, %p will not remove from cache, refcnt:%d, cacheName:%s", pNode->key, pNode->data, - T_REF_VAL_GET(pNode), pCacheObj->cacheName); + uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key, + pNode->data, T_REF_VAL_GET(pNode)); } } taosHashDestroyIter(pIter); - taosHashCleanup(pCacheObj->pHashTable); + // todo memory leak if there are object with refcount greater than 0 in hash table? + taosHashCleanup(pCacheObj->pHashTable); __cache_unlock(pCacheObj); taosTrashCanEmpty(pCacheObj, true); __cache_lock_destroy(pCacheObj); - + + tfree(pCacheObj->name); memset(pCacheObj, 0, sizeof(SCacheObj)); free(pCacheObj); } From 66805aaf18c013201995adb7ac644bf39d5dc193 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jul 2020 16:35:46 +0800 Subject: [PATCH 05/12] [td-225] refactor --- src/client/src/tscLocal.c | 2 +- src/client/src/tscServer.c | 4 ++-- src/mnode/src/mnodeProfile.c | 2 +- src/mnode/src/mnodeShow.c | 2 +- src/plugins/http/src/httpContext.c | 2 +- src/plugins/http/src/httpSession.c | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 83700ce0a5..1d66fb0467 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -406,7 +406,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { pSql->res.qhandle = 0x1; pSql->res.numOfRows = 0; } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { - taosCacheEmpty(tscCacheHandle); + taosCacheEmpty(tscCacheHandle,false); } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { tscProcessServerVer(pSql); } else if (pCmd->command == TSDB_SQL_CLI_VERSION) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1b2334b998..917959505a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1943,7 +1943,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { } int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) { - taosCacheEmpty(tscCacheHandle); + taosCacheEmpty(tscCacheHandle, false); return 0; } @@ -1989,7 +1989,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { if (isSuperTable) { // if it is a super table, reset whole query cache tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name); - taosCacheEmpty(tscCacheHandle); + taosCacheEmpty(tscCacheHandle, false); } } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index d4765d4c9e..af4a09a45a 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -68,7 +68,7 @@ int32_t mnodeInitProfile() { mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); - tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn"); + tsMnodeConnCache = taosCacheInit(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn"); return 0; } diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index f601f391ce..72abafb1d5 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -65,7 +65,7 @@ int32_t mnodeInitShow() { mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); - tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show"); + tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show"); return 0; } diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index ae331a7d44..cdaee53c38 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) { } bool httpInitContexts() { - tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc"); + tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc"); if (tsHttpServer.contextCache == NULL) { httpError("failed to init context cache"); return false; diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c index 14bb6da983..256b0c9549 100644 --- a/src/plugins/http/src/httpSession.c +++ b/src/plugins/http/src/httpSession.c @@ -115,7 +115,7 @@ void httpCleanUpSessions() { } bool httpInitSessions() { - tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests"); + tsHttpServer.sessionCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests"); if (tsHttpServer.sessionCache == NULL) { httpError("failed to init session cache"); return false; From 1f172151b27edd92bb39e430fba23f54912f6358 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jul 2020 16:37:01 +0800 Subject: [PATCH 06/12] [td-225] add query mgmt module & refactor the vnode ref management during query processing --- src/dnode/src/dnodeVRead.c | 31 +++------- src/inc/query.h | 7 +++ src/query/src/qExecutor.c | 123 ++++++++++++++++++++++++++++++++++++- src/vnode/inc/vnodeInt.h | 2 +- src/vnode/src/vnodeMain.c | 19 ++---- src/vnode/src/vnodeRead.c | 69 ++++++++++++--------- 6 files changed, 181 insertions(+), 70 deletions(-) diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 947d0fa501..acd92db598 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) { - pVnode = vnodeGetVnode(pHead->vgId); - } else { - pVnode = vnodeAccquireVnode(pHead->vgId); - } + pVnode = vnodeAccquireVnode(pHead->vgId); if (pVnode == NULL) { leftLen -= pHead->contLen; @@ -189,24 +185,7 @@ void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) { taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); } -static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); - pRead->rpcMsg = pMsg->rpcMsg; - pRead->pCont = qhandle; - pRead->contLen = 0; - pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - - taos_queue queue = vnodeGetRqueue(pVnode); - taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); -} - void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { - if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return; - if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) { - dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead); - code = TSDB_CODE_SUCCESS; - } - SRpcMsg rpcRsp = { .handle = pRead->rpcMsg.handle, .pCont = pRead->rspRet.rsp, @@ -216,6 +195,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { rpcSendResponse(&rpcRsp); rpcFreeCont(pRead->rpcMsg.pCont); + vnodeRelease(pVnode); +} + +void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { + vnodeRelease(pVnode); + return; } static void *dnodeProcessReadQueue(void *param) { @@ -235,6 +220,8 @@ static void *dnodeProcessReadQueue(void *param) { if (type == TAOS_QTYPE_RPC) { dnodeSendRpcReadRsp(pVnode, pReadMsg, code); + } else { + dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); } taosFreeQitem(pReadMsg); diff --git a/src/inc/query.h b/src/inc/query.h index eb8abace62..88badc2d7b 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -84,6 +84,13 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); */ int32_t qKillQuery(qinfo_t qinfo); +void* qOpenQueryMgmt(int32_t vgId); +void qSetQueryMgmtClosed(void* pExecutor); +void qCleanupQueryMgmt(void* pExecutor); +void** qRegisterQInfo(void* pMgmt, void* qInfo); +void** qAcquireQInfo(void* pMgmt, void** key); +void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree); + #ifdef __cplusplus } #endif diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8156967d5d..06578916bb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -13,8 +13,10 @@ * along with this program. If not, see . */ #include "os.h" -#include "taosmsg.h" +#include "tcache.h" +#include "tglobal.h" #include "qfill.h" +#include "taosmsg.h" #include "hash.h" #include "qExecutor.h" @@ -1520,7 +1522,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } static bool isQueryKilled(SQInfo *pQInfo) { - return false; return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5910,9 +5911,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { return TSDB_CODE_SUCCESS; } +typedef struct SQueryMgmt { + SCacheObj *qinfoPool; // query handle pool + int32_t vgId; + bool closed; + pthread_mutex_t lock; +} SQueryMgmt; + int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn, qinfo_t* pQInfo) { - assert(pQueryMsg != NULL); + assert(pQueryMsg != NULL && tsdb != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -6356,3 +6364,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) { setQueryStatus(pQuery, QUERY_COMPLETED); } +void freeqinfoFn(void *qhandle) { + void** handle = qhandle; + if (handle == NULL || *handle == NULL) { + return; + } + + qKillQuery(*handle); +} + +void* qOpenQueryMgmt(int32_t vgId) { + const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, refresh handle pool + + char cacheName[128] = {0}; + sprintf(cacheName, "qhandle_%d", vgId); + + SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt)); + + pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName); + pQueryHandle->closed = false; + pthread_mutex_init(&pQueryHandle->lock, NULL); + + qDebug("vgId:%d, open querymgmt success", vgId); + return pQueryHandle; +} + +void qSetQueryMgmtClosed(void* pQMgmt) { + if (pQMgmt == NULL) { + return; + } + + SQueryMgmt* pQueryMgmt = pQMgmt; + qDebug("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId); + + pthread_mutex_lock(&pQueryMgmt->lock); + pQueryMgmt->closed = true; + pthread_mutex_unlock(&pQueryMgmt->lock); + + taosCacheEmpty(pQueryMgmt->qinfoPool, true); +} + +void qCleanupQueryMgmt(void* pQMgmt) { + if (pQMgmt == NULL) { + return; + } + + SQueryMgmt* pQueryMgmt = pQMgmt; + int32_t vgId = pQueryMgmt->vgId; + + assert(pQueryMgmt->closed); + + SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool; + pQueryMgmt->qinfoPool = NULL; + + taosCacheCleanup(pqinfoPool); + pthread_mutex_destroy(&pQueryMgmt->lock); + tfree(pQueryMgmt); + + qDebug("vgId:%d querymgmt cleanup completed", vgId); +} + +void** qRegisterQInfo(void* pMgmt, void* qInfo) { + if (pMgmt == NULL) { + return NULL; + } + + SQueryMgmt *pQueryMgmt = pMgmt; + if (pQueryMgmt->qinfoPool == NULL) { + return NULL; + } + + pthread_mutex_lock(&pQueryMgmt->lock); + if (pQueryMgmt->closed) { + pthread_mutex_unlock(&pQueryMgmt->lock); + + return NULL; + } else { + void** handle = taosCachePut(pQueryMgmt->qinfoPool, qInfo, POINTER_BYTES, &qInfo, POINTER_BYTES, tsShellActivityTimer*2); + pthread_mutex_unlock(&pQueryMgmt->lock); + + return handle; + } +} + +void** qAcquireQInfo(void* pMgmt, void** key) { + SQueryMgmt *pQueryMgmt = pMgmt; + + if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) { + return NULL; + } + + void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, key, POINTER_BYTES); + if (handle == NULL || *handle == NULL) { + return NULL; + } else { + return handle; + } +} + +void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree) { + SQueryMgmt *pQueryMgmt = pMgmt; + + if (pQueryMgmt->qinfoPool == NULL) { + return NULL; + } + + taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, needFree); + return 0; +} + diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 76e53f3962..4f22c7784d 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -53,7 +53,7 @@ typedef struct { STsdbCfg tsdbCfg; SSyncCfg syncCfg; SWalCfg walCfg; - void *qHandlePool; // query handle pool + void *qMgmt; char *rootDir; char db[TSDB_DB_NAME_LEN]; } SVnodeObj; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 4c446a78ec..6ccdc02acf 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -46,7 +46,6 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); -static void vnodeFreeqHandle(void* phandle); static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; @@ -283,9 +282,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); - const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool - pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle, "qhandle"); - + pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); @@ -328,6 +325,9 @@ void vnodeRelease(void *pVnodeRaw) { return; } + qCleanupQueryMgmt(pVnode->qMgmt); + pVnode->qMgmt = NULL; + if (pVnode->tsdb) tsdbCloseRepo(pVnode->tsdb, 1); pVnode->tsdb = NULL; @@ -475,7 +475,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); // release local resources only after cutting off outside connections - taosCacheCleanup(pVnode->qHandlePool); + qSetQueryMgmtClosed(pVnode->qMgmt); vnodeRelease(pVnode); } @@ -881,12 +881,3 @@ PARSE_OVER: if(fp) fclose(fp); return terrno; } - -void vnodeFreeqHandle(void *qHandle) { - void** handle = qHandle; - if (handle == NULL || *handle == NULL) { - return; - } - - qKillQuery(*handle); -} \ No newline at end of file diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 48d2fa6878..3105d58aea 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -14,6 +14,7 @@ */ #define _DEFAULT_SOURCE +#include #include "os.h" #include "tglobal.h" @@ -73,18 +74,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - vWarn("QInfo:%p connection %p broken, kill query", (void*)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); + void* handle = NULL; + if ((void**) killQueryMsg->qhandle != NULL) { + handle = *(void**) killQueryMsg->qhandle; + } + + vWarn("QInfo:%p connection %p broken, kill query", handle, pReadMsg->rpcMsg.handle); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); - // this message arrived here by means of the *query* message, so release the vnode is necessary - void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle)); + void** qhandle = qAcquireQInfo(pVnode->qMgmt, (void**) killQueryMsg->qhandle); if (qhandle == NULL || *qhandle == NULL) { vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); } else { - taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true); + assert(qhandle == (void**) killQueryMsg->qhandle); + qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); } - vnodeRelease(pVnode); return TSDB_CODE_TSC_QUERY_CANCELLED; } @@ -93,7 +98,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void** handle = NULL; if (contLen != 0) { - code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo); + code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; @@ -104,25 +109,30 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // current connect is broken if (code == TSDB_CODE_SUCCESS) { - if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo, - pReadMsg->rpcMsg.handle); - pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + // add lock here + handle = qRegisterQInfo(pVnode->qMgmt, pQInfo); + if (handle == NULL) { // failed to register qhandle + pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; - // NOTE: there two refcount, needs to kill twice, todo refactor - // query has not been put into qhandle pool, kill it directly. qKillQuery(pQInfo); qKillQuery(pQInfo); - - return pRsp->code; + } else { + assert(*handle == pQInfo); + pRsp->qhandle = htobe64((uint64_t) (handle)); } - handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2); - assert(*handle == pQInfo); - pRsp->qhandle = htobe64((uint64_t) (handle)); + if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle); + pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + + // NOTE: there two refcount, needs to kill twice + // query has not been put into qhandle pool, kill it directly. + qKillQuery(pQInfo); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + return pRsp->code; + } } else { assert(pQInfo == NULL); - vnodeRelease(pVnode); } vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); @@ -137,9 +147,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pQInfo != NULL) { qTableQuery(pQInfo); // do execute query - assert(handle != NULL); - taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } return code; @@ -158,7 +167,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet, 0, sizeof(SRspRet)); int32_t ret = 0; - void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo)); + void** handle = qAcquireQInfo(pVnode->qMgmt, pQInfo); if (handle == NULL || handle != pQInfo) { ret = TSDB_CODE_QRY_INVALID_QHANDLE; } @@ -166,8 +175,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pRetrieve->free == 1) { if (ret == TSDB_CODE_SUCCESS) { vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); - taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); @@ -177,30 +186,30 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRsp->completed = true; pRsp->useconds = 0; } else { // todo handle error - + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); } return ret; } - vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo); - int32_t code = qRetrieveQueryResultInfo(*pQInfo); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || ret != TSDB_CODE_SUCCESS) { //TODO pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + } else { // todo check code and handle error in build result set code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); - if (qHasMoreResultsToRetrieve(*pQInfo)) { + if (qHasMoreResultsToRetrieve(*handle)) { + dnodePutQhandleIntoReadQueue(pVnode, handle); pRet->qhandle = handle; - code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; + code = TSDB_CODE_SUCCESS; } else { // no further execution invoked, release the ref to vnode - taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); } } - + return code; } From 8beba3a325d192dc9d63638fe7e4986aeab6ce06 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jul 2020 16:39:52 +0800 Subject: [PATCH 07/12] [td-225] remove u_malloc --- src/query/src/qExecutor.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 06578916bb..1882aa1850 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -89,16 +89,18 @@ typedef struct { STSCursor cur; } SQueryStatusInfo; +#if 0 static UNUSED_FUNC void *u_malloc (size_t __size) { -// uint32_t v = rand(); -// if (v % 5 <= 1) { -// return NULL; -// } else { + uint32_t v = rand(); + if (v % 5 <= 1) { + return NULL; + } else { return malloc(__size); -// } + } } #define malloc u_malloc +#endif #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) From 1405d825be8091c759af7fdd29889213498eb5e6 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 4 Jul 2020 17:45:38 +0800 Subject: [PATCH 08/12] emulate file operation random fail. [TD-792] --- cmake/define.inc | 4 ++++ cmake/input.inc | 7 +++++- src/util/inc/tfile.h | 29 ++++++++++++++++++++++++ src/util/src/tfile.c | 53 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 src/util/inc/tfile.h create mode 100644 src/util/src/tfile.c diff --git a/cmake/define.inc b/cmake/define.inc index da100f4260..8d6a398709 100755 --- a/cmake/define.inc +++ b/cmake/define.inc @@ -24,3 +24,7 @@ ENDIF () IF (TD_MEM_CHECK) ADD_DEFINITIONS(-DTAOS_MEM_CHECK) ENDIF () + +IF (TD_RANDOM_FILE_FAIL) + ADD_DEFINITIONS(-DTAOS_RANDOM_FILE_FAIL) +ENDIF () diff --git a/cmake/input.inc b/cmake/input.inc index 5a17e0319c..574eac5b45 100755 --- a/cmake/input.inc +++ b/cmake/input.inc @@ -30,4 +30,9 @@ ENDIF () IF (${MEM_CHECK} MATCHES "true") SET(TD_MEM_CHECK TRUE) MESSAGE(STATUS "build with memory check") -ENDIF () \ No newline at end of file +ENDIF () + +IF (${RANDOM_FILE_FAIL} MATCHES "true") + SET(TD_RANDOM_FILE_FAIL TRUE) + MESSAGE(STATUS "build with random-file-fail enabled") +ENDIF () diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h new file mode 100644 index 0000000000..566a429d32 --- /dev/null +++ b/src/util/inc/tfile.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TFILE_H +#define TDENGINE_TFILE_H + +#ifdef TAOS_RANDOM_FILE_FAIL + +ssize_t taos_tread(int fd, void *buf, size_t count); +ssize_t taos_twrite(int fd, void *buf, size_t count); + +#define tread(fd, buf, count) taos_tread(fd, buf, count) +#define twrite(fd, buf, count) taos_twrite(fd, buf, count) + +#endif // TAOS_RANDOM_FILE_FAIL + +#endif // TDENGINE_TFILE_H diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c new file mode 100644 index 0000000000..200d0f8af5 --- /dev/null +++ b/src/util/src/tfile.c @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "os.h" + +#define RANDOM_FACTOR 5 + +ssize_t taos_tread(int fd, void *buf, size_t count) +{ +#ifdef TAOS_RANDOM_FILE_FAIL + if (rand() % RANDOM_FACTOR == 0) { + errno = EIO; + return -1; + } +#endif + + return tread(fd, buf, count); +} + +ssize_t taos_twrite(int fd, void *buf, size_t count) +{ +#ifdef TAOS_RANDOM_FILE_FAIL + if (rand() % RANDOM_FACTOR == 0) { + errno = EIO; + return -1; + } +#endif + + return twrite(fd, buf, count); +} From 88dd38aac11f9b83312c582583cb5731e31fbd39 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 4 Jul 2020 18:10:42 +0800 Subject: [PATCH 09/12] add lseek random failure. --- src/query/src/qExecutor.c | 1 + src/tsdb/src/tsdbFile.c | 1 + src/tsdb/src/tsdbRWHelper.c | 1 + src/util/inc/tfile.h | 2 ++ src/util/src/tfile.c | 12 ++++++++++++ src/util/src/tkvstore.c | 3 ++- 6 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8156967d5d..9f0b0b67b1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -27,6 +27,7 @@ #include "exception.h" #include "tscompression.h" #include "ttime.h" +#include "tfile.h" /** * check if the primary column is load by default, otherwise, the program will diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 33eae639b8..95cc47292b 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -28,6 +28,7 @@ #include "tsdbMain.h" #include "tutil.h" #include "ttime.h" +#include "tfile.h" const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"}; diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 934fa8e733..eab9a5e056 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -19,6 +19,7 @@ #include "tcoding.h" #include "tscompression.h" #include "tsdbMain.h" +#include "tfile.h" #define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM)) diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h index 566a429d32..5bddc76266 100644 --- a/src/util/inc/tfile.h +++ b/src/util/inc/tfile.h @@ -20,9 +20,11 @@ ssize_t taos_tread(int fd, void *buf, size_t count); ssize_t taos_twrite(int fd, void *buf, size_t count); +off_t taos_lseek(int fd, off_t offset, int whence); #define tread(fd, buf, count) taos_tread(fd, buf, count) #define twrite(fd, buf, count) taos_twrite(fd, buf, count) +#define lseek(fd, offset, whence) taos_lseek(fd, offset, whence) #endif // TAOS_RANDOM_FILE_FAIL diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index 200d0f8af5..97eeda010e 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -51,3 +51,15 @@ ssize_t taos_twrite(int fd, void *buf, size_t count) return twrite(fd, buf, count); } + +off_t taos_lseek(int fd, off_t offset, int whence) +{ +#ifdef TAOS_RANDOM_FILE_FAIL + if (rand() % RANDOM_FACTOR == 0) { + errno = EIO; + return -1; + } +#endif + + return lseek(fd, offset, whence); +} diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index f33941376f..2a24a59742 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -27,6 +27,7 @@ #include "tcoding.h" #include "tkvstore.h" #include "tulog.h" +#include "tfile.h" #define TD_KVSTORE_HEADER_SIZE 512 #define TD_KVSTORE_MAJOR_VERSION 1 @@ -581,4 +582,4 @@ _err: taosHashDestroyIter(pIter); tfree(buf); return -1; -} \ No newline at end of file +} From a4ede13384a6381823190935d5d3a1bba8e5a337 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Sat, 4 Jul 2020 18:11:43 +0800 Subject: [PATCH 10/12] [TD-842] --- src/kit/taosdemo/taosdemo.c | 253 +++++++++++++++++++++++++++++++----- 1 file changed, 218 insertions(+), 35 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 3265285cca..9a5aedcdb7 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -32,6 +32,7 @@ #include #include #include +#include #include "taos.h" #include "tutil.h" @@ -54,6 +55,7 @@ static struct argp_option options[] = { {0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3}, {0, 'd', "database", 0, "Destination database. Default is 'test'.", 3}, {0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3}, + {0, 's', "sql file", 0, "The select sql file.", 3}, {0, 'M', 0, 0, "Use metric flag.", 13}, {0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14}, {0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 6}, @@ -79,6 +81,7 @@ typedef struct DemoArguments { char *password; char *database; char *tb_prefix; + char *sqlFile; bool use_metric; bool insert_only; char *output_file; @@ -120,6 +123,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'o': arguments->output_file = arg; break; + case 's': + arguments->sqlFile = arg; + break; case 'q': arguments->mode = atoi(arg); break; @@ -179,10 +185,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { arguments->tb_prefix = arg; break; case 'M': - arguments->use_metric = true; + arguments->use_metric = false; break; case 'x': - arguments->insert_only = true; + arguments->insert_only = false; break; case 'c': if (wordexp(arg, &full_path, 0) != 0) { @@ -253,6 +259,9 @@ typedef struct { int data_of_rate; int64_t start_time; bool do_aggreFunc; + + char* cols; + bool use_metric; sem_t mutex_sem; int notFinished; @@ -305,6 +314,8 @@ void rand_string(char *str, int size); double getCurrentTime(); void callBack(void *param, TAOS_RES *res, int code); +void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass); +void querySqlFile(TAOS* taos, char* sqlFile); int main(int argc, char *argv[]) { SDemoArguments arguments = { NULL, // host @@ -313,6 +324,7 @@ int main(int argc, char *argv[]) { "taosdata", // password "test", // database "t", // tb_prefix + NULL, false, // use_metric false, // insert_only "./output.txt", // output_file @@ -361,7 +373,7 @@ int main(int argc, char *argv[]) { abort(); #endif } - + enum MODE query_mode = arguments.mode; char *ip_addr = arguments.host; uint16_t port = arguments.port; @@ -385,6 +397,13 @@ int main(int argc, char *argv[]) { char dataString[STRING_LEN]; bool do_aggreFunc = true; + if (NULL != arguments.sqlFile) { + TAOS* qtaos = taos_connect(ip_addr, user, pass, db_name, port); + querySqlFile(qtaos, arguments.sqlFile); + taos_close(qtaos); + return 0; + } + memset(dataString, 0, STRING_LEN); int len = 0; @@ -495,47 +514,19 @@ int main(int argc, char *argv[]) { len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary); } - if (!use_metric) { - /* Create all the tables; */ - printf("Creating %d table(s)......\n", ntables); - for (int i = 0; i < ntables; i++) { - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols); - queryDB(taos, command); - } - - printf("Table(s) created!\n"); - taos_close(taos); - - } else { + if (use_metric) { /* Create metric table */ printf("Creating meters super table...\n"); snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); queryDB(taos, command); printf("meters created!\n"); - /* Create all the tables; */ - printf("Creating %d table(s)......\n", ntables); - for (int i = 0; i < ntables; i++) { - int j; - if (i % 10 == 0) { - j = 10; - } else { - j = i % 10; - } - if (j % 2 == 0) { - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai"); - } else { - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing"); - } - queryDB(taos, command); - } - - printf("Table(s) created!\n"); taos_close(taos); } - /* Wait for table to create */ - + /* Wait for table to create */ + multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass); + /* Insert data */ double ts = getCurrentTime(); printf("Inserting data......\n"); @@ -685,6 +676,198 @@ int main(int argc, char *argv[]) { return 0; } +#define MAX_SQL_SIZE 65536 +void selectSql(TAOS* taos, char* sqlcmd) +{ + TAOS_RES *pSql = taos_query(taos, sqlcmd); + int32_t code = taos_errno(pSql); + + if (code != 0) { + printf("Failed to sqlcmd:%s, reason:%s\n", sqlcmd, taos_errstr(pSql)); + taos_free_result(pSql); + exit(1); + } + + int count = 0; + while (taos_fetch_row(pSql) != NULL) { + count++; + } + + taos_free_result(pSql); + return; +} + + +/* Function to do regular expression check */ +static int regexMatch(const char *s, const char *reg, int cflags) { + regex_t regex; + char msgbuf[100] = {0}; + + /* Compile regular expression */ + if (regcomp(®ex, reg, cflags) != 0) { + printf("Fail to compile regex\n"); + exit(-1); + } + + /* Execute regular expression */ + int reti = regexec(®ex, s, 0, NULL, 0); + if (!reti) { + regfree(®ex); + return 1; + } else if (reti == REG_NOMATCH) { + regfree(®ex); + return 0; + } else { + regerror(reti, ®ex, msgbuf, sizeof(msgbuf)); + printf("Regex match failed: %s\n", msgbuf); + regfree(®ex); + exit(-1); + } + + return 0; +} + +static int isCommentLine(char *line) { + if (line == NULL) return 1; + + return regexMatch(line, "^\\s*#.*", REG_EXTENDED); +} + +void querySqlFile(TAOS* taos, char* sqlFile) +{ + FILE *fp = fopen(sqlFile, "r"); + if (fp == NULL) { + printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno)); + exit(-1); + } + + int read_len = 0; + char * cmd = calloc(1, MAX_SQL_SIZE); + size_t cmd_len = 0; + char * line = NULL; + size_t line_len = 0; + + double t = getCurrentTime(); + + while ((read_len = getline(&line, &line_len, fp)) != -1) { + if (read_len >= MAX_SQL_SIZE) continue; + line[--read_len] = '\0'; + + if (read_len == 0 || isCommentLine(line)) { // line starts with # + continue; + } + + if (line[read_len - 1] == '\\') { + line[read_len - 1] = ' '; + memcpy(cmd + cmd_len, line, read_len); + cmd_len += read_len; + continue; + } + + memcpy(cmd + cmd_len, line, read_len); + selectSql(taos, cmd); + memset(cmd, 0, MAX_SQL_SIZE); + cmd_len = 0; + } + + t = getCurrentTime() - t; + printf("run %s took %.6f second(s)\n\n", sqlFile, t); + + free(cmd); + if (line) free(line); + fclose(fp); + return; +} + +void * createTable(void *sarg) +{ + char command[BUFFER_SIZE] = "\0"; + + info *winfo = (info *)sarg; + + if (!winfo->use_metric) { + /* Create all the tables; */ + printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); + for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols); + queryDB(winfo->taos, command); + } + + taos_close(winfo->taos); + + } else { + /* Create all the tables; */ + printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); + for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { + int j; + if (i % 10 == 0) { + j = 10; + } else { + j = i % 10; + } + if (j % 2 == 0) { + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "shanghai"); + } else { + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "beijing"); + } + queryDB(winfo->taos, command); + } + taos_close(winfo->taos); + } + + return NULL; +} + +void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass) { + double ts = getCurrentTime(); + printf("create table......\n"); + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + info *infos = malloc(threads * sizeof(info)); + + int a = ntables / threads; + if (a < 1) { + threads = ntables; + a = 1; + } + + int b = 0; + if (threads != 0) + b = ntables % threads; + int last = 0; + for (int i = 0; i < threads; i++) { + info *t_info = infos + i; + t_info->threadID = i; + tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); + tstrncpy(t_info->tb_prefix, tb_prefix, MAX_TB_NAME_SIZE); + t_info->taos = taos_connect(ip_addr, user, pass, db_name, port); + t_info->start_table_id = last; + t_info->end_table_id = i < b ? last + a : last + a - 1; + last = t_info->end_table_id + 1; + t_info->use_metric = use_metric; + t_info->cols = cols; + pthread_create(pids + i, NULL, createTable, t_info); + } + + for (int i = 0; i < threads; i++) { + pthread_join(pids[i], NULL); + } + + double t = getCurrentTime() - ts; + printf("Spent %.4f seconds to create %d tables with %d connections\n", t, ntables, threads); + + for (int i = 0; i < threads; i++) { + info *t_info = infos + i; + taos_close(t_info->taos); + sem_destroy(&(t_info->mutex_sem)); + sem_destroy(&(t_info->lock_sem)); + } + + free(pids); + free(infos); + + return ; +} + void *readTable(void *sarg) { info *rinfo = (info *)sarg; TAOS *taos = rinfo->taos; From c067db1f6b43d8ab179cfd4d3256a5f56affa731 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jul 2020 18:24:14 +0800 Subject: [PATCH 11/12] [td-225] refactor code. --- src/dnode/src/dnodeMgmt.c | 2 +- src/dnode/src/dnodeVRead.c | 8 +++++--- src/inc/dnode.h | 2 +- src/inc/vnode.h | 4 ++-- src/vnode/src/vnodeMain.c | 10 +++++----- src/vnode/src/vnodeRead.c | 2 +- 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 9cf024ba83..8a8c5b3162 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -391,7 +391,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); } - void *pVnode = vnodeAccquireVnode(pCreate->cfg.vgId); + void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); if (pVnode != NULL) { int32_t code = vnodeAlter(pVnode, pCreate); vnodeRelease(pVnode); diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index acd92db598..66135a93e9 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -98,7 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - pVnode = vnodeAccquireVnode(pHead->vgId); + pVnode = vnodeAcquireVnode(pHead->vgId); if (pVnode == NULL) { leftLen -= pHead->contLen; @@ -175,13 +175,15 @@ void dnodeFreeVnodeRqueue(void *rqueue) { // dynamically adjust the number of threads } -void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) { +void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->pCont = qhandle; pRead->contLen = 0; - taos_queue queue = vnodeAccquireRqueue(pVnode); + assert(pVnode != NULL); + taos_queue queue = vnodeAcquireRqueue(pVnode); + taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 1d33dafbaa..096aae58f2 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -53,7 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); void dnodeFreeVnodeWqueue(void *queue); void *dnodeAllocateVnodeRqueue(void *pVnode); void dnodeFreeVnodeRqueue(void *rqueue); -void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle); +void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 49bd67a04f..fd6b980687 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -49,10 +49,10 @@ int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); void vnodeRelease(void *pVnode); -void* vnodeAccquireVnode(int32_t vgId); // add refcount +void* vnodeAcquireVnode(int32_t vgId); // add refcount void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged -void* vnodeAccquireRqueue(void *); +void* vnodeAcquireRqueue(void *); void* vnodeGetRqueue(void *); void* vnodeGetWqueue(int32_t vgId); void* vnodeGetWal(void *pVnode); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 6ccdc02acf..c0e2c40599 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -293,7 +293,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } int32_t vnodeStartStream(int32_t vnode) { - SVnodeObj* pVnode = vnodeAccquireVnode(vnode); + SVnodeObj* pVnode = vnodeAcquireVnode(vnode); if (pVnode != NULL) { tsdbStartStream(pVnode->tsdb); vnodeRelease(pVnode); @@ -383,7 +383,7 @@ void *vnodeGetVnode(int32_t vgId) { return *ppVnode; } -void *vnodeAccquireVnode(int32_t vgId) { +void *vnodeAcquireVnode(int32_t vgId) { SVnodeObj *pVnode = vnodeGetVnode(vgId); if (pVnode == NULL) return pVnode; @@ -393,7 +393,7 @@ void *vnodeAccquireVnode(int32_t vgId) { return pVnode; } -void *vnodeAccquireRqueue(void *param) { +void *vnodeAcquireRqueue(void *param) { SVnodeObj *pVnode = param; if (pVnode == NULL) return NULL; @@ -407,7 +407,7 @@ void *vnodeGetRqueue(void *pVnode) { } void *vnodeGetWqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeAccquireVnode(vgId); + SVnodeObj *pVnode = vnodeAcquireVnode(vgId); if (pVnode == NULL) return NULL; return pVnode->wqueue; } @@ -451,7 +451,7 @@ void vnodeBuildStatusMsg(void *param) { void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { for (int32_t i = 0; i < numOfVnodes; ++i) { pAccess[i].vgId = htonl(pAccess[i].vgId); - SVnodeObj *pVnode = vnodeAccquireVnode(pAccess[i].vgId); + SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId); if (pVnode != NULL) { pVnode->accessState = pAccess[i].accessState; if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 2ca69a3ddb..354caf2af5 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -203,7 +203,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); if (qHasMoreResultsToRetrieve(*handle)) { - dnodePutQhandleIntoReadQueue(pVnode, handle); + dnodePutItemIntoReadQueue(pVnode, handle); pRet->qhandle = handle; code = TSDB_CODE_SUCCESS; } else { // no further execution invoked, release the ref to vnode From 3e632cf7e148ab650aa140b392b0200bcc169adc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 4 Jul 2020 21:12:44 +0800 Subject: [PATCH 12/12] fix crash while close vnodes --- src/dnode/src/dnodeMgmt.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index ece5aeb341..1ae1287888 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -283,7 +283,7 @@ static void *dnodeOpenVnode(void *param) { static int32_t dnodeOpenVnodes() { int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t)); - int32_t numOfVnodes; + int32_t numOfVnodes = 0; int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); if (status != TSDB_CODE_SUCCESS) { @@ -360,7 +360,7 @@ void dnodeStartStream() { static void dnodeCloseVnodes() { int32_t vnodeList[TSDB_MAX_VNODES]; - int32_t numOfVnodes; + int32_t numOfVnodes = 0; int32_t status; status = vnodeGetVnodeList(vnodeList, &numOfVnodes);