From 53dd2050fd5786776083b435158587e580f70609 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Tue, 24 Nov 2020 17:05:20 +0800 Subject: [PATCH 01/14] [TD-2140]: use tref to manage sql objects --- src/client/inc/tsclient.h | 6 ++--- src/client/src/tscLocal.c | 7 ++++-- src/client/src/tscServer.c | 49 ++++++++++++++------------------------ src/client/src/tscSql.c | 18 +++++--------- src/client/src/tscSub.c | 4 ++-- src/client/src/tscSystem.c | 14 +++++++---- src/client/src/tscUtil.c | 22 ++++++++--------- 7 files changed, 54 insertions(+), 66 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ff36cf0f5a..f38a8e6c18 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -333,7 +333,7 @@ typedef struct STscObj { char superAuth : 1; uint32_t connId; uint64_t rid; // ref ID returned by taosAddRef - struct SSqlObj * pHb; + int64_t hbrid; struct SSqlObj * sqlList; struct SSqlStream *streamList; void* pDnodeConn; @@ -373,7 +373,7 @@ typedef struct SSqlObj { struct SSqlObj **pSubs; struct SSqlObj *prev, *next; - struct SSqlObj **self; + int64_t self; } SSqlObj; typedef struct SSqlStream { @@ -507,7 +507,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField } extern SCacheObj* tscMetaCache; -extern SCacheObj* tscObjCache; +extern int tscObjRef; extern void * tscTmr; extern void * tscQhandle; extern int tscKeepConn[]; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 538e652f3c..4c28adc261 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -825,8 +825,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; - if (pObj->pHb != NULL) { - if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + if (pHb != NULL) { + int32_t code = pHb->res.code; + taosReleaseRef(tscObjRef, pObj->hbrid); + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; return pSql->res.code; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cbc5604a27..b964a4f15d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code)); + tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code)); } - if (pObj->pHb != NULL) { + if (pObj->hbrid != 0) { int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); @@ -193,20 +193,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { STscObj *pObj = taosAcquireRef(tscRefId, rid); if (pObj == NULL) return; - SSqlObj* pHB = pObj->pHb; - - void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - tscWarn("%p HB object has been released already", pHB); - taosReleaseRef(tscRefId, pObj->rid); - return; - } - - assert(*pHB->self == pHB); + SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid); + assert(pHB->self == pObj->hbrid); pHB->retry = 0; int32_t code = tscProcessSql(pHB); - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pObj->hbrid); if (code != TSDB_CODE_SUCCESS) { tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); @@ -236,7 +228,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .msgType = pSql->cmd.msgType, .pCont = pMsg, .contLen = pSql->cmd.payloadLen, - .ahandle = pSql, + .ahandle = (void*)pSql->self, .handle = NULL, .code = 0 }; @@ -247,26 +239,24 @@ int tscSendMsgToServer(SSqlObj *pSql) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; - void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { + SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); + if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); return; } - - SSqlObj* pSql = *p; - assert(pSql != NULL); + assert(pSql->self == handle); STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - assert(*pSql->self == pSql); pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); - taosCacheRelease(tscObjCache, (void**) &p, true); + taosRemoveRef(tscObjRef, pSql->self); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -276,10 +266,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); - void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p1, false); - - taosCacheRelease(tscObjCache, (void**) &p, true); + taosRemoveRef(tscObjRef, pSql->self); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -322,7 +310,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // if there is an error occurring, proceed to the following error handling procedure. if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -390,11 +378,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { (*pSql->fp)(pSql->param, pSql, rpcMsg->code); } - void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p1, false); + taosReleaseRef(tscObjRef, pSql->self); if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it - taosCacheRelease(tscObjCache, (void **)&p, true); + taosRemoveRef(tscObjRef, pSql->self); tscDebug("%p sqlObj is automatically freed", pSql); } @@ -2019,7 +2006,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { // TODO multithread problem static void createHBObj(STscObj* pObj) { - if (pObj->pHb != NULL) { + if (pObj->hbrid != 0) { return; } @@ -2049,7 +2036,7 @@ static void createHBObj(STscObj* pObj) { registerSqlObj(pSql); tscDebug("%p HB is allocated, pObj:%p", pSql, pObj); - pObj->pHb = pSql; + pObj->hbrid = pSql->self; } int tscProcessConnectRsp(SSqlObj *pSql) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 020305a0a8..50dfbcbec9 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -276,8 +276,8 @@ void taos_close(TAOS *taos) { pObj->signature = NULL; taosTmrStopA(&(pObj->pTimer)); - SSqlObj* pHb = pObj->pHb; - if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { + SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + if (pHb != NULL) { if (pHb->rpcRid > 0) { // wait for rsp from dnode rpcCancelRequest(pHb->rpcRid); pHb->rpcRid = -1; @@ -285,6 +285,7 @@ void taos_close(TAOS *taos) { tscDebug("%p HB is freed", pHb); taos_free_result(pHb); + taosReleaseRef(tscObjRef, pHb->self); } int32_t ref = T_REF_DEC(pObj); @@ -597,8 +598,7 @@ void taos_free_result(TAOS_RES *res) { bool freeNow = tscKillQueryInDnode(pSql); if (freeNow) { tscDebug("%p free sqlObj in cache", pSql); - SSqlObj** p = pSql->self; - taosCacheRelease(tscObjCache, (void**) &p, true); + taosReleaseRef(tscObjRef, pSql->self); } } @@ -691,13 +691,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { continue; } - void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - continue; - } - - SSqlObj* pSubObj = (SSqlObj*) (*p); - assert(pSubObj->self == (SSqlObj**) p); + SSqlObj* pSubObj = pSub; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; if (pSubObj->rpcRid > 0) { @@ -706,7 +700,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { } tscQueueAsyncRes(pSubObj); - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pSubObj->self); } tscDebug("%p super table query cancelled", pSql); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 2c81bd7c7c..3bce7bee77 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -179,8 +179,8 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* fail: tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code)); if (pSql != NULL) { - if (pSql->self != NULL) { - taos_free_result(pSql); + if (pSql->self != 0) { + taosReleaseRef(tscObjRef, pSql->self); } else { tscFreeSqlObj(pSql); } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 9e9a00550a..672d87e0c8 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -15,7 +15,7 @@ #include "os.h" #include "taosmsg.h" -#include "tcache.h" +#include "tref.h" #include "trpc.h" #include "tsystem.h" #include "ttimer.h" @@ -31,7 +31,7 @@ // global, not configurable SCacheObj* tscMetaCache; -SCacheObj* tscObjCache; +int tscObjRef = -1; void * tscTmr; void * tscQhandle; void * tscCheckDiskUsageTmr; @@ -144,7 +144,11 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); +#ifndef SQLOBJ_USE_CACHE + tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj); +#else tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); +#endif } tscRefId = taosOpenRef(200, tscCloseTscObj); @@ -167,9 +171,9 @@ void taos_cleanup(void) { taosCacheCleanup(m); } - m = tscObjCache; - if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) { - taosCacheCleanup(m); + int refId = atomic_exchange_32(&tscObjRef, -1); + if (refId != -1) { + taosCloseRef(refId); } m = tscQhandle; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7a82bcaaab..b35dbb47e6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -447,18 +447,18 @@ static void tscFreeSubobj(SSqlObj* pSql) { void tscFreeRegisteredSqlObj(void *pSql) { assert(pSql != NULL); - SSqlObj** p = (SSqlObj**)pSql; - STscObj* pTscObj = (*p)->pTscObj; + SSqlObj* p = *(SSqlObj**)pSql; + STscObj* pTscObj = p->pTscObj; - assert((*p)->self != 0 && (*p)->self == (p)); - tscFreeSqlObj(*p); + assert(p->self != 0); + tscFreeSqlObj(p); int32_t ref = T_REF_DEC(pTscObj); assert(ref >= 0); - tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); + tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref); if (ref == 0) { - tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); + tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj); taosRemoveRef(tscRefId, pTscObj->rid); } } @@ -1560,6 +1560,8 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { } } +#ifndef SQLOBJ_USE_CACHE +#else void tscSetFreeHeatBeat(STscObj* pObj) { if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) { return; @@ -1572,6 +1574,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0); pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; } +#endif /* * the following four kinds of SqlObj should not be freed @@ -1591,7 +1594,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { } STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) { + if (pSql->pStream != NULL || pTscObj->hbrid == pSql->self || pSql->pSubscription != NULL) { return false; } @@ -1883,13 +1886,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { } void registerSqlObj(SSqlObj* pSql) { - int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec - int32_t ref = T_REF_INC(pSql->pTscObj); tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref); - TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql; - pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME); + pSql->self = taosAddRef(tscObjRef, pSql); } SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { From d3352f551140bd73f56afcb5f956daa83ba0d1b5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Nov 2020 18:27:24 +0800 Subject: [PATCH 02/14] [TD-225] reduce memory consumption. --- src/client/src/tscSubquery.c | 3 +++ src/client/src/tscUtil.c | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 6ebbeeef41..819a323db5 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2198,6 +2198,9 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); + // free the data block created from insert sql string + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); return code; // here the pSql may have been released already. diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b35dbb47e6..703e266eb8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -833,9 +833,14 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { dataBuf->size += (finalLen + sizeof(SSubmitBlk)); assert(dataBuf->size <= dataBuf->nAllocSize); + // free unnecessary memory resource ASAP. + char* p = realloc(dataBuf->pData, dataBuf->size); + if (p != NULL) { + dataBuf->pData = p; + } + // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); - dataBuf->numOfTables += 1; } From 1386c05504ad369aaa60519c91b9829847454d55 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Wed, 25 Nov 2020 10:29:25 +0800 Subject: [PATCH 03/14] [TD-2140]: remove temporary code --- src/client/src/tscSystem.c | 4 ---- src/client/src/tscUtil.c | 16 ---------------- 2 files changed, 20 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 672d87e0c8..91eabda78b 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -144,11 +144,7 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); -#ifndef SQLOBJ_USE_CACHE tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj); -#else - tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); -#endif } tscRefId = taosOpenRef(200, tscCloseTscObj); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 703e266eb8..950e904c4f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1565,22 +1565,6 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { } } -#ifndef SQLOBJ_USE_CACHE -#else -void tscSetFreeHeatBeat(STscObj* pObj) { - if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) { - return; - } - - SSqlObj* pHeatBeat = pObj->pHb; - assert(pHeatBeat == pHeatBeat->signature); - - // to denote the heart-beat timer close connection and free all allocated resources - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0); - pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; -} -#endif - /* * the following four kinds of SqlObj should not be freed * 1. SqlObj for stream computing From a432168ae5399366bd947b3be1aeae3309ab776f Mon Sep 17 00:00:00 2001 From: stephenkgu Date: Mon, 30 Nov 2020 14:25:54 +0800 Subject: [PATCH 04/14] [TD-2265]: fix TSDB_MAX_ALLOWED_SQL_LEN from 8MB to 1MB --- src/inc/taosdef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 20c7af6a21..ec1e1fc330 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -257,7 +257,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_SHOW_LEN 512 -#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb +#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 1mb #define TSDB_APPNAME_LEN TSDB_UNI_LEN From eebed5c2100a81077c99d17e7b924eb2a6ae4373 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 30 Nov 2020 08:16:58 +0000 Subject: [PATCH 05/14] fix TD-2279 --- src/tsdb/src/tsdbRWHelper.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 5b65b2185a..8e57066d27 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -917,6 +917,8 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); } + ASSERT((blkIdx == pIdx->numOfBlocks -1) || (!pCompBlock->last)); + tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); @@ -1042,6 +1044,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; + ASSERT((blkIdx == pIdx->numOfBlocks-1) || (!pCompBlock->last)); + tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); @@ -1622,11 +1626,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, pCfg->update); if (pDataCols->numOfRows == 0) break; - if (tblkIdx == pIdx->numOfBlocks - 1) { - if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; - } else { - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - } + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (round == 0) { if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; From 2b11f6ab4e094c20bc525811a257c724b0a6e323 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Mon, 30 Nov 2020 08:42:42 +0000 Subject: [PATCH 06/14] [TD-2140]: fix invalid write --- src/client/src/tscUtil.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 893c3f4c39..af17f9b457 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -836,12 +836,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { dataBuf->size += (finalLen + sizeof(SSubmitBlk)); assert(dataBuf->size <= dataBuf->nAllocSize); - // free unnecessary memory resource ASAP. - char* p = realloc(dataBuf->pData, dataBuf->size); - if (p != NULL) { - dataBuf->pData = p; - } - // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); dataBuf->numOfTables += 1; From e13de8ce2c41af6cdf389a7ee74b4287d9fc998d Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Mon, 30 Nov 2020 09:12:42 +0000 Subject: [PATCH 07/14] return mnode initialization stage to client --- src/dnode/src/dnodeMPeer.c | 11 +++-- src/dnode/src/dnodeMRead.c | 11 +++-- src/dnode/src/dnodeMWrite.c | 18 +++++---- src/dnode/src/dnodeMain.c | 2 +- src/dnode/src/dnodeShell.c | 2 +- src/inc/mnode.h | 3 ++ src/inc/taoserror.h | 3 ++ src/mnode/src/mnodeMain.c | 80 +++++++++++++++++++++++++++---------- src/mnode/src/mnodeSdb.c | 2 +- src/mnode/src/mnodeUser.c | 15 ++++++- src/rpc/src/rpcMain.c | 5 +-- 11 files changed, 111 insertions(+), 41 deletions(-) diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index ee6dc5212e..adeccd1f9c 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -122,11 +122,16 @@ void dnodeFreeMPeerQueue() { } void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) { - if (!mnodeIsRunning() || tsMPeerQueue == NULL) { + if (!mnodeIsRunning()) { dnodeSendRedirectMsg(pMsg, false); } else { - SMnodeMsg *pPeer = mnodeCreateMsg(pMsg); - taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer); + if (!mnodeIsReady()) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = mnodeInitCode(), .pCont = NULL}; + rpcSendResponse(&rpcRsp); + } else { + SMnodeMsg *pPeer = mnodeCreateMsg(pMsg); + taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer); + } } rpcFreeCont(pMsg->pCont); diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 65f3af7b3b..bb0d6f94f2 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -123,11 +123,16 @@ void dnodeFreeMReadQueue() { } void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) { - if (!mnodeIsRunning() || tsMReadQueue == NULL) { + if (!mnodeIsRunning()) { dnodeSendRedirectMsg(pMsg, true); } else { - SMnodeMsg *pRead = mnodeCreateMsg(pMsg); - taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); + if (!mnodeIsReady()) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = mnodeInitCode(), .pCont = NULL}; + rpcSendResponse(&rpcRsp); + } else { + SMnodeMsg *pRead = mnodeCreateMsg(pMsg); + taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); + } } rpcFreeCont(pMsg->pCont); diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index ef2d49ef42..3c539ceefa 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -123,13 +123,18 @@ void dnodeFreeMWritequeue() { } void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { - if (!mnodeIsRunning() || tsMWriteQueue == NULL) { + if (!mnodeIsRunning()) { dnodeSendRedirectMsg(pMsg, true); } else { - SMnodeMsg *pWrite = mnodeCreateMsg(pMsg); - dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle, - taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); - taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); + if (!mnodeIsReady()) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = mnodeInitCode(), .pCont = NULL}; + rpcSendResponse(&rpcRsp); + } else { + SMnodeMsg *pWrite = mnodeCreateMsg(pMsg); + dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); + taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); + } } rpcFreeCont(pMsg->pCont); @@ -187,7 +192,7 @@ static void *dnodeProcessMWriteQueue(void *param) { void dnodeReprocessMWriteMsg(void *pMsg) { SMnodeMsg *pWrite = pMsg; - if (!mnodeIsRunning() || tsMWriteQueue == NULL) { + if (!mnodeIsRunning()) { dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); @@ -196,7 +201,6 @@ void dnodeReprocessMWriteMsg(void *pMsg) { } else { dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry); - taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } } diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 9f52dbd331..ce45105fb3 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -63,6 +63,7 @@ static const SDnodeComponent tsDnodeComponents[] = { {"dnodeeps", dnodeInitEps, dnodeCleanupEps}, {"globalcfg" ,taosCheckGlobalCfg, NULL}, {"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos}, + {"shell", dnodeInitShell, dnodeCleanupShell}, {"wal", walInit, walCleanUp}, {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"vread", dnodeInitVRead, dnodeCleanupVRead}, @@ -75,7 +76,6 @@ static const SDnodeComponent tsDnodeComponents[] = { {"mgmt", dnodeInitMgmt, dnodeCleanupMgmt}, {"modules", dnodeInitModules, dnodeCleanupModules}, {"mgmt-tmr", dnodeInitMgmtTimer, dnodeCleanupMgmtTimer}, - {"shell", dnodeInitShell, dnodeCleanupShell}, {"telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry}, }; diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 89f657f789..fa5f9c24e8 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -144,7 +144,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); - if (code != TSDB_CODE_APP_NOT_READY) return code; + if (code != TSDB_CODE_RPC_REDIRECT) return code; SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); tstrncpy(pMsg->user, user, sizeof(pMsg->user)); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index bdc30b0c46..453e8ab3f6 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -65,12 +65,15 @@ void mnodeStopSystem(); void sdbUpdateAsync(); void sdbUpdateSync(void *pMnodes); bool mnodeIsRunning(); +bool mnodeIsReady(); +int32_t mnodeInitCode(); int32_t mnodeProcessRead(SMnodeMsg *pMsg); int32_t mnodeProcessWrite(SMnodeMsg *pMsg); int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg); void mnodeProcessPeerRsp(SRpcMsg *pMsg); int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); + #ifdef __cplusplus } #endif diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 5fde6d40d5..59f25a0765 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -125,6 +125,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, 0, 0x030B, "Data expir TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, 0, 0x030C, "Invalid query id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, 0, 0x030D, "Invalid stream id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, 0, 0x030E, "Invalid connection id") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INIT, 0, 0x030F, "Mnode is initializing") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INIT_SDB, 0, 0x0310, "Mnode is initializing meta data, it takes a while if many tables exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INIT_OTHER, 0, 0x0311, "Mnode is initializing other data") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "Object already there") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "Unexpected generic error in sdb") diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index d15b32da54..05ea0b93a1 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -17,6 +17,7 @@ #include "os.h" #include "taosdef.h" #include "tsched.h" +#include "taoserror.h" #include "tbalance.h" #include "tgrant.h" #include "ttimer.h" @@ -37,30 +38,41 @@ #include "mnodeShow.h" #include "mnodeProfile.h" +typedef enum { + TSDB_MND_STATUS_NOT_RUNNING, + TSDB_MND_STATUS_INIT, + TSDB_MND_STATUS_INIT_SDB, + TSDB_MND_STATUS_INIT_OTHER, + TSDB_MND_STATUS_READY, + TSDB_MND_STATUS_CLEANING, +} EMndStatus; + typedef struct { const char *const name; int (*init)(); void (*cleanup)(); + EMndStatus status; } SMnodeComponent; -void *tsMnodeTmr = NULL; -static bool tsMgmtIsRunning = false; +void *tsMnodeTmr = NULL; +static bool tsMgmtIsRunning = false; +static EMndStatus tsMgmtStatus = TSDB_MND_STATUS_NOT_RUNNING; static const SMnodeComponent tsMnodeComponents[] = { - {"sdbref", sdbInitRef, sdbCleanUpRef}, - {"profile", mnodeInitProfile, mnodeCleanupProfile}, - {"cluster", mnodeInitCluster, mnodeCleanupCluster}, - {"accts", mnodeInitAccts, mnodeCleanupAccts}, - {"users", mnodeInitUsers, mnodeCleanupUsers}, - {"dnodes", mnodeInitDnodes, mnodeCleanupDnodes}, - {"dbs", mnodeInitDbs, mnodeCleanupDbs}, - {"vgroups", mnodeInitVgroups, mnodeCleanupVgroups}, - {"tables", mnodeInitTables, mnodeCleanupTables}, - {"mnodes", mnodeInitMnodes, mnodeCleanupMnodes}, - {"sdb", sdbInit, sdbCleanUp}, - {"balance", balanceInit, balanceCleanUp}, - {"grant", grantInit, grantCleanUp}, - {"show", mnodeInitShow, mnodeCleanUpShow} + {"sdbref", sdbInitRef, sdbCleanUpRef, TSDB_MND_STATUS_INIT}, + {"profile", mnodeInitProfile, mnodeCleanupProfile, TSDB_MND_STATUS_INIT}, + {"cluster", mnodeInitCluster, mnodeCleanupCluster, TSDB_MND_STATUS_INIT}, + {"accts", mnodeInitAccts, mnodeCleanupAccts, TSDB_MND_STATUS_INIT}, + {"users", mnodeInitUsers, mnodeCleanupUsers, TSDB_MND_STATUS_INIT}, + {"dnodes", mnodeInitDnodes, mnodeCleanupDnodes, TSDB_MND_STATUS_INIT}, + {"dbs", mnodeInitDbs, mnodeCleanupDbs, TSDB_MND_STATUS_INIT}, + {"vgroups", mnodeInitVgroups, mnodeCleanupVgroups, TSDB_MND_STATUS_INIT}, + {"tables", mnodeInitTables, mnodeCleanupTables, TSDB_MND_STATUS_INIT}, + {"mnodes", mnodeInitMnodes, mnodeCleanupMnodes, TSDB_MND_STATUS_INIT}, + {"sdb", sdbInit, sdbCleanUp, TSDB_MND_STATUS_INIT_SDB}, + {"balance", balanceInit, balanceCleanUp, TSDB_MND_STATUS_INIT_OTHER}, + {"grant", grantInit, grantCleanUp, TSDB_MND_STATUS_INIT_OTHER}, + {"show", mnodeInitShow, mnodeCleanUpShow, TSDB_MND_STATUS_INIT_OTHER}, }; static void mnodeInitTimer(); @@ -76,21 +88,24 @@ static void mnodeCleanupComponents(int32_t stepId) { static int32_t mnodeInitComponents() { int32_t code = 0; for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) { + tsMgmtStatus = tsMnodeComponents[i].status; if (tsMnodeComponents[i].init() != 0) { mnodeCleanupComponents(i); code = -1; break; } + // sleep(3); } return code; } int32_t mnodeStartSystem() { - if (tsMgmtIsRunning) { + if (tsMgmtStatus != TSDB_MND_STATUS_NOT_RUNNING) { mInfo("mnode module already started..."); return 0; } + tsMgmtStatus = TSDB_MND_STATUS_INIT; mInfo("starting to initialize mnode ..."); if (mkdir(tsMnodeDir, 0755) != 0 && errno != EEXIST) { mError("failed to init mnode dir:%s, reason:%s", tsMnodeDir, strerror(errno)); @@ -106,7 +121,7 @@ int32_t mnodeStartSystem() { } grantReset(TSDB_GRANT_ALL, 0); - tsMgmtIsRunning = true; + tsMgmtStatus = TSDB_MND_STATUS_READY; mInfo("mnode is initialized successfully"); @@ -126,7 +141,7 @@ int32_t mnodeInitSystem() { void mnodeCleanupSystem() { if (tsMgmtIsRunning) { mInfo("starting to clean up mnode"); - tsMgmtIsRunning = false; + tsMgmtStatus = TSDB_MND_STATUS_CLEANING; dnodeFreeMWritequeue(); dnodeFreeMReadQueue(); @@ -134,6 +149,7 @@ void mnodeCleanupSystem() { mnodeCleanupTimer(); mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1); + tsMgmtStatus = TSDB_MND_STATUS_NOT_RUNNING; mInfo("mnode is cleaned up"); } } @@ -184,5 +200,29 @@ static bool mnodeNeedStart() { } bool mnodeIsRunning() { - return tsMgmtIsRunning; + return (tsMgmtStatus != TSDB_MND_STATUS_NOT_RUNNING && tsMgmtStatus != TSDB_MND_STATUS_CLEANING); +} + +bool mnodeIsReady() { + return (tsMgmtStatus == TSDB_MND_STATUS_READY); +} + +int32_t mnodeInitCode() { + int32_t code = -1; + + switch (tsMgmtStatus) { + case TSDB_MND_STATUS_INIT: + code = TSDB_CODE_MND_INIT; + break; + case TSDB_MND_STATUS_INIT_SDB: + code = TSDB_CODE_MND_INIT_SDB; + break; + case TSDB_MND_STATUS_INIT_OTHER: + code = TSDB_CODE_MND_INIT_OTHER; + break; + default: + code = TSDB_CODE_MND_INIT; + } + + return code; } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 6670e87a7c..2e6ba8175d 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -299,7 +299,7 @@ void sdbUpdateAsync() { void sdbUpdateSync(void *pMnodes) { SMnodeInfos *mnodes = pMnodes; - if (!mnodeIsRunning()) { + if (!mnodeIsReady()) { mDebug("vgId:1, mnode not start yet, update sync config later"); return; } diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index fb26086d04..8d57ad30dc 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -585,10 +585,21 @@ void mnodeDropAllUsers(SAcctObj *pAcct) { } int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { + *secret = 0; + + if (!mnodeIsRunning()) { + mDebug("user:%s, mnode is not running, fail to auth", user); + return TSDB_CODE_RPC_REDIRECT; + } + + if (!mnodeIsReady()) { + mDebug("user:%s, failed to auth user, mnode is not ready", user); + return mnodeInitCode(); + } + if (!sdbIsMaster()) { - *secret = 0; mDebug("user:%s, failed to auth user, mnode is not master", user); - return TSDB_CODE_APP_NOT_READY; + return TSDB_CODE_RPC_REDIRECT; } SUserObj *pUser = mnodeGetUser(user); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index acceaf9d7a..2b7271459e 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1551,10 +1551,9 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { if ( !rpcIsReq(pHead->msgType) ) { // for response, if code is auth failure, it shall bypass the auth process code = htonl(pHead->code); - if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || - code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) { - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + if (code != 0) { // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); return 0; } } From 2e69b78fcd054ec8f8570148b4a798fc93a96dcf Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 30 Nov 2020 10:32:12 +0000 Subject: [PATCH 08/14] TD-2283 --- src/cq/src/cqMain.c | 8 +++----- src/cq/test/cqtest.c | 2 +- src/inc/tcq.h | 2 +- src/vnode/src/vnodeMain.c | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 84b2c297d0..b1977fd5d9 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -40,15 +40,14 @@ typedef struct { int32_t vgId; + int32_t master; + int32_t num; // number of continuous streams char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_NAME_LEN]; FCqWrite cqWrite; - void *ahandle; - int32_t num; // number of continuous streams struct SCqObj *pHead; void *dbConn; - int32_t master; void *tmrCtrl; pthread_mutex_t mutex; } SCqContext; @@ -90,7 +89,6 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { tstrncpy(pContext->db, db, sizeof(pContext->db)); pContext->vgId = pCfg->vgId; pContext->cqWrite = pCfg->cqWrite; - pContext->ahandle = ahandle; tscEmbedded = 1; pthread_mutex_init(&pContext->mutex, NULL); @@ -342,7 +340,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { pHead->version = 0; // write into vnode write queue - pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL); + pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL); free(buffer); } diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index e1114fc024..41380f0d86 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -24,7 +24,7 @@ int64_t ver = 0; void *pCq = NULL; -int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { +int writeToQueue(int32_t vgId, void *data, int type, void *pMsg) { return 0; } diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 7a0727f1b8..afa744a9c4 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -21,7 +21,7 @@ extern "C" { #include "tdataformat.h" -typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); +typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg); typedef struct { int32_t vgId; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3d3abde3c7..0b29fcc908 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -272,7 +272,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; - cqCfg.cqWrite = vnodeWriteToWQueue; + cqCfg.cqWrite = vnodeWriteToCache; pVnode->cq = cqOpen(pVnode, &cqCfg); if (pVnode->cq == NULL) { vnodeCleanUp(pVnode); From ece8c475147e81a0db2e617205202728d3a0cd87 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 30 Nov 2020 19:01:25 +0800 Subject: [PATCH 09/14] TD-1926 --- src/inc/taoserror.h | 3 - src/inc/tsync.h | 4 +- src/mnode/src/mnodeSdb.c | 15 +- src/sync/inc/syncInt.h | 8 +- src/sync/src/syncMain.c | 3 +- src/sync/src/syncRetrieve.c | 311 +++++++++++++++++------------------- src/vnode/src/vnodeMain.c | 9 +- 7 files changed, 171 insertions(+), 182 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 5fde6d40d5..dd6d88e88e 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -261,9 +261,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired") -TAOS_DEFINE_ERROR(TSDB_CODE_SYN_VND_COMMITING, 0, 0x0904, "Vnode is commiting") -TAOS_DEFINE_ERROR(TSDB_CODE_SYN_FILE_CHNAGED, 0, 0x0905, "Vnode file is changed") -TAOS_DEFINE_ERROR(TSDB_CODE_SYN_APP_ERROR, 0, 0x1000, "Unexpected generic error in sync") // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 293e382519..1303195ef1 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -86,7 +86,7 @@ typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level); typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); // get file version -typedef int32_t (*FGetFileVersion)(int32_t vgId, uint64_t *fver); +typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver); typedef struct { int32_t vgId; // vgroup ID @@ -100,7 +100,7 @@ typedef struct { FNotifyRole notifyRole; FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; - FGetFileVersion getFileVersion; + FGetVersion getVersion; } SSyncInfo; typedef void *tsync_h; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 6670e87a7c..40e2e1cfcc 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -251,6 +251,16 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) { sdbUpdateMnodeRoles(); } +static int32_t sdbNotifyFileSynced(int32_t vgId, uint64_t fversion) { return 0; } + +static void sdbNotifyFlowCtrl(int32_t vgId, int32_t level) {} + +static int32_t sdbGetSyncVersion(int32_t vgId, uint64_t *fver, uint64_t *vver) { + *fver = 0; + *vver = 0; + return 0; +} + // failed to forward, need revert insert static void sdbHandleFailedConfirm(SSdbRow *pRow) { SWalHead *pHead = pRow->pHead; @@ -372,11 +382,14 @@ void sdbUpdateSync(void *pMnodes) { syncInfo.version = sdbGetVersion(); syncInfo.syncCfg = syncCfg; sprintf(syncInfo.path, "%s", tsMnodeDir); - syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; + syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.writeToCache = sdbWriteFwdToQueue; syncInfo.confirmForward = sdbConfirmForward; syncInfo.notifyRole = sdbNotifyRole; + syncInfo.notifyFileSynced = sdbNotifyFileSynced; + syncInfo.notifyFlowCtrl = sdbNotifyFlowCtrl; + syncInfo.getVersion = sdbGetSyncVersion; tsSdbMgmt.cfg = syncCfg; if (tsSdbMgmt.sync) { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 7dc04d70d0..6d0c52284f 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -139,16 +139,14 @@ typedef struct SsyncPeer { char id[TSDB_EP_LEN + 32]; // peer vgId + end point uint64_t version; uint64_t sversion; // track the peer version in retrieve process - uint64_t lastVer; // track the file version while retrieve + uint64_t lastFileVer; // track the file version while retrieve + uint64_t lastWalVer; // track the wal version while retrieve int32_t syncFd; int32_t peerFd; // forward FD int32_t numOfRetrieves; // number of retrieves tried int32_t fileChanged; // a flag to indicate file is changed during retrieving process void * timer; void * pConn; - int32_t notifyFd; - int32_t watchNum; - int32_t *watchFd; int32_t refCount; // reference count struct SSyncNode *pSyncNode; } SSyncPeer; @@ -173,7 +171,7 @@ typedef struct SSyncNode { FNotifyRole notifyRole; FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; - FGetFileVersion getFileVersion; + FGetVersion getVersion; pthread_mutex_t mutex; } SSyncNode; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ce85c940ff..adac532f2d 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -196,7 +196,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { pNode->confirmForward = pInfo->confirmForward; pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl; pNode->notifyFileSynced = pInfo->notifyFileSynced; - pNode->getFileVersion = pInfo->getFileVersion; + pNode->getVersion = pInfo->getVersion; pNode->selfIndex = -1; pNode->vgId = pInfo->vgId; @@ -498,7 +498,6 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) { taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); sDebug("%s, resource is freed", pPeer->id); - tfree(pPeer->watchFd); tfree(pPeer); return 0; } diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 305ec492b9..44d2680b1a 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -26,35 +26,73 @@ #include "tsync.h" #include "syncInt.h" -static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { - if (pNode->getFileVersion == NULL) return TSDB_CODE_SUCCESS; - - uint64_t fver = 0; - int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver); +static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) { + uint64_t fver, wver; + int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); if (code != 0) { - sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastVer); - pPeer->fileChanged = 1; - return TSDB_CODE_SYN_VND_COMMITING; + sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer); + return -1; } - if (fver != pPeer->lastVer) { - sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last fver:%" PRIu64, pPeer->id, fver, pPeer->lastVer); + pPeer->lastWalVer = wver; + return code; +} + +static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) { + uint64_t fver, wver; + int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + if (code != 0) { + sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer); + return true; + } + + if (wver != pPeer->lastWalVer) { + sDebug("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer); + return true; + } + + return false; +} + +static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) { + uint64_t fver, wver; + int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + if (code != 0) { + sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); + return -1; + } + + pPeer->lastFileVer = fver; + return code; +} + +static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { + uint64_t fver, wver; + int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); + if (code != 0) { + sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); pPeer->fileChanged = 1; - return TSDB_CODE_SYN_FILE_CHNAGED; + return true; + } + + if (fver != pPeer->lastFileVer) { + sDebug("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer); + pPeer->fileChanged = 1; + return true; } pPeer->fileChanged = 0; - return TSDB_CODE_SUCCESS; + return false; } static int32_t syncRetrieveFile(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo)); SFileAck fileAck = {0}; - int32_t code = TSDB_CODE_SYN_APP_ERROR; + int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; - if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer); + if (syncGetFileVersion(pNode, pPeer) < 0) return -1; while (1) { // retrieve file info @@ -120,8 +158,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { fileInfo.index++; // check if processed files are modified - code = syncAreFilesModified(pNode, pPeer); - if (code != TSDB_CODE_SUCCESS) break; + if (syncAreFilesModified(pNode, pPeer)) { + code = -1; + break; + } } if (code != TSDB_CODE_SUCCESS) { @@ -131,117 +171,90 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { return code; } -/* if only a partial record is read out, set the IN_MODIFY flag in event, - so upper layer will reload the file to get a complete record */ -static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEvent) { - int32_t ret; +// if only a partial record is read out, upper layer will reload the file to get a complete record +static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { + int32_t ret = read(sfd, pHead, sizeof(SWalHead)); + if (ret < 0) { + sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret); + return -1; + } - ret = read(sfd, pHead, sizeof(SWalHead)); - if (ret < 0) return -1; - if (ret == 0) return 0; + if (ret == 0) { + sDebug("sfd:%d, read to end of the wal, ret:%d", sfd, ret); + return 0; + } if (ret != sizeof(SWalHead)) { // file is not at end yet, it shall be reloaded - *pEvent = *pEvent | IN_MODIFY; + sDebug("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret); return 0; } assert(pHead->len <= TSDB_MAX_WAL_SIZE); ret = read(sfd, pHead->cont, pHead->len); - if (ret < 0) return -1; + if (ret < 0) { + sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret); + return -1; + } if (ret != pHead->len) { // file is not at end yet, it shall be reloaded - *pEvent = *pEvent | IN_MODIFY; + sDebug("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret); return 0; } return sizeof(SWalHead) + pHead->len; } -static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) { - pPeer->watchNum = 0; - taosClose(pPeer->notifyFd); - pPeer->notifyFd = inotify_init1(IN_NONBLOCK); - if (pPeer->notifyFd < 0) { - sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno)); - return -1; - } - - if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles); - if (pPeer->watchFd == NULL) { - sError("%s, failed to allocate watchFd", pPeer->id); - return -1; - } - - memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles); - int32_t *wd = pPeer->watchFd; - - *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); - if (*wd == -1) { - sError("%s, failed to watch last wal since %s", pPeer->id, strerror(errno)); - return -1; - } - - return 0; -} - -static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { - char buf[2048]; - int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); - if (len < 0 && errno != EAGAIN) { - sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno)); - return -1; - } - - if (len == 0) return 0; - - struct inotify_event *event; - for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { - event = (struct inotify_event *)ptr; - if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY; - if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE; - } - - if (pEvent != 0) sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent); - - return 0; -} - -static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { - SWalHead *pHead = malloc(SYNC_MAX_SIZE); - int32_t code = -1; - int32_t bytes = 0; - int32_t sfd; - - sfd = open(name, O_RDONLY); +static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) { + int32_t sfd = open(name, O_RDONLY); if (sfd < 0) { - free(pHead); + sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno)); return -1; } - (void)lseek(sfd, offset, SEEK_SET); - sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion); + int32_t code = taosLSeek(sfd, offset, SEEK_SET); + if (code < 0) { + sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno)); + close(sfd); + return -1; + } + + sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion); + + SWalHead *pHead = malloc(SYNC_MAX_SIZE); + int32_t bytes = 0; while (1) { - int32_t wsize = syncReadOneWalRecord(sfd, pHead, pEvent); - if (wsize < 0) break; - if (wsize == 0) { - code = 0; + code = syncReadOneWalRecord(sfd, pHead); + if (code < 0) { + sError("%s, failed to read one record from wal:%s", pPeer->id, name); + break; + } + + if (code == 0) { + code = bytes; + sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes); break; } sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); - int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); - if (ret != wsize) break; - pPeer->sversion = pHead->version; + int32_t wsize = code; + int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); + if (ret != wsize) { + code = -1; + sError("%s, failed to forward wal since %s, hver:%" PRIu64, pPeer->id, strerror(errno), pHead->version); + break; + } + + pPeer->sversion = pHead->version; bytes += wsize; if (pHead->version >= fversion && fversion > 0) { code = 0; - bytes = 0; + sDebug("%s, retrieve wal finished, fver:%" PRIu64, pPeer->id, fversion); break; } } @@ -249,92 +262,61 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi free(pHead); close(sfd); - if (code == 0) return bytes; - return -1; + return code; } static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { SSyncNode *pNode = pPeer->pSyncNode; - int32_t code = -1; + int32_t once = 0; // last WAL has once ever been processed + int64_t offset = 0; + uint64_t fversion = 0; char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file - if (syncAreFilesModified(pNode, pPeer) != 0) return -1; + // get full path to wal file + snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); + sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname); while (1) { - int32_t once = 0; // last WAL has once ever been processed - int64_t offset = 0; - uint64_t fversion = 0; - uint32_t event = 0; + if (syncAreFilesModified(pNode, pPeer)) return -1; + if (syncGetWalVersion(pNode, pPeer) < 0) return -1; - // get full path to wal file - snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); - sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname); - - // monitor last wal - if (syncMonitorLastWal(pPeer, fname) < 0) break; - - while (1) { - int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event); - if (bytes < 0) break; - - // check file changes - if (syncCheckLastWalChanges(pPeer, &event) < 0) break; - - // if file is not updated or updated once, set the fversion and sstatus - if (((event & IN_MODIFY) == 0) || once) { - if (fversion == 0) { - pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt - sDebug("%s, fversion is 0 then set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - fversion = nodeVersion; // must read data to fversion - } - } - - // if all data up to fversion is read out, it is over - if (pPeer->sversion >= fversion && fversion > 0) { - code = 0; - sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes); - break; - } - - // if all data are read out, and no update - if ((bytes == 0) && ((event & IN_MODIFY) == 0)) { - // wal file is closed, break - if (event & IN_CLOSE_WRITE) { - code = 0; - sDebug("%s, current wal is closed", pPeer->id); - break; - } - - // wal not closed, it means some data not flushed to disk, wait for a while - usleep(10000); - } - - // if bytes>0, file is updated, or fversion is not reached but file still open, read again - once = 1; - offset += bytes; - sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes); - event = event & (~IN_MODIFY); // clear IN_MODIFY flag + int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset); + if (bytes < 0) { + sDebug("%s, failed to retrieve last wal", pPeer->id); + return bytes; } - if (code < 0) break; - if (pPeer->sversion >= fversion && fversion > 0) break; + // check file changes + bool walModified = syncIsWalModified(pNode, pPeer); - index++; - wname[0] = 0; - code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); - if (code < 0) break; - if (wname[0] == 0) { - code = 0; - break; + // if file is not updated or updated once, set the fversion and sstatus + if (!walModified || once) { + if (fversion == 0) { + pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt + fversion = nodeVersion; // must read data to fversion + sDebug("%s, fversion is 0 and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); + } } - // current last wal is closed, there is a new one - sDebug("%s, last wal is closed, try new one", pPeer->id); + // if all data up to fversion is read out, it is over + if (pPeer->sversion >= fversion && fversion > 0) { + sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes); + return 0; + } + + // if all data are read out, and no update + if (bytes == 0 && !walModified) { + // wal not closed, it means some data not flushed to disk, wait for a while + usleep(10000); + } + + // if bytes > 0, file is updated, or fversion is not reached but file still open, read again + once = 1; + offset += bytes; + sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64, pPeer->id, bytes, offset); } - taosClose(pPeer->notifyFd); - - return code; + return -1; } static int32_t syncRetrieveWal(SSyncPeer *pPeer) { @@ -377,7 +359,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { close(sfd); if (code < 0) break; - if (syncAreFilesModified(pNode, pPeer) != 0) break; + if (syncAreFilesModified(pNode, pPeer)) break; } if (code == 0) { @@ -474,7 +456,6 @@ void *syncRetrieveData(void *param) { } pPeer->fileChanged = 0; - taosClose(pPeer->notifyFd); taosClose(pPeer->syncFd); syncDecPeerRef(pPeer); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3d3abde3c7..a046f1a50b 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -38,7 +38,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level); static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); -static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver); +static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); #ifndef _SYNC int64_t syncStart(const SSyncInfo *info) { return NULL; } @@ -353,7 +353,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; - syncInfo.getFileVersion = vnodeGetFileVersion; + syncInfo.getVersion = vnodeGetVersion; pVnode->sync = syncStart(&syncInfo); #ifndef _SYNC @@ -771,7 +771,7 @@ static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void return code; } -static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) { +static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { vError("vgId:%d, vnode not found while write to cache", vgId); @@ -780,10 +780,11 @@ static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) { int32_t code = 0; if (pVnode->isCommiting) { - vDebug("vgId:%d, vnode is commiting while get file version", vgId); + vDebug("vgId:%d, vnode is commiting while get version", vgId); code = -1; } else { *fver = pVnode->fversion; + *wver = pVnode->version; } vnodeRelease(pVnode); From 892e2599ecd0112413372ac422d7d98a27f0019a Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Tue, 1 Dec 2020 11:20:14 +0800 Subject: [PATCH 10/14] Revert "[TD-2268]: cancel all related timers in taosTmrCleanUp" --- src/util/src/ttimer.c | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index baf396f030..0222a6d80a 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -560,37 +560,6 @@ void taosTmrCleanUp(void* handle) { tmrDebug("%s timer controller is cleaned up.", ctrl->label); ctrl->label[0] = 0; - // cancel all timers of this controller - for (size_t i = 0; i < timerMap.size; i++) { - timer_list_t* list = timerMap.slots + i; - lockTimerList(list); - - tmr_obj_t* t = list->timers; - tmr_obj_t* prev = NULL; - while (t != NULL) { - tmr_obj_t* next = t->mnext; - if (t->ctrl != ctrl) { - prev = t; - t = next; - continue; - } - - uint8_t state = atomic_val_compare_exchange_8(&t->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); - if (state == TIMER_STATE_WAITING) { - removeFromWheel(t); - } - timerDecRef(t); - if (prev == NULL) { - list->timers = next; - } else { - prev->mnext = next; - } - t = next; - } - - unlockTimerList(list); - } - pthread_mutex_lock(&tmrCtrlMutex); ctrl->next = unusedTmrCtrl; numOfTmrCtrl--; From 4e0f1055dce5aaf60d70b122307f5026ba37e2c7 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 1 Dec 2020 03:44:37 +0000 Subject: [PATCH 11/14] remove a potential rpcSendRecv deadlock --- src/rpc/src/rpcMain.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index acceaf9d7a..4ccdec2bc7 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -630,7 +630,14 @@ static void rpcReleaseConn(SRpcConn *pConn) { } else { // if there is an outgoing message, free it if (pConn->outType && pConn->pReqMsg) { - if (pConn->pContext) pConn->pContext->pConn = NULL; + if (pContext->pRsp) { + // for synchronous API, post semaphore to unblock app + pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR; + pContext->pRsp->pCont = NULL; + pContext->pRsp->contLen = 0; + tsem_post(pContext->pSem); + } + pConn->pContext->pConn = NULL; taosRemoveRef(tsRpcRefId, pConn->pContext->rid); } } From 7fadd4cb3aec7b30c99572586b3a1179f87a4838 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 1 Dec 2020 03:48:24 +0000 Subject: [PATCH 12/14] remove a potential rpcSendRecv deadlock if rpcClose is called by another thread --- src/rpc/src/rpcMain.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 4ccdec2bc7..00a97d7bc2 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -630,6 +630,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { } else { // if there is an outgoing message, free it if (pConn->outType && pConn->pReqMsg) { + SRpcReqContext *pContext = pConn->pContext; if (pContext->pRsp) { // for synchronous API, post semaphore to unblock app pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR; @@ -637,8 +638,8 @@ static void rpcReleaseConn(SRpcConn *pConn) { pContext->pRsp->contLen = 0; tsem_post(pContext->pSem); } - pConn->pContext->pConn = NULL; - taosRemoveRef(tsRpcRefId, pConn->pContext->rid); + pContext->pConn = NULL; + taosRemoveRef(tsRpcRefId, pContext->rid); } } From bbafebfbeef323b9308e685baee4eaa900d4276e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Dec 2020 11:52:43 +0800 Subject: [PATCH 13/14] TD-1926 --- src/sync/src/syncRetrieve.c | 72 ++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 44d2680b1a..36b197dd46 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -97,6 +97,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { while (1) { // retrieve file info fileInfo.name[0] = 0; + fileInfo.size = 0; fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); // fileInfo.size = htonl(size); @@ -105,14 +106,14 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file info int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } // if no file anymore, break if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { - code = TSDB_CODE_SUCCESS; + code = 0; sDebug("%s, no more files to sync", pPeer->id); break; } @@ -120,7 +121,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // wait for the ack from peer ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -141,7 +142,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file to peer int32_t sfd = open(name, O_RDONLY); if (sfd < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -149,7 +150,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); close(sfd); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -165,7 +166,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { } if (code != TSDB_CODE_SUCCESS) { - sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code)); + sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code); } return code; @@ -180,7 +181,7 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { } if (ret == 0) { - sDebug("sfd:%d, read to end of the wal, ret:%d", sfd, ret); + sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret); return 0; } @@ -254,7 +255,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi if (pHead->version >= fversion && fversion > 0) { code = 0; - sDebug("%s, retrieve wal finished, fver:%" PRIu64, pPeer->id, fversion); + sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion); break; } } @@ -294,13 +295,14 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) if (fversion == 0) { pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt fversion = nodeVersion; // must read data to fversion - sDebug("%s, fversion is 0 and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); + sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion); } } // if all data up to fversion is read out, it is over if (pPeer->sversion >= fversion && fversion > 0) { - sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes); + sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes, + pPeer->sversion); return 0; } @@ -324,7 +326,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { char fname[TSDB_FILENAME_LEN * 3]; char wname[TSDB_FILENAME_LEN * 2]; int32_t size; - struct stat fstat; int32_t code = -1; int64_t index = 0; @@ -332,9 +333,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { // retrieve wal info wname[0] = 0; code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); - if (code < 0) break; // error + if (code < 0) { + sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code); + break; + } + if (wname[0] == 0) { // no wal file - sDebug("%s, no wal file", pPeer->id); + code = 0; + sDebug("%s, no wal file anymore", pPeer->id); break; } @@ -346,20 +352,35 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { // get the full path to wal file snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); - // send wal file, - // inotify is not required, old wal file won't be modified, even remove is ok - if (stat(fname, &fstat) < 0) break; - size = fstat.st_size; + // send wal file, old wal file won't be modified, even remove is ok + struct stat fstat; + if (stat(fname, &fstat) < 0) { + code = -1; + sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } + size = fstat.st_size; sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); + int32_t sfd = open(fname, O_RDONLY); - if (sfd < 0) break; + if (sfd < 0) { + code = -1; + sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } code = taosSendFile(pPeer->syncFd, sfd, NULL, size); close(sfd); - if (code < 0) break; + if (code < 0) { + sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } - if (syncAreFilesModified(pNode, pPeer)) break; + if (syncAreFilesModified(pNode, pPeer)) { + code = -1; + break; + } } if (code == 0) { @@ -368,9 +389,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { SWalHead walHead; memset(&walHead, 0, sizeof(walHead)); - code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); + taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); } else { - sError("%s, failed to send wal since %s", pPeer->id, strerror(errno)); + sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); } return code; @@ -410,7 +431,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { pPeer->sversion = 0; pPeer->sstatus = TAOS_SYNC_STATUS_FILE; sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - if (syncRetrieveFile(pPeer) < 0) { + if (syncRetrieveFile(pPeer) != 0) { sError("%s, failed to retrieve files", pPeer->id); return -1; } @@ -419,8 +440,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { if (pPeer->sversion == 0) pPeer->sversion = 1; sInfo("%s, start to retrieve wals", pPeer->id); - if (syncRetrieveWal(pPeer) < 0) { - sError("%s, failed to retrieve wals", pPeer->id); + int32_t code = syncRetrieveWal(pPeer); + if (code != 0) { + sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code); return -1; } From c1b46153509597ea30d52873759f714550306653 Mon Sep 17 00:00:00 2001 From: stephenkgu Date: Tue, 1 Dec 2020 13:19:32 +0800 Subject: [PATCH 14/14] [TD-2255]: make db options days and keep error message more clear --- src/inc/taoserror.h | 3 +++ src/mnode/src/mnodeDb.c | 12 +++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 5fde6d40d5..b526fe2b09 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -184,6 +184,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DATABASES, 0, 0x0385, "Too many d TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_DROPPING, 0, 0x0386, "Database not available") TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_READY, 0, 0x0387, "Database unsynced") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_DAYS, 0, 0x0390, "Invalid database option: days out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_KEEP, 0, 0x0391, "Invalid database option: keep >= keep2 >= keep1 >= days") + // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, 0, 0x0400, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "Dnode out of memory") diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 999c606284..39a0c03008 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -236,30 +236,28 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) { mError("invalid db option daysPerFile:%d valid range: [%d, %d]", pCfg->daysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE); - return TSDB_CODE_MND_INVALID_DB_OPTION; + return TSDB_CODE_MND_INVALID_DB_OPTION_DAYS; } if (pCfg->daysToKeep < TSDB_MIN_KEEP || pCfg->daysToKeep > TSDB_MAX_KEEP) { mError("invalid db option daysToKeep:%d valid range: [%d, %d]", pCfg->daysToKeep, TSDB_MIN_KEEP, TSDB_MAX_KEEP); - return TSDB_CODE_MND_INVALID_DB_OPTION; + return TSDB_CODE_MND_INVALID_DB_OPTION_KEEP; } if (pCfg->daysToKeep < pCfg->daysPerFile) { mError("invalid db option daysToKeep:%d should larger than daysPerFile:%d", pCfg->daysToKeep, pCfg->daysPerFile); - return TSDB_CODE_MND_INVALID_DB_OPTION; + return TSDB_CODE_MND_INVALID_DB_OPTION_KEEP; } -#if 0 if (pCfg->daysToKeep2 < TSDB_MIN_KEEP || pCfg->daysToKeep2 > pCfg->daysToKeep) { mError("invalid db option daysToKeep2:%d valid range: [%d, %d]", pCfg->daysToKeep, TSDB_MIN_KEEP, pCfg->daysToKeep); - return TSDB_CODE_MND_INVALID_DB_OPTION; + return TSDB_CODE_MND_INVALID_DB_OPTION_KEEP; } if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > pCfg->daysToKeep2) { mError("invalid db option daysToKeep1:%d valid range: [%d, %d]", pCfg->daysToKeep1, TSDB_MIN_KEEP, pCfg->daysToKeep2); - return TSDB_CODE_MND_INVALID_DB_OPTION; + return TSDB_CODE_MND_INVALID_DB_OPTION_KEEP; } -#endif if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) { mError("invalid db option maxRowsPerFileBlock:%d valid range: [%d, %d]", pCfg->maxRowsPerFileBlock,