fix:can not do assignment if in tsdb mode
This commit is contained in:
parent
dabaefe3e2
commit
8e011c46c9
|
@ -151,7 +151,7 @@ typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t vgStatus;
|
int32_t vgStatus;
|
||||||
int32_t vgSkipCnt; // here used to mark the slow vgroups
|
int32_t vgSkipCnt; // here used to mark the slow vgroups
|
||||||
bool receivedInfoFromVnode; // has already received info from vnode
|
// bool receivedInfoFromVnode; // has already received info from vnode
|
||||||
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
|
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||||
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
|
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
|
@ -1521,7 +1521,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
clientVg.offsetInfo.walVerBegin = -1;
|
clientVg.offsetInfo.walVerBegin = -1;
|
||||||
clientVg.offsetInfo.walVerEnd = -1;
|
clientVg.offsetInfo.walVerEnd = -1;
|
||||||
clientVg.seekUpdated = false;
|
clientVg.seekUpdated = false;
|
||||||
clientVg.receivedInfoFromVnode = false;
|
// clientVg.receivedInfoFromVnode = false;
|
||||||
|
|
||||||
taosArrayPush(pTopic->vgs, &clientVg);
|
taosArrayPush(pTopic->vgs, &clientVg);
|
||||||
}
|
}
|
||||||
|
@ -1893,7 +1893,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, i
|
||||||
// update the valid wal version range
|
// update the valid wal version range
|
||||||
pVg->offsetInfo.walVerBegin = sver;
|
pVg->offsetInfo.walVerBegin = sver;
|
||||||
pVg->offsetInfo.walVerEnd = ever;
|
pVg->offsetInfo.walVerEnd = ever;
|
||||||
pVg->receivedInfoFromVnode = true;
|
// pVg->receivedInfoFromVnode = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
|
@ -2556,6 +2556,13 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) {
|
||||||
taosMemoryFree(pCommon);
|
taosMemoryFree(pCommon);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isInSnapshotMode(int8_t type, bool useSnapshot){
|
||||||
|
if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
|
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
|
||||||
int32_t* numOfAssignment) {
|
int32_t* numOfAssignment) {
|
||||||
*numOfAssignment = 0;
|
*numOfAssignment = 0;
|
||||||
|
@ -2578,9 +2585,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
*numOfAssignment = taosArrayGetSize(pTopic->vgs);
|
*numOfAssignment = taosArrayGetSize(pTopic->vgs);
|
||||||
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
||||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if ((pClientVg->offsetInfo.currentOffset.type < TMQ_OFFSET__LOG && tmq->useSnapshot) ||
|
int32_t type = pClientVg->offsetInfo.currentOffset.type;
|
||||||
pClientVg->offsetInfo.currentOffset.type > TMQ_OFFSET__LOG) {
|
if (isInSnapshotMode(type, tmq->useSnapshot)) {
|
||||||
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, pClientVg->offsetInfo.currentOffset.type);
|
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
|
||||||
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
|
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -2598,18 +2605,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
|
|
||||||
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
||||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (!pClientVg->receivedInfoFromVnode) {
|
if (pClientVg->offsetInfo.currentOffset.type != TMQ_OFFSET__LOG) {
|
||||||
needFetch = true;
|
needFetch = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_topic_assignment* pAssignment = &(*assignment)[j];
|
tmq_topic_assignment* pAssignment = &(*assignment)[j];
|
||||||
if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
|
||||||
pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
|
|
||||||
} else {
|
|
||||||
pAssignment->currentOffset = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
|
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
|
||||||
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
|
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
|
||||||
pAssignment->vgId = pClientVg->vgId;
|
pAssignment->vgId = pClientVg->vgId;
|
||||||
|
@ -2717,18 +2719,10 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
|
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
|
||||||
|
|
||||||
// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
|
||||||
|
|
||||||
// char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
|
||||||
// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
|
|
||||||
|
|
||||||
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
|
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
|
||||||
|
|
||||||
pOffsetInfo->walVerBegin = p->begin;
|
pOffsetInfo->walVerBegin = p->begin;
|
||||||
pOffsetInfo->walVerEnd = p->end;
|
pOffsetInfo->walVerEnd = p->end;
|
||||||
// pOffsetInfo->currentOffset.version = p->currentOffset;
|
|
||||||
// pOffsetInfo->committedOffset.version = p->currentOffset;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2789,7 +2783,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
||||||
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
|
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
|
||||||
|
|
||||||
int32_t type = pOffsetInfo->currentOffset.type;
|
int32_t type = pOffsetInfo->currentOffset.type;
|
||||||
if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
|
if (isInSnapshotMode(type, tmq->useSnapshot)) {
|
||||||
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
|
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
|
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
|
||||||
|
@ -2803,12 +2797,10 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the offset, and then commit to vnode
|
// update the offset, and then commit to vnode
|
||||||
// if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
|
|
||||||
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
||||||
pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
|
pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
|
||||||
pOffsetInfo->committedOffset.version = INT64_MIN;
|
pOffsetInfo->committedOffset.version = INT64_MIN;
|
||||||
pVg->seekUpdated = true;
|
pVg->seekUpdated = true;
|
||||||
// }
|
|
||||||
|
|
||||||
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
|
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
|
||||||
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
|
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
|
||||||
|
@ -2834,8 +2826,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
|
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code));
|
||||||
tstrerror(code));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue