diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index e380206696..6ab4eeaa8a 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -54,7 +54,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const return; } - strtolower(pSql->sqlstr, sqlstr); + strntolower(pSql->sqlstr, sqlstr, sqlLen); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); pSql->cmd.curSql = pSql->sqlstr; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5d5e546943..0677463d8d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -263,12 +263,29 @@ TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { return pSql; } TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { - char* buf = malloc(sqlLen + 1); - buf[sqlLen] = 0; - strncpy(buf, sqlstr, sqlLen); - TAOS_RES *res = taos_query(taos, buf); - free(buf); - return res; + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL || pObj->signature != pObj) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + if (sqlLen > tsMaxSQLStringLen) { + tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); + terrno = TSDB_CODE_TSC_INVALID_SQL; + return NULL; + } + + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + tscError("failed to malloc sqlObj"); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + + doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); + + tsem_wait(&pSql->rspSem); + return pSql; } int taos_result_precision(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 12ea4ad78d..719d80aa77 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -318,7 +318,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->maxPoints = maxRows; pCols->bufSize = maxRowSize * maxRows; - pCols->buf = malloc(pCols->bufSize); + pCols->buf = calloc(1, pCols->bufSize); if (pCols->buf == NULL) { free(pCols); return NULL; diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 86a222b3e6..8e1696c802 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -399,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); - void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); + void *pVnode = vnodeAcquire(pCreate->cfg.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); vnodeRelease(pVnode); @@ -413,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg); - void *pVnode = vnodeAcquireVnode(pAlter->cfg.vgId); + void *pVnode = vnodeAcquire(pAlter->cfg.vgId); if (pVnode != NULL) { dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId); int32_t code = vnodeAlter(pVnode, pAlter); diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 66135a93e9..bbea1a5e0b 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - void *pVnode; while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - pVnode = vnodeAcquireVnode(pHead->vgId); + taos_queue queue = vnodeAcquireRqueue(pHead->vgId); - if (pVnode == NULL) { + if (queue == NULL) { leftLen -= pHead->contLen; pCont -= pHead->contLen; continue; } // put message into queue - taos_queue queue = vnodeGetRqueue(pVnode); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = *pMsg; pRead->pCont = pCont; @@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) { // dynamically adjust the number of threads } -void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) { - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); - pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - pRead->pCont = qhandle; - pRead->contLen = 0; - - assert(pVnode != NULL); - taos_queue queue = vnodeAcquireRqueue(pVnode); - - taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); -} - void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { SRpcMsg rpcRsp = { .handle = pRead->rpcMsg.handle, diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index ba36e537a6..dc09a03e14 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - taos_queue queue = vnodeGetWqueue(pHead->vgId); + taos_queue queue = vnodeAcquireWqueue(pHead->vgId); if (queue) { // put message into queue SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 096aae58f2..b561c407a3 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); void dnodeFreeVnodeWqueue(void *queue); void *dnodeAllocateVnodeRqueue(void *pVnode); void dnodeFreeVnodeRqueue(void *rqueue); -void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 972db294f6..65b91d87e4 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); typedef void (*FNotifyRole)(void *ahandle, int8_t role); // when data file is synced successfully, notity app -typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); +typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef struct { int32_t vgId; // vgroup ID diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 77c72c2451..15ddb6afee 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -22,10 +22,10 @@ extern "C" { typedef enum _VN_STATUS { TAOS_VN_STATUS_INIT, - TAOS_VN_STATUS_UPDATING, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING, - TAOS_VN_STATUS_DELETING, + TAOS_VN_STATUS_UPDATING, + TAOS_VN_STATUS_RESET, } EVnStatus; typedef struct { @@ -47,13 +47,10 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); -void vnodeRelease(void *pVnode); -void* vnodeAcquireVnode(int32_t vgId); // add refcount -void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged - -void* vnodeAcquireRqueue(void *); -void* vnodeGetRqueue(void *); -void* vnodeGetWqueue(int32_t vgId); +void* vnodeAcquire(int32_t vgId); // add refcount +void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue +void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue +void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index 1ba57bbaaa..f7c69e3973 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -133,6 +133,8 @@ char **strsplit(char *src, const char *delim, int32_t *num); char* strtolower(char *dst, const char *src); +char* strntolower(char *dst, const char *src, int32_t n); + int64_t strnatoi(char *num, int32_t len); //char* strreplace(const char* str, const char* pattern, const char* rep); diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index b225dfa36a..5de61a3d57 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI return -1; } + /* set REUSEADDR option, so the portnumber can be re-used */ + int reuse = 1; + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + close(sockFd); + return -1; + }; + if ( clientIp != 0) { memset((char *)&clientAddr, 0, sizeof(clientAddr)); clientAddr.sin_family = AF_INET; diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 1a74359f47..c8df34e1cd 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -234,6 +234,32 @@ char* strtolower(char *dst, const char *src) { *p = 0; return dst; } +char* strntolower(char *dst, const char *src, int32_t n) { + int esc = 0; + char quote = 0, *p = dst, c; + + assert(dst != NULL); + + for (c = *src++; n-- > 0; c = *src++) { + if (esc) { + esc = 0; + } else if (quote) { + if (c == '\\') { + esc = 1; + } else if (c == quote) { + quote = 0; + } + } else if (c >= 'A' && c <= 'Z') { + c -= 'A' - 'a'; + } else if (c == '\'' || c == '"') { + quote = c; + } + *p++ = c; + } + + *p = 0; + return dst; +} char *paGetToken(char *string, char **token, int32_t *tokenLen) { char quote = 0; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 77db4fd04c..74cfbf1e73 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,7 +37,7 @@ extern int32_t vDebugFlag; typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count - int status; + int8_t status; int8_t role; int8_t accessState; int64_t version; // current version @@ -55,6 +55,8 @@ typedef struct { SWalCfg walCfg; void *qMgmt; char *rootDir; + tsem_t sem; + int8_t dropped; char db[TSDB_DB_NAME_LEN]; } SVnodeObj; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 539f9c6851..bf98824570 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); -static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } @@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) { SVnodeObj *pVnode = *ppVnode; vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount); - pVnode->status = TAOS_VN_STATUS_DELETING; + pVnode->dropped = 1; vnodeCleanUp(pVnode); return TSDB_CODE_SUCCESS; @@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // cfgVersion can be corrected by status msg - if (pVnode->status != TAOS_VN_STATUS_READY) { + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); return TSDB_CODE_SUCCESS; } - // the vnode may always fail to synchronize because of it in low cfgVersion - // so cannot use the following codes - // if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED) - // return TSDB_CODE_VND_NOT_SYNCED; - - pVnode->status = TAOS_VN_STATUS_UPDATING; - int32_t code = vnodeSaveCfg(pVnodeCfg); if (code != TSDB_CODE_SUCCESS) { pVnode->status = TAOS_VN_STATUS_READY; @@ -194,10 +187,12 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { return code; } - code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); - if (code != TSDB_CODE_SUCCESS) { - pVnode->status = TAOS_VN_STATUS_READY; - return code; + if (pVnode->tsdb) { + code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); + if (code != TSDB_CODE_SUCCESS) { + pVnode->status = TAOS_VN_STATUS_READY; + return code; + } } pVnode->status = TAOS_VN_STATUS_READY; @@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); pVnode->accessState = TSDB_VN_ALL_ACCCESS; + tsem_init(&pVnode->sem, 0, 0); int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { @@ -319,7 +315,6 @@ int32_t vnodeClose(int32_t vgId) { SVnodeObj *pVnode = *ppVnode; vDebug("vgId:%d, vnode will be closed", pVnode->vgId); - pVnode->status = TAOS_VN_STATUS_CLOSING; vnodeCleanUp(pVnode); return 0; @@ -334,6 +329,8 @@ void vnodeRelease(void *pVnodeRaw) { if (refCount > 0) { vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); + if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) + tsem_post(&pVnode->sem); return; } @@ -344,11 +341,6 @@ void vnodeRelease(void *pVnodeRaw) { tsdbCloseRepo(pVnode->tsdb, 1); pVnode->tsdb = NULL; - // stop continuous query - if (pVnode->cq) - cqClose(pVnode->cq); - pVnode->cq = NULL; - if (pVnode->wal) walClose(pVnode->wal); pVnode->wal = NULL; @@ -363,20 +355,21 @@ void vnodeRelease(void *pVnodeRaw) { tfree(pVnode->rootDir); - if (pVnode->status == TAOS_VN_STATUS_DELETING) { + if (pVnode->dropped) { char rootDir[TSDB_FILENAME_LEN] = {0}; sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId); taosMvDir(tsVnodeBakDir, rootDir); taosRemoveDir(rootDir); } + tsem_destroy(&pVnode->sem); free(pVnode); int32_t count = taosHashGetSize(tsDnodeVnodesHash); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); } -void *vnodeGetVnode(int32_t vgId) { +void *vnodeAcquire(int32_t vgId) { SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; @@ -384,35 +377,38 @@ void *vnodeGetVnode(int32_t vgId) { return NULL; } - return *ppVnode; -} - -void *vnodeAcquireVnode(int32_t vgId) { - SVnodeObj *pVnode = vnodeGetVnode(vgId); - if (pVnode == NULL) return pVnode; - + SVnodeObj *pVnode = *ppVnode; atomic_add_fetch_32(&pVnode->refCount, 1); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); return pVnode; } -void *vnodeAcquireRqueue(void *param) { - SVnodeObj *pVnode = param; +void *vnodeAcquireRqueue(int32_t vgId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) return NULL; - atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount); - return ((SVnodeObj *)pVnode)->rqueue; + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + vnodeRelease(pVnode); + return NULL; + } + + return pVnode->rqueue; } -void *vnodeGetRqueue(void *pVnode) { - return ((SVnodeObj *)pVnode)->rqueue; -} - -void *vnodeGetWqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeAcquireVnode(vgId); +void *vnodeAcquireWqueue(int32_t vgId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) return NULL; + + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + vnodeRelease(pVnode); + return NULL; + } + return pVnode->wqueue; } @@ -484,7 +480,7 @@ void vnodeBuildStatusMsg(void *param) { void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { for (int32_t i = 0; i < numOfVnodes; ++i) { pAccess[i].vgId = htonl(pAccess[i].vgId); - SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId); + SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId); if (pVnode != NULL) { pVnode->accessState = pAccess[i].accessState; if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { @@ -498,11 +494,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + int i = 0; + + if (pVnode->status != TAOS_VN_STATUS_INIT) { + // it may be in updateing or reset state, then it shall wait + while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { + if (++i % 1000 == 0) { + sched_yield(); + } + } + } // stop replication module if (pVnode->sync) { - syncStop(pVnode->sync); + void *sync = pVnode->sync; pVnode->sync = NULL; + syncStop(sync); + } + + // stop continuous query + if (pVnode->cq) { + void *cq = pVnode->cq; + pVnode->cq = NULL; + cqClose(cq); } vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); @@ -549,18 +563,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { cqStop(pVnode->cq); } -static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { - SVnodeObj *pVnode = ahandle; - vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); - - pVnode->fversion = fversion; - pVnode->version = fversion; - vnodeSaveVersion(pVnode); - +static int vnodeResetTsdb(SVnodeObj *pVnode) +{ char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); - // clsoe tsdb, then open tsdb - tsdbCloseRepo(pVnode->tsdb, 0); + + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) + return -1; + + void *tsdb = pVnode->tsdb; + pVnode->tsdb = NULL; + + // acquire vnode + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + + if (refCount > 2) + tsem_wait(&pVnode->sem); + + // close tsdb, then open tsdb + tsdbCloseRepo(tsdb, 0); STsdbAppH appH = {0}; appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; @@ -569,6 +590,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { appH.cqDropFunc = cqDrop; appH.configFunc = dnodeSendCfgTableToRecv; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); + + pVnode->status = TAOS_VN_STATUS_READY; + vnodeRelease(pVnode); + + return 0; +} + +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { + SVnodeObj *pVnode = ahandle; + vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); + + pVnode->fversion = fversion; + pVnode->version = fversion; + vnodeSaveVersion(pVnode); + + return vnodeResetTsdb(pVnode); } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index f529b713cf..973df7c5a1 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -26,6 +26,7 @@ #include "tsdb.h" #include "vnode.h" #include "vnodeInt.h" +#include "tqueue.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); @@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return TSDB_CODE_VND_INVALID_STATUS; } + // tsdb may be in reset state + if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING) + return TSDB_CODE_RPC_NOT_READY; + // TODO: Later, let slave to support query if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); @@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } +static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + pRead->pCont = qhandle; + pRead->contLen = 0; + + atomic_add_fetch_32(&pVnode->refCount, 1); + taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); +} + static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void *pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; @@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); - dnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } @@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } else { // if failed to dump result, free qhandle immediately if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { if (qHasMoreResultsToRetrieve(*handle)) { - dnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); pRet->qhandle = *handle; freeHandle = false; } else { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 09e4b43ed3..6b9b8ca4fd 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { return TSDB_CODE_VND_NO_WRITE_AUTH; } + // tsdb may be in reset state + if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING) + return TSDB_CODE_RPC_NOT_READY; + if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state } - if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { + if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); return TSDB_CODE_RPC_NOT_READY; } diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 445baa9e45..b9a9e4f024 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -136,6 +136,8 @@ echo "defaultPass taosdata" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG echo "clog 2" >> $TAOS_CFG +#echo "cache 1" >> $TAOS_CFG +#echo "block 2" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG echo "numOfTotalVnodes 4" >> $TAOS_CFG echo "maxVgroupsPerDb 4" >> $TAOS_CFG diff --git a/tests/script/sh/exec.sh b/tests/script/sh/exec.sh index 6928039be1..2f294075a1 100755 --- a/tests/script/sh/exec.sh +++ b/tests/script/sh/exec.sh @@ -88,7 +88,9 @@ if [ "$EXEC_OPTON" = "start" ]; then echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR if [ "$SHELL_OPTION" = "true" ]; then - nohup valgrind --log-file=${LOG_DIR}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & + TT=`date +%s` + mkdir ${LOG_DIR}/${TT} + nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & else nohup $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & fi diff --git a/tests/script/unique/vnode/replica2_a_large.sim b/tests/script/unique/vnode/replica2_a_large.sim new file mode 100644 index 0000000000..801570dd9c --- /dev/null +++ b/tests/script/unique/vnode/replica2_a_large.sim @@ -0,0 +1,103 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 +system sh/cfg.sh -n dnode1 -c wallevel -v 2 +system sh/cfg.sh -n dnode2 -c wallevel -v 2 +system sh/cfg.sh -n dnode3 -c wallevel -v 2 +system sh/cfg.sh -n dnode4 -c wallevel -v 2 +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode1 -c debugFlag -v 131 +system sh/cfg.sh -n dnode2 -c debugFlag -v 131 +system sh/cfg.sh -n dnode3 -c debugFlag -v 131 +system sh/cfg.sh -n dnode4 -c debugFlag -v 131 +system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator + +print ============== step0: start tarbitrator +system sh/exec_tarbitrator.sh -s start + +system sh/exec.sh -n dnode1 -s start +sleep 3000 +sql connect + +sql create dnode $hostname2 +sql create dnode $hostname3 +system sh/exec.sh -n dnode2 -s start -t +system sh/exec.sh -n dnode3 -s start -t +sleep 3000 + +print ========= step1 +sql create database db replica 2 +#sql create table db.tb1 (ts timestamp, i int) +#sql create table db.tb2 (ts timestamp, i int) +#sql create table db.tb3 (ts timestamp, i int) +#sql create table db.tb4 (ts timestamp, i int) +#sql insert into db.tb1 values(now, 1) +#sql select count(*) from db.tb1 + +sql create database db replica 2 +sql create table db.tb (ts timestamp, i int) +sql insert into db.tb values(now, 1) +sql select count(*) from db.tb +$lastRows = $rows + +print ======== step2 +#run_back unique/vnode/back_insert_many.sim +run_back unique/vnode/back_insert.sim +sleep 3000 + +print ======== step3 + +$x = 0 +loop: + +print ======== step4 +system sh/exec.sh -n dnode2 -s stop -x SIGINT +sleep 10000 +system sh/exec.sh -n dnode2 -s start -t +sleep 10000 + +print ======== step5 +system sh/exec.sh -n dnode3 -s stop -x SIGINT +sleep 10000 +system sh/exec.sh -n dnode3 -s start -t +sleep 10000 + +print ======== step6 +#sql select count(*) from db.tb1 +#print select count(*) from db.tb1 ==> $data00 $lastRows +sql select count(*) from db.tb +print select count(*) from db.tb ==> $data00 $lastRows +if $data00 <= $lastRows then + return -1 +endi + +print ======== step7 +$lastRows = $data00 +print ======== loop Times $x + +if $x < 10 then + $x = $x + 1 + goto loop +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +system sh/exec.sh -n dnode5 -s stop -x SIGINT +system sh/exec.sh -n dnode6 -s stop -x SIGINT +system sh/exec.sh -n dnode7 -s stop -x SIGINT +system sh/exec.sh -n dnode8 -s stop -x SIGINT \ No newline at end of file