Merge pull request #22405 from taosdata/mark/tmq-3.0
fix:offset error in tmq & add test cases
This commit is contained in:
commit
1990a891c2
|
@ -153,7 +153,6 @@ struct SWalReader {
|
|||
int64_t capacity;
|
||||
TdThreadMutex mutex;
|
||||
SWalFilterCond cond;
|
||||
// TODO remove it
|
||||
SWalCkHead *pHead;
|
||||
};
|
||||
|
||||
|
@ -207,10 +206,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64
|
|||
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
|
||||
|
||||
// only for tq usage
|
||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver);
|
||||
int32_t walFetchBody(SWalReader *pRead);
|
||||
int32_t walSkipFetchBody(SWalReader *pRead);
|
||||
|
||||
void walRefFirstVer(SWal *, SWalRef *);
|
||||
void walRefLastVer(SWal *, SWalRef *);
|
||||
|
|
|
@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
|
||||
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){
|
||||
if (!pVg->seekUpdated) {
|
||||
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
|
||||
pVg->offsetInfo.beginOffset = *reqOffset;
|
||||
if(hasData) pVg->offsetInfo.beginOffset = *reqOffset;
|
||||
pVg->offsetInfo.endOffset = *rspOffset;
|
||||
} else {
|
||||
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
|
||||
|
@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
pVg->epSet = *pollRspWrapper->pEpset;
|
||||
}
|
||||
|
||||
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
|
||||
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId, pDataRsp->blockNum != 0);
|
||||
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
|
||||
|
@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
|
||||
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
||||
// build rsp
|
||||
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
|
@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
|
||||
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId, pollRspWrapper->taosxRsp.blockNum != 0);
|
||||
|
||||
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||
|
|
|
@ -1442,178 +1442,4 @@ TEST(clientCase, sub_tb_mt_test) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST(clientCase, ts_3756) {
|
||||
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
|
||||
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||
tmq_conf_set(conf, "group.id", "group_id_2");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "false");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "tp");
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
TAOS_FIELD* fields = NULL;
|
||||
int32_t numOfFields = 0;
|
||||
int32_t precision = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeout = 200;
|
||||
|
||||
int32_t count = 0;
|
||||
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
int32_t numOfAssign = 0;
|
||||
|
||||
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
printf("start to poll\n");
|
||||
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||
if (pRes) {
|
||||
char buf[128];
|
||||
|
||||
const char* topicName = tmq_get_topic_name(pRes);
|
||||
// const char* dbName = tmq_get_db_name(pRes);
|
||||
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
|
||||
//
|
||||
// printf("topic: %s\n", topicName);
|
||||
// printf("db: %s\n", dbName);
|
||||
// printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
printSubResults(pRes, &totalRows);
|
||||
|
||||
tmq_topic_assignment* pAssignTmp = NULL;
|
||||
int32_t numOfAssignTmp = 0;
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssignTmp, &numOfAssignTmp);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssignTmp[i].vgId, pAssignTmp[i].currentOffset, pAssignTmp[i].begin, pAssignTmp[i].end);
|
||||
}
|
||||
if(numOfAssign != 0){
|
||||
int i = 0;
|
||||
for(; i < numOfAssign; i++){
|
||||
if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(i == numOfAssign){
|
||||
printf("all position is same\n");
|
||||
break;
|
||||
}
|
||||
tmq_free_assignment(pAssign);
|
||||
}
|
||||
numOfAssign = numOfAssignTmp;
|
||||
pAssign = pAssignTmp;
|
||||
|
||||
} else {
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
|
||||
// tmq_commit_sync(tmq, pRes);
|
||||
continue;
|
||||
}
|
||||
|
||||
// tmq_commit_sync(tmq, pRes);
|
||||
if (pRes != NULL) {
|
||||
taos_free_result(pRes);
|
||||
// if ((++count) > 1) {
|
||||
// break;
|
||||
// }
|
||||
} else {
|
||||
// break;
|
||||
}
|
||||
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
|
@ -244,7 +244,7 @@ static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, con
|
|||
|
||||
SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pConsumerEp->vgs);
|
||||
|
|
|
@ -127,7 +127,7 @@ void tqDestroyTqHandle(void* data);
|
|||
// tqRead
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
|
||||
|
||||
// tqExec
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
|
||||
|
@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
|
|||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||
int32_t type, int64_t sver, int64_t ever);
|
||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
|
||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
|
|||
}
|
||||
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, &req);
|
||||
tqInitDataRsp(&dataRsp, req.reqOffset);
|
||||
dataRsp.blockNum = 0;
|
||||
dataRsp.rspOffset = dataRsp.reqOffset;
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
|
||||
tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId);
|
||||
|
@ -391,7 +390,6 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
|
||||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
|
||||
|
@ -719,7 +717,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
||||
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, &req);
|
||||
tqInitDataRsp(&dataRsp, req.reqOffset);
|
||||
|
||||
if (req.useSnapshot == true) {
|
||||
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
|
||||
|
|
|
@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
|
|||
if(buildHandle(pTq, handle) < 0){
|
||||
return -1;
|
||||
}
|
||||
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
|
@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
|||
if(buildHandle(pTq, handle) < 0){
|
||||
return -1;
|
||||
}
|
||||
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
|||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||
taosRUnLockLatch(&pTq->pStreamMeta->lock);
|
||||
|
||||
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
|
||||
tqTrace("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
|
||||
|
||||
// push data for stream processing:
|
||||
// 1. the vnode has already been restored.
|
||||
|
|
|
@ -184,70 +184,63 @@ end:
|
|||
return tbSuid == realTbSuid;
|
||||
}
|
||||
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
|
||||
int32_t code = 0;
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
|
||||
int32_t code = -1;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||
int64_t offset = *fetchOffset;
|
||||
int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
|
||||
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
|
||||
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
|
||||
|
||||
while (1) {
|
||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64,
|
||||
vgId, offset, lastVer, committedVer, appliedVer);
|
||||
|
||||
while (offset <= appliedVer) {
|
||||
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
|
||||
", no more log to return, reqId:0x%" PRIx64,
|
||||
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
|
||||
pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
|
||||
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId);
|
||||
|
||||
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
|
||||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||
|
||||
if (code < 0) {
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
*fetchOffset = offset;
|
||||
code = 0;
|
||||
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||
code = walFetchBody(pHandle->pWalReader);
|
||||
goto END;
|
||||
} else {
|
||||
if (pHandle->fetchMeta != WITH_DATA) {
|
||||
SWalCont* pHead = &((*ppCkHead)->head);
|
||||
SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
|
||||
if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
|
||||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||
code = walFetchBody(pHandle->pWalReader);
|
||||
if (code < 0) {
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
pHead = &(pHandle->pWalReader->pHead->head);
|
||||
if (isValValidForTable(pHandle, pHead)) {
|
||||
*fetchOffset = offset;
|
||||
code = 0;
|
||||
goto END;
|
||||
} else {
|
||||
offset++;
|
||||
code = -1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
|
||||
code = walSkipFetchBody(pHandle->pWalReader);
|
||||
if (code < 0) {
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
offset++;
|
||||
}
|
||||
code = -1;
|
||||
}
|
||||
|
||||
END:
|
||||
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
||||
*fetchOffset = offset;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,8 +20,9 @@
|
|||
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
||||
const SMqMetaRsp* pRsp, int32_t vgId);
|
||||
|
||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
|
||||
pRsp->reqOffset = pReq->reqOffset;
|
||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
||||
pRsp->reqOffset = pOffset;
|
||||
pRsp->rspOffset = pOffset;
|
||||
|
||||
pRsp->blockData = taosArrayInit(0, sizeof(void*));
|
||||
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||
|
@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
||||
pRsp->reqOffset = pReq->reqOffset;
|
||||
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
|
||||
pRsp->reqOffset = pOffset;
|
||||
pRsp->rspOffset = pOffset;
|
||||
|
||||
pRsp->withTbName = 1;
|
||||
pRsp->withSchema = 1;
|
||||
|
@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
|||
|
||||
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
|
@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
return 0;
|
||||
} else {
|
||||
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
|
||||
if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
|
||||
if (pRequest->useSnapshot) {
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
|
||||
consumerId, pHandle->subKey, vgId);
|
||||
|
@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
} else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pRequest);
|
||||
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
|
||||
|
||||
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1);
|
||||
tqInitDataRsp(&dataRsp, *pOffsetVal);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
|
||||
pHandle->subKey, vgId, dataRsp.rspOffset.version);
|
||||
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||
|
@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
|
||||
*pBlockReturned = true;
|
||||
return code;
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||
} else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
|
||||
" in vg %d, subkey %s, reset none failed",
|
||||
pHandle->subKey, consumerId, vgId, pRequest->subKey);
|
||||
|
@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
|
||||
if(offset->type == TMQ_OFFSET__LOG){
|
||||
offset->version = ver + 1;
|
||||
}
|
||||
}
|
||||
//static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
|
||||
// if(offset->type == TMQ_OFFSET__LOG){
|
||||
// offset->version = ver;
|
||||
// }
|
||||
//}
|
||||
|
||||
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||
|
@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
terrno = 0;
|
||||
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pRequest);
|
||||
dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset
|
||||
tqInitDataRsp(&dataRsp, *pOffset);
|
||||
// dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset
|
||||
|
||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||
|
@ -152,8 +153,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
// lock
|
||||
taosWLockLatch(&pTq->lock);
|
||||
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
|
||||
if (pOffset->version >= ver ||
|
||||
dataRsp.rspOffset.version >= ver) { // check if there are data again to avoid lost data
|
||||
if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data
|
||||
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
goto end;
|
||||
|
@ -161,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
taosWUnLockLatch(&pTq->lock);
|
||||
}
|
||||
|
||||
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
|
||||
// setRequestVersion(&dataRsp.reqOffset, pOffset->version);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||
|
||||
end : {
|
||||
|
@ -179,11 +179,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
||||
int code = 0;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||
taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
|
||||
tqInitTaosxRsp(&taosxRsp, *offset);
|
||||
// taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
|
||||
|
||||
if (offset->type != TMQ_OFFSET__LOG) {
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
||||
|
@ -216,14 +215,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
if (offset->type == TMQ_OFFSET__LOG) {
|
||||
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
||||
int64_t fetchVer = offset->version;
|
||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||
if (pCkHead == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||
int totalRows = 0;
|
||||
while (1) {
|
||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||
|
@ -234,14 +226,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
break;
|
||||
}
|
||||
|
||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
|
||||
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||
goto end;
|
||||
}
|
||||
|
||||
SWalCont* pHead = &pCkHead->head;
|
||||
SWalCont* pHead = &pHandle->pWalReader->pHead->head;
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
|
||||
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
|
||||
|
||||
|
@ -249,7 +241,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
if (pHead->msgType != TDMT_VND_SUBMIT) {
|
||||
if (totalRows > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||
goto end;
|
||||
}
|
||||
|
@ -279,7 +271,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
|
||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
||||
setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||
goto end;
|
||||
} else {
|
||||
|
@ -291,20 +283,17 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
end:
|
||||
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
||||
int32_t code = -1;
|
||||
STqOffsetVal offset = {0};
|
||||
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||
|
||||
// 1. reset the offset if needed
|
||||
if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
|
||||
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
|
||||
// handle the reset offset cases, according to the consumer's choice.
|
||||
bool blockReturned = false;
|
||||
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
||||
int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -313,20 +302,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
|
|||
if (blockReturned) {
|
||||
return 0;
|
||||
}
|
||||
} else if(reqOffset.type != 0){ // use the consumer specified offset
|
||||
// the offset value can not be monotonious increase??
|
||||
offset = reqOffset;
|
||||
} else {
|
||||
} else if(reqOffset.type == 0){ // use the consumer specified offset
|
||||
uError("req offset type is 0");
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
|
||||
// this is a normal subscribe requirement
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
|
||||
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
|
||||
} else { // todo handle the case where re-balance occurs.
|
||||
// for taosx
|
||||
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
|
||||
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -628,6 +628,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TMQ_SEEK:
|
||||
return tqProcessSeekReq(pVnode->pTq, pMsg);
|
||||
|
||||
default:
|
||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -16,10 +16,6 @@
|
|||
#include "taoserror.h"
|
||||
#include "walInt.h"
|
||||
|
||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
|
||||
static int32_t walFetchBodyNew(SWalReader *pRead);
|
||||
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
|
||||
|
||||
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
|
||||
if (pReader == NULL) {
|
||||
|
@ -70,38 +66,29 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
|||
int64_t fetchVer = pReader->curVersion;
|
||||
int64_t lastVer = walGetLastVer(pReader->pWal);
|
||||
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
||||
// int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||
|
||||
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
||||
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
||||
// }
|
||||
|
||||
// int64_t endVer = TMIN(appliedVer, committedVer);
|
||||
int64_t endVer = committedVer;
|
||||
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||
|
||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
||||
", end index:%" PRId64,
|
||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer);
|
||||
|
||||
if (fetchVer > endVer){
|
||||
", applied index:%" PRId64,
|
||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
|
||||
if (fetchVer > appliedVer){
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
while (fetchVer <= endVer) {
|
||||
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
||||
while (fetchVer <= appliedVer) {
|
||||
if (walFetchHead(pReader, fetchVer) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t type = pReader->pHead->head.msgType;
|
||||
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
||||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
||||
if (walFetchBodyNew(pReader) < 0) {
|
||||
if (walFetchBody(pReader) < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
if (walSkipFetchBodyNew(pReader) < 0) {
|
||||
if (walSkipFetchBody(pReader) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -263,104 +250,8 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
|
||||
|
||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||
int64_t contLen;
|
||||
bool seeked = false;
|
||||
|
||||
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
|
||||
|
||||
if (pRead->curVersion != fetchVer) {
|
||||
if (walReaderSeekVer(pRead, fetchVer) < 0) {
|
||||
return -1;
|
||||
}
|
||||
seeked = true;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
if (contLen == sizeof(SWalCkHead)) {
|
||||
break;
|
||||
} else if (contLen == 0 && !seeked) {
|
||||
if(walReadSeekVerImpl(pRead, fetchVer) < 0){
|
||||
return -1;
|
||||
}
|
||||
seeked = true;
|
||||
continue;
|
||||
} else {
|
||||
if (contLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
// pRead->curInvalid = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t walFetchBodyNew(SWalReader *pReader) {
|
||||
SWalCont *pReadHead = &pReader->pHead->head;
|
||||
int64_t ver = pReadHead->version;
|
||||
|
||||
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver,
|
||||
pReadHead->bodyLen);
|
||||
|
||||
if (pReader->capacity < pReadHead->bodyLen) {
|
||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReader->pHead = ptr;
|
||||
pReadHead = &pReader->pHead->head;
|
||||
pReader->capacity = pReadHead->bodyLen;
|
||||
}
|
||||
|
||||
if (pReadHead->bodyLen != taosReadFile(pReader->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||
if (pReadHead->bodyLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
|
||||
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver, tstrerror(terrno));
|
||||
} else {
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
|
||||
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walValidBodyCksum(pReader->pHead) != 0) {
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType);
|
||||
pReader->curVersion = ver + 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
|
||||
int64_t code;
|
||||
|
||||
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||
if (code < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRead->curVersion++;
|
||||
wDebug("vgId:%d, version advance to %" PRId64 ", skip fetch", pRead->pWal->cfg.vgId, pRead->curVersion);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||
int64_t code;
|
||||
int64_t contLen;
|
||||
bool seeked = false;
|
||||
|
@ -378,15 +269,13 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
if (pRead->curVersion != ver) {
|
||||
code = walReaderSeekVer(pRead, ver);
|
||||
if (code < 0) {
|
||||
// pRead->curVersion = ver;
|
||||
// pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
seeked = true;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
|
||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
if (contLen == sizeof(SWalCkHead)) {
|
||||
break;
|
||||
} else if (contLen == 0 && !seeked) {
|
||||
|
@ -401,12 +290,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
// pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
code = walValidHeadCksum(pHead);
|
||||
code = walValidHeadCksum(pRead->pHead);
|
||||
|
||||
if (code != 0) {
|
||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||
|
@ -414,32 +302,27 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// pRead->curInvalid = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
|
||||
int64_t code;
|
||||
|
||||
int32_t walSkipFetchBody(SWalReader *pRead) {
|
||||
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||
", applied ver:%" PRId64,
|
||||
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
|
||||
|
||||
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
|
||||
int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||
if (code < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRead->curVersion++;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||
SWalCont *pReadHead = &((*ppHead)->head);
|
||||
int32_t walFetchBody(SWalReader *pRead) {
|
||||
SWalCont *pReadHead = &pRead->pHead->head;
|
||||
int64_t ver = pReadHead->version;
|
||||
|
||||
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||
|
@ -448,13 +331,13 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
|||
pRead->pWal->vers.appliedVer);
|
||||
|
||||
if (pRead->capacity < pReadHead->bodyLen) {
|
||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
*ppHead = ptr;
|
||||
pReadHead = &((*ppHead)->head);
|
||||
pRead->pHead = ptr;
|
||||
pReadHead = &pRead->pHead->head;
|
||||
pRead->capacity = pReadHead->bodyLen;
|
||||
}
|
||||
|
||||
|
@ -468,27 +351,24 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
|||
pRead->pWal->cfg.vgId, pReadHead->version, ver);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
// pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pReadHead->version != ver) {
|
||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||
pReadHead->version, ver);
|
||||
// pRead->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walValidBodyCksum(*ppHead) != 0) {
|
||||
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
|
||||
ver);
|
||||
// pRead->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRead->curVersion = ver + 1;
|
||||
pRead->curVersion++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -127,6 +127,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
|
||||
|
@ -455,7 +456,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
|
||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
|
||||
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
|
||||
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
|
||||
#,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
|
||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
|
||||
|
|
|
@ -36,7 +36,7 @@ class TDTestCase:
|
|||
# tdDnodes[1].cfgDir
|
||||
|
||||
cfgFile = f"%s/taos.cfg"%(cfgDir)
|
||||
shellCmd = 'echo "tmqMaxTopicNum %d" >> %s'%(tmqMaxTopicNum, cfgFile)
|
||||
shellCmd = 'echo tmqMaxTopicNum %d >> %s'%(tmqMaxTopicNum, cfgFile)
|
||||
tdLog.info(" shell cmd: %s"%(shellCmd))
|
||||
os.system(shellCmd)
|
||||
tdDnodes.stoptaosd(1)
|
||||
|
|
|
@ -131,7 +131,7 @@ class TDTestCase:
|
|||
if snapshot_value == "true":
|
||||
if offset_value != "earliest" and offset_value != "":
|
||||
if offset_value == "latest":
|
||||
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info))
|
||||
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
|
||||
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
|
||||
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
|
||||
tdSql.checkEqual(sum(rows_value_list), expected_res)
|
||||
|
@ -154,7 +154,7 @@ class TDTestCase:
|
|||
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
|
||||
else:
|
||||
if offset_value != "none":
|
||||
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info))
|
||||
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
|
||||
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
|
||||
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
|
||||
tdSql.checkEqual(sum(rows_value_list), expected_res)
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
buildPath = tdCom.getBuildPath()
|
||||
cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath)
|
||||
tdLog.info(cmdStr1)
|
||||
os.system(cmdStr1)
|
||||
time.sleep(15)
|
||||
|
||||
cmdStr2 = '%s/build/bin/tmq_offset_test &'%(buildPath)
|
||||
tdLog.info(cmdStr2)
|
||||
os.system(cmdStr2)
|
||||
|
||||
time.sleep(20)
|
||||
|
||||
os.system("kill -9 `pgrep taosBenchmark`")
|
||||
result = os.system("kill -9 `pgrep tmq_offset_test`")
|
||||
if result != 0:
|
||||
tdLog.exit("tmq_offset_test error!")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c)
|
|||
add_executable(sml_test sml_test.c)
|
||||
add_executable(get_db_name_test get_db_name_test.c)
|
||||
add_executable(tmq_offset tmqOffset.c)
|
||||
add_executable(tmq_offset_test tmq_offset_test.c)
|
||||
target_link_libraries(
|
||||
tmq_offset
|
||||
PUBLIC taos
|
||||
|
@ -42,6 +43,13 @@ target_link_libraries(
|
|||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
target_link_libraries(
|
||||
tmq_offset_test
|
||||
PUBLIC taos
|
||||
PUBLIC util
|
||||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
write_raw_block_test
|
||||
|
|
|
@ -0,0 +1,311 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include "types.h"
|
||||
|
||||
int buildData(TAOS* pConn){
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists tp");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop tp, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists db_ts3756");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create database if not exists db_ts3756 vgroups 2 wal_retention_period 3600");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "use db_ts3756");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table meters, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into t1 values(now, 1)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into t1 values(now + 1s, 2)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic tp as select * from t1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic tp, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void test_offset(TAOS* pConn){
|
||||
if(buildData(pConn) != 0){
|
||||
ASSERT(0);
|
||||
}
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||
tmq_conf_set(conf, "group.id", "group_id_2");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "false");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "tp");
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
int32_t timeout = 200;
|
||||
|
||||
tmq_topic_assignment* pAssign1 = NULL;
|
||||
int32_t numOfAssign1 = 0;
|
||||
|
||||
tmq_topic_assignment* pAssign2 = NULL;
|
||||
int32_t numOfAssign2 = 0;
|
||||
|
||||
tmq_topic_assignment* pAssign3 = NULL;
|
||||
int32_t numOfAssign3 = 0;
|
||||
|
||||
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign1, &numOfAssign1);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign1);
|
||||
tmq_consumer_close(tmq);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign2, &numOfAssign2);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign2);
|
||||
tmq_consumer_close(tmq);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign3, &numOfAssign3);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign3);
|
||||
tmq_consumer_close(tmq);
|
||||
ASSERT(0);
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(numOfAssign1 == 2);
|
||||
ASSERT(numOfAssign1 == numOfAssign2);
|
||||
ASSERT(numOfAssign1 == numOfAssign3);
|
||||
|
||||
for(int i = 0; i < numOfAssign1; i++){
|
||||
int j = 0;
|
||||
int k = 0;
|
||||
for(; j < numOfAssign2; j++){
|
||||
if(pAssign1[i].vgId == pAssign2[j].vgId){
|
||||
break;
|
||||
}
|
||||
}
|
||||
for(; k < numOfAssign3; k++){
|
||||
if(pAssign1[i].vgId == pAssign3[k].vgId){
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pAssign1[i].currentOffset == pAssign2[j].currentOffset);
|
||||
ASSERT(pAssign1[i].currentOffset == pAssign3[k].currentOffset);
|
||||
|
||||
ASSERT(pAssign1[i].begin == pAssign2[j].begin);
|
||||
ASSERT(pAssign1[i].begin == pAssign3[k].begin);
|
||||
|
||||
ASSERT(pAssign1[i].end == pAssign2[j].end);
|
||||
ASSERT(pAssign1[i].end == pAssign3[k].end);
|
||||
}
|
||||
tmq_free_assignment(pAssign1);
|
||||
tmq_free_assignment(pAssign2);
|
||||
tmq_free_assignment(pAssign3);
|
||||
|
||||
int cnt = 0;
|
||||
int offset1 = -1;
|
||||
int offset2 = -1;
|
||||
while (cnt++ < 10) {
|
||||
printf("start to poll:%d\n", cnt);
|
||||
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||
if (pRes) {
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
int32_t numOfAssign = 0;
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
int64_t position = tmq_position(tmq, "tp", pAssign[i].vgId);
|
||||
if(position == 0) continue;
|
||||
|
||||
printf("position = %d\n", (int)position);
|
||||
tmq_commit_offset_sync(tmq, "tp", pAssign[i].vgId, position);
|
||||
int64_t committed = tmq_committed(tmq, "tp", pAssign[i].vgId);
|
||||
ASSERT(position == committed);
|
||||
}
|
||||
|
||||
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
|
||||
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
|
||||
|
||||
if(offset1 != -1){
|
||||
ASSERT(offset1 == pAssign[0].currentOffset);
|
||||
}
|
||||
if(offset2 != -1){
|
||||
ASSERT(offset2 == pAssign[1].currentOffset);
|
||||
}
|
||||
|
||||
offset1 = pAssign[0].currentOffset;
|
||||
offset2 = pAssign[1].currentOffset;
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
}
|
||||
|
||||
tmq_consumer_close(tmq);
|
||||
}
|
||||
|
||||
// run taosBenchmark first
|
||||
void test_ts3756(TAOS* pConn){
|
||||
TAOS_RES*pRes = taos_query(pConn, "use test");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
pRes = taos_query(pConn, "create topic t1 as select * from meters");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||
tmq_conf_set(conf, "group.id", "group_id_2");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "false");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "t1");
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
int32_t timeout = 200;
|
||||
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
int32_t numOfAssign = 0;
|
||||
|
||||
while (1) {
|
||||
// printf("start to poll\n");
|
||||
|
||||
pRes = tmq_consumer_poll(tmq, timeout);
|
||||
if (pRes) {
|
||||
tmq_topic_assignment* pAssignTmp = NULL;
|
||||
int32_t numOfAssignTmp = 0;
|
||||
|
||||
int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssignTmp, &numOfAssignTmp);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if(numOfAssign != 0){
|
||||
int i = 0;
|
||||
for(; i < numOfAssign; i++){
|
||||
if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(i == numOfAssign){
|
||||
ASSERT(0);
|
||||
}
|
||||
tmq_free_assignment(pAssign);
|
||||
}
|
||||
numOfAssign = numOfAssignTmp;
|
||||
pAssign = pAssignTmp;
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
test_offset(pConn);
|
||||
test_ts3756(pConn);
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue