From 19dedc226accda1655633aa1dd66d5ed47282445 Mon Sep 17 00:00:00 2001 From: zyyang Date: Tue, 24 Nov 2020 18:34:02 +0800 Subject: [PATCH 01/12] [TD-2132]: add memory leak test case --- .../jdbc/cases/AppMemoryLeakTest.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/AppMemoryLeakTest.java diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/AppMemoryLeakTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/AppMemoryLeakTest.java new file mode 100644 index 0000000000..8a3dbb2bc1 --- /dev/null +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/AppMemoryLeakTest.java @@ -0,0 +1,23 @@ +package com.taosdata.jdbc.cases; + +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class AppMemoryLeakTest { + + @Test + public void testAppMemoryLeak() { + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + + while (true) { + DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata"); + } + } catch (ClassNotFoundException | SQLException e) { + e.printStackTrace(); + } + } +} From d1cd620696bbfe7f88efe04caf060d1688301d7b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 25 Nov 2020 16:29:02 +0800 Subject: [PATCH 02/12] TD-1455 TD-2196 --- src/dnode/src/dnodeVWrite.c | 4 ++-- src/inc/tsync.h | 6 +++--- src/inc/vnode.h | 6 ++++-- src/sync/src/syncRetrieve.c | 5 ++--- src/vnode/inc/vnodeInt.h | 2 +- src/vnode/src/vnodeMain.c | 10 ++++----- src/vnode/src/vnodeWrite.c | 42 ++++++++++++++++++++++++++++++++----- 7 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 959eb3c9c5..6d4b50ee54 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -176,7 +176,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { if (count <= 1) return; SRpcMsg rpcRsp = { - .handle = pWrite->rpcHandle, + .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rspRet.rsp, .contLen = pWrite->rspRet.len, .code = pWrite->code, @@ -206,7 +206,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite, - pWrite->rpcAhandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version); + pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); if (pWrite->code <= 0) pWrite->processedCount = 1; diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 77a3b36e73..398e1bf97c 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -23,7 +23,7 @@ extern "C" { #define TAOS_SYNC_MAX_REPLICA 5 #define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF -typedef enum _TAOS_SYNC_ROLE { +typedef enum { TAOS_SYNC_ROLE_OFFLINE = 0, TAOS_SYNC_ROLE_UNSYNCED = 1, TAOS_SYNC_ROLE_SYNCING = 2, @@ -31,7 +31,7 @@ typedef enum _TAOS_SYNC_ROLE { TAOS_SYNC_ROLE_MASTER = 4 } ESyncRole; -typedef enum _TAOS_SYNC_STATUS { +typedef enum { TAOS_SYNC_STATUS_INIT = 0, TAOS_SYNC_STATUS_START = 1, TAOS_SYNC_STATUS_FILE = 2, @@ -80,7 +80,7 @@ typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code); typedef void (*FNotifyRole)(int32_t vgId, int8_t role); // if a number of retrieving data failed, call this to start flow control -typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds); +typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level); // when data file is synced successfully, notity app typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 4e8389498b..5f643295d6 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include "trpc.h" #include "twal.h" typedef enum _VN_STATUS { @@ -51,8 +52,9 @@ typedef struct { typedef struct { int32_t code; int32_t processedCount; - void * rpcHandle; - void * rpcAhandle; + int32_t qtype; + void * pVnode; + SRpcMsg rpcMsg; SRspRet rspRet; char reserveForSync[16]; SWalHead pHead[]; diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 348f91820b..060badba9d 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -504,6 +504,8 @@ void *syncRetrieveData(void *param) { SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); + if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves); + pPeer->fileChanged = 0; pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (pPeer->syncFd < 0) { @@ -520,10 +522,7 @@ void *syncRetrieveData(void *param) { } if (pPeer->fileChanged) { - // if file is changed 3 times continuously, start flow control pPeer->numOfRetrieves++; - if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2)); } else { pPeer->numOfRetrieves = 0; if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 021831a644..cf9a1baa80 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -39,7 +39,7 @@ typedef struct { int32_t refCount; // reference count int32_t queuedWMsg; int32_t queuedRMsg; - int32_t delayMs; + int32_t flowctlLevel; int8_t status; int8_t role; int8_t accessState; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b94fea52bd..1ee968e7cb 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -34,7 +34,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); static void vnodeNotifyRole(int32_t vgId, int8_t role); -static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds); +static void vnodeCtrlFlow(int32_t vgId, int32_t level); static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); @@ -677,17 +677,15 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) { vnodeRelease(pVnode); } -static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) { +static void vnodeCtrlFlow(int32_t vgId, int32_t level) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { vError("vgId:%d, vnode not found while ctrl flow", vgId); return; } - if (pVnode->delayMs != mseconds) { - pVnode->delayMs = mseconds; - vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); - } + pVnode->flowctlLevel = level; + vDebug("vgId:%d, set flowctl level:%d", pVnode->vgId, level); vnodeRelease(pVnode); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a0227d84ba..cada188591 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -22,14 +22,17 @@ #include "tsdb.h" #include "twal.h" #include "tsync.h" +#include "ttimer.h" #include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" #include "syncInt.h" #include "tcq.h" +#include "dnode.h" #define MAX_QUEUED_MSG_NUM 10000 +extern void * tsDnodeTmr; static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); @@ -47,6 +50,32 @@ void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; } +static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) { + SVWriteMsg *pWrite = param; + SVnodeObj * pVnode = pWrite->pVnode; + + int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, &pWrite->rpcMsg); + if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) { + vDebug("vgId:%d, failed to reprocess msg after perform flowctl since %s", pVnode->vgId, tstrerror(code)); + dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); + } + + tfree(pWrite); + vnodeRelease(pWrite->pVnode); +} + +static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { + SVnodeObj *pVnode = pWrite->pVnode; + if (pVnode->flowctlLevel <= 0) return 0; + + int32_t ms = pVnode->flowctlLevel * 5; + void * unUsed = NULL; + taosTmrReset(vnodeFlowCtlMsgToWQueue, ms, pWrite, tsDnodeTmr, &unUsed); + + vDebug("vgId:%d, perform flowctl for %d ms", pVnode->vgId, ms); + return TSDB_CODE_RPC_ACTION_IN_PROGRESS; +} + int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t code = 0; SVnodeObj *pVnode = vparam; @@ -77,8 +106,6 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara // assign version pHead->version = pVnode->version + 1; - if (pVnode->delayMs) taosMsleep(pVnode->delayMs); - } else { // from wal or forward // for data from WAL or forward, version may be smaller if (pHead->version <= pVnode->version) return 0; @@ -218,9 +245,10 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { SVnodeObj *pVnode = vparam; SWalHead * pHead = wparam; + int32_t code = 0; if (qtype == TAOS_QTYPE_RPC) { - int32_t code = vnodeCheckWrite(pVnode); + code = vnodeCheckWrite(pVnode); if (code != TSDB_CODE_SUCCESS) return code; } @@ -237,14 +265,18 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar if (rparam != NULL) { SRpcMsg *pRpcMsg = rparam; - pWrite->rpcHandle = pRpcMsg->handle; - pWrite->rpcAhandle = pRpcMsg->ahandle; + pWrite->rpcMsg = *pRpcMsg; } memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); + pWrite->pVnode = pVnode; + pWrite->qtype = qtype; atomic_add_fetch_32(&pVnode->refCount, 1); + code = vnodePerformFlowCtrl(pWrite); + if (code != 0) return code; + int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); if (queued > MAX_QUEUED_MSG_NUM) { vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued); From ffbbb0cc44768e97698355bca586016598964b27 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 25 Nov 2020 20:36:44 +0800 Subject: [PATCH 03/12] TD-1455 TD-2196 --- src/common/inc/tglobal.h | 1 + src/common/src/tglobal.c | 12 ++++++++ src/vnode/src/vnodeMain.c | 6 ++-- src/vnode/src/vnodeWrite.c | 59 +++++++++++++++++++++----------------- 4 files changed, 49 insertions(+), 29 deletions(-) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 4087f638a9..efe3d7678a 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole; extern int32_t tsBalanceInterval; extern int32_t tsOfflineThreshold; extern int32_t tsMnodeEqualVnodeNum; +extern int32_t tsFlowCtrl; // restful extern int32_t tsEnableHttpModule; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 4495c3d928..f8bb965d28 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0; int32_t tsBalanceInterval = 300; // seconds int32_t tsOfflineThreshold = 86400*100; // seconds 10days int32_t tsMnodeEqualVnodeNum = 4; +int32_t tsFlowCtrl = 1; // restful int32_t tsEnableHttpModule = 1; @@ -971,6 +972,17 @@ static void doInitGlobalConfig(void) { cfg.maxValue = 1000; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // module configs + cfg.option = "flowctrl"; + cfg.ptr = &tsFlowCtrl; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); cfg.option = "http"; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 1ee968e7cb..df4376d5d0 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -660,7 +660,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { static void vnodeNotifyRole(int32_t vgId, int8_t role) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { - vError("vgId:%d, vnode not found while notify role", vgId); + vTrace("vgId:%d, vnode not found while notify role", vgId); return; } @@ -680,12 +680,12 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) { static void vnodeCtrlFlow(int32_t vgId, int32_t level) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { - vError("vgId:%d, vnode not found while ctrl flow", vgId); + vTrace("vgId:%d, vnode not found while flow ctrl", vgId); return; } pVnode->flowctlLevel = level; - vDebug("vgId:%d, set flowctl level:%d", pVnode->vgId, level); + vDebug("vgId:%d, set flowctrl level:%d", pVnode->vgId, level); vnodeRelease(pVnode); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index cada188591..4bf7b30ebd 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -17,6 +17,7 @@ #include "os.h" #include "taosmsg.h" #include "taoserror.h" +#include "tglobal.h" #include "tqueue.h" #include "trpc.h" #include "tsdb.h" @@ -40,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); +static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite); void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; @@ -50,32 +52,6 @@ void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; } -static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) { - SVWriteMsg *pWrite = param; - SVnodeObj * pVnode = pWrite->pVnode; - - int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, &pWrite->rpcMsg); - if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) { - vDebug("vgId:%d, failed to reprocess msg after perform flowctl since %s", pVnode->vgId, tstrerror(code)); - dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); - } - - tfree(pWrite); - vnodeRelease(pWrite->pVnode); -} - -static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { - SVnodeObj *pVnode = pWrite->pVnode; - if (pVnode->flowctlLevel <= 0) return 0; - - int32_t ms = pVnode->flowctlLevel * 5; - void * unUsed = NULL; - taosTmrReset(vnodeFlowCtlMsgToWQueue, ms, pWrite, tsDnodeTmr, &unUsed); - - vDebug("vgId:%d, perform flowctl for %d ms", pVnode->vgId, ms); - return TSDB_CODE_RPC_ACTION_IN_PROGRESS; -} - int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t code = 0; SVnodeObj *pVnode = vparam; @@ -298,3 +274,34 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { taosFreeQitem(pWrite); vnodeRelease(pVnode); } + +static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) { + SVWriteMsg *pWrite = param; + SVnodeObj * pVnode = pWrite->pVnode; + + int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, pWrite->rpcMsg.handle == NULL ? NULL : &pWrite->rpcMsg); + if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) { + vDebug("vgId:%d, failed to reprocess msg after perform flowctrl since %s", pVnode->vgId, tstrerror(code)); + dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); + } + + tfree(pWrite); + vnodeRelease(pWrite->pVnode); +} + +static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { + SVnodeObj *pVnode = pWrite->pVnode; + if (pVnode->flowctlLevel <= 0) return 0; + + if (tsFlowCtrl == 0) { + int32_t ms = pVnode->flowctlLevel * 2; + if (ms > 60000) ms = 60000; + vDebug("vgId:%d, perform flowctrl for %d ms", pVnode->vgId, ms); + taosMsleep(ms); + return 0; + } else { + void *unUsed = NULL; + taosTmrReset(vnodeFlowCtlMsgToWQueue, 5, pWrite, tsDnodeTmr, &unUsed); + return TSDB_CODE_RPC_ACTION_IN_PROGRESS; + } +} From 060299548e69b2eed1eb611429e96f5a922f8d16 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 26 Nov 2020 11:09:46 +0800 Subject: [PATCH 04/12] change log print --- src/dnode/src/dnodeMPeer.c | 2 +- src/dnode/src/dnodeMRead.c | 2 +- src/dnode/src/dnodeMWrite.c | 2 +- src/dnode/src/dnodeVRead.c | 2 +- src/os/inc/osAlpine.h | 1 + src/os/inc/osArm32.h | 1 + src/os/inc/osArm64.h | 1 + src/os/inc/osDarwin.h | 1 + src/os/inc/osLinux32.h | 1 + src/os/inc/osLinux64.h | 1 + src/os/inc/osWindows.h | 1 + src/vnode/inc/vnodeInt.h | 2 +- src/vnode/src/vnodeMain.c | 2 +- 13 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index 05b37bd338..ee6dc5212e 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -162,7 +162,7 @@ static void *dnodeProcessMPeerQueue(void *param) { break; } - dDebug("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]); + dTrace("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]); int32_t code = mnodeProcessPeerReq(pPeerMsg); dnodeSendRpcMPeerRsp(pPeerMsg, code); } diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index ee9da72e4d..65f3af7b3b 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -168,7 +168,7 @@ static void *dnodeProcessMReadQueue(void *param) { break; } - dDebug("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead, + dTrace("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead, taosMsg[pRead->rpcMsg.msgType]); int32_t code = mnodeProcessRead(pRead); dnodeSendRpcMReadRsp(pRead, code); diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 65c0d53819..ef2d49ef42 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) { break; } - dDebug("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle, + dTrace("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); int32_t code = mnodeProcessWrite(pWrite); diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 4cce54bf59..b42a627a3a 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) { break; } - dDebug("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle, + dTrace("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle, taosMsg[pRead->msgType], qtype); int32_t code = vnodeProcessRead(pVnode, pRead); diff --git a/src/os/inc/osAlpine.h b/src/os/inc/osAlpine.h index d939adfb6d..eba9459395 100644 --- a/src/os/inc/osAlpine.h +++ b/src/os/inc/osAlpine.h @@ -77,6 +77,7 @@ extern "C" { #include #include #include +#include typedef int(*__compar_fn_t)(const void *, const void *); void error (int, int, const char *); diff --git a/src/os/inc/osArm32.h b/src/os/inc/osArm32.h index 17b4d2dbd5..24ff95522e 100644 --- a/src/os/inc/osArm32.h +++ b/src/os/inc/osArm32.h @@ -76,6 +76,7 @@ extern "C" { #include #include #include +#include #define TAOS_OS_FUNC_LZ4 #define BUILDIN_CLZL(val) __builtin_clzll(val) diff --git a/src/os/inc/osArm64.h b/src/os/inc/osArm64.h index 3ae08b45f4..22f0000e96 100644 --- a/src/os/inc/osArm64.h +++ b/src/os/inc/osArm64.h @@ -77,6 +77,7 @@ extern "C" { #include #include #include +#include #ifdef __cplusplus } diff --git a/src/os/inc/osDarwin.h b/src/os/inc/osDarwin.h index 7bb844831e..1461ec6d3b 100644 --- a/src/os/inc/osDarwin.h +++ b/src/os/inc/osDarwin.h @@ -70,6 +70,7 @@ extern "C" { #include #include #include +#include #define TAOS_OS_FUNC_FILE_SENDIFLE diff --git a/src/os/inc/osLinux32.h b/src/os/inc/osLinux32.h index 93e917e797..cfef05368f 100644 --- a/src/os/inc/osLinux32.h +++ b/src/os/inc/osLinux32.h @@ -76,6 +76,7 @@ extern "C" { #include #include #include +#include #define TAOS_OS_FUNC_LZ4 #define BUILDIN_CLZL(val) __builtin_clzll(val) diff --git a/src/os/inc/osLinux64.h b/src/os/inc/osLinux64.h index c0841c41bd..a2febd51b7 100644 --- a/src/os/inc/osLinux64.h +++ b/src/os/inc/osLinux64.h @@ -79,6 +79,7 @@ extern "C" { #include #endif #include +#include #ifdef __cplusplus } diff --git a/src/os/inc/osWindows.h b/src/os/inc/osWindows.h index 5003e48c44..5a1e642572 100644 --- a/src/os/inc/osWindows.h +++ b/src/os/inc/osWindows.h @@ -40,6 +40,7 @@ #include #include #include +#include #include "msvcProcess.h" #include "msvcDirect.h" #include "msvcFcntl.h" diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index cf9a1baa80..7fc9b100ef 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -39,7 +39,7 @@ typedef struct { int32_t refCount; // reference count int32_t queuedWMsg; int32_t queuedRMsg; - int32_t flowctlLevel; + int32_t flowctrlLevel; int8_t status; int8_t role; int8_t accessState; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index df4376d5d0..cd6d2ea7c0 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -684,7 +684,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level) { return; } - pVnode->flowctlLevel = level; + pVnode->flowctrlLevel = level; vDebug("vgId:%d, set flowctrl level:%d", pVnode->vgId, level); vnodeRelease(pVnode); From 810023ca6894c2c220c20344246c16a7558a8c1d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 26 Nov 2020 13:13:21 +0800 Subject: [PATCH 05/12] TD-1455 TD-2196 --- src/inc/taoserror.h | 1 + src/vnode/src/vnodeWrite.c | 46 ++++---- tests/script/unique/cluster/flowctrl.sim | 131 +++++++++++++++++++++++ 3 files changed, 160 insertions(+), 18 deletions(-) create mode 100644 tests/script/unique/cluster/flowctrl.sim diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index f1e736130c..77ec5350ba 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -205,6 +205,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid ve TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID") diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 4bf7b30ebd..57bd407cd1 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -250,15 +250,15 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar atomic_add_fetch_32(&pVnode->refCount, 1); - code = vnodePerformFlowCtrl(pWrite); - if (code != 0) return code; - int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); if (queued > MAX_QUEUED_MSG_NUM) { vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued); taosMsleep(1); } + code = vnodePerformFlowCtrl(pWrite); + if (code != 0) return 0; + vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); taosWriteQitem(pVnode->wqueue, qtype, pWrite); @@ -268,40 +268,50 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { SVnodeObj *pVnode = vparam; - atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); - vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); + int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); + vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued); taosFreeQitem(pWrite); vnodeRelease(pVnode); } -static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) { +static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { SVWriteMsg *pWrite = param; SVnodeObj * pVnode = pWrite->pVnode; + int32_t code = TSDB_CODE_VND_SYNCING; - int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, pWrite->rpcMsg.handle == NULL ? NULL : &pWrite->rpcMsg); - if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) { - vDebug("vgId:%d, failed to reprocess msg after perform flowctrl since %s", pVnode->vgId, tstrerror(code)); + pWrite->processedCount++; + if (pWrite->processedCount > 100) { + vError("vgId:%d, msg:%p, failed to process since %s", pVnode->vgId, pWrite, tstrerror(code)); + pWrite->processedCount = 1; dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); + } else { + code = vnodePerformFlowCtrl(pWrite); + if (code == 0) { + vTrace("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId); + pWrite->processedCount = 0; + taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); + } } - - tfree(pWrite); - vnodeRelease(pWrite->pVnode); } static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { SVnodeObj *pVnode = pWrite->pVnode; - if (pVnode->flowctlLevel <= 0) return 0; + if (pVnode->flowctrlLevel <= 0) return 0; + if (pWrite->qtype != TAOS_QTYPE_RPC) return 0; if (tsFlowCtrl == 0) { - int32_t ms = pVnode->flowctlLevel * 2; - if (ms > 60000) ms = 60000; - vDebug("vgId:%d, perform flowctrl for %d ms", pVnode->vgId, ms); + int32_t ms = pow(2, pVnode->flowctrlLevel + 2); + if (ms > 100) ms = 100; + vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms); taosMsleep(ms); return 0; } else { void *unUsed = NULL; - taosTmrReset(vnodeFlowCtlMsgToWQueue, 5, pWrite, tsDnodeTmr, &unUsed); - return TSDB_CODE_RPC_ACTION_IN_PROGRESS; + taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsed); + + vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, count:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, + pWrite->processedCount); + return TSDB_CODE_VND_ACTION_IN_PROGRESS; } } diff --git a/tests/script/unique/cluster/flowctrl.sim b/tests/script/unique/cluster/flowctrl.sim new file mode 100644 index 0000000000..6dc60d9fba --- /dev/null +++ b/tests/script/unique/cluster/flowctrl.sim @@ -0,0 +1,131 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3 + +system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 + +system sh/cfg.sh -n dnode1 -c http -v 0 +system sh/cfg.sh -n dnode2 -c http -v 0 +system sh/cfg.sh -n dnode3 -c http -v 0 + +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000 +system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000 +system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000 + +system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20 +system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20 +system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20 + +system sh/cfg.sh -n dnode1 -c replica -v 3 +system sh/cfg.sh -n dnode2 -c replica -v 3 +system sh/cfg.sh -n dnode3 -c replica -v 3 + +print ============== deploy + +system sh/exec.sh -n dnode1 -s start +sleep 5001 +sql connect + +sql create dnode $hostname2 +sql create dnode $hostname3 +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start + +print =============== step1 +$x = 0 +show1: + $x = $x + 1 + sleep 2000 + if $x == 5 then + return -1 + endi +sql show mnodes -x show1 +$mnode1Role = $data2_1 +print mnode1Role $mnode1Role +$mnode2Role = $data2_2 +print mnode2Role $mnode2Role +$mnode3Role = $data2_3 +print mnode3Role $mnode3Role + +if $mnode1Role != master then + goto show1 +endi +if $mnode2Role != slave then + goto show1 +endi +if $mnode3Role != slave then + goto show1 +endi + +print =============== step2 + +sql create database db replica 3 +sql use db +sql create table tb (ts timestamp, test int) + +$x = 0 +while $x < 100 + $ms = $x . s + sql insert into tb values (now + $ms , $x ) + $x = $x + 1 +endw + +print =============== step3 +sleep 3000 + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT + +print =============== step4 +sleep 5000 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start + +print =============== step5 +sleep 8000 +while $x < 200 + $ms = $x . s + sql insert into tb values (now + $ms , $x ) + $x = $x + 1 +endw + +print =============== step6 +system sh/exec.sh -n dnode2 -s stop -x SIGINT +sleep 3000 +while $x < 300 + $ms = $x . s + sql insert into tb values (now + $ms , $x ) + $x = $x + 1 +endw + +system sh/exec.sh -n dnode2 -s start + +sleep 6000 +print =============== step7 +while $x < 400 + $ms = $x . s + sql insert into tb values (now + $ms , $x ) + $x = $x + 1 + sleep 1 +endw + +print =============== step8 +sql select * from tb +print rows $rows +if $rows != 400 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT \ No newline at end of file From e9c83f8251545936fe424a9004ddb4c6003c8612 Mon Sep 17 00:00:00 2001 From: zyyang Date: Thu, 26 Nov 2020 14:33:53 +0800 Subject: [PATCH 06/12] [TD-2132]: add memory leak test case --- .../jdbc/cases/AppMemoryLeakTest.java | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/AppMemoryLeakTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/AppMemoryLeakTest.java index 8a3dbb2bc1..8de2e3b442 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/AppMemoryLeakTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/AppMemoryLeakTest.java @@ -5,19 +5,40 @@ import org.junit.Test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.sql.Statement; public class AppMemoryLeakTest { - @Test - public void testAppMemoryLeak() { - try { - Class.forName("com.taosdata.jdbc.TSDBDriver"); - - while (true) { - DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata"); - } - } catch (ClassNotFoundException | SQLException e) { - e.printStackTrace(); + @Test(expected = SQLException.class) + public void testCreateTooManyConnection() throws ClassNotFoundException, SQLException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + int conCnt = 0; + while (true) { + Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata"); + System.out.println(conCnt++ + " : " + conn); } } + + @Test + public void testCreateTooManyStatement() throws ClassNotFoundException, SQLException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + int stmtCnt = 0; + Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata"); + while (true) { + Statement stmt = conn.createStatement(); + System.out.println(++stmtCnt + " : " + stmt); + } + } + + public static void main(String[] args) throws ClassNotFoundException, SQLException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + int stmtCnt = 0; + Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata"); + while (true) { + Statement stmt = conn.createStatement(); + System.out.println(++stmtCnt + " : " + stmt); + } + } + + } From d3729d273b5b1f200df77b2f1fc137148b2e5ff9 Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Thu, 26 Nov 2020 15:09:41 +0800 Subject: [PATCH 07/12] add test for column which not exist --- tests/pytest/concurrent_inquiry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pytest/concurrent_inquiry.py b/tests/pytest/concurrent_inquiry.py index 5d1e9a7537..6f4258312d 100644 --- a/tests/pytest/concurrent_inquiry.py +++ b/tests/pytest/concurrent_inquiry.py @@ -146,7 +146,7 @@ class ConcurrentInquiry: col_list=self.stb_stru_list[tbi-1] tag_list=self.stb_tag_list[tbi-1] is_stb=1 - tlist=col_list+tag_list + tlist=col_list+tag_list+['abc'] #增加不存在的域'abc',是否会引起新bug con_rand=random.randint(0,len(condition_list)) func_rand=random.randint(0,len(func_list)) col_rand=random.randint(0,len(col_list)) From 651672957d470f715b265c29892f495d62c94da0 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Thu, 26 Nov 2020 16:32:10 +0800 Subject: [PATCH 08/12] [TD-2240] update restfulInsert --- tests/pytest/insert/restfulInsert.py | 37 ++++++++++++++++++---------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/tests/pytest/insert/restfulInsert.py b/tests/pytest/insert/restfulInsert.py index c39d1cbe48..e3a963f1d4 100644 --- a/tests/pytest/insert/restfulInsert.py +++ b/tests/pytest/insert/restfulInsert.py @@ -18,10 +18,10 @@ import time import argparse class RestfulInsert: - def __init__(self, host, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder): + def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder): self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='} self.url = "http://%s:6041/rest/sql" % host - self.ts = 1500000000000 + self.ts = startTimestamp self.dbname = dbname self.numOfThreads = threads self.numOfTables = tables @@ -36,8 +36,10 @@ class RestfulInsert: for i in range(tablesPerThread): tableID = threadID * tablesPerThread name = 'beijing' if tableID % 2 == 0 else 'shanghai' - data = "create table %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name) - requests.post(self.url, data, headers = self.header) + data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name) + response = requests.post(self.url, data, headers = self.header) + if response.status_code != 200: + print(response.content) def insertData(self, threadID): print("thread %d started" % threadID) @@ -50,7 +52,9 @@ class RestfulInsert: values = [] for k in range(self.batchSize): data += "(%d, %d, %d, %d)" % (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)) - requests.post(self.url, data, headers = self.header) + response = requests.post(self.url, data, headers = self.header) + if response.status_code != 200: + print(response.content) def insertUnlimitedData(self, threadID): print("thread %d started" % threadID) @@ -76,15 +80,15 @@ class RestfulInsert: else: random.shuffle(values) for k in range(len(values)): - data += values[k] - requests.post(self.url, data, headers = self.header) + data += values[k] + response = requests.post(self.url, data, headers = self.header) + if response.status_code != 200: + print(response.content) - def run(self): - data = "drop database if exists %s" % self.dbname + def run(self): + data = "create database if not exists %s" % self.dbname requests.post(self.url, data, headers = self.header) - data = "create database %s" % self.dbname - requests.post(self.url, data, headers = self.header) - data = "create table %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname + data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname requests.post(self.url, data, headers = self.header) threads = [] @@ -120,6 +124,13 @@ parser.add_argument( default='127.0.0.1', type=str, help='host name to be connected (default: 127.0.0.1)') +parser.add_argument( + '-S', + '--start-timestamp', + action='store', + default=1500000000000, + type=int, + help='insert data from timestamp (default: 1500000000000)') parser.add_argument( '-d', '--db-name', @@ -169,5 +180,5 @@ parser.add_argument( help='The order of test data (default: False)') args = parser.parse_args() -ri = RestfulInsert(args.host_name, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order) +ri = RestfulInsert(args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order) ri.run() \ No newline at end of file From fa25d38dea8ec9e310c85fac1da6aa36c70104ef Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Thu, 26 Nov 2020 16:37:11 +0800 Subject: [PATCH 09/12] full test --- tests/test-all.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test-all.sh b/tests/test-all.sh index ff47cbfd71..5897978bce 100755 --- a/tests/test-all.sh +++ b/tests/test-all.sh @@ -17,9 +17,9 @@ function runSimCaseOneByOne { echo -e "${GREEN}$case success${NC}" | tee -a out.log || \ echo -e "${RED}$case failed${NC}" | tee -a out.log out_log=`tail -1 out.log ` - if [[ $out_log =~ 'failed' ]];then - exit 8 - fi + # if [[ $out_log =~ 'failed' ]];then + # exit 8 + # fi end_time=`date +%s` echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log fi @@ -42,9 +42,9 @@ function runPyCaseOneByOne { echo -e "${RED}$case failed${NC}" | tee -a pytest-out.log end_time=`date +%s` out_log=`tail -1 pytest-out.log ` - if [[ $out_log =~ 'failed' ]];then - exit 8 - fi + # if [[ $out_log =~ 'failed' ]];then + # exit 8 + # fi echo execution time of $case was `expr $end_time - $start_time`s. | tee -a pytest-out.log else $line > /dev/null 2>&1 From 19eccc86ade72ded17cd4c4033b44b275137dc0f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 26 Nov 2020 08:44:20 +0000 Subject: [PATCH 10/12] fix TD-2184 --- src/tsdb/src/tsdbMemTable.c | 4 ++-- src/util/src/tskiplist.c | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 71944c87c6..4c51608b5d 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -871,10 +871,10 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); if (pBufBlock->offset == 0) { // return the block to buffer pool - tsdbLockRepo(pRepo); + if (tsdbLockRepo(pRepo) < 0) return; SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList); tdListPrependNode(pBufPool->bufBlockList, pNode); - tsdbUnlockRepo(pRepo); + if (tsdbUnlockRepo(pRepo) < 0) return; } } else { ASSERT(listNEles(pRepo->mem->extraBuffList) > 0); diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index a36f7f0261..20bec16954 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -291,7 +291,10 @@ bool tSkipListIterNext(SSkipListIterator *iter) { iter->next = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0); iter->step++; } else { - if (iter->cur == pSkipList->pHead) return false; + if (iter->cur == pSkipList->pHead) { + tSkipListUnlock(pSkipList); + return false; + } iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0); // a new node is inserted into between iter->cur and iter->next, ignore it From 11230382920a402fe2c9010a2b16e4b544e22b07 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 26 Nov 2020 14:24:18 +0000 Subject: [PATCH 11/12] TD-2245 --- src/dnode/src/dnodePeer.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 4c44924cd0..6bf22cee4e 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -151,9 +151,9 @@ void dnodeCleanupClient() { } static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) { + if (dnodeGetRunStatus() == TSDB_RUN_STATUS_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; - dDebug("msg:%p is ignored since dnode not running", pMsg); + dDebug("msg:%p is ignored since dnode is stopping", pMsg); rpcFreeCont(pMsg->pCont); return; } From 86479c1f6a42a97d0ef5d880e18565eae1660ded Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 27 Nov 2020 11:33:44 +0800 Subject: [PATCH 12/12] TD-2246 --- tests/test/c/CMakeLists.txt | 7 ++-- tests/test/c/hashIterator.c | 72 +++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 tests/test/c/hashIterator.c diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index 26aa20e647..11480a8ba2 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -31,8 +31,8 @@ IF (TD_LINUX) #add_executable(createTablePerformance createTablePerformance.c) #target_link_libraries(createTablePerformance taos_static tutil common pthread) - #add_executable(createNormalTable createNormalTable.c) - #target_link_libraries(createNormalTable taos_static tutil common pthread) + add_executable(createNormalTable createNormalTable.c) + target_link_libraries(createNormalTable taos_static tutil common pthread) #add_executable(queryPerformance queryPerformance.c) #target_link_libraries(queryPerformance taos_static tutil common pthread) @@ -45,5 +45,8 @@ IF (TD_LINUX) #add_executable(invalidTableId invalidTableId.c) #target_link_libraries(invalidTableId taos_static tutil common pthread) + + add_executable(hashIterator hashIterator.c) + target_link_libraries(hashIterator taos_static tutil common pthread) ENDIF() diff --git a/tests/test/c/hashIterator.c b/tests/test/c/hashIterator.c new file mode 100644 index 0000000000..cbd8a0895e --- /dev/null +++ b/tests/test/c/hashIterator.c @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taos.h" +#include "tulog.h" +#include "tutil.h" +#include "hash.h" + +typedef struct HashTestRow { + int32_t keySize; + char key[100]; +} HashTestRow; + +int main(int argc, char *argv[]) { + _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + void * hashHandle = taosHashInit(100, hashFp, true, HASH_ENTRY_LOCK); + + pPrint("insert 3 rows to hash"); + for (int32_t t = 0; t < 3; ++t) { + HashTestRow row = {0}; + row.keySize = sprintf(row.key, "0.db.st%d", t); + + taosHashPut(hashHandle, row.key, row.keySize, &row, sizeof(HashTestRow)); + } + + pPrint("start iterator"); + HashTestRow *row = taosHashIterate(hashHandle, NULL); + while (row) { + pPrint("drop key:%s", row->key); + taosHashRemove(hashHandle, row->key, row->keySize); + + pPrint("get rows from hash"); + for (int32_t t = 0; t < 3; ++t) { + HashTestRow r = {0}; + r.keySize = sprintf(r.key, "0.db.st%d", t); + + void *result = taosHashGet(hashHandle, r.key, r.keySize); + pPrint("get key:%s result:%p", r.key, result); + } + + //Before getting the next iterator, the object just deleted can be obtained + row = taosHashIterate(hashHandle, row); + } + + pPrint("stop iterator"); + taosHashCancelIterate(hashHandle, row); + + pPrint("get rows from hash"); + for (int32_t t = 0; t < 3; ++t) { + HashTestRow r = {0}; + r.keySize = sprintf(r.key, "0.db.st%d", t); + + void *result = taosHashGet(hashHandle, r.key, r.keySize); + pPrint("get key:%s result:%p", r.key, result); + } + + return 0; +} \ No newline at end of file