fix:core in race condition for pTq->pExecStore & return if poll too long & fix test cases if submit empty
This commit is contained in:
parent
8ebb6e202e
commit
c3498fbfe8
|
@ -269,19 +269,21 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
|
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
|
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
|
||||||
code = 0;
|
code = 0;
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. check consumer-vg assignment status
|
// 2. check consumer-vg assignment status
|
||||||
taosRLockLatch(&pTq->lock);
|
|
||||||
if (pHandle->consumerId != req.consumerId) {
|
if (pHandle->consumerId != req.consumerId) {
|
||||||
tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||||
req.consumerId, vgId, req.subKey, pHandle->consumerId);
|
req.consumerId, vgId, req.subKey, pHandle->consumerId);
|
||||||
taosRUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -289,7 +291,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
|
// if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
|
||||||
// TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
|
// TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
taosRUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
rsp.code = code;
|
rsp.code = code;
|
||||||
|
@ -496,15 +498,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
// 1. find handle
|
// 1. find handle
|
||||||
|
taosRLockLatch(&pTq->lock);
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
|
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
taosRUnLockLatch(&pTq->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. check re-balance status
|
// 2. check re-balance status
|
||||||
taosRLockLatch(&pTq->lock);
|
|
||||||
if (pHandle->consumerId != consumerId) {
|
if (pHandle->consumerId != consumerId) {
|
||||||
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||||
consumerId, vgId, req.subKey, pHandle->consumerId);
|
consumerId, vgId, req.subKey, pHandle->consumerId);
|
||||||
|
@ -580,7 +583,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
bool exec = tqIsHandleExec(pHandle);
|
bool exec = tqIsHandleExec(pHandle);
|
||||||
|
|
||||||
if(exec){
|
if(exec){
|
||||||
tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
|
tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
|
||||||
pHandle->subKey, pHandle);
|
pHandle->subKey, pHandle);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
|
@ -667,7 +670,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
STqHandle* pHandle = NULL;
|
STqHandle* pHandle = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) {
|
if (pHandle) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosRLockLatch(&pTq->lock);
|
||||||
|
ret = tqMetaGetHandle(pTq, req.subKey);
|
||||||
|
taosRUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
|
if (ret < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -687,21 +697,33 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqDestroyTqHandle(&handle);
|
tqDestroyTqHandle(&handle);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
|
||||||
} else {
|
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
|
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
||||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
|
||||||
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
|
||||||
} else {
|
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
|
||||||
req.newConsumerId);
|
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
|
||||||
atomic_store_32(&pHandle->epoch, 0);
|
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
|
||||||
}
|
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
} else {
|
||||||
|
while(1){
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
|
bool exec = tqIsHandleExec(pHandle);
|
||||||
|
if(exec){
|
||||||
|
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId,
|
||||||
|
pHandle->subKey, pHandle);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
taosMsleep(10);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||||
|
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
||||||
|
} else {
|
||||||
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
|
req.newConsumerId);
|
||||||
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
|
atomic_store_32(&pHandle->epoch, 0);
|
||||||
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
|
|
@ -367,7 +367,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||||
SWalReader* pWalReader = pReader->pWalReader;
|
SWalReader* pWalReader = pReader->pWalReader;
|
||||||
|
|
||||||
// uint64_t st = taosGetTimestampMs();
|
uint64_t st = taosGetTimestampMs();
|
||||||
while (1) {
|
while (1) {
|
||||||
SArray* pBlockList = pReader->submit.aSubmitTbData;
|
SArray* pBlockList = pReader->submit.aSubmitTbData;
|
||||||
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
||||||
|
@ -442,9 +442,9 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||||
|
|
||||||
pReader->msg.msgStr = NULL;
|
pReader->msg.msgStr = NULL;
|
||||||
|
|
||||||
// if(taosGetTimestampMs() - st > 5){
|
if(taosGetTimestampMs() - st > 1000){
|
||||||
// return false;
|
return false;
|
||||||
// }
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1087,6 +1087,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
// update the table list for each consumer handle
|
// update the table list for each consumer handle
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pHandle, pIter);
|
pIter = taosHashIterate(pTq->pHandle, pIter);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
|
@ -1116,6 +1117,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
|
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
|
||||||
taosArrayDestroy(list);
|
taosArrayDestroy(list);
|
||||||
taosHashCancelIterate(pTq->pHandle, pIter);
|
taosHashCancelIterate(pTq->pHandle, pIter);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
|
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
|
||||||
|
@ -1125,6 +1128,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
// update the table list handle for each stream scanner/wal reader
|
// update the table list handle for each stream scanner/wal reader
|
||||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
|
@ -200,7 +200,9 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
code = tDecodeSTqHandle(pDecoder, &handle);
|
code = tDecodeSTqHandle(pDecoder, &handle);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
|
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
if (code < 0) goto _err;
|
if (code < 0) goto _err;
|
||||||
tDecoderClear(pDecoder);
|
tDecoderClear(pDecoder);
|
||||||
|
|
||||||
|
|
|
@ -218,7 +218,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
||||||
int64_t fetchVer = offset->version;
|
int64_t fetchVer = offset->version;
|
||||||
|
|
||||||
// uint64_t st = taosGetTimestampMs();
|
uint64_t st = taosGetTimestampMs();
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
// int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
// int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
|
@ -265,8 +265,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) {
|
if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 1000)) {
|
||||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
|
|
|
@ -336,7 +336,7 @@ class TDTestCase:
|
||||||
for i in range(expectRows):
|
for i in range(expectRows):
|
||||||
totalConsumeRows += resultList[i]
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0:
|
if totalConsumeRows > expectrowcnt or totalConsumeRows < 0:
|
||||||
tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt))
|
tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt))
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
|
|
@ -218,7 +218,7 @@ class TDTestCase:
|
||||||
|
|
||||||
actConsumeTotalRows = resultList[0]
|
actConsumeTotalRows = resultList[0]
|
||||||
|
|
||||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
|
||||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
|
@ -216,7 +216,7 @@ class TDTestCase:
|
||||||
|
|
||||||
actConsumeTotalRows = resultList[0]
|
actConsumeTotalRows = resultList[0]
|
||||||
tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted))
|
tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted))
|
||||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
|
||||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
|
@ -218,7 +218,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted):
|
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
|
@ -216,7 +216,7 @@ class TDTestCase:
|
||||||
|
|
||||||
actConsumeTotalRows = resultList[0]
|
actConsumeTotalRows = resultList[0]
|
||||||
|
|
||||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted):
|
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
|
||||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
|
@ -217,7 +217,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||||
if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)):
|
if not ((actConsumeTotalRows >= 0) and (actConsumeTotalRows <= totalRowsInserted)):
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
Loading…
Reference in New Issue