Merge pull request #22032 from taosdata/mark/tmq

fix:set firset version to reqOffset of response
This commit is contained in:
Haojun Liao 2023-07-13 16:35:21 +08:00 committed by GitHub
commit e2ae868377
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 324 additions and 201 deletions

View File

@ -3371,6 +3371,12 @@ typedef struct {
int8_t reserved;
} SMqHbRsp;
typedef struct {
SMsgHead head;
int64_t consumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
} SMqSeekReq;
#define TD_AUTO_CREATE_TABLE 0x1
typedef struct {
int64_t suid;
@ -3500,6 +3506,8 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq);
int32_t tDeserializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq);
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2

View File

@ -306,7 +306,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK_TO_OFFSET, "vnode-tmq-seekto-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK, "vnode-tmq-seek", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)

View File

@ -775,6 +775,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004)
#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005)
#define TSDB_CODE_TMQ_SNAPSHOT_ERROR TAOS_DEF_ERROR_CODE(0, 0x4006)
#define TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4007)
#define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008)
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -140,6 +140,7 @@ enum {
typedef struct SVgOffsetInfo {
STqOffsetVal committedOffset;
STqOffsetVal currentOffset;
STqOffsetVal seekOffset; // the first version in block for seek operation
int64_t walVerBegin;
int64_t walVerEnd;
} SVgOffsetInfo;
@ -214,6 +215,11 @@ typedef struct SMqVgCommon {
int32_t code;
} SMqVgCommon;
typedef struct SMqSeekParam {
tsem_t sem;
int32_t code;
} SMqSeekParam;
typedef struct SMqVgWalInfoParam {
int32_t vgId;
int32_t epoch;
@ -821,7 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
offRows->vgId = pVg->vgId;
offRows->rows = pVg->numOfRows;
offRows->offset = pVg->offsetInfo.currentOffset;
offRows->offset = pVg->offsetInfo.seekOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
@ -1479,6 +1485,7 @@ CREATE_MSG_FAIL:
typedef struct SVgroupSaveInfo {
STqOffsetVal currentOffset;
STqOffsetVal commitOffset;
STqOffsetVal seekOffset;
int64_t numOfRows;
} SVgroupSaveInfo;
@ -1518,6 +1525,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew;
clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew;
clientVg.offsetInfo.seekOffset = pInfo ? pInfo->seekOffset : offsetNew;
clientVg.offsetInfo.walVerBegin = -1;
clientVg.offsetInfo.walVerEnd = -1;
clientVg.seekUpdated = false;
@ -1577,7 +1585,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf);
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .seekOffset = pVgCur->offsetInfo.seekOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
}
}
@ -1879,10 +1887,11 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0;
}
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, int64_t ever, int64_t consumerId){
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.currentOffset = *offset;
pVg->offsetInfo.seekOffset = *reqOffset;
pVg->offsetInfo.currentOffset = *rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
}
@ -1944,7 +1953,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset;
}
updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
@ -1994,7 +2003,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
}
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
// build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
@ -2022,7 +2031,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
}
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
@ -2259,9 +2268,11 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*) res;
STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset;
if (pOffset->type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.rspOffset.version;
return pRspObj->rsp.reqOffset.version;
}else{
tscError("invalid offset type:%d", pOffset->type);
}
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
@ -2270,9 +2281,11 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
}
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.rspOffset.version;
if (pRspObj->rsp.reqOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.reqOffset.version;
}
} else{
tscError("invalid tmq type:%d", *(int8_t*)res);
}
// data from tsdb, no valid offset info
@ -2541,6 +2554,8 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_post(&pCommon->rsp);
}
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return 0;
}
@ -2611,7 +2626,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
tmq_topic_assignment* pAssignment = &(*assignment)[j];
pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
pAssignment->currentOffset = pClientVg->offsetInfo.seekOffset.version;
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
pAssignment->vgId = pClientVg->vgId;
@ -2650,6 +2665,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SMqPollReq req = {0};
tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
req.reqOffset = pClientVg->offsetInfo.seekOffset;
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
@ -2746,6 +2762,17 @@ void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
taosMemoryFree(pAssignment);
}
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
SMqSeekParam* pParam = param;
pParam->code = code;
tsem_post(&pParam->sem);
return 0;
}
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
@ -2761,7 +2788,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (pTopic == NULL) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
SMqClientVg* pVg = NULL;
@ -2777,7 +2804,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (pVg == NULL) {
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_TMQ_INVALID_VGID;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
@ -2793,41 +2820,77 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
}
// update the offset, and then commit to vnode
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
pOffsetInfo->seekOffset = pOffsetInfo->currentOffset;
// pOffsetInfo->committedOffset.version = INT64_MIN;
pVg->seekUpdated = true;
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
SMqSeekReq req = {0};
snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, pTopic->topicName);
req.head.vgId = pVg->vgId;
req.consumerId = tmq->consumerId;
int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
if (msgSize < 0) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(msg);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
if (pParam == NULL) {
taosMemoryFree(msg);
taosMemoryFree(sendInfo);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqSeekCb;
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0;
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64,
tmq->consumerId, pTopic->topicName, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
taosWUnLockLatch(&tmq->lock);
// SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
// tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
//
// SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
// if (pInfo == NULL) {
// tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
// return TSDB_CODE_OUT_OF_MEMORY;
// }
//
// tsem_init(&pInfo->sem, 0, 0);
// pInfo->code = 0;
//
// asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);
//
// tsem_wait(&pInfo->sem);
// int32_t code = pInfo->code;
//
// tsem_destroy(&pInfo->sem);
// taosMemoryFree(pInfo);
//
// if (code != TSDB_CODE_SUCCESS) {
// tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code));
// }
tsem_wait(&pParam->sem);
int32_t code = pParam->code;
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return 0;
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, vgId, tstrerror(code));
}
return code;
}

View File

@ -34,6 +34,8 @@ namespace {
void printSubResults(void* pRes, int32_t* totalRows) {
char buf[1024];
int32_t vgId = tmq_get_vgroup_id(pRes);
int64_t offset = tmq_get_vgroup_offset(pRes);
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) {
@ -45,7 +47,7 @@ void printSubResults(void* pRes, int32_t* totalRows) {
int32_t precision = taos_result_precision(pRes);
taos_print_row(buf, row, fields, numOfFields);
*totalRows += 1;
printf("precision: %d, row content: %s\n", precision, buf);
printf("vgId: %d, offset: %"PRId64", precision: %d, row content: %s\n", vgId, offset, precision, buf);
}
// taos_free_result(pRes);
@ -1160,6 +1162,7 @@ TEST(clientCase, td_25129) {
}
while (1) {
printf("start to poll\n");
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[128];
@ -1173,9 +1176,24 @@ TEST(clientCase, td_25129) {
// printf("vgroup id: %d\n", vgroupId);
printSubResults(pRes, &totalRows);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
} else {
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
tmq_commit_sync(tmq, pRes);
continue;
}

View File

@ -5382,6 +5382,48 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
return 0;
}
int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subKey) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->head.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
tDecodeCStrTo(&decoder, pReq->subKey);
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
@ -7351,27 +7393,8 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) {
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) {
if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->withSchema) < 0) return -1;
if (tEncodeMqDataRsp(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1;
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i);
void *data = taosArrayGetP(pRsp->blockData, i);
if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1;
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
if (tEncodeSSchemaWrapper(pEncoder, pSW) < 0) return -1;
}
if (pRsp->withTbName) {
char *tbName = (char *)taosArrayGetP(pRsp->blockTbName, i);
if (tEncodeCStr(pEncoder, tbName) < 0) return -1;
}
}
}
if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
@ -7384,46 +7407,8 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
}
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) {
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
if (tDecodeI8(pDecoder, &pRsp->withTbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->withSchema) < 0) return -1;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void *));
}
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *));
}
if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1;
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void *data;
uint64_t bLen;
if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1;
taosArrayPush(pRsp->blockData, &data);
int32_t len = bLen;
taosArrayPush(pRsp->blockDataLen, &len);
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pSW == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) {
taosMemoryFree(pSW);
return -1;
}
taosArrayPush(pRsp->blockSchema, &pSW);
}
if (pRsp->withTbName) {
char *tbName;
if (tDecodeCStrAlloc(pDecoder, &tbName) < 0) return -1;
taosArrayPush(pRsp->blockTbName, &tbName);
}
}
}
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t));

View File

@ -726,7 +726,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK_TO_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;

View File

@ -137,12 +137,11 @@ typedef enum {
} EDndReason;
typedef enum {
CONSUMER_UPDATE_REB_MODIFY_NOTOPIC = 1, // topic do not need modified after rebalance
CONSUMER_UPDATE_REB_MODIFY_TOPIC, // topic need modified after rebalance
CONSUMER_UPDATE_REB_MODIFY_REMOVE, // topic need removed after rebalance
// CONSUMER_UPDATE_TIMER_LOST,
CONSUMER_UPDATE_RECOVER,
CONSUMER_UPDATE_SUB_MODIFY, // modify after subscribe req
CONSUMER_UPDATE_REB = 1, // update after rebalance
CONSUMER_ADD_REB, // add after rebalance
CONSUMER_REMOVE_REB, // remove after rebalance
CONSUMER_UPDATE_REC, // update after recover
CONSUMER_UPDATE_SUB, // update after subscribe req
} ECsmUpdateType;
typedef struct {

View File

@ -184,7 +184,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
}
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER;
pConsumerNew->updateType = CONSUMER_UPDATE_REC;
mndReleaseConsumer(pMnode, pConsumer);
@ -701,7 +701,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;
// pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY; // use insert logic
// pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
@ -731,7 +731,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// set the update type
pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;
pConsumerNew->updateType = CONSUMER_UPDATE_SUB;
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
@ -984,7 +984,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
taosWLockLatch(&pOldConsumer->lock);
if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) {
if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
@ -1004,7 +1004,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
// pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) {
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
@ -1013,12 +1013,12 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) {
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
pOldConsumer->rebalanceTime = taosGetTimestampMs();
mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) {
} else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// check if exist in current topic
@ -1049,7 +1049,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) {
} else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
// remove from removed topic

View File

@ -597,7 +597,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_NOTOPIC;
pConsumerNew->updateType = CONSUMER_UPDATE_REB;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew, true);
@ -613,7 +613,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_TOPIC;
pConsumerNew->updateType = CONSUMER_ADD_REB;
char* topicTmp = taosStrdup(topic);
taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp);
@ -633,7 +633,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_REMOVE;
pConsumerNew->updateType = CONSUMER_REMOVE_REB;
char* topicTmp = taosStrdup(topic);
taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp);

View File

@ -228,7 +228,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);

View File

@ -330,86 +330,124 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
return 0;
}
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVgOffset vgOffset = {0};
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
SMqSeekReq req = {0};
int32_t vgId = TD_VID(pTq->pVnode);
SRpcMsg rsp = {.info = pMsg->info};
int code = 0;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
tqError("vgId:%d failed to decode seek msg", vgId);
return -1;
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tDecoderClear(&decoder);
tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);
STqOffset* pOffset = &vgOffset.offset;
if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
return -1;
}
STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
code = 0;
goto end;
}
// 2. check consumer-vg assignment status
taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != vgOffset.consumerId) {
tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
if (pHandle->consumerId != req.consumerId) {
tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
req.consumerId, vgId, req.subKey, pHandle->consumerId);
taosRUnLockLatch(&pTq->lock);
return -1;
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
goto end;
}
//if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to TMQ_VG_STATUS__IDLE,
//otherwise poll data failed after seek.
tqUnregisterPushHandle(pTq, pHandle);
taosRUnLockLatch(&pTq->lock);
// 3. check the offset info
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL) {
if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
return 0; // no need to update the offset value
}
if (pSavedOffset->val.version == pOffset->val.version) {
tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
pOffset->val.version, pSavedOffset->val.version);
end:
rsp.code = code;
tmsgSendRsp(&rsp);
return 0;
}
}
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
if (pOffset->val.version < sver) {
pOffset->val.version = sver;
} else if (pOffset->val.version > ever) {
pOffset->val.version = ever;
}
// save the new offset value
if (pSavedOffset != NULL) {
tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
pSavedOffset->val.version);
} else {
tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
}
if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
return -1;
}
tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
vgOffset.consumerId, vgOffset.offset.val.version);
return 0;
// SMqVgOffset vgOffset = {0};
// int32_t vgId = TD_VID(pTq->pVnode);
//
// SDecoder decoder;
// tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
// if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
// tqError("vgId:%d failed to decode seek msg", vgId);
// return -1;
// }
//
// tDecoderClear(&decoder);
//
// tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
// vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);
//
// STqOffset* pOffset = &vgOffset.offset;
// if (pOffset->val.type != TMQ_OFFSET__LOG) {
// tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
// return -1;
// }
//
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
// if (pHandle == NULL) {
// tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
//
// // 2. check consumer-vg assignment status
// taosRLockLatch(&pTq->lock);
// if (pHandle->consumerId != vgOffset.consumerId) {
// tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
// vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
// terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
// taosRUnLockLatch(&pTq->lock);
// return -1;
// }
// taosRUnLockLatch(&pTq->lock);
//
// // 3. check the offset info
// STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
// if (pSavedOffset != NULL) {
// if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
// tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
// return 0; // no need to update the offset value
// }
//
// if (pSavedOffset->val.version == pOffset->val.version) {
// tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
// pOffset->val.version, pSavedOffset->val.version);
// return 0;
// }
// }
//
// int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// if (pOffset->val.version < sver) {
// pOffset->val.version = sver;
// } else if (pOffset->val.version > ever) {
// pOffset->val.version = ever;
// }
//
// // save the new offset value
// if (pSavedOffset != NULL) {
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
// pSavedOffset->val.version);
// } else {
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
// }
//
// if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
// tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
// return -1;
// }
//
// tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
// vgOffset.consumerId, vgOffset.offset.val.version);
//
// return 0;
}
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {

View File

@ -157,18 +157,24 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
if(offset->type == TMQ_OFFSET__LOG){
offset->version = ver + 1;
}
}
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
int code = 0;
terrno = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
goto end;
}
@ -183,11 +189,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
goto end;
} else {
}
taosWUnLockLatch(&pTq->lock);
}
}
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
end : {
@ -209,6 +214,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
@ -261,6 +267,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
}
@ -273,6 +280,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
}
@ -302,6 +310,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
} else {

View File

@ -466,11 +466,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err;
}
break;
case TDMT_VND_TMQ_SEEK_TO_OFFSET:
if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_VND_TMQ_ADD_CHECKINFO:
if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
goto _err;
@ -643,6 +638,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:

View File

@ -632,6 +632,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to s
//tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_TOPIC, "Topic does not belong to this consumer")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")

View File

@ -26,7 +26,7 @@ class TDTestCase:
# self.commit_value_list = ["true"]
# self.offset_value_list = [""]
# self.tbname_value_list = ["true"]
# self.snapshot_value_list = ["true"]
# self.snapshot_value_list = ["false"]
def tmqParamsTest(self):
paraDict = {'dbName': 'db1',
@ -131,8 +131,8 @@ class TDTestCase:
if snapshot_value == "true":
if offset_value != "earliest" and offset_value != "":
if offset_value == "latest":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) > 0, True)
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
elif offset_value == "none":
@ -154,8 +154,8 @@ class TDTestCase:
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
else:
if offset_value != "none":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) > 0, True)
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
else: