enh(tmq): retry consumer close
This commit is contained in:
parent
031414001d
commit
6f8683de7e
|
@ -474,7 +474,6 @@ typedef struct {
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
// int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
|
|
||||||
int32_t streamSetupTrigger(SStreamTask* pTask);
|
int32_t streamSetupTrigger(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||||
|
|
|
@ -189,20 +189,6 @@ typedef struct {
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
} SMqPollCbParam;
|
} SMqPollCbParam;
|
||||||
|
|
||||||
#if 0
|
|
||||||
typedef struct {
|
|
||||||
tmq_t* tmq;
|
|
||||||
int8_t async;
|
|
||||||
int8_t automatic;
|
|
||||||
int8_t freeOffsets;
|
|
||||||
tmq_commit_cb* userCb;
|
|
||||||
tsem_t rspSem;
|
|
||||||
int32_t rspErr;
|
|
||||||
SArray* offsets;
|
|
||||||
void* userParam;
|
|
||||||
} SMqCommitCbParam;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tmq_t* tmq;
|
tmq_t* tmq;
|
||||||
int8_t automatic;
|
int8_t automatic;
|
||||||
|
@ -385,29 +371,6 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
|
||||||
return sprintf(dst, "%s:%d", topicName, vg);
|
return sprintf(dst, "%s:%d", topicName, vg);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|
||||||
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
|
|
||||||
pParam->rspErr = code;
|
|
||||||
if (pParam->async) {
|
|
||||||
if (pParam->automatic && pParam->tmq->commitCb) {
|
|
||||||
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
|
|
||||||
} else if (!pParam->automatic && pParam->userCb) {
|
|
||||||
pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pParam->freeOffsets) {
|
|
||||||
taosArrayDestroy(pParam->offsets);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(pParam);
|
|
||||||
} else {
|
|
||||||
tsem_post(&pParam->rspSem);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
|
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
|
||||||
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
|
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
|
||||||
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
|
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
|
||||||
|
@ -660,123 +623,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
|
|
||||||
tmq_commit_cb* userCb, void* userParam) {
|
|
||||||
SMqCMCommitOffsetReq req;
|
|
||||||
SArray* pOffsets = NULL;
|
|
||||||
void* buf = NULL;
|
|
||||||
SMqCommitCbParam* pParam = NULL;
|
|
||||||
SMsgSendInfo* sendInfo = NULL;
|
|
||||||
int8_t freeOffsets;
|
|
||||||
int32_t code = -1;
|
|
||||||
|
|
||||||
if (msg == NULL) {
|
|
||||||
freeOffsets = 1;
|
|
||||||
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
|
||||||
SMqOffset offset;
|
|
||||||
tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
|
|
||||||
tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
|
||||||
offset.vgId = pVg->vgId;
|
|
||||||
offset.offset = pVg->currentOffset;
|
|
||||||
taosArrayPush(pOffsets, &offset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
freeOffsets = 0;
|
|
||||||
pOffsets = (SArray*)&msg->container;
|
|
||||||
}
|
|
||||||
|
|
||||||
req.num = (int32_t)pOffsets->size;
|
|
||||||
req.offsets = pOffsets->pData;
|
|
||||||
|
|
||||||
SEncoder encoder;
|
|
||||||
|
|
||||||
tEncoderInit(&encoder, NULL, 0);
|
|
||||||
code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
|
|
||||||
if (code < 0) {
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
int32_t tlen = encoder.pos;
|
|
||||||
buf = taosMemoryMalloc(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
tEncoderInit(&encoder, buf, tlen);
|
|
||||||
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
|
|
||||||
if (pParam == NULL) {
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
pParam->tmq = tmq;
|
|
||||||
pParam->automatic = automatic;
|
|
||||||
pParam->async = async;
|
|
||||||
pParam->offsets = pOffsets;
|
|
||||||
pParam->freeOffsets = freeOffsets;
|
|
||||||
pParam->userCb = userCb;
|
|
||||||
pParam->userParam = userParam;
|
|
||||||
if (!async) tsem_init(&pParam->rspSem, 0, 0);
|
|
||||||
|
|
||||||
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
|
||||||
if (sendInfo == NULL) goto END;
|
|
||||||
sendInfo->msgInfo = (SDataBuf){
|
|
||||||
.pData = buf,
|
|
||||||
.len = tlen,
|
|
||||||
.handle = NULL,
|
|
||||||
};
|
|
||||||
|
|
||||||
sendInfo->requestId = generateRequestId();
|
|
||||||
sendInfo->requestObjRefId = 0;
|
|
||||||
sendInfo->param = pParam;
|
|
||||||
sendInfo->fp = tmqCommitCb;
|
|
||||||
sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
|
|
||||||
|
|
||||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
|
||||||
|
|
||||||
if (!async) {
|
|
||||||
tsem_wait(&pParam->rspSem);
|
|
||||||
code = pParam->rspErr;
|
|
||||||
tsem_destroy(&pParam->rspSem);
|
|
||||||
taosMemoryFree(pParam);
|
|
||||||
} else {
|
|
||||||
code = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// avoid double free if msg is sent
|
|
||||||
buf = NULL;
|
|
||||||
|
|
||||||
END:
|
|
||||||
if (buf) taosMemoryFree(buf);
|
|
||||||
/*if (pParam) taosMemoryFree(pParam);*/
|
|
||||||
/*if (sendInfo) taosMemoryFree(sendInfo);*/
|
|
||||||
|
|
||||||
if (code != 0 && async) {
|
|
||||||
if (automatic) {
|
|
||||||
tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
|
|
||||||
} else {
|
|
||||||
userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!async && freeOffsets) {
|
|
||||||
taosArrayDestroy(pOffsets);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void tmqAssignAskEpTask(void* param, void* tmrId) {
|
void tmqAssignAskEpTask(void* param, void* tmrId) {
|
||||||
tmq_t* tmq = (tmq_t*)param;
|
tmq_t* tmq = (tmq_t*)param;
|
||||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||||
|
@ -1839,13 +1685,21 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
return rsp;
|
return rsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t retryCnt = 0;
|
||||||
tmq_list_t* lst = tmq_list_new();
|
tmq_list_t* lst = tmq_list_new();
|
||||||
rsp = tmq_subscribe(tmq, lst);
|
while (1) {
|
||||||
|
rsp = tmq_subscribe(tmq, lst);
|
||||||
|
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
retryCnt++;
|
||||||
|
taosMsleep(500);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tmq_list_destroy(lst);
|
tmq_list_destroy(lst);
|
||||||
|
|
||||||
if (rsp != 0) {
|
return rsp;
|
||||||
return rsp;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// TODO: free resources
|
// TODO: free resources
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -216,9 +216,11 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
|
||||||
if (offset.val.type == TMQ_OFFSET__LOG) {
|
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
|
||||||
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
if (pHandle) {
|
||||||
ASSERT(0);
|
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
||||||
return -1;
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -515,7 +517,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
// todo lock
|
// todo lock
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
ASSERT(req.oldConsumerId == -1);
|
if (req.oldConsumerId != -1) {
|
||||||
|
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
|
||||||
|
req.newConsumerId, req.oldConsumerId);
|
||||||
|
}
|
||||||
ASSERT(req.newConsumerId != -1);
|
ASSERT(req.newConsumerId != -1);
|
||||||
STqHandle tqHandle = {0};
|
STqHandle tqHandle = {0};
|
||||||
pHandle = &tqHandle;
|
pHandle = &tqHandle;
|
||||||
|
|
|
@ -47,7 +47,7 @@ void streamCleanUp() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTriggerByTimer(void* param, void* tmrId) {
|
void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = (void*)param;
|
SStreamTask* pTask = (void*)param;
|
||||||
|
|
||||||
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
|
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
|
||||||
|
@ -71,39 +71,17 @@ void streamTriggerByTimer(void* param, void* tmrId) {
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSetupTrigger(SStreamTask* pTask) {
|
int32_t streamSetupTrigger(SStreamTask* pTask) {
|
||||||
if (pTask->triggerParam != 0) {
|
if (pTask->triggerParam != 0) {
|
||||||
pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
|
pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
|
||||||
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
|
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
|
|
||||||
int8_t schedStatus = atomic_load_8(&pTask->schedStatus);
|
|
||||||
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
|
||||||
if (pRunReq == NULL) return -1;
|
|
||||||
|
|
||||||
// TODO: do we need htonl?
|
|
||||||
pRunReq->head.vgId = vgId;
|
|
||||||
pRunReq->streamId = pTask->streamId;
|
|
||||||
pRunReq->taskId = pTask->taskId;
|
|
||||||
SRpcMsg msg = {
|
|
||||||
.msgType = TDMT_STREAM_TASK_RUN,
|
|
||||||
.pCont = pRunReq,
|
|
||||||
.contLen = sizeof(SStreamTaskRunReq),
|
|
||||||
};
|
|
||||||
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t streamSchedExec(SStreamTask* pTask) {
|
int32_t streamSchedExec(SStreamTask* pTask) {
|
||||||
int8_t schedStatus =
|
int8_t schedStatus =
|
||||||
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
|
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
|
||||||
|
@ -296,6 +274,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
|
||||||
|
|
||||||
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
|
|
||||||
/*streamTryExec(pTask);*/
|
/*streamTryExec(pTask);*/
|
||||||
|
|
||||||
/*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/
|
/*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/
|
||||||
|
|
Loading…
Reference in New Issue