diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index bfd6908e16..b331e68b73 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1127,7 +1127,7 @@ TEST(clientCase, tmq_commit) { printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position); tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1); position = tmq_position(tmq, topicName, pAssign[i].vgId); - printf("after seek 100, position vgId:%d, position:%lld\n", pAssign[i].vgId, position); + printf("after seek 1, position vgId:%d, position:%lld\n", pAssign[i].vgId, position); } while (1) { @@ -1143,6 +1143,12 @@ TEST(clientCase, tmq_commit) { for(int i = 0; i < numOfAssign; i++) { int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed); + if(committed > 0){ + int32_t code = tmq_commit_offset_sync(tmq, topicName, pAssign[i].vgId, 4); + printf("tmq_commit_offset_sync vgId:%d, offset:4, code:%d\n", pAssign[i].vgId, code); + int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); + printf("after tmq_commit_offset_sync, committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed); + } } if (pRes != NULL) { taos_free_result(pRes); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bf0067b128..03d6932578 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -85,9 +85,9 @@ void tqDestroyTqHandle(void* data) { } } -static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { +static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && - pLeft->val.version <= pRight->val.version; + pLeft->val.version == pRight->val.version; } STQ* tqOpen(const char* path, SVnode* pVnode) { @@ -302,10 +302,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t STqOffset* pOffset = &vgOffset.offset; if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) { - tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, + tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts); } else if (pOffset->val.type == TMQ_OFFSET__LOG) { - tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, + tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, pOffset->val.version); if (pOffset->val.version + 1 == sversion) { pOffset->val.version += 1; @@ -316,8 +316,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); - if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) { - tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, + if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) { + tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); return 0; // no need to update the offset value } @@ -605,7 +605,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { return TSDB_CODE_INVALID_PARA; } - void* buf = taosMemoryCalloc(1, len); + void* buf = rpcMallocCont(len); if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; }