fix:[TD-24111]avoid exec pHandle task in multi query thread
This commit is contained in:
parent
35fcb34888
commit
d85f0f56d5
|
@ -102,6 +102,7 @@ typedef struct {
|
|||
STqExecHandle execHandle; // exec
|
||||
SRpcMsg* msg;
|
||||
int32_t noDataPollCnt;
|
||||
int8_t sendRsp;
|
||||
} STqHandle;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -55,6 +55,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
|||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
||||
} else {
|
||||
tqPushDataRsp(pTq, pHandle);
|
||||
void* tmp = pHandle->msg->pCont;
|
||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||
pHandle->msg->pCont = tmp;
|
||||
|
|
|
@ -162,6 +162,10 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool isHandleExecuting(STqHandle* pHandle){
|
||||
return 0 == atomic_load_8(&pHandle->sendRsp);
|
||||
}
|
||||
|
||||
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
|
@ -177,6 +181,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
return code;
|
||||
}
|
||||
|
||||
while(isHandleExecuting(pHandle)){
|
||||
tqInfo("sub is executing, pHandle:%p", pHandle);
|
||||
taosMsleep(5);
|
||||
}
|
||||
atomic_store_8(&pHandle->sendRsp, 0);
|
||||
|
||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||
if(code != 0) {
|
||||
|
@ -193,6 +203,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
atomic_store_8(&pHandle->sendRsp, 1);
|
||||
return code;
|
||||
}
|
||||
else{
|
||||
|
@ -202,7 +213,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
|
||||
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||
|
||||
// NOTE: this pHandle->consumerId may have been changed already.
|
||||
|
||||
end:
|
||||
|
@ -214,12 +224,14 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
// taosWUnLockLatch(&pTq->lock);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
}
|
||||
atomic_store_8(&pHandle->sendRsp, 1);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
|
||||
int code = 0;
|
||||
int code = 0;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
|
@ -232,10 +244,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
return code;
|
||||
}
|
||||
|
||||
while(isHandleExecuting(pHandle)){
|
||||
tqInfo("sub is executing, pHandle:%p", pHandle);
|
||||
taosMsleep(5);
|
||||
}
|
||||
atomic_store_8(&pHandle->sendRsp, 0);
|
||||
|
||||
if (offset->type != TMQ_OFFSET__LOG) {
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
|
@ -243,16 +261,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
|
||||
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
|
||||
taosMemoryFree(metaRsp.metaRsp);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
goto end;
|
||||
}
|
||||
|
||||
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||
",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts);
|
||||
if (taosxRsp.blockNum > 0) {
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
goto end;
|
||||
}else {
|
||||
*offset = taosxRsp.rspOffset;
|
||||
}
|
||||
|
@ -264,9 +280,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
int64_t fetchVer = offset->version + 1;
|
||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||
if (pCkHead == NULL) {
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||
int totalRows = 0;
|
||||
|
@ -281,9 +297,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
goto end;
|
||||
}
|
||||
|
||||
SWalCont* pHead = &pCkHead->head;
|
||||
|
@ -295,9 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
if(totalRows > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
goto end;
|
||||
}
|
||||
|
||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
||||
|
@ -305,17 +317,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
metaRsp.resMsgType = pHead->msgType;
|
||||
metaRsp.metaRspLen = pHead->bodyLen;
|
||||
metaRsp.metaRsp = pHead->body;
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||
goto end;
|
||||
}
|
||||
|
||||
// process data
|
||||
|
@ -325,29 +328,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
.ver = pHead->version,
|
||||
};
|
||||
|
||||
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
|
||||
pRequest->subKey);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return -1;
|
||||
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
|
||||
if (code < 0) {
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
goto end;
|
||||
} else {
|
||||
fetchVer++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
end:
|
||||
atomic_store_8(&pHandle->sendRsp, 1);
|
||||
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
||||
|
|
Loading…
Reference in New Issue