commit
10d291feab
|
@ -1827,6 +1827,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
|||
typedef struct {
|
||||
int64_t leftForVer;
|
||||
int32_t vgId;
|
||||
int32_t epoch;
|
||||
int64_t consumerId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
|
@ -1840,6 +1841,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
|
|||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||||
tlen += taosEncodeFixedI32(buf, pReq->epoch);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||||
tlen += taosEncodeString(buf, pReq->topicName);
|
||||
tlen += taosEncodeString(buf, pReq->cgroup);
|
||||
|
@ -1853,6 +1855,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
|
|||
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
||||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
||||
buf = taosDecodeFixedI32(buf, &pReq->epoch);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||
buf = taosDecodeStringTo(buf, pReq->topicName);
|
||||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
||||
|
@ -1868,6 +1871,7 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int64_t oldConsumerId;
|
||||
int64_t newConsumerId;
|
||||
char* topic;
|
||||
} SMqMVRebReq;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
|
||||
|
@ -1876,6 +1880,7 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR
|
|||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
||||
tlen += taosEncodeString(buf, pReq->topic);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
|
@ -1884,6 +1889,7 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
|
|||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
||||
buf = taosDecodeString(buf, &pReq->topic);
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ typedef struct {
|
|||
// connection info
|
||||
int32_t vgId;
|
||||
int32_t vgStatus;
|
||||
int64_t skipCnt;
|
||||
int32_t vgSkipCnt;
|
||||
SEpSet epSet;
|
||||
} SMqClientVg;
|
||||
|
||||
|
@ -849,7 +849,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
if (msgEpoch < tmqEpoch) {
|
||||
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
|
||||
/*tsem_post(&tmq->rspSem);*/
|
||||
tscWarn("discard rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
|
||||
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -881,6 +881,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
|
||||
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
|
||||
if (pRsp == NULL) {
|
||||
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
|
||||
goto CREATE_MSG_FAIL;
|
||||
}
|
||||
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
|
@ -969,14 +970,14 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
|||
offset = *pOffset;
|
||||
tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
|
||||
}
|
||||
tscDebug("consumer %ld epoch %d vg %d offset set to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset);
|
||||
tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
|
||||
SMqClientVg clientVg = {
|
||||
.pollCnt = 0,
|
||||
.currentOffset = offset,
|
||||
.vgId = pVgEp->vgId,
|
||||
.epSet = pVgEp->epSet,
|
||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||
.skipCnt = 0,
|
||||
.vgSkipCnt = 0,
|
||||
};
|
||||
taosArrayPush(topic.vgs, &clientVg);
|
||||
set = true;
|
||||
|
@ -1232,9 +1233,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
|||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||
if (vgStatus != TMQ_VG_STATUS__IDLE) {
|
||||
int64_t skipCnt = atomic_add_fetch_64(&pVg->skipCnt, 1);
|
||||
tscDebug("consumer %ld epoch %d skip vg %d skip cnt %ld", tmq->consumerId, tmq->epoch, pVg->vgId, skipCnt);
|
||||
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
||||
tscDebug("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
|
||||
continue;
|
||||
/*if (vgSkipCnt < 10000) continue;*/
|
||||
#if 0
|
||||
if (skipCnt < 30000) {
|
||||
continue;
|
||||
|
@ -1243,7 +1245,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
|||
}
|
||||
#endif
|
||||
}
|
||||
atomic_store_64(&pVg->skipCnt, 0);
|
||||
atomic_store_32(&pVg->vgSkipCnt, 0);
|
||||
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
|
||||
if (pReq == NULL) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
|
@ -1409,6 +1411,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
if (blocking_time != 0) {
|
||||
int64_t endTime = taosGetTimestampMs();
|
||||
if (endTime - startTime > blocking_time) {
|
||||
tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -413,6 +413,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
int32_t vgId; // -1 for unassigned
|
||||
int32_t status;
|
||||
int32_t epoch;
|
||||
SEpSet epSet;
|
||||
int64_t oldConsumerId;
|
||||
int64_t consumerId; // -1 for unassigned
|
||||
|
@ -423,6 +424,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp
|
|||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
|
||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
|
||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->epoch);
|
||||
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->oldConsumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||
|
@ -433,6 +435,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp
|
|||
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
|
||||
buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
|
||||
buf = taosDecodeFixedI32(buf, &pConsumerEp->epoch);
|
||||
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->oldConsumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||
|
|
|
@ -493,6 +493,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
|
|||
|
||||
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
||||
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
||||
//TODO
|
||||
pConsumerEp->epoch = 0;
|
||||
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
||||
|
||||
if (pConsumerEp->oldConsumerId == -1) {
|
||||
|
|
|
@ -206,7 +206,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
int64_t epoch;
|
||||
int32_t epoch;
|
||||
char cgroup[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* topics; // SArray<STqTopic>
|
||||
} STqConsumer;
|
||||
|
|
|
@ -167,7 +167,7 @@ static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pC
|
|||
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
|
||||
tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
|
||||
tlen += taosEncodeString(buf, pConsumer->cgroup);
|
||||
sz = taosArrayGetSize(pConsumer->topics);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
|
@ -182,7 +182,7 @@ static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer*
|
|||
int32_t sz;
|
||||
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
|
||||
buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
|
||||
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic));
|
||||
|
@ -255,6 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
int64_t consumerId = pReq->consumerId;
|
||||
int64_t fetchOffset;
|
||||
int64_t blockingTime = pReq->blockingTime;
|
||||
int32_t reqEpoch = pReq->epoch;
|
||||
|
||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||
fetchOffset = 0;
|
||||
|
@ -264,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
fetchOffset = pReq->currentOffset + 1;
|
||||
}
|
||||
|
||||
/*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
|
||||
|
||||
SMqPollRsp rsp = {
|
||||
/*.consumerId = consumerId,*/
|
||||
|
@ -274,6 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId);
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
pMsg->code = -1;
|
||||
|
@ -281,30 +283,57 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
||||
while (consumerEpoch < reqEpoch) {
|
||||
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
|
||||
}
|
||||
|
||||
STqTopic* pTopic = NULL;
|
||||
int sz = taosArrayGetSize(pConsumer->topics);
|
||||
ASSERT(sz == 1);
|
||||
STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
|
||||
ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
|
||||
//TODO race condition
|
||||
ASSERT(pConsumer->consumerId == consumerId);
|
||||
if (strcmp(topic->topicName, pReq->topic) == 0) {
|
||||
pTopic = topic;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (pTopic == NULL) {
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, pTq->pVnode->vgId);
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
pMsg->code = -1;
|
||||
tmsgSendRsp(pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch);
|
||||
|
||||
rsp.reqOffset = pReq->currentOffset;
|
||||
rsp.skipLogNum = 0;
|
||||
|
||||
while (1) {
|
||||
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
|
||||
//TODO
|
||||
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
||||
if (consumerEpoch > pReq->epoch) {
|
||||
//TODO: return
|
||||
break;
|
||||
}
|
||||
SWalReadHead* pHead;
|
||||
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
|
||||
// TODO: no more log, set timer to wait blocking time
|
||||
// if data inserted during waiting, launch query and
|
||||
// response to user
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
||||
break;
|
||||
}
|
||||
/*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType);
|
||||
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
||||
/*pHead = pTopic->pReadhandle->pHead;*/
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||
/*printf("from topic %s from consumer\n", pTopic->topicName, consumerId);*/
|
||||
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
|
||||
ASSERT(task);
|
||||
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||
|
@ -324,6 +353,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
|
||||
if (taosArrayGetSize(pRes) == 0) {
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
||||
fetchOffset++;
|
||||
rsp.skipLogNum++;
|
||||
taosArrayDestroy(pRes);
|
||||
|
@ -352,7 +382,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
/*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/
|
||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch);
|
||||
tmsgSendRsp(pMsg);
|
||||
taosMemoryFree(pHead);
|
||||
return 0;
|
||||
|
@ -383,7 +413,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
/*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/
|
||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch);
|
||||
/*}*/
|
||||
|
||||
return 0;
|
||||
|
@ -393,6 +423,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
|
|||
SMqMVRebReq req = {0};
|
||||
tDecodeSMqMVRebReq(msg, &req);
|
||||
|
||||
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId);
|
||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
|
||||
ASSERT(pConsumer);
|
||||
pConsumer->consumerId = req.newConsumerId;
|
||||
|
@ -407,17 +438,19 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
|||
SMqSetCVgReq req = {0};
|
||||
tDecodeSMqSetCVgReq(msg, &req);
|
||||
|
||||
/*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
|
||||
STqConsumer* pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
|
||||
vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId);
|
||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
|
||||
if (pConsumer == NULL) {
|
||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(pConsumer->cgroup, req.cgroup);
|
||||
pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
|
||||
pConsumer->consumerId = req.consumerId;
|
||||
pConsumer->epoch = 0;
|
||||
}
|
||||
|
||||
STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic));
|
||||
if (pTopic == NULL) {
|
||||
|
|
Loading…
Reference in New Issue