fix(tmq): check handle status before close sub.
This commit is contained in:
parent
0c744e864b
commit
40ffe0cd83
|
@ -185,6 +185,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
|
||||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
|
bool tqIsHandleExecuting(STqHandle* pHandle);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -379,15 +379,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// taosWLockLatch(&pTq->lock);
|
|
||||||
// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
|
||||||
// if (code != 0) {
|
|
||||||
// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
|
|
||||||
// }
|
|
||||||
// taosWUnLockLatch(&pTq->lock);
|
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
|
@ -395,6 +390,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
if (pHandle->pRef) {
|
if (pHandle->pRef) {
|
||||||
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while (tqIsHandleExecuting(pHandle)) {
|
||||||
|
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
||||||
|
taosMsleep(5);
|
||||||
|
}
|
||||||
|
|
||||||
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||||
|
|
|
@ -162,9 +162,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isHandleExecuting(STqHandle* pHandle){
|
bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); }
|
||||||
return 1 == atomic_load_8(&pHandle->exec);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||||
|
@ -174,15 +172,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
qTaskInfo_t task = pHandle->execHandle.task;
|
|
||||||
if(qTaskIsExecuting(task)){
|
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
while(isHandleExecuting(pHandle)){
|
while(tqIsHandleExecuting(pHandle)){
|
||||||
tqInfo("sub is executing, pHandle:%p", pHandle);
|
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
||||||
taosMsleep(5);
|
taosMsleep(5);
|
||||||
}
|
}
|
||||||
atomic_store_8(&pHandle->exec, 1);
|
atomic_store_8(&pHandle->exec, 1);
|
||||||
|
@ -211,9 +203,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
|
||||||
// NOTE: this pHandle->consumerId may have been changed already.
|
// NOTE: this pHandle->consumerId may have been changed already.
|
||||||
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
{
|
{
|
||||||
|
@ -221,7 +212,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
tFormatOffset(buf, 80, &dataRsp.rspOffset);
|
tFormatOffset(buf, 80, &dataRsp.rspOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||||
// taosWUnLockLatch(&pTq->lock);
|
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
}
|
}
|
||||||
atomic_store_8(&pHandle->exec, 0);
|
atomic_store_8(&pHandle->exec, 0);
|
||||||
|
@ -237,17 +227,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
STaosxRsp taosxRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||||
qTaskInfo_t task = pHandle->execHandle.task;
|
|
||||||
if(qTaskIsExecuting(task)){
|
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
while(isHandleExecuting(pHandle)){
|
while(tqIsHandleExecuting(pHandle)){
|
||||||
tqInfo("sub is executing, pHandle:%p", pHandle);
|
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
||||||
taosMsleep(5);
|
taosMsleep(5);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_8(&pHandle->exec, 1);
|
atomic_store_8(&pHandle->exec, 1);
|
||||||
|
|
||||||
if (offset->type != TMQ_OFFSET__LOG) {
|
if (offset->type != TMQ_OFFSET__LOG) {
|
||||||
|
@ -274,7 +259,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (offset->type == TMQ_OFFSET__LOG) {
|
if (offset->type == TMQ_OFFSET__LOG) {
|
||||||
verifyOffset(pHandle->pWalReader, offset);
|
verifyOffset(pHandle->pWalReader, offset);
|
||||||
int64_t fetchVer = offset->version + 1;
|
int64_t fetchVer = offset->version + 1;
|
||||||
|
|
Loading…
Reference in New Issue