From 563fca5c54f1c0fb5fca22f993d3563efb3b612e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 1 May 2023 14:47:01 +0800 Subject: [PATCH] fix(tmq): remove unnecessary error log. --- source/dnode/vnode/src/tq/tqOffset.c | 103 +++++++++++++++------------ source/dnode/vnode/src/tq/tqPush.c | 15 ---- 2 files changed, 58 insertions(+), 60 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index e8051a1406..34e93cec2d 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -31,57 +31,67 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer) { int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); - if (pFile != NULL) { - STqOffsetHead head = {0}; - int64_t code; + if (pFile == NULL) { + return TSDB_CODE_SUCCESS; + } - while (1) { - if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { - if (code == 0) { - break; - } else { - return -1; - } - } - int32_t size = htonl(head.size); - void* memBuf = taosMemoryCalloc(1, size); - if (memBuf == NULL) { + int32_t vgId = TD_VID(pStore->pTq->pVnode); + int64_t code = 0; + + STqOffsetHead head = {0}; + + while (1) { + if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + if (code == 0) { + break; + } else { return -1; } - if ((code = taosReadFile(pFile, memBuf, size)) != size) { - taosMemoryFree(memBuf); - return -1; - } - STqOffset offset; - SDecoder decoder; - tDecoderInit(&decoder, memBuf, size); - if (tDecodeSTqOffset(&decoder, &offset) < 0) { - taosMemoryFree(memBuf); - tDecoderClear(&decoder); - return -1; - } - - tDecoderClear(&decoder); - if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { - return -1; - } - - if (offset.val.type == TMQ_OFFSET__LOG) { - STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); - if (pHandle) { - if (walRefVer(pHandle->pRef, offset.val.version) < 0) { - tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, - pHandle->subKey, offset.val.version); - } - } - } - - taosMemoryFree(memBuf); } - taosCloseFile(&pFile); + int32_t size = htonl(head.size); + void* pMemBuf = taosMemoryCalloc(1, size); + if (pMemBuf == NULL) { + tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if ((code = taosReadFile(pFile, pMemBuf, size)) != size) { + taosMemoryFree(pMemBuf); + return -1; + } + + STqOffset offset; + SDecoder decoder; + tDecoderInit(&decoder, pMemBuf, size); + if (tDecodeSTqOffset(&decoder, &offset) < 0) { + taosMemoryFree(pMemBuf); + tDecoderClear(&decoder); + return code; + } + + tDecoderClear(&decoder); + if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { + return -1; + } + + // todo remove this + if (offset.val.type == TMQ_OFFSET__LOG) { + STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); + if (pHandle) { + if (walRefVer(pHandle->pRef, offset.val.version) < 0) { +// tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey, +// offset.val.version); + } + } + } + + taosMemoryFree(pMemBuf); } - return 0; + + taosCloseFile(&pFile); + return TSDB_CODE_SUCCESS; } STqOffsetStore* tqOffsetOpen(STQ* pTq) { @@ -89,6 +99,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { if (pStore == NULL) { return NULL; } + pStore->pTq = pTq; pStore->needCommit = 0; pTq->pOffsetStore = pStore; @@ -98,12 +109,14 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { taosMemoryFree(pStore); return NULL; } + char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); if (tqOffsetRestoreFromFile(pStore, fname) < 0) { taosMemoryFree(fname); taosMemoryFree(pStore); return NULL; } + taosMemoryFree(fname); return pStore; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 7a1a6b7454..ce222fcda7 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -335,21 +335,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v } if (msgType == TDMT_VND_SUBMIT) { -#if 0 - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId); - return -1; - } - - memcpy(data, pReq, len); - SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver}; - - tqDebug("vgId:%d tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", vgId, data, len, ver, pReq); - tqProcessSubmitReq(pTq, submit); -#endif SPackedData submit = {0}; tqProcessSubmitReq(pTq, submit); }