fix:logic error
This commit is contained in:
parent
f0fbe08e1d
commit
02ac3eac5a
|
@ -1742,6 +1742,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||||
|
bool allStopped = false;
|
||||||
|
|
||||||
SStreamTaskNodeUpdateMsg req = {0};
|
SStreamTaskNodeUpdateMsg req = {0};
|
||||||
|
|
||||||
|
@ -1787,7 +1788,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pMeta->closedTask += 1;
|
pMeta->closedTask += 1;
|
||||||
|
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
bool allStopped = (pMeta->closedTask == numOfTasks);
|
allStopped = (pMeta->closedTask == numOfTasks);
|
||||||
if (allStopped) {
|
if (allStopped) {
|
||||||
pMeta->closedTask = 0;
|
pMeta->closedTask = 0;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -225,12 +225,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
if (savedEpoch > pRequest->epoch) {
|
ASSERT(savedEpoch <= pRequest->epoch);
|
||||||
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
|
|
||||||
", found new consumer epoch %d, discard req epoch %d",
|
|
||||||
pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
|
|
Loading…
Reference in New Issue