commit
a985b98995
|
@ -416,86 +416,6 @@ end:
|
||||||
rsp.code = code;
|
rsp.code = code;
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// SMqVgOffset vgOffset = {0};
|
|
||||||
// int32_t vgId = TD_VID(pTq->pVnode);
|
|
||||||
//
|
|
||||||
// SDecoder decoder;
|
|
||||||
// tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
|
||||||
// if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
|
|
||||||
// tqError("vgId:%d failed to decode seek msg", vgId);
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// tDecoderClear(&decoder);
|
|
||||||
//
|
|
||||||
// tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
|
|
||||||
// vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);
|
|
||||||
//
|
|
||||||
// STqOffset* pOffset = &vgOffset.offset;
|
|
||||||
// if (pOffset->val.type != TMQ_OFFSET__LOG) {
|
|
||||||
// tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
|
|
||||||
// if (pHandle == NULL) {
|
|
||||||
// tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
|
|
||||||
// terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // 2. check consumer-vg assignment status
|
|
||||||
// taosRLockLatch(&pTq->lock);
|
|
||||||
// if (pHandle->consumerId != vgOffset.consumerId) {
|
|
||||||
// tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
|
||||||
// vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
|
|
||||||
// terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
|
||||||
// taosRUnLockLatch(&pTq->lock);
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
// taosRUnLockLatch(&pTq->lock);
|
|
||||||
//
|
|
||||||
// // 3. check the offset info
|
|
||||||
// STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
|
|
||||||
// if (pSavedOffset != NULL) {
|
|
||||||
// if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
|
|
||||||
// tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
|
|
||||||
// return 0; // no need to update the offset value
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (pSavedOffset->val.version == pOffset->val.version) {
|
|
||||||
// tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
|
|
||||||
// pOffset->val.version, pSavedOffset->val.version);
|
|
||||||
// return 0;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// int64_t sver = 0, ever = 0;
|
|
||||||
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
|
||||||
// if (pOffset->val.version < sver) {
|
|
||||||
// pOffset->val.version = sver;
|
|
||||||
// } else if (pOffset->val.version > ever) {
|
|
||||||
// pOffset->val.version = ever;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // save the new offset value
|
|
||||||
// if (pSavedOffset != NULL) {
|
|
||||||
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
|
|
||||||
// pSavedOffset->val.version);
|
|
||||||
// } else {
|
|
||||||
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
|
|
||||||
// tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
|
|
||||||
// vgOffset.consumerId, vgOffset.offset.val.version);
|
|
||||||
//
|
|
||||||
// return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
||||||
|
@ -707,10 +627,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
taosRUnLockLatch(&pTq->lock);
|
taosRUnLockLatch(&pTq->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pTq->lock);
|
|
||||||
|
|
||||||
int64_t sver = 0, ever = 0;
|
int64_t sver = 0, ever = 0;
|
||||||
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
||||||
|
taosRUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, req.reqOffset);
|
tqInitDataRsp(&dataRsp, req.reqOffset);
|
||||||
|
@ -766,15 +686,19 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
|
tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
while (tqIsHandleExec(pHandle)) {
|
while (1) {
|
||||||
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
|
taosWLockLatch(&pTq->lock);
|
||||||
pHandle->subKey, pHandle);
|
bool exec = tqIsHandleExec(pHandle);
|
||||||
taosMsleep(10);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if(exec){
|
||||||
|
tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
|
||||||
|
pHandle->subKey, pHandle);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
taosMsleep(10);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (pHandle->pRef) {
|
if (pHandle->pRef) {
|
||||||
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
||||||
}
|
}
|
||||||
|
@ -785,8 +709,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
|
tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
|
||||||
|
|
|
@ -366,6 +366,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||||
SWalReader* pWalReader = pReader->pWalReader;
|
SWalReader* pWalReader = pReader->pWalReader;
|
||||||
|
|
||||||
|
// uint64_t st = taosGetTimestampMs();
|
||||||
while (1) {
|
while (1) {
|
||||||
SArray* pBlockList = pReader->submit.aSubmitTbData;
|
SArray* pBlockList = pReader->submit.aSubmitTbData;
|
||||||
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
||||||
|
@ -439,6 +440,10 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||||
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
||||||
|
|
||||||
pReader->msg.msgStr = NULL;
|
pReader->msg.msgStr = NULL;
|
||||||
|
|
||||||
|
// if(taosGetTimestampMs() - st > 5){
|
||||||
|
// return false;
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -489,7 +494,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||||
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
|
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
|
tqInfo("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
|
||||||
taosHashGetSize(pReader->tbIdHash), idstr);
|
taosHashGetSize(pReader->tbIdHash), idstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -850,7 +855,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
||||||
tDeleteSchemaWrapper(pSW);
|
tDeleteSchemaWrapper(pSW);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
||||||
(int32_t)taosArrayGetSize(block.pDataBlock));
|
(int32_t)taosArrayGetSize(block.pDataBlock));
|
||||||
|
|
||||||
block.info.id.uid = uid;
|
block.info.id.uid = uid;
|
||||||
|
@ -867,7 +872,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
||||||
|
|
||||||
SSDataBlock* pBlock = taosArrayGetLast(blocks);
|
SSDataBlock* pBlock = taosArrayGetLast(blocks);
|
||||||
|
|
||||||
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
|
tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
|
||||||
(int32_t)taosArrayGetSize(blocks));
|
(int32_t)taosArrayGetSize(blocks));
|
||||||
|
|
||||||
int32_t targetIdx = 0;
|
int32_t targetIdx = 0;
|
||||||
|
@ -949,7 +954,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
||||||
tDeleteSchemaWrapper(pSW);
|
tDeleteSchemaWrapper(pSW);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
||||||
(int32_t)taosArrayGetSize(block.pDataBlock));
|
(int32_t)taosArrayGetSize(block.pDataBlock));
|
||||||
|
|
||||||
block.info.id.uid = uid;
|
block.info.id.uid = uid;
|
||||||
|
@ -966,7 +971,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
||||||
|
|
||||||
SSDataBlock* pBlock = taosArrayGetLast(blocks);
|
SSDataBlock* pBlock = taosArrayGetLast(blocks);
|
||||||
|
|
||||||
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
|
tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
|
||||||
(int32_t)taosArrayGetSize(blocks));
|
(int32_t)taosArrayGetSize(blocks));
|
||||||
|
|
||||||
int32_t targetIdx = 0;
|
int32_t targetIdx = 0;
|
||||||
|
|
|
@ -126,12 +126,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
//static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
|
|
||||||
// if(offset->type == TMQ_OFFSET__LOG){
|
|
||||||
// offset->version = ver;
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
|
@ -140,7 +134,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, *pOffset);
|
tqInitDataRsp(&dataRsp, *pOffset);
|
||||||
// dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset
|
|
||||||
|
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
|
@ -161,7 +154,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
// setRequestVersion(&dataRsp.reqOffset, pOffset->version);
|
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
|
|
||||||
end : {
|
end : {
|
||||||
|
@ -182,7 +174,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
STaosxRsp taosxRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp, *offset);
|
tqInitTaosxRsp(&taosxRsp, *offset);
|
||||||
// taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
|
|
||||||
|
|
||||||
if (offset->type != TMQ_OFFSET__LOG) {
|
if (offset->type != TMQ_OFFSET__LOG) {
|
||||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
||||||
|
@ -216,19 +207,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
||||||
int64_t fetchVer = offset->version;
|
int64_t fetchVer = offset->version;
|
||||||
|
|
||||||
|
// uint64_t st = taosGetTimestampMs();
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
if (savedEpoch > pRequest->epoch) {
|
ASSERT (savedEpoch <= pRequest->epoch);
|
||||||
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
|
|
||||||
", found new consumer epoch %d, discard req epoch %d",
|
|
||||||
pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -241,7 +227,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
if (pHead->msgType != TDMT_VND_SUBMIT) {
|
if (pHead->msgType != TDMT_VND_SUBMIT) {
|
||||||
if (totalRows > 0) {
|
if (totalRows > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -269,9 +254,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) {
|
||||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
||||||
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
} else {
|
} else {
|
||||||
|
@ -310,7 +295,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
|
||||||
// this is a normal subscribe requirement
|
// this is a normal subscribe requirement
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
|
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
|
||||||
} else { // todo handle the case where re-balance occurs.
|
} else {
|
||||||
// for taosx
|
// for taosx
|
||||||
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
|
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1727,9 +1727,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
if (pResult && pResult->info.rows > 0) {
|
if (pResult && pResult->info.rows > 0) {
|
||||||
// qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
|
|
||||||
// pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
|
|
||||||
// pInfo->tqReader->pWalReader->curVersion);
|
|
||||||
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
|
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
|
||||||
return pResult;
|
return pResult;
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 10000,
|
||||||
'batchNum': 2000,
|
'batchNum': 2000,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 20,
|
'pollDelay': 50,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1}
|
'showRow': 1}
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ class TDTestCase:
|
||||||
autoCommitInterval = 'auto.commit.interval.ms:1000'
|
autoCommitInterval = 'auto.commit.interval.ms:1000'
|
||||||
autoOffset = 'auto.offset.reset:earliest'
|
autoOffset = 'auto.offset.reset:earliest'
|
||||||
|
|
||||||
pollDelay = 20
|
pollDelay = 50
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue