opti:raw data from tmq

This commit is contained in:
wangmm0220 2024-04-19 13:48:17 +08:00
parent 2052c347dd
commit 42467c0e00
11 changed files with 279 additions and 262 deletions

View File

@ -3807,6 +3807,7 @@ int32_t tDecodeMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp);
void tDeleteMqMetaRsp(SMqMetaRsp *pRsp);
#define MQ_DATA_RSP_VERSION 100
typedef struct {
SMqRspHead head;
STqOffsetVal reqOffset;
@ -3818,33 +3819,27 @@ typedef struct {
SArray* blockData;
SArray* blockTbName;
SArray* blockSchema;
int64_t sleepTime;
} SMqDataRspCommon;
typedef struct {
SMqDataRspCommon common;
int64_t sleepTime;
} SMqDataRsp;
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp, int8_t dataVersion);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const void* pRsp);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, void* pRsp);
void tDeleteMqDataRsp(void* pRsp);
typedef struct {
SMqRspHead head;
STqOffsetVal reqOffset;
STqOffsetVal rspOffset;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
SArray* blockDataLen;
SArray* blockData;
SArray* blockTbName;
SArray* blockSchema;
// the following attributes are extended from SMqDataRsp
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
SMqDataRspCommon common;
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
} STaosxRsp;
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp, int8_t dateVersion);
void tDeleteSTaosxRsp(STaosxRsp* pRsp);
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const void* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, void* pRsp);
void tDeleteSTaosxRsp(void* pRsp);
typedef struct {
SMqRspHead head;

View File

@ -221,7 +221,11 @@ typedef struct {
SSchemaWrapper schema;
int32_t resIter;
SReqResultInfo resInfo;
SMqDataRsp rsp;
} SMqRspObjCommon;
typedef struct {
SMqRspObjCommon common;
SMqDataRsp rsp;
} SMqRspObj;
typedef struct {
@ -233,14 +237,8 @@ typedef struct {
} SMqMetaRspObj;
typedef struct {
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
SSchemaWrapper schema;
int32_t resIter;
SReqResultInfo resInfo;
STaosxRsp rsp;
SMqRspObjCommon common;
STaosxRsp rsp;
} SMqTaosxRspObj;
typedef struct SReqRelInfo {
@ -320,7 +318,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
return (SReqResultInfo*)&msg->resInfo;
return (SReqResultInfo*)&msg->common.resInfo;
}
SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4);

View File

@ -343,12 +343,12 @@ void taos_free_result(TAOS_RES *res) {
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
tDeleteSTaosxRsp(&pRsp->rsp);
doFreeReqResultInfo(&pRsp->resInfo);
doFreeReqResultInfo(&pRsp->common.resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res;
tDeleteMqDataRsp(&pRsp->rsp);
doFreeReqResultInfo(&pRsp->resInfo);
doFreeReqResultInfo(&pRsp->common.resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
@ -417,7 +417,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
if (msg->common.resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
} else {
pResultInfo = tmqGetCurResInfo(res);

View File

@ -1623,11 +1623,16 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
pRequest->syncQuery = true;
rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ;
rspObj.common.resIter = -1;
rspObj.common.resType = RES_TYPE__TMQ;
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION){
data += sizeof(int8_t) + sizeof(int32_t);
dataLen -= sizeof(int8_t) + sizeof(int32_t);
}
tDecoderInit(&decoder, data, dataLen);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp, *(int8_t*)data);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp);
if (code != 0) {
code = TSDB_CODE_INVALID_MSG;
goto end;
@ -1656,13 +1661,13 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end;
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
while (++rspObj.resIter < rspObj.rsp.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) {
while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.rsp.common.blockData, rspObj.common.resIter);
if (!rspObj.rsp.common.withSchema) {
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter);
if (!tbName) {
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
@ -1693,7 +1698,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
goto end;
@ -1752,11 +1757,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
pRequest->syncQuery = true;
rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ_METADATA;
rspObj.common.resIter = -1;
rspObj.common.resType = RES_TYPE__TMQ_METADATA;
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION){
data += sizeof(int8_t) + sizeof(int32_t);
dataLen -= sizeof(int8_t) + sizeof(int32_t);
}
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp, *(int8_t*)data);
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp);
if (code != 0) {
code = TSDB_CODE_INVALID_MSG;
goto end;
@ -1786,14 +1797,14 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) {
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum);
while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.rsp.common.blockData, rspObj.common.resIter);
if (!rspObj.rsp.common.withSchema) {
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter);
if (!tbName) {
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
@ -1865,7 +1876,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
goto end;
@ -1944,6 +1955,64 @@ char* tmq_get_json_meta(TAOS_RES* res) {
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
static int32_t getOffSetLen(const void *rsp){
const SMqDataRspCommon *pRsp = rsp;
SEncoder coder = {0};
tEncoderInit(&coder, NULL, 0);
if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
int32_t pos = coder.pos;
tEncoderClear(&coder);
return pos;
}
typedef int32_t __encode_func__(SEncoder *pEncoder, const void *pRsp);
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw){
int32_t len = 0;
int32_t code = 0;
SEncoder encoder = {0};
void* buf = NULL;
tEncodeSize(encodeFunc, rspObj, len, code);
if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
len += sizeof(int8_t) + sizeof(int32_t);
buf = taosMemoryCalloc(1, len);
if(buf == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAILED;
}
tEncoderInit(&encoder, buf, len);
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
int32_t offsetLen = getOffSetLen(rspObj);
if(offsetLen <= 0){
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
if (tEncodeI32(&encoder, offsetLen) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
if(encodeFunc(&encoder, rspObj) < 0){
terrno = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
tEncoderClear(&encoder);
raw->raw = buf;
raw->raw_len = len;
return 0;
FAILED:
tEncoderClear(&encoder);
taosMemoryFree(buf);
return terrno;
}
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
if (!raw || !res) {
terrno = TSDB_CODE_INVALID_PARA;
@ -1957,42 +2026,19 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
uDebug("tmq get raw type meta:%p", raw);
} else if (TD_RES_TMQ(res)) {
SMqRspObj* rspObj = ((SMqRspObj*)res);
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeMqDataRsp, &rspObj->rsp, len, code);
if (code < 0) {
return -1;
if(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0){
uError("tmq get raw type error:%d", terrno);
return terrno;
}
void* buf = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len);
tEncodeMqDataRsp(&encoder, &rspObj->rsp);
tEncoderClear(&encoder);
raw->raw = buf;
raw->raw_len = len;
raw->raw_type = RES_TYPE__TMQ;
uDebug("tmq get raw type data:%p", raw);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSTaosxRsp, &rspObj->rsp, len, code);
if (code < 0) {
return -1;
if(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0){
uError("tmq get raw type error:%d", terrno);
return terrno;
}
void* buf = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len);
tEncodeSTaosxRsp(&encoder, &rspObj->rsp);
tEncoderClear(&encoder);
raw->raw = buf;
raw->raw_len = len;
raw->raw_type = RES_TYPE__TMQ_METADATA;
uDebug("tmq get raw type metadata:%p", raw);
} else {

View File

@ -622,9 +622,9 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
if (TD_RES_TMQ(pRes)) {
SMqRspObj* pRspObj = (SMqRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
offsetVal = pRspObj->rsp.rspOffset;
pTopicName = pRspObj->common.topic;
vgId = pRspObj->common.vgId;
offsetVal = pRspObj->rsp.common.rspOffset;
} else if (TD_RES_TMQ_META(pRes)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
pTopicName = pMetaRspObj->topic;
@ -632,9 +632,9 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
offsetVal = pMetaRspObj->metaRsp.rspOffset;
} else if (TD_RES_TMQ_METADATA(pRes)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
offsetVal = pRspObj->rsp.rspOffset;
pTopicName = pRspObj->common.topic;
vgId = pRspObj->common.vgId;
offsetVal = pRspObj->rsp.common.rspOffset;
} else {
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
@ -1377,7 +1377,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
if(tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){
if(tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp) < 0){
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
@ -1387,9 +1387,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset);
tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset);
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
tmq->consumerId, vgId, pRspWrapper->dataRsp.common.reqOffset.version, buf, rspType, requestId);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
@ -1404,7 +1404,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
if(tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){
if(tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp) < 0){
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
@ -1598,6 +1598,9 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
if(pRspObj == NULL) {
return NULL;
}
pRspObj->resType = RES_TYPE__TMQ_META;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
@ -1611,8 +1614,7 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
void changeByteEndian(char* pData){
char* p = pData;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
// version:
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | version:
int32_t blockVersion = *(int32_t*)p;
ASSERT(blockVersion == BLOCK_VERSION_1);
*(int32_t*)p = BLOCK_VERSION_2;
@ -1649,7 +1651,7 @@ static void tmqGetRawDataRowsPrecisionFromRes(void *pRetrieve, void** rawData, i
}
}
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObjCommon* pRspObj, SMqDataRspCommon* pDataRsp) {
(*numOfRows) = 0;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
@ -1660,14 +1662,14 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
bool needTransformSchema = !pRspObj->rsp.withSchema;
if (!pRspObj->rsp.withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
pRspObj->rsp.withSchema = true;
pRspObj->rsp.blockSchema = taosArrayInit(pRspObj->rsp.blockNum, sizeof(void*));
bool needTransformSchema = !pDataRsp->withSchema;
if (!pDataRsp->withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
pDataRsp->withSchema = true;
pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
}
// extract the rows in this data packet
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i);
for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
void* pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
void* rawData = NULL;
int64_t rows = 0;
// deal with compatibility
@ -1679,7 +1681,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
if(schema){
taosArrayPush(pRspObj->rsp.blockSchema, &schema);
taosArrayPush(pDataRsp->blockSchema, &schema);
}
}
}
@ -1687,18 +1689,24 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
if(pRspObj == NULL){
return NULL;
}
pRspObj->common.resType = RES_TYPE__TMQ;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj);
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common);
return pRspObj;
}
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
pRspObj->resType = RES_TYPE__TMQ_METADATA;
if(pRspObj == NULL){
return NULL;
}
pRspObj->common.resType = RES_TYPE__TMQ_METADATA;
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj);
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common);
return pRspObj;
}
@ -1891,7 +1899,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
SMqDataRspCommon* pDataRsp = (SMqDataRspCommon*)&pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) {
taosWLockLatch(&tmq->lock);
@ -1994,8 +2002,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
SMqDataRspCommon* pDataRsp = (SMqDataRspCommon*)&pollRspWrapper->taosxRsp;
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
if (pDataRsp->head.epoch == consumerEpoch) {
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
@ -2009,11 +2018,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
return NULL;
}
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset,
pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId,
pollRspWrapper->taosxRsp.blockNum != 0);
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset,
pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId,
pDataRsp->blockNum != 0);
if (pollRspWrapper->taosxRsp.blockNum == 0) {
if (pDataRsp->blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
@ -2034,7 +2043,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId);
taosFreeQitem(pRspWrapper);
@ -2042,7 +2051,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
return pRsp;
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
@ -2205,15 +2214,11 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
if (res == NULL) {
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->topic, '.') + 1;
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
return strchr(((SMqRspObjCommon*)res)->topic, '.') + 1;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->topic, '.') + 1;
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return strchr(pRspObj->topic, '.') + 1;
} else {
return NULL;
}
@ -2224,15 +2229,11 @@ const char* tmq_get_db_name(TAOS_RES* res) {
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
return strchr(((SMqRspObjCommon*)res)->db, '.') + 1;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->db, '.') + 1;
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
} else {
return NULL;
}
@ -2242,15 +2243,11 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL) {
return -1;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->vgId;
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
return ((SMqRspObjCommon*)res)->vgId;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId;
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return pRspObj->vgId;
} else {
return -1;
}
@ -2260,24 +2257,19 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (res == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset;
if (pOffset->type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.reqOffset.version;
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqDataRspCommon* common = (SMqDataRspCommon*)POINTER_SHIFT(res, sizeof(SMqRspObjCommon));
STqOffsetVal* pOffset = &common->reqOffset;
if (common->reqOffset.type == TMQ_OFFSET__LOG) {
return common->reqOffset.version;
} else {
tscError("invalid offset type:%d", pOffset->type);
tscError("invalid offset type:%d", common->reqOffset.type);
}
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->metaRsp.rspOffset.version;
}
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
if (pRspObj->rsp.reqOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.reqOffset.version;
}
} else {
tscError("invalid tmq type:%d", *(int8_t*)res);
}
@ -2290,20 +2282,15 @@ const char* tmq_get_table_name(TAOS_RES* res) {
if (res == NULL) {
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqDataRspCommon* common = (SMqDataRspCommon*)POINTER_SHIFT(res, sizeof(SMqRspObjCommon));
SMqRspObjCommon* pRspObj = (SMqRspObjCommon*)res;
if (!common->withTbName || common->blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= common->blockNum) {
return NULL;
}
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
return (const char*)taosArrayGetP(common->blockTbName, pRspObj->resIter);
}
return NULL;
}
@ -2640,17 +2627,17 @@ void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, cons
}
SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
SMqDataRspCommon* common = (SMqDataRspCommon*)POINTER_SHIFT(res, sizeof(SMqRspObjCommon));
SMqRspObjCommon* pRspObj = (SMqRspObjCommon*)res;
pRspObj->resIter++;
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
if (pRspObj->rsp.withSchema) {
if (pRspObj->resIter < common->blockNum) {
if (common->withSchema) {
doFreeReqResultInfo(&pRspObj->resInfo);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(common->blockSchema, pRspObj->resIter);
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
}
void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
void* pRetrieve = taosArrayGetP(common->blockData, pRspObj->resIter);
void* rawData = NULL;
int64_t rows = 0;
int32_t precision = 0;
@ -2685,14 +2672,14 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqDataRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeMqDataRsp(&decoder, &rsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)));
tDecodeMqDataRsp(&decoder, &rsp);
tDecoderClear(&decoder);
SMqRspHead* pHead = pMsg->pData;
tmq_topic_assignment assignment = {.begin = pHead->walsver,
.end = pHead->walever + 1,
.currentOffset = rsp.rspOffset.version,
.currentOffset = rsp.common.rspOffset.version,
.vgId = pParam->vgId};
taosThreadMutexLock(&pCommon->mutex);

View File

@ -8897,7 +8897,7 @@ void tDeleteMqMetaRsp(SMqMetaRsp *pRsp) {
taosMemoryFree(pRsp->metaRsp);
}
int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRspCommon *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1;
@ -8922,14 +8922,13 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
return 0;
}
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1;
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const void *pRsp) {
if (tEncodeMqDataRspCommon(pEncoder, pRsp) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1;
if (tEncodeI64(pEncoder, ((SMqDataRsp*)pRsp)->sleepTime) < 0) return -1;
return 0;
}
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRspCommon *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1;
@ -8975,19 +8974,17 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return 0;
}
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp, int8_t dataVersion) {
if (dataVersion >= MQ_DATA_RSP_VERSION){
if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1;
}
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) {
if (tDecodeMqDataRspCommon(pDecoder, pRsp) < 0) return -1;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1;
if (tDecodeI64(pDecoder, &((SMqDataRsp*)pRsp)->sleepTime) < 0) return -1;
}
return 0;
}
void tDeleteMqDataRsp(SMqDataRsp *pRsp) {
static void tDeleteMqDataRspCommon(void *rsp) {
SMqDataRspCommon *pRsp = rsp;
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
@ -8999,10 +8996,14 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) {
tOffsetDestroy(&pRsp->rspOffset);
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1;
if (tEncodeMqDataRspCommon(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1;
void tDeleteMqDataRsp(void *rsp) {
tDeleteMqDataRspCommon(rsp);
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const void *rsp) {
if (tEncodeMqDataRspCommon(pEncoder, rsp) < 0) return -1;
const STaosxRsp *pRsp = (const STaosxRsp *)rsp;
if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
@ -9014,19 +9015,17 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
return 0;
}
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp, int8_t dataVersion) {
if (dataVersion >= MQ_DATA_RSP_VERSION){
if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1;
}
if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1;
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, void *rsp) {
if (tDecodeMqDataRspCommon(pDecoder, rsp) < 0) return -1;
STaosxRsp *pRsp = (STaosxRsp *)rsp;
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *));
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
void * pCreate = NULL;
uint64_t len;
uint64_t len = 0;
if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1;
int32_t l = (int32_t)len;
taosArrayPush(pRsp->createTableLen, &l);
@ -9037,20 +9036,13 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp, int8_t dataVersion
return 0;
}
void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL;
void tDeleteSTaosxRsp(void *rsp) {
tDeleteMqDataRspCommon(rsp);
STaosxRsp *pRsp = (STaosxRsp *)rsp;
pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen);
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
pRsp->createTableReq = NULL;
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
}
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {

View File

@ -648,7 +648,6 @@ static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscri
_over:
mndReleaseConsumer(pMnode, pExistedConsumer);
tDeleteSMqConsumerObj(pConsumerNew);
taosArrayDestroyP(subscribe->topicNames, (FDelete)taosMemoryFree);
return NULL;
}

View File

@ -119,8 +119,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp,
int32_t type, int32_t vgId);
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
@ -154,9 +154,9 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tq util
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset);
void tqUpdateNodeStage(STQ* pTq, bool isLeader);
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData, const char* id);

View File

@ -153,10 +153,10 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
}
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, req.reqOffset);
dataRsp.blockNum = 0;
tqInitDataRsp(&dataRsp.common, req.reqOffset);
dataRsp.common.blockNum = 0;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf,
req.reqId);
@ -165,7 +165,7 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
return 0;
}
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp,
int32_t type, int32_t vgId) {
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
@ -174,11 +174,11 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
char buf1[TSDB_OFFSET_LEN] = {0};
char buf2[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset);
tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset);
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId);
return 0;
}
@ -499,7 +499,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
taosRUnLockLatch(&pTq->lock);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, req.reqOffset);
tqInitDataRsp(&dataRsp.common, req.reqOffset);
if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
@ -508,10 +508,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
dataRsp.common.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = reqOffset.version;
dataRsp.common.rspOffset.version = reqOffset.version;
} else if (reqOffset.type < 0) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
if (pOffset != NULL) {
@ -522,17 +522,17 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
dataRsp.rspOffset.version = pOffset->val.version;
dataRsp.common.rspOffset.version = pOffset->val.version;
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
req.subKey, dataRsp.rspOffset.version);
req.subKey, dataRsp.common.rspOffset.version);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
dataRsp.common.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever;
dataRsp.common.rspOffset.version = ever;
}
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
dataRsp.rspOffset.version);
dataRsp.common.rspOffset.version);
}
} else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,

View File

@ -15,7 +15,7 @@
#include "tq.h"
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOfCols, int8_t precision) {
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
@ -30,22 +30,22 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
actualLen += sizeof(SRetrieveTableRspForTmq);
taosArrayPush(pRsp->blockDataLen, &actualLen);
taosArrayPush(pRsp->blockData, &buf);
taosArrayPush(((SMqDataRspCommon*)pRsp)->blockDataLen, &actualLen);
taosArrayPush(((SMqDataRspCommon*)pRsp)->blockData, &buf);
return TSDB_CODE_SUCCESS;
}
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, void* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
if (pSW == NULL) {
return -1;
}
taosArrayPush(pRsp->blockSchema, &pSW);
taosArrayPush(((SMqDataRspCommon*)pRsp)->blockSchema, &pSW);
return 0;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, void* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
@ -57,7 +57,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
for (int32_t i = 0; i < n; i++) {
char* tbName = taosStrdup(mr.me.name);
taosArrayPush(pRsp->blockTbName, &tbName);
taosArrayPush(((SMqDataRspCommon*)pRsp)->blockTbName, &tbName);
}
metaReaderClear(&mr);
return 0;
@ -125,7 +125,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
return code;
}
pRsp->blockNum++;
pRsp->common.blockNum++;
if (pDataBlock == NULL) {
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
@ -149,7 +149,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
return code;
}
pRsp->blockNum++;
pRsp->common.blockNum++;
totalRows += pDataBlock->info.rows;
if (totalRows >= tmqRowSize) {
break;
@ -158,8 +158,8 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
}
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
qStreamExtractOffset(task, &pRsp->rspOffset);
pHandle->consumerId, vgId, pRsp->common.blockNum, totalRows);
qStreamExtractOffset(task, &pRsp->common.rspOffset);
return 0;
}
@ -186,7 +186,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
if (pRsp->withTbName) {
if (pRsp->common.withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
@ -194,21 +194,21 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
}
} else {
char* tbName = taosStrdup(qExtractTbnameFromTask(task));
taosArrayPush(pRsp->blockTbName, &tbName);
taosArrayPush(pRsp->common.blockTbName, &tbName);
}
}
if (pRsp->withSchema) {
if (pRsp->common.withSchema) {
if (pOffset->type == TMQ_OFFSET__LOG) {
tqAddBlockSchemaToRsp(pExec, pRsp);
} else {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
taosArrayPush(pRsp->blockSchema, &pSW);
taosArrayPush(pRsp->common.blockSchema, &pSW);
}
}
tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++;
pRsp->common.blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) {
continue;
} else {
@ -234,13 +234,13 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
qStreamExtractOffset(task, &pRsp->rspOffset);
qStreamExtractOffset(task, &pRsp->common.rspOffset);
break;
}
if (pRsp->blockNum > 0) {
if (pRsp->common.blockNum > 0) {
tqDebug("tmqsnap task exec exited, get data");
qStreamExtractOffset(task, &pRsp->rspOffset);
qStreamExtractOffset(task, &pRsp->common.rspOffset);
break;
}
}
@ -268,7 +268,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
goto loop_table;
}
if (pRsp->withTbName) {
if (pRsp->common.withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
goto loop_table;
@ -312,8 +312,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
taosArrayPush(pRsp->common.blockSchema, &pSW);
pRsp->common.blockNum++;
}
continue;
loop_table:
@ -336,7 +336,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
goto loop_db;
}
if (pRsp->withTbName) {
if (pRsp->common.withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
goto loop_db;
@ -380,8 +380,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
taosArrayPush(pRsp->common.blockSchema, &pSW);
pRsp->common.blockNum++;
}
continue;
loop_db:

View File

@ -18,7 +18,7 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) {
tOffsetCopy(&pRsp->reqOffset, &pOffset);
tOffsetCopy(&pRsp->rspOffset, &pOffset);
@ -39,7 +39,7 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
static int32_t tqInitTaosxRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) {
tOffsetCopy(&pRsp->reqOffset, &pOffset);
tOffsetCopy(&pRsp->rspOffset, &pOffset);
@ -110,9 +110,9 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
SMqDataRsp dataRsp = {0};
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
tqInitDataRsp(&dataRsp, *pOffsetVal);
tqInitDataRsp(&dataRsp.common, *pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
pHandle->subKey, vgId, dataRsp.common.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tDeleteMqDataRsp(&dataRsp);
@ -137,7 +137,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, *pOffset);
tqInitDataRsp(&dataRsp.common, *pOffset);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
@ -146,11 +146,11 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.common.blockNum == 0) {
// lock
taosWLockLatch(&pTq->lock);
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data
if (dataRsp.common.rspOffset.version > ver) { // check if there are data again to avoid lost data
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
goto end;
@ -158,15 +158,15 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}
tOffsetCopy(&dataRsp.reqOffset, pOffset); // reqOffset represents the current date offset, may be changed if wal not exists
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tOffsetCopy(&dataRsp.common.reqOffset, pOffset); // reqOffset represents the current date offset, may be changed if wal not exists
code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64
" code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
consumerId, pHandle->subKey, vgId, dataRsp.common.blockNum, buf, pRequest->reqId, code);
tDeleteMqDataRsp(&dataRsp);
return code;
}
@ -194,7 +194,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
int32_t vgId = TD_VID(pTq->pVnode);
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, *offset);
tqInitTaosxRsp(&taosxRsp.common, *offset);
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
@ -214,13 +214,13 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.common.blockNum, taosxRsp.common.rspOffset.type,
taosxRsp.common.rspOffset.uid, taosxRsp.common.rspOffset.ts);
if (taosxRsp.common.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
tOffsetCopy(offset, &taosxRsp.rspOffset);
tOffsetCopy(offset, &taosxRsp.common.rspOffset);
}
}
@ -235,9 +235,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
ASSERT(savedEpoch <= pRequest->epoch);
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
@ -249,9 +249,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
// process meta
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
@ -292,9 +292,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer + 1);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
@ -386,7 +386,7 @@ int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPoll
return 0;
}
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever) {
int32_t len = 0;
int32_t code = 0;
@ -394,7 +394,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
}
if (code < 0) {
@ -418,7 +418,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
tEncodeSTaosxRsp(&encoder, pRsp);
}
tEncoderClear(&encoder);