refactor(tmq): offset storage
This commit is contained in:
parent
aef7c7ebe1
commit
18993d50e5
|
@ -376,6 +376,9 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
|
|||
}
|
||||
#endif
|
||||
|
||||
/*tscDebug("receive offset commit cb of %s on vg %d, offset is %ld", pParam->pOffset->subKey, pParam->->vgId,
|
||||
* pOffset->version);*/
|
||||
|
||||
// count down waiting rsp
|
||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||
ASSERT(waitingRspNum >= 0);
|
||||
|
@ -421,10 +424,20 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
|
||||
tscDebug("consumer %ld begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
|
||||
(int32_t)taosArrayGetSize(pTopic->vgs));
|
||||
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, i);
|
||||
if (pVg->currentOffset < 0) {
|
||||
/*if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {*/
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
|
||||
tscDebug("consumer %ld begin commit for topic %s, vgId %d", tmq->consumerId, pTopic->topicName, pVg->vgId);
|
||||
|
||||
/*if (pVg->currentOffset < 0) {*/
|
||||
if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {
|
||||
tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);
|
||||
|
||||
continue;
|
||||
}
|
||||
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
|
||||
|
@ -434,10 +447,12 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
|
|||
}
|
||||
pOffset->type = TMQ_OFFSET__LOG;
|
||||
pOffset->version = pVg->currentOffset;
|
||||
int32_t tlen = strlen(tmq->groupId);
|
||||
memcpy(pOffset->subKey, tmq->groupId, tlen);
|
||||
pOffset->subKey[tlen] = TMQ_SEPARATOR;
|
||||
strcpy(pOffset->subKey + tlen + 1, pTopic->topicName);
|
||||
|
||||
int32_t groupLen = strlen(tmq->groupId);
|
||||
memcpy(pOffset->subKey, tmq->groupId, groupLen);
|
||||
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
|
||||
strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
|
||||
|
||||
int32_t len;
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
|
||||
|
@ -470,6 +485,9 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
|
|||
.handle = NULL,
|
||||
};
|
||||
|
||||
tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId,
|
||||
pOffset->version);
|
||||
|
||||
// TODO: put into cb
|
||||
pVg->committedOffset = pVg->currentOffset;
|
||||
|
||||
|
@ -1039,14 +1057,13 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
|||
if (pTopicCur->vgs) {
|
||||
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
|
||||
tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
|
||||
if (vgNumCur == 0) break;
|
||||
for (int32_t j = 0; j < vgNumCur; j++) {
|
||||
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
||||
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
|
||||
tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
|
||||
tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %ld", tmq->consumerId, epoch, pVgCur->vgId, vgKey,
|
||||
pVgCur->currentOffset);
|
||||
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1054,7 +1071,6 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
|||
SMqClientTopic topic = {0};
|
||||
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
|
||||
topic.schema = pTopicEp->schema;
|
||||
taosHashClear(pHash);
|
||||
topic.topicName = strdup(pTopicEp->topic);
|
||||
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
||||
|
||||
|
@ -1071,7 +1087,8 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
|||
offset = *pOffset;
|
||||
}
|
||||
|
||||
tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
|
||||
tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch,
|
||||
pVgEp->vgId, offset, vgKey);
|
||||
SMqClientVg clientVg = {
|
||||
.pollCnt = 0,
|
||||
.currentOffset = offset,
|
||||
|
@ -1350,10 +1367,10 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
|
|||
/*strcpy(pReq->topic, pTopic->topicName);*/
|
||||
/*strcpy(pReq->cgroup, tmq->groupId);*/
|
||||
|
||||
int32_t tlen = strlen(tmq->groupId);
|
||||
memcpy(pReq->subKey, tmq->groupId, tlen);
|
||||
pReq->subKey[tlen] = TMQ_SEPARATOR;
|
||||
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
|
||||
int32_t groupLen = strlen(tmq->groupId);
|
||||
memcpy(pReq->subKey, tmq->groupId, groupLen);
|
||||
pReq->subKey[groupLen] = TMQ_SEPARATOR;
|
||||
strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
|
||||
|
||||
pReq->withTbName = tmq->withTbName;
|
||||
pReq->timeout = timeout;
|
||||
|
|
|
@ -124,17 +124,20 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
tDecoderClear(&decoder);
|
||||
|
||||
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
|
||||
tqDebug("receive offset commit msg to %s, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, offset.uid,
|
||||
offset.ts);
|
||||
tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey,
|
||||
pTq->pVnode->config.vgId, offset.uid, offset.ts);
|
||||
} else if (offset.type == TMQ_OFFSET__LOG) {
|
||||
tqDebug("receive offset commit msg to %s, offset(type:log) version: %ld", offset.subKey, offset.version);
|
||||
tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey,
|
||||
pTq->pVnode->config.vgId, offset.version);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
||||
if (pOffset == NULL || pOffset->version < offset.version) {
|
||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -155,6 +158,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
|
||||
if (pOffset != NULL) {
|
||||
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
||||
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
|
||||
pTq->pVnode->config.vgId, pOffset->version);
|
||||
fetchOffset = pOffset->version + 1;
|
||||
} else {
|
||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||
|
@ -166,6 +171,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pTq->pVnode->config.vgId, pReq->subKey);
|
||||
return -1;
|
||||
}
|
||||
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
|
||||
pTq->pVnode->config.vgId, pReq->currentOffset, fetchOffset);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -513,6 +513,7 @@ void* consumeThreadFunc(void* param) {
|
|||
/*tmq_commit(pInfo->tmq, NULL, 0);*/
|
||||
tmq_commit_sync(pInfo->tmq, NULL);
|
||||
taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
|
||||
pPrint("tmq_commit() manual commit over.\n");
|
||||
}
|
||||
|
||||
err = tmq_unsubscribe(pInfo->tmq);
|
||||
|
|
Loading…
Reference in New Issue