Merge pull request #22007 from taosdata/mark/tmq

fix:can not do assignment if in tsdb mode & do not commit if seek offset
This commit is contained in:
Haojun Liao 2023-07-11 09:17:22 +08:00 committed by GitHub
commit 0e9b9b3a44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 48 deletions

View File

@ -771,6 +771,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) #define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003)
#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004) #define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004)
#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005) #define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005)
#define TSDB_CODE_TMQ_SNAPSHOT_ERROR TAOS_DEF_ERROR_CODE(0, 0x4006)
// stream // stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -151,7 +151,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
int32_t vgStatus; int32_t vgStatus;
int32_t vgSkipCnt; // here used to mark the slow vgroups int32_t vgSkipCnt; // here used to mark the slow vgroups
bool receivedInfoFromVnode; // has already received info from vnode // bool receivedInfoFromVnode; // has already received info from vnode
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
SEpSet epSet; SEpSet epSet;
@ -1521,7 +1521,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerBegin = -1;
clientVg.offsetInfo.walVerEnd = -1; clientVg.offsetInfo.walVerEnd = -1;
clientVg.seekUpdated = false; clientVg.seekUpdated = false;
clientVg.receivedInfoFromVnode = false; // clientVg.receivedInfoFromVnode = false;
taosArrayPush(pTopic->vgs, &clientVg); taosArrayPush(pTopic->vgs, &clientVg);
} }
@ -1893,7 +1893,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, i
// update the valid wal version range // update the valid wal version range
pVg->offsetInfo.walVerBegin = sver; pVg->offsetInfo.walVerBegin = sver;
pVg->offsetInfo.walVerEnd = ever; pVg->offsetInfo.walVerEnd = ever;
pVg->receivedInfoFromVnode = true; // pVg->receivedInfoFromVnode = true;
} }
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
@ -2556,6 +2556,13 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) {
taosMemoryFree(pCommon); taosMemoryFree(pCommon);
} }
static bool isInSnapshotMode(int8_t type, bool useSnapshot){
if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
return true;
}
return false;
}
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
int32_t* numOfAssignment) { int32_t* numOfAssignment) {
*numOfAssignment = 0; *numOfAssignment = 0;
@ -2576,6 +2583,15 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
// in case of snapshot is opened, no valid offset will return // in case of snapshot is opened, no valid offset will return
*numOfAssignment = taosArrayGetSize(pTopic->vgs); *numOfAssignment = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
int32_t type = pClientVg->offsetInfo.currentOffset.type;
if (isInSnapshotMode(type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
goto end;
}
}
*assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment)); *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
if (*assignment == NULL) { if (*assignment == NULL) {
@ -2589,18 +2605,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
for (int32_t j = 0; j < (*numOfAssignment); ++j) { for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (!pClientVg->receivedInfoFromVnode) { if (pClientVg->offsetInfo.currentOffset.type != TMQ_OFFSET__LOG) {
needFetch = true; needFetch = true;
break; break;
} }
tmq_topic_assignment* pAssignment = &(*assignment)[j]; tmq_topic_assignment* pAssignment = &(*assignment)[j];
if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version; pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
} else {
pAssignment->currentOffset = 0;
}
pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->end = pClientVg->offsetInfo.walVerEnd;
pAssignment->vgId = pClientVg->vgId; pAssignment->vgId = pClientVg->vgId;
@ -2708,18 +2719,10 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
} }
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
// char offsetBuf[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end; pOffsetInfo->walVerEnd = p->end;
// pOffsetInfo->currentOffset.version = p->currentOffset;
// pOffsetInfo->committedOffset.version = p->currentOffset;
} }
} }
} }
@ -2780,10 +2783,10 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
int32_t type = pOffsetInfo->currentOffset.type; int32_t type = pOffsetInfo->currentOffset.type;
if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { if (isInSnapshotMode(type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
} }
if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
@ -2794,40 +2797,37 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
} }
// update the offset, and then commit to vnode // update the offset, and then commit to vnode
// if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
pOffsetInfo->committedOffset.version = INT64_MIN; // pOffsetInfo->committedOffset.version = INT64_MIN;
pVg->seekUpdated = true; pVg->seekUpdated = true;
// }
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); // SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
if (pInfo == NULL) { // tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId); //
return TSDB_CODE_OUT_OF_MEMORY; // SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
} // if (pInfo == NULL) {
// tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
tsem_init(&pInfo->sem, 0, 0); // return TSDB_CODE_OUT_OF_MEMORY;
pInfo->code = 0; // }
//
asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo); // tsem_init(&pInfo->sem, 0, 0);
// pInfo->code = 0;
tsem_wait(&pInfo->sem); //
int32_t code = pInfo->code; // asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);
//
tsem_destroy(&pInfo->sem); // tsem_wait(&pInfo->sem);
taosMemoryFree(pInfo); // int32_t code = pInfo->code;
//
if (code != TSDB_CODE_SUCCESS) { // tsem_destroy(&pInfo->sem);
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, // taosMemoryFree(pInfo);
tstrerror(code)); //
} // if (code != TSDB_CODE_SUCCESS) {
// tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code));
return code; // }
return 0;
} }

View File

@ -628,6 +628,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval
//tmq //tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")