Merge pull request #14401 from taosdata/feature/stream
fix(tmq): reset window
This commit is contained in:
commit
c40c22c386
|
@ -1547,7 +1547,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
/*printf("call poll\n");*/
|
/*tscDebug("call poll");*/
|
||||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||||
|
@ -1708,6 +1708,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
|
/*tscDebug("call poll1");*/
|
||||||
void* rspObj;
|
void* rspObj;
|
||||||
int64_t startTime = taosGetTimestampMs();
|
int64_t startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
|
|
|
@ -5402,9 +5402,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
} else if (pVal->type == TMQ_OFFSET__LOG) {
|
} else if (pVal->type == TMQ_OFFSET__LOG) {
|
||||||
snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version);
|
snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version);
|
||||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
snprintf(buf, maxLen, "offset(snapshot data) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
|
snprintf(buf, maxLen, "offset(ss data) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
|
||||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
snprintf(buf, maxLen, "offset(snapshot meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
|
snprintf(buf, maxLen, "offset(ss meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,10 +154,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
};
|
};
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
char buf1[50];
|
char buf1[80];
|
||||||
char buf2[50];
|
char buf2[80];
|
||||||
tFormatOffset(buf1, 50, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 50, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s",
|
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s",
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
|
@ -238,8 +238,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
STqOffsetVal fetchOffsetNew;
|
STqOffsetVal fetchOffsetNew;
|
||||||
|
|
||||||
// 1.find handle
|
// 1.find handle
|
||||||
char buf[50];
|
char buf[80];
|
||||||
tFormatOffset(buf, 50, &reqOffset);
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), buf);
|
TD_VID(pTq->pVnode), buf);
|
||||||
|
|
||||||
|
@ -360,7 +360,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
|
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
metaRsp.reqOffset = pReq->reqOffset.version;
|
metaRsp.reqOffset = pReq->reqOffset.version;
|
||||||
/*tqOffsetResetToLog(&metaR)*/
|
|
||||||
metaRsp.rspOffset = fetchVer;
|
metaRsp.rspOffset = fetchVer;
|
||||||
metaRsp.resMsgType = pHead->msgType;
|
metaRsp.resMsgType = pHead->msgType;
|
||||||
metaRsp.metaRspLen = pHead->bodyLen;
|
metaRsp.metaRspLen = pHead->bodyLen;
|
||||||
|
@ -380,18 +379,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// 2. get data (rebuild reader if needed)
|
// 2. get data (rebuild reader if needed)
|
||||||
// 3. get new uid and ts
|
// 3. get new uid and ts
|
||||||
|
|
||||||
char formatBuf[50];
|
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts);
|
||||||
tFormatOffset(formatBuf, 50, &dataRsp.reqOffset);
|
|
||||||
tqInfo("retrieve using snapshot req offset %s", formatBuf);
|
|
||||||
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
|
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. send rsp
|
// 4. send rsp
|
||||||
if (dataRsp.blockNum != 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
code = -1;
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -2837,12 +2837,10 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||||
|
|
||||||
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
|
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
|
||||||
tsdbSetTableId(pInfo->dataReader, uid);
|
tsdbSetTableId(pInfo->dataReader, uid);
|
||||||
SQueryTableDataCond tmpCond = pInfo->cond;
|
int64_t oldSkey = pInfo->cond.twindows[0].skey;
|
||||||
tmpCond.twindows[0] = (STimeWindow){
|
pInfo->cond.twindows[0].skey = ts;
|
||||||
.skey = ts,
|
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
||||||
.ekey = INT64_MAX,
|
pInfo->cond.twindows[0].skey = oldSkey;
|
||||||
};
|
|
||||||
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
|
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
pInfo->curTWinIdx = 0;
|
pInfo->curTWinIdx = 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -518,7 +518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// if scan table by table
|
// if scan table by table
|
||||||
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
||||||
// check status
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||||
if (result) {
|
if (result) {
|
||||||
|
@ -530,7 +529,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
||||||
/*pTableInfo->uid */
|
|
||||||
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
||||||
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
|
|
|
@ -111,7 +111,7 @@ endi
|
||||||
|
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$expectmsgcnt = 1
|
$expectmsgcnt = 1000000
|
||||||
$expectrowcnt = 100
|
$expectrowcnt = 100
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
|
@ -131,9 +131,6 @@ endi
|
||||||
if $data[0][1] != $consumerId then
|
if $data[0][1] != $consumerId then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data[0][2] != $expectmsgcnt then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
if $data[0][3] != $expectrowcnt then
|
if $data[0][3] != $expectrowcnt then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
@ -183,7 +180,7 @@ endi
|
||||||
|
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb
|
$totalMsgOfCtb = $rowsPerCtb
|
||||||
$expectmsgcnt = 1
|
$expectmsgcnt = 1000000
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
|
@ -254,7 +251,7 @@ endi
|
||||||
|
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb
|
$totalMsgOfNtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = 1000000
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
|
|
|
@ -80,7 +80,7 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
|
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
|
||||||
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
|
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = 1000000
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
|
|
||||||
|
@ -168,7 +168,7 @@ $consumerId = 0
|
||||||
|
|
||||||
$totalMsgOfOneTopic = $rowsPerCtb
|
$totalMsgOfOneTopic = $rowsPerCtb
|
||||||
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
|
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = 1000000
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
$topicList = ' . topic_ctb_function
|
$topicList = ' . topic_ctb_function
|
||||||
|
@ -245,7 +245,7 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfOneTopic = $rowsPerCtb
|
$totalMsgOfOneTopic = $rowsPerCtb
|
||||||
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
|
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = 1000000
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue