fix(tmq): remove unnecessary error log.

This commit is contained in:
Haojun Liao 2023-05-01 14:47:01 +08:00
parent 465c595c3a
commit 563fca5c54
2 changed files with 58 additions and 60 deletions

View File

@ -31,9 +31,14 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer) {
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile != NULL) { if (pFile == NULL) {
return TSDB_CODE_SUCCESS;
}
int32_t vgId = TD_VID(pStore->pTq->pVnode);
int64_t code = 0;
STqOffsetHead head = {0}; STqOffsetHead head = {0};
int64_t code;
while (1) { while (1) {
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
@ -43,22 +48,27 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
return -1; return -1;
} }
} }
int32_t size = htonl(head.size); int32_t size = htonl(head.size);
void* memBuf = taosMemoryCalloc(1, size); void* pMemBuf = taosMemoryCalloc(1, size);
if (memBuf == NULL) { if (pMemBuf == NULL) {
tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if ((code = taosReadFile(pFile, memBuf, size)) != size) {
taosMemoryFree(memBuf); if ((code = taosReadFile(pFile, pMemBuf, size)) != size) {
taosMemoryFree(pMemBuf);
return -1; return -1;
} }
STqOffset offset; STqOffset offset;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, memBuf, size); tDecoderInit(&decoder, pMemBuf, size);
if (tDecodeSTqOffset(&decoder, &offset) < 0) { if (tDecodeSTqOffset(&decoder, &offset) < 0) {
taosMemoryFree(memBuf); taosMemoryFree(pMemBuf);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return -1; return code;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -66,22 +76,22 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
return -1; return -1;
} }
// todo remove this
if (offset.val.type == TMQ_OFFSET__LOG) { if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) { if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, // tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey,
pHandle->subKey, offset.val.version); // offset.val.version);
} }
} }
} }
taosMemoryFree(memBuf); taosMemoryFree(pMemBuf);
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
} return TSDB_CODE_SUCCESS;
return 0;
} }
STqOffsetStore* tqOffsetOpen(STQ* pTq) { STqOffsetStore* tqOffsetOpen(STQ* pTq) {
@ -89,6 +99,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
if (pStore == NULL) { if (pStore == NULL) {
return NULL; return NULL;
} }
pStore->pTq = pTq; pStore->pTq = pTq;
pStore->needCommit = 0; pStore->needCommit = 0;
pTq->pOffsetStore = pStore; pTq->pOffsetStore = pStore;
@ -98,12 +109,14 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
taosMemoryFree(pStore); taosMemoryFree(pStore);
return NULL; return NULL;
} }
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
if (tqOffsetRestoreFromFile(pStore, fname) < 0) { if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
taosMemoryFree(fname); taosMemoryFree(fname);
taosMemoryFree(pStore); taosMemoryFree(pStore);
return NULL; return NULL;
} }
taosMemoryFree(fname); taosMemoryFree(fname);
return pStore; return pStore;
} }

View File

@ -335,21 +335,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
} }
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
#if 0
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId);
return -1;
}
memcpy(data, pReq, len);
SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver};
tqDebug("vgId:%d tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", vgId, data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit);
#endif
SPackedData submit = {0}; SPackedData submit = {0};
tqProcessSubmitReq(pTq, submit); tqProcessSubmitReq(pTq, submit);
} }