fix: close wal ref
This commit is contained in:
parent
5b8cc2234b
commit
a6fff95795
|
@ -33,16 +33,16 @@ extern "C" {
|
||||||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define WAL_PROTO_VER 0
|
#define WAL_PROTO_VER 0
|
||||||
#define WAL_NOSUFFIX_LEN 20
|
#define WAL_NOSUFFIX_LEN 20
|
||||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
||||||
#define WAL_LOG_SUFFIX "log"
|
#define WAL_LOG_SUFFIX "log"
|
||||||
#define WAL_INDEX_SUFFIX "idx"
|
#define WAL_INDEX_SUFFIX "idx"
|
||||||
#define WAL_REFRESH_MS 1000
|
#define WAL_REFRESH_MS 1000
|
||||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_WAL_WRITE = 1,
|
TAOS_WAL_WRITE = 1,
|
||||||
|
|
|
@ -725,9 +725,15 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (code != 0) {
|
if (pHandle) {
|
||||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
if (pHandle->pRef) {
|
||||||
|
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
||||||
|
}
|
||||||
|
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
|
if (code != 0) {
|
||||||
|
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
||||||
|
@ -736,7 +742,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
|
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
|
||||||
ASSERT(0);
|
tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -223,6 +223,7 @@ void walClose(SWal *pWal) {
|
||||||
taosMemoryFree(pRef);
|
taosMemoryFree(pRef);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pWal->pRefHash);
|
taosHashCleanup(pWal->pRefHash);
|
||||||
|
pWal->pRefHash = NULL;
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
|
||||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||||
|
|
|
@ -32,15 +32,18 @@ SWalRef *walOpenRef(SWal *pWal) {
|
||||||
return pRef;
|
return pRef;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1
|
|
||||||
void walCloseRef(SWal *pWal, int64_t refId) {
|
void walCloseRef(SWal *pWal, int64_t refId) {
|
||||||
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
|
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||||
if (ppRef == NULL) return;
|
if (ppRef == NULL) return;
|
||||||
SWalRef *pRef = *ppRef;
|
SWalRef *pRef = *ppRef;
|
||||||
|
if (pRef) {
|
||||||
|
wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
|
||||||
|
} else {
|
||||||
|
wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId);
|
||||||
|
}
|
||||||
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
|
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||||
taosMemoryFree(pRef);
|
taosMemoryFree(pRef);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
||||||
SWal *pWal = pRef->pWal;
|
SWal *pWal = pRef->pWal;
|
||||||
|
|
Loading…
Reference in New Issue