fix(tq): set the offset to be the consume offset, instead of the committed offset. And update the table sim.

This commit is contained in:
Haojun Liao 2023-05-12 16:14:56 +08:00
parent 4b961753a5
commit 99422c87c3
2 changed files with 15 additions and 13 deletions

View File

@ -439,7 +439,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
taosRLockLatch(&pTq->lock); taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); consumerId, vgId, req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
return -1; return -1;
@ -498,6 +498,8 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, &req);
@ -522,8 +524,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = sver; dataRsp.rspOffset.version = currentVer; // return current consume offset value
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever; dataRsp.rspOffset.version = ever;
} else { } else {
@ -909,11 +913,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
} }
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code; int32_t code = 0;
#if 0 int32_t vgId = TD_VID(pTq->pVnode);
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
if (code < 0) return code;
#endif
if (tsDisableStream) { if (tsDisableStream) {
return 0; return 0;
} }
@ -939,7 +941,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWLockLatch(&pTq->pStreamMeta->lock); taosWLockLatch(&pTq->pStreamMeta->lock);
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr,
streamMetaGetNumOfTasks(pTq->pStreamMeta)); streamMetaGetNumOfTasks(pTq->pStreamMeta));
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1; return -1;
@ -952,8 +954,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamTaskCheckDownstream(pTask, sversion); streamTaskCheckDownstream(pTask, sversion);
} }
tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
return 0; return 0;
} }

View File

@ -665,8 +665,8 @@ sql_error alter table tb2023 add column v varchar(65530);
sql alter table tb2023 add column v varchar(16374); sql alter table tb2023 add column v varchar(16374);
sql desc tb2023 sql desc tb2023
sql alter table tb2023 drop column v sql alter table tb2023 drop column v
sql_error alter table tb2023 add column v nchar(4094); sql_error alter table tb2023 add column v nchar(16384);
sql alter table tb2023 add column v nchar(4093); sql alter table tb2023 add column v nchar(16374);
sql desc tb2023 sql desc tb2023
print ======= over print ======= over
sql drop database d1 sql drop database d1