From 1e17d776f905e998ea69e13fbde9726156d39954 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 10 Jan 2022 16:13:05 +0800 Subject: [PATCH] fix crash --- source/client/inc/clientHb.h | 48 +++++++-- source/client/src/clientHb.c | 142 ++++++++++++++------------ source/common/src/tmsg.c | 16 +-- source/dnode/mnode/impl/src/mndSync.c | 4 +- source/libs/wal/inc/walInt.h | 1 + source/libs/wal/src/walMeta.c | 1 + source/libs/wal/src/walWrite.c | 8 ++ 7 files changed, 137 insertions(+), 83 deletions(-) diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 2624c8f833..7bc4311b29 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -18,7 +18,7 @@ #include "thash.h" #include "tmsg.h" -#define HEARTBEAT_INTERVAL 1500 //ms +#define HEARTBEAT_INTERVAL 1500 // ms typedef enum { HEARTBEAT_TYPE_MQ = 0, @@ -29,20 +29,50 @@ typedef enum { typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); -//TODO: embed param into function -//return type: SArray +typedef struct SAppHbMgr { + // statistics + int32_t reportCnt; + int32_t connKeyCnt; + int64_t reportBytes; // not implemented + int64_t startTime; + // ctl + SRWLatch lock; // lock is used in serialization + // connection + void* transporter; + SEpSet epSet; + // info + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash +} SAppHbMgr; + +typedef struct SClientHbMgr { + int8_t inited; + // ctl + int8_t threadStop; + pthread_t thread; + pthread_mutex_t lock; // used when app init and cleanup + SArray* appHbMgrs; // SArray one for each cluster + FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; +} SClientHbMgr; + +// TODO: embed param into function +// return type: SArray typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param); -// called by mgmt -int hbMgrInit(void* transporter, SEpSet epSet); +// global, called by mgmt +int hbMgrInit(); void hbMgrCleanUp(); int hbHandleRsp(SClientHbBatchRsp* hbRsp); -//called by user -int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); -void hbDeregisterConn(SClientHbKey connKey); +// cluster level +SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet); +void appHbMgrCleanup(SAppHbMgr* pAppHbMgr); -int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); +// conn level +int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func); +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); + +int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); // mq void hbMgrInitMqHbRspHandle(); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1d50f7574a..9bbd62c1d9 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -16,27 +16,6 @@ #include "clientHb.h" #include "trpc.h" -typedef struct SClientHbMgr { - int8_t inited; - - // statistics - int32_t reportCnt; - int32_t connKeyCnt; - int64_t reportBytes; // not implemented - int64_t startTime; - // ctl - int8_t threadStop; - pthread_t thread; - SRWLatch lock; // lock is used in serialization - // connection - void* transporter; - SEpSet epSet; - - SHashObj* activeInfo; // hash - SHashObj* getInfoFuncs; // hash - FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; -} SClientHbMgr; - static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); @@ -55,32 +34,32 @@ static FORCE_INLINE void hbMgrInitHandle() { hbMgrInitMqHbRspHandle(); } -static SClientHbBatchReq* hbGatherAllInfo() { +SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq)); - if(pReq == NULL) { + if (pReq == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt); + int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); - void *pIter = taosHashIterate(clientHbMgr.activeInfo, NULL); + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { taosArrayPush(pReq->reqs, pIter); SClientHbReq* pOneReq = pIter; taosHashClear(pOneReq->info); - pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } - pIter = taosHashIterate(clientHbMgr.getInfoFuncs, NULL); + pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); while (pIter != NULL) { FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; SClientHbKey connKey; taosHashCopyKey(pIter, &connKey); getConnInfoFp(connKey, NULL); - pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } return pReq; @@ -93,22 +72,24 @@ static void* hbThreadFunc(void* param) { if(threadStop) { break; } - - SClientHbBatchReq* pReq = hbGatherAllInfo(); - void* reqStr = NULL; - int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq); - SMsgSendInfo info; - /*info.fp = hbHandleRsp;*/ - int64_t transporterId = 0; - asyncSendMsgToServer(clientHbMgr.transporter, &clientHbMgr.epSet, &transporterId, &info); - tFreeClientHbBatchReq(pReq); + int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); + for(int i = 0; i < sz; i++) { + SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i); + SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr); + void* reqStr = NULL; + int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq); + SMsgSendInfo info; + /*info.fp = hbHandleRsp;*/ - atomic_add_fetch_32(&clientHbMgr.reportCnt, 1); - taosMsleep(HEARTBEAT_INTERVAL); + int64_t transporterId = 0; + asyncSendMsgToServer(pAppHbMgr->transporter, &pAppHbMgr->epSet, &transporterId, &info); + tFreeClientHbBatchReq(pReq); + atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); + taosMsleep(HEARTBEAT_INTERVAL); + } } - return NULL; } @@ -129,27 +110,55 @@ static void hbStopThread() { atomic_store_8(&clientHbMgr.threadStop, 1); } -int hbMgrInit(void* transporter, SEpSet epSet) { +SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) { + SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); + if (pAppHbMgr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + // init stat + pAppHbMgr->startTime = taosGetTimestampMs(); + + // init connection info + pAppHbMgr->transporter = transporter; + pAppHbMgr->epSet = epSet; + + // init hash info + pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq; + // init getInfoFunc + pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + + taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); + return pAppHbMgr; +} + +void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) { + pthread_mutex_lock(&clientHbMgr.lock); + + int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); + for (int i = 0; i < sz; i++) { + SAppHbMgr* pTarget = taosArrayGet(clientHbMgr.appHbMgrs, i); + if (pAppHbMgr == pTarget) { + taosHashCleanup(pTarget->activeInfo); + taosHashCleanup(pTarget->getInfoFuncs); + } + } + + pthread_mutex_unlock(&clientHbMgr.lock); +} + +int hbMgrInit() { // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; - // init stat - clientHbMgr.startTime = taosGetTimestampMs(); + clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*)); + pthread_mutex_init(&clientHbMgr.lock, NULL); // init handle funcs hbMgrInitHandle(); - // init hash info - clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - clientHbMgr.activeInfo->freeFp = tFreeClientHbReq; - // init getInfoFunc - clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - - //init connection info - clientHbMgr.transporter = transporter; - clientHbMgr.epSet = epSet; - // init backgroud thread hbCreateThread(); @@ -157,11 +166,12 @@ int hbMgrInit(void* transporter, SEpSet epSet) { } void hbMgrCleanUp() { + // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); if (old == 0) return; - taosHashCleanup(clientHbMgr.activeInfo); - taosHashCleanup(clientHbMgr.getInfoFuncs); + taosArrayDestroy(clientHbMgr.appHbMgrs); + } int hbHandleRsp(SClientHbBatchRsp* hbRsp) { @@ -181,34 +191,34 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp) { return 0; } -int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { +int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) { // init hash in activeinfo - void* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { return 0; } SClientHbReq hbReq; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - taosHashPut(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); // init hash if (func != NULL) { - taosHashPut(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); + taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); } - atomic_add_fetch_32(&clientHbMgr.connKeyCnt, 1); + atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); return 0; } -void hbDeregisterConn(SClientHbKey connKey) { - taosHashRemove(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); - taosHashRemove(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey)); - atomic_sub_fetch_32(&clientHbMgr.connKeyCnt, 1); +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { + taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey)); + atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } -int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { +int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { // find req by connection id - SClientHbReq* pReq = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); ASSERT(pReq != NULL); taosHashPut(pReq->info, key, keyLen, value, valueLen); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 09523da965..b94bd6f715 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -31,7 +31,9 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int tlen = 0; tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); - SKv kv; + int kvNum = taosHashGetSize(pReq->info); + tlen += taosEncodeFixedI32(buf, kvNum); + SKv kv; void* pIter = taosHashIterate(pReq->info, pIter); while (pIter != NULL) { taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); @@ -49,12 +51,14 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { buf = taosDecodeSClientHbKey(buf, &pReq->connKey); // TODO: error handling - if (pReq->info == NULL) { - pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + int kvNum; + taosDecodeFixedI32(buf, &kvNum); + pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + for(int i = 0; i < kvNum; i++) { + SKv kv; + buf = taosDecodeSKv(buf, &kv); + taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); } - SKv kv; - buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); return buf; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 47d0ce4105..e55da73f62 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -95,11 +95,11 @@ static int32_t mndRestoreWal(SMnode *pMnode) { if (sdbWriteFile(pSdb) != 0) { goto WAL_RESTORE_OVER; } - } if (walEndSnapshot(pWal) < 0) { goto WAL_RESTORE_OVER; } + } code = 0; @@ -181,4 +181,4 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; return pMgmt->state == TAOS_SYNC_STATE_LEADER; -} \ No newline at end of file +} diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 871c95193f..7631593dd8 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -131,6 +131,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); // seek section int walChangeFile(SWal* pWal, int64_t ver); +int walChangeFileToLast(SWal* pWal); // seek section end int64_t walGetSeq(); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 6164ec2baf..270a26bf80 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -184,6 +184,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { } taosArraySetSize(pArray, sz); pWal->fileInfoSet = pArray; + pWal->writeCur = sz - 1; cJSON_Delete(pRoot); return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 5eda16b061..975f232e3d 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -254,6 +254,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { SWalIdxEntry entry = {.ver = ver, .offset = offset}; int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry)); if (size != sizeof(SWalIdxEntry)) { + terrno = TAOS_SYSTEM_ERROR(errno); // TODO truncate return -1; } @@ -287,7 +288,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } /*if (!tfValid(pWal->writeLogTfd)) return -1;*/ + ASSERT(pWal->writeCur >= 0); + pthread_mutex_lock(&pWal->mutex); + if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) { + walChangeFileToLast(pWal); + } + pWal->writeHead.head.version = index; int64_t offset = walGetCurFileOffset(pWal); @@ -309,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } + code = walWriteIndex(pWal, index, offset); if (code != 0) { // TODO