From 59d23065fd469447717b2bc7da46ce2ec012c01e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 02:54:11 -0800 Subject: [PATCH 1/8] fix invalid write in mnode --- source/dnode/mnode/impl/src/mndDb.c | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 2 +- source/dnode/mnode/impl/src/mndVgroup.c | 8 ++++ tests/script/sh/exec.sh | 4 +- tests/script/sim/db/basic6.sim | 56 ++++++++++--------------- 5 files changed, 33 insertions(+), 39 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 85b7fbbb42..bc4d890257 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -740,7 +740,7 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * if (pReq == NULL) return -1; action.pCont = pReq; - action.contLen = sizeof(SCreateVnodeReq); + action.contLen = sizeof(SDropVnodeReq); action.msgType = TDMT_DND_DROP_VNODE; action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 4bc570c11d..2a67e6a0f9 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -235,7 +235,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { int64_t interval = ABS(pDnode->lastAccessTime - curMs); - if (interval > 3500 * pMnode->cfg.statusInterval) { + if (interval > 10000 * pMnode->cfg.statusInterval) { if (pDnode->rebootTime > 0) { pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e0d6d3dd42..93d6d104ff 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -319,6 +319,14 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); + int32_t size = taosArrayGetSize(pArray); + if (size < pVgroup->replica) { + mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size, + pVgroup->replica); + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + return -1; + } + for (int32_t v = 0; v < pVgroup->replica; ++v) { SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pDnode = taosArrayGet(pArray, v); diff --git a/tests/script/sh/exec.sh b/tests/script/sh/exec.sh index 2e95a740d0..05f756ebb6 100755 --- a/tests/script/sh/exec.sh +++ b/tests/script/sh/exec.sh @@ -30,7 +30,7 @@ do CLEAR_OPTION="clear" ;; v) - SHELL_OPTION="true" + VALGRIND_OPTION="true" ;; u) USERS=$OPTARG @@ -99,7 +99,7 @@ fi if [ "$EXEC_OPTON" = "start" ]; then echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR - if [ "$SHELL_OPTION" = "true" ]; then + if [ "$VALGRIND_OPTION" = "true" ]; then TT=`date +%s` mkdir ${LOG_DIR}/${TT} nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & diff --git a/tests/script/sim/db/basic6.sim b/tests/script/sim/db/basic6.sim index 684ce825fe..8e7d4f5ec1 100644 --- a/tests/script/sim/db/basic6.sim +++ b/tests/script/sim/db/basic6.sim @@ -1,29 +1,24 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c wallevel -v 0 -system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 4 -system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 1000 - system sh/exec.sh -n dnode1 -s start - -sleep 2000 sql connect + print ============================ dnode1 start $i = 0 -$dbPrefix = ob_db_db -$tbPrefix = ob_db_tb +$dbPrefix = db +$tbPrefix = tb $db = $dbPrefix . $i $tb = $tbPrefix . $i print =============== step1 -sql create database $db replica 1 days 20 keep 2000 cache 16 +sql create database $db replica 1 days 20 keep 2000 cache 16 vgroups 4 sql show databases print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 if $data00 != $db then return -1 endi -if $data02 != 0 then +if $data02 != 4 then return -1 endi if $data03 != 0 then @@ -63,9 +58,6 @@ print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 if $data00 != $db then return -1 endi -if $data02 != 0 then - return -1 -endi if $data03 != 0 then return -1 endi @@ -77,30 +69,14 @@ if $data06 != 15 then endi print =============== step6 -sql use $db -sql create table $tb (ts timestamp, speed int) -$i = 1 -while $i < 4 +$i = $i + 1 +while $i < 5 $db = $dbPrefix . $i - $tb = $tbPrefix . $i sql create database $db sql use $db - sql create table $tb (ts timestamp, speed int) $i = $i + 1 endw -sql show databases -if $rows != 4 then - return -1 -endi - -$i = 4 -$db = $dbPrefix . $i -$tb = $tbPrefix . $i -sql create database $db -sql use $db -sql create table $tb (ts timestamp, speed int) - print =============== step7 $i = 0 while $i < 5 @@ -115,7 +91,8 @@ $db = $dbPrefix . $i $tb = $tbPrefix . $i sql create database $db sql use $db -sql create table $tb (ts timestamp, speed int) +sql create table st (ts timestamp, i int) tags (j int) +sql create table $tb using st tags(1) sql show tables if $rows != 1 then return -1 @@ -133,7 +110,8 @@ if $rows != 0 then endi print =============== step11 -sql create table $tb (ts timestamp, speed int) +sql create table st (ts timestamp, i int) tags (j int) +sql create table $tb using st tags(1) sql show tables if $rows != 1 then return -1 @@ -149,16 +127,23 @@ sql show tables if $rows != 0 then return -1 endi -sql create table $tb (ts timestamp, speed int) + +sql create table st (ts timestamp, i int) tags (j int) +sql create table $tb using st tags(1) sql show tables if $rows != 1 then return -1 endi + sql insert into $tb values (now+1a, 0) sql insert into $tb values (now+2a, 1) sql insert into $tb values (now+3a, 2) sql insert into $tb values (now+4a, 3) sql insert into $tb values (now+5a, 4) + + +return + sql select * from $tb if $rows != 5 then return -1 @@ -176,7 +161,8 @@ if $rows != 0 then endi print =============== step16 -sql create table $tb (ts timestamp, speed int) +sql create table st (ts timestamp, i int) tags (j int) +sql create table $tb using st tags(1) sql show tables if $rows != 1 then return -1 From 0920bb05763b6f8d1e6b0cc508b6190d332779a5 Mon Sep 17 00:00:00 2001 From: lihui Date: Tue, 11 Jan 2022 18:55:09 +0800 Subject: [PATCH 2/8] [modify] --- tests/test/c/create_table.c | 117 ++++++++++++++++++++++++++---------- 1 file changed, 84 insertions(+), 33 deletions(-) diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index f2db9d0a0c..a5a55bbc91 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -40,6 +40,8 @@ typedef struct { float createTableSpeed; float insertDataSpeed; int64_t startMs; + int64_t maxDelay; + int64_t minDelay; pthread_t thread; } SThreadInfo; @@ -58,12 +60,30 @@ int32_t main(int32_t argc, char *argv[]) { pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); - int64_t numOfTablesPerThread = numOfTables / numOfThreads; - numOfTables = numOfTablesPerThread * numOfThreads; + //int64_t numOfTablesPerThread = numOfTables / numOfThreads; + //numOfTables = numOfTablesPerThread * numOfThreads; + + + if (numOfThreads < 1) { + numOfThreads = 1; + } + + int64_t a = numOfTables / numOfThreads; + if (a < 1) { + numOfThreads = numOfTables; + a = 1; + } + + int64_t b = 0; + b = numOfTables % numOfThreads; + + int64_t tableFrom = 0; for (int32_t i = 0; i < numOfThreads; ++i) { - pInfo[i].tableBeginIndex = i * numOfTablesPerThread; - pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread; + pInfo[i].tableBeginIndex = tableFrom; + pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pInfo[i].tableEndIndex + 1; pInfo[i].threadIndex = i; + pInfo[i].minDelay = INT64_MAX; strcpy(pInfo[i].dbName, dbName); strcpy(pInfo[i].stbName, stbName); pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); @@ -74,9 +94,15 @@ int32_t main(int32_t argc, char *argv[]) { pthread_join(pInfo[i].thread, NULL); } + int64_t maxDelay = 0; + int64_t minDelay = INT64_MAX; + float createTableSpeed = 0; for (int32_t i = 0; i < numOfThreads; ++i) { createTableSpeed += pInfo[i].createTableSpeed; + + if (pInfo[i].maxDelay > maxDelay) maxDelay = pInfo[i].maxDelay; + if (pInfo[i].minDelay < minDelay) minDelay = pInfo[i].minDelay; } float insertDataSpeed = 0; @@ -84,10 +110,19 @@ int32_t main(int32_t argc, char *argv[]) { insertDataSpeed += pInfo[i].insertDataSpeed; } - pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed, - numOfThreads, NC); - pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, + pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 "us %s", + GREEN, + numOfTables, + createTableSpeed, + numOfThreads, + maxDelay, + minDelay, + NC); + + if (insertData) { + pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, numOfThreads, NC); + } pthread_attr_destroy(&thattr); free(pInfo); @@ -99,36 +134,36 @@ void createDbAndStb() { TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); if (con == NULL) { - pError("failed to connect to DB, reason:%s", taos_errstr(con)); + pError("failed to connect to DB, reason:%s", taos_errstr(NULL)); exit(1); } sprintf(qstr, "create database if not exists %s vgroups %d", dbName, numOfVgroups); - TAOS_RES *pSql = taos_query(con, qstr); - int32_t code = taos_errno(pSql); + TAOS_RES *pRes = taos_query(con, qstr); + int32_t code = taos_errno(pRes); if (code != 0) { - pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con)); + pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(pRes), taos_errstr(pRes)); exit(0); } - taos_free_result(pSql); + taos_free_result(pRes); sprintf(qstr, "use %s", dbName); - pSql = taos_query(con, qstr); - code = taos_errno(pSql); + pRes = taos_query(con, qstr); + code = taos_errno(pRes); if (code != 0) { - pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes)); exit(0); } - taos_free_result(pSql); + taos_free_result(pRes); sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName); - pSql = taos_query(con, qstr); - code = taos_errno(pSql); + pRes = taos_query(con, qstr); + code = taos_errno(pRes); if (code != 0) { - pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes)); exit(0); } - taos_free_result(pSql); + taos_free_result(pRes); taos_close(con); } @@ -160,16 +195,20 @@ void *threadFunc(void *param) { TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); if (con == NULL) { - pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); + pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(NULL)); exit(1); } + //printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex, pInfo->tableEndIndex); sprintf(qstr, "use %s", pInfo->dbName); - TAOS_RES *pSql = taos_query(con, qstr); - taos_free_result(pSql); + TAOS_RES *pRes = taos_query(con, qstr); + taos_free_result(pRes); + if (createTable) { - pInfo->startMs = taosGetTimestampMs(); + int64_t curMs = 0; + int64_t beginMs = taosGetTimestampMs(); + pInfo->startMs = beginMs; for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { int64_t batch = (pInfo->tableEndIndex - t); batch = MIN(batch, batchNum); @@ -179,14 +218,22 @@ void *threadFunc(void *param) { len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i); } - TAOS_RES *pSql = taos_query(con, qstr); - code = taos_errno(pSql); + int64_t startTs = taosGetTimestampUs(); + TAOS_RES *pRes = taos_query(con, qstr); + code = taos_errno(pRes); if (code != 0) { pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); } - taos_free_result(pSql); + taos_free_result(pRes); + int64_t endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + //printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay); + if (delay > pInfo->maxDelay) pInfo->maxDelay = delay; + if (delay < pInfo->minDelay) pInfo->minDelay = delay; - if (t % 100000 == 0) { + curMs = taosGetTimestampMs(); + if (curMs - beginMs > 10000) { + beginMs = curMs; printCreateProgress(pInfo, t); } t += (batch - 1); @@ -195,6 +242,9 @@ void *threadFunc(void *param) { } if (insertData) { + int64_t curMs = 0; + int64_t beginMs = taosGetTimestampMs();; + pInfo->startMs = taosGetTimestampMs(); for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { int64_t batch = (pInfo->tableEndIndex - t); @@ -205,14 +255,15 @@ void *threadFunc(void *param) { len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); } - TAOS_RES *pSql = taos_query(con, qstr); - code = taos_errno(pSql); + TAOS_RES *pRes = taos_query(con, qstr); + code = taos_errno(pRes); if (code != 0) { pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); } - taos_free_result(pSql); + taos_free_result(pRes); - if (t % 100000 == 0) { + curMs = taosGetTimestampMs(); + if (curMs - beginMs > 10000) { printInsertProgress(pInfo, t); } t += (batch - 1); @@ -266,7 +317,7 @@ void parseArgument(int32_t argc, char *argv[]) { numOfThreads = atoi(argv[++i]); } else if (strcmp(argv[i], "-n") == 0) { numOfTables = atoll(argv[++i]); - } else if (strcmp(argv[i], "-n") == 0) { + } else if (strcmp(argv[i], "-v") == 0) { numOfVgroups = atoi(argv[++i]); } else if (strcmp(argv[i], "-a") == 0) { createTable = atoi(argv[++i]); From f94f8aadc0b00e15a0869eb7291ae53b8bec8d49 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 03:13:26 -0800 Subject: [PATCH 3/8] dnode may offline while create vnode --- source/dnode/mgmt/impl/inc/dndInt.h | 1 + source/dnode/mgmt/impl/src/dndMgmt.c | 13 ++++++++++++- source/dnode/mnode/impl/src/mndDnode.c | 2 +- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index afdd678213..e637b38815 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -94,6 +94,7 @@ typedef struct { pthread_t *threadId; SRWLatch latch; SDnodeWorker mgmtWorker; + SDnodeWorker statusWorker; } SDnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index f252bffbbf..d9edf39b73 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -536,6 +536,11 @@ int32_t dndInitMgmt(SDnode *pDnode) { return -1; } + if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) { + dError("failed to start dnode mgmt worker since %s", terrstr()); + return -1; + } + pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); if (pMgmt->threadId == NULL) { dError("failed to init dnode thread"); @@ -550,6 +555,7 @@ int32_t dndInitMgmt(SDnode *pDnode) { void dndStopMgmt(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; dndCleanupWorker(&pMgmt->mgmtWorker); + dndCleanupWorker(&pMgmt->statusWorker); if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); @@ -587,7 +593,12 @@ void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndUpdateMnodeEpSet(pDnode, pEpSet); } - if (dndWriteMsgToWorker(&pMgmt->mgmtWorker, pMsg, sizeof(SRpcMsg)) != 0) { + SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + if (pMsg->msgType == TDMT_MND_STATUS_RSP) { + pWorker = &pMgmt->statusWorker; + } + + if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)) != 0) { if (pMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; rpcSendResponse(&rsp); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 2a67e6a0f9..4bc570c11d 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -235,7 +235,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { int64_t interval = ABS(pDnode->lastAccessTime - curMs); - if (interval > 10000 * pMnode->cfg.statusInterval) { + if (interval > 3500 * pMnode->cfg.statusInterval) { if (pDnode->rebootTime > 0) { pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT; } From 07a1b4acf21d25321819781d1ec9a2214d98e175 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 03:41:38 -0800 Subject: [PATCH 4/8] minor changes --- source/dnode/mnode/impl/test/profile/profile.cpp | 10 +++++----- tests/script/jenkins/basic.txt | 5 +++-- tests/script/sim/db/basic6.sim | 4 ++++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index bdffb6c72a..bf047517d3 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -13,7 +13,7 @@ class MndTestProfile : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9022); } + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9031); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; @@ -53,7 +53,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9022); + EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); connId = pRsp->connId; @@ -127,7 +127,7 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9022); + EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); } @@ -185,7 +185,7 @@ TEST_F(MndTestProfile, 05_KillConnMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9022); + EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); connId = pRsp->connId; @@ -249,7 +249,7 @@ TEST_F(MndTestProfile, 07_KillQueryMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9022); + EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); } } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index bc0c3a4f64..a84e473d9f 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -5,8 +5,9 @@ ./test.sh -f sim/user/basic1.sim # ---- db -./test.sh -f sim/db/basic1.sim -./test.sh -f sim/db/error1.sim +./test.sh -f sim/db/basic1.sim +./test.sh -f sim/db/error6.sim +./test.sh -f sim/db/error1.sim # ---- table ./test.sh -f sim/table/basic1.sim diff --git a/tests/script/sim/db/basic6.sim b/tests/script/sim/db/basic6.sim index 8e7d4f5ec1..a688b4c2f3 100644 --- a/tests/script/sim/db/basic6.sim +++ b/tests/script/sim/db/basic6.sim @@ -93,6 +93,10 @@ sql create database $db sql use $db sql create table st (ts timestamp, i int) tags (j int) sql create table $tb using st tags(1) + +return +system sh/exec.sh -n dnode1 -s stop -x SIGINT + sql show tables if $rows != 1 then return -1 From 530ffb6cbc6fc4b14a07c96388bc6e7fcf22c1fc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 03:46:05 -0800 Subject: [PATCH 5/8] minor changes --- tests/script/jenkins/basic.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index a84e473d9f..bb6569deb0 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -6,7 +6,7 @@ # ---- db ./test.sh -f sim/db/basic1.sim -./test.sh -f sim/db/error6.sim +./test.sh -f sim/db/basic6.sim ./test.sh -f sim/db/error1.sim # ---- table From f1f666e780238cbd39373ddb98fc6a07df6d6b3a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 23:22:08 +0800 Subject: [PATCH 6/8] add message queue for vnode --- include/dnode/vnode/vnode.h | 17 ++++++++++++++--- source/dnode/mgmt/impl/src/dnode.c | 15 +++++++++++++-- source/dnode/vnode/impl/inc/vnodeDef.h | 4 ++++ source/dnode/vnode/impl/src/vnodeMgr.c | 18 +++++++++++++----- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 2212a8c29a..b83ca53859 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -66,15 +66,26 @@ typedef struct SVnodeCfg { SWalCfg walCfg; } SVnodeCfg; +typedef struct SDnode SDnode; +typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); +typedef struct { + int32_t sver; + SDnode *pDnode; + char *timezone; + char *locale; + char *charset; + PutReqToVQueryQFp putReqToVQueryQFp; + uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) +} SVnodeOpt; + /* ------------------------ SVnode ------------------------ */ /** * @brief Initialize the vnode module * - * @param nthreads number of commit threads. 0 for no threads and - * a schedule queue should be given (TODO) + * @param pOption Option of the vnode mnodule * @return int 0 for success and -1 for failure */ -int vnodeInit(uint16_t nthreads); +int vnodeInit(const SVnodeOpt *pOption); /** * @brief clear a vnode diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index cd27781df3..f3016feda5 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -22,8 +22,8 @@ #include "dndTransport.h" #include "dndVnodes.h" #include "sync.h" -#include "wal.h" #include "tfs.h" +#include "wal.h" EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } @@ -153,6 +153,8 @@ static void dndCleanupEnv(SDnode *pDnode) { taosStopCacheRefreshWorker(); } +static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); } + SDnode *dndInit(SDnodeOpt *pOption) { taosIgnSIGPIPE(); taosBlockSIGPIPE(); @@ -196,7 +198,16 @@ SDnode *dndInit(SDnodeOpt *pOption) { return NULL; } - if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) { + SVnodeOpt vnodeOpt = { + .sver = pDnode->opt.sver, + .pDnode = pDnode, + .timezone = pDnode->opt.timezone, + .locale = pDnode->opt.locale, + .charset = pDnode->opt.charset, + .putReqToVQueryQFp = dndPutMsgToVQueryQ, + .nthreads = pDnode->opt.numOfCommitThreads, + }; + if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); dndCleanup(pDnode); return NULL; diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index e70e891794..361fdd10e0 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -57,6 +57,8 @@ typedef struct SVnodeMgr { pthread_cond_t hasTask; TD_DLIST(SVnodeTask) queue; // For vnode Mgmt + SDnode* pDnode; + PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -79,6 +81,8 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); +void vnodePutReqToVQueryQ(struct SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 784d1abb60..43527c13fe 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -19,17 +19,19 @@ SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; static void* loop(void* arg); -int vnodeInit(uint16_t nthreads) { +int vnodeInit(const SVnodeOpt *pOption) { if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { return 0; } vnodeMgr.stop = false; + vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; + vnodeMgr.pDnode = pOption->pDnode; // Start commit handers - if (nthreads > 0) { - vnodeMgr.nthreads = nthreads; - vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); + if (pOption->nthreads > 0) { + vnodeMgr.nthreads = pOption->nthreads; + vnodeMgr.threads = (pthread_t*)calloc(pOption->nthreads, sizeof(pthread_t)); if (vnodeMgr.threads == NULL) { return -1; } @@ -38,7 +40,7 @@ int vnodeInit(uint16_t nthreads) { pthread_cond_init(&(vnodeMgr.hasTask), NULL); TD_DLIST_INIT(&(vnodeMgr.queue)); - for (uint16_t i = 0; i < nthreads; i++) { + for (uint16_t i = 0; i < pOption->nthreads; i++) { pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL); pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); } @@ -89,6 +91,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } +void vnodePutReqToVQueryQ(struct SRpcMsg* pReq) { + if (vnodeMgr.putReqToVQueryQFp) { + (*vnodeMgr.putReqToVQueryQFp)(vnodeMgr.pDnode, pReq); + } +} + /* ------------------------ STATIC METHODS ------------------------ */ static void* loop(void* arg) { SVnodeTask* pTask; From ab3378e090100a98451f4fa606ca85c2a8fc9ec8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 16:44:11 -0800 Subject: [PATCH 7/8] add msg queue --- include/dnode/vnode/vnode.h | 11 ++++++----- source/dnode/mgmt/impl/src/dndVnodes.c | 6 ++++-- source/dnode/mgmt/impl/src/dnode.c | 3 +-- source/dnode/vnode/impl/inc/vnodeDef.h | 3 ++- source/dnode/vnode/impl/src/vnodeMain.c | 21 ++++++++++++--------- source/dnode/vnode/impl/src/vnodeMgr.c | 9 ++++----- 6 files changed, 29 insertions(+), 24 deletions(-) diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index b83ca53859..03a59b01e6 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -31,8 +31,12 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; +typedef struct SDnode SDnode; +typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); + typedef struct SVnodeCfg { int32_t vgId; + SDnode *pDnode; /** vnode buffer pool options */ struct { @@ -66,16 +70,13 @@ typedef struct SVnodeCfg { SWalCfg walCfg; } SVnodeCfg; -typedef struct SDnode SDnode; -typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef struct { int32_t sver; - SDnode *pDnode; char *timezone; char *locale; char *charset; - PutReqToVQueryQFp putReqToVQueryQFp; uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) + PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeOpt; /* ------------------------ SVnode ------------------------ */ @@ -100,7 +101,7 @@ void vnodeClear(); * @param pVnodeCfg options of the vnode * @return SVnode* The vnode object */ -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); /** * @brief Close a VNODE diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index bf27a542ae..9f585859a8 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -381,7 +381,8 @@ static void *dnodeOpenVnodeFunc(void *param) { pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId); + SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId}; + SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -581,7 +582,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId); + vnodeCfg.pDnode = pDnode; + SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); return -1; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index f3016feda5..ea42db96ab 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -200,12 +200,11 @@ SDnode *dndInit(SDnodeOpt *pOption) { SVnodeOpt vnodeOpt = { .sver = pDnode->opt.sver, - .pDnode = pDnode, .timezone = pDnode->opt.timezone, .locale = pDnode->opt.locale, .charset = pDnode->opt.charset, - .putReqToVQueryQFp = dndPutMsgToVQueryQ, .nthreads = pDnode->opt.numOfCommitThreads, + .putReqToVQueryQFp = dndPutMsgToVQueryQ, }; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 361fdd10e0..4f53dcd899 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -77,11 +77,12 @@ struct SVnode { SVnodeFS* pFs; tsem_t canCommit; SQHandle* pQuery; + SDnode* pDnode; }; int vnodeScheduleTask(SVnodeTask* task); -void vnodePutReqToVQueryQ(struct SRpcMsg *pReq); +void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index eb4b45bc20..85ccc9879e 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -15,27 +15,29 @@ #include "vnodeDef.h" -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - //if (pVnodeCfg == NULL) { - pVnodeCfg = &defaultVnodeOptions; - //} + SVnodeCfg cfg = defaultVnodeOptions; + if (pVnodeCfg != NULL) { + cfg.vgId = pVnodeCfg->vgId; + cfg.pDnode = pVnodeCfg->pDnode; + } // Validate options - if (vnodeValidateOptions(pVnodeCfg) < 0) { + if (vnodeValidateOptions(&cfg) < 0) { // TODO return NULL; } // Create the handle - pVnode = vnodeNew(path, pVnodeCfg, vid); + pVnode = vnodeNew(path, &cfg); if (pVnode == NULL) { // TODO: handle error return NULL; @@ -62,7 +64,7 @@ void vnodeClose(SVnode *pVnode) { void vnodeDestroy(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); @@ -71,7 +73,8 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vi return NULL; } - pVnode->vgId = vid; + pVnode->vgId = pVnodeCfg->vgId; + pVnode->pDnode = pVnodeCfg->pDnode; pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 43527c13fe..51f33031ac 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -26,7 +26,6 @@ int vnodeInit(const SVnodeOpt *pOption) { vnodeMgr.stop = false; vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; - vnodeMgr.pDnode = pOption->pDnode; // Start commit handers if (pOption->nthreads > 0) { @@ -91,10 +90,10 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } -void vnodePutReqToVQueryQ(struct SRpcMsg* pReq) { - if (vnodeMgr.putReqToVQueryQFp) { - (*vnodeMgr.putReqToVQueryQFp)(vnodeMgr.pDnode, pReq); - } +void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { + assert(vnodeMgr.putReqToVQueryQFp); + assert(pVnode->pDnode); + (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); } /* ------------------------ STATIC METHODS ------------------------ */ From 53b4c06d9b1442c0063dc6520fa8982d913f8da3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 17:07:34 -0800 Subject: [PATCH 8/8] add msg queue --- include/dnode/vnode/vnode.h | 2 +- source/dnode/mgmt/impl/inc/dndVnodes.h | 2 ++ source/dnode/mgmt/impl/src/dndVnodes.c | 26 ++++++++++++++++++++------ source/dnode/mgmt/impl/src/dnode.c | 4 +--- source/dnode/vnode/impl/inc/vnodeDef.h | 2 +- source/dnode/vnode/impl/src/vnodeMgr.c | 10 ++++++---- 6 files changed, 31 insertions(+), 15 deletions(-) diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 03a59b01e6..4332fc8e58 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -32,7 +32,7 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; typedef struct SDnode SDnode; -typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); +typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef struct SVnodeCfg { int32_t vgId; diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index b5fae62959..a78db602fe 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -36,6 +36,8 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 9f585859a8..2d7999fe5a 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -802,7 +802,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t } } -static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) { +static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) { int32_t code = 0; if (pQueue == NULL) { @@ -819,13 +819,15 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) } } - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && sendRsp) { if (pRpcMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; rpcSendResponse(&rsp); } rpcFreeCont(pRpcMsg->pCont); } + + return code; } static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { @@ -848,7 +850,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -856,7 +858,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -864,7 +866,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -872,11 +874,23 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + // pHead->vgId = htonl(pHead->vgId); + + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); + if (pVnode == NULL) return -1; + + int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false); + dndReleaseVnode(pDnode, pVnode); + return code; +} + static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) return -1; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index ea42db96ab..f3d5e09564 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -153,8 +153,6 @@ static void dndCleanupEnv(SDnode *pDnode) { taosStopCacheRefreshWorker(); } -static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); } - SDnode *dndInit(SDnodeOpt *pOption) { taosIgnSIGPIPE(); taosBlockSIGPIPE(); @@ -204,7 +202,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { .locale = pDnode->opt.locale, .charset = pDnode->opt.charset, .nthreads = pDnode->opt.numOfCommitThreads, - .putReqToVQueryQFp = dndPutMsgToVQueryQ, + .putReqToVQueryQFp = dndPutReqToVQueryQ, }; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 4f53dcd899..f9172dd351 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -82,7 +82,7 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); -void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); +int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 51f33031ac..cc369a0d15 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -90,10 +90,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } -void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { - assert(vnodeMgr.putReqToVQueryQFp); - assert(pVnode->pDnode); - (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); +int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { + if (pVnode == NULL || pVnode->pDnode == NULL || vnodeMgr.putReqToVQueryQFp == NULL) { + terrno = TSDB_CODE_VND_APP_ERROR; + return -1; + } + return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); } /* ------------------------ STATIC METHODS ------------------------ */