fix:[TS-5776]add raw type from consumer

This commit is contained in:
wangmm0220 2025-01-24 13:44:56 +08:00
parent 3fb2ec432e
commit 7fad4bceb0
14 changed files with 145 additions and 89 deletions

View File

@ -125,7 +125,7 @@ enum {
};
static char* tmqMsgTypeStr[] = {
"data", "meta", "ask ep", "meta data", "wal info", "batch meta"
"data", "meta", "ask ep", "meta data", "wal info", "batch meta", "raw data"
};
enum {

View File

@ -4182,6 +4182,7 @@ typedef struct {
int8_t sourceExcluded;
int8_t rawData;
int8_t enableBatchMeta;
SHashObj *uidHash; // to find if uid is duplicated
} SMqPollReq;
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
@ -4255,13 +4256,19 @@ typedef struct {
SArray* createTableLen;
SArray* createTableReq;
};
struct{
int32_t len;
void* rawData;
};
};
} SMqDataRsp;
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
int32_t tDecodeMqRawDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);
void tDeleteMqRawDataRsp(SMqDataRsp* pRsp);
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);

View File

@ -1017,6 +1017,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017)
#define TSDB_CODE_TMQ_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x4018)
#define TSDB_CODE_TMQ_DUPLICATE_UID TAOS_DEF_ERROR_CODE(0, 0x4019)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -253,10 +253,6 @@ typedef struct {
SMqDataRsp dataRsp;
SMqMetaRsp metaRsp;
SMqBatchMetaRsp batchMetaRsp;
struct{
int32_t len;
void* rawData;
};
};
} SMqRspObj;

View File

@ -555,8 +555,7 @@ void taos_free_result(TAOS_RES *res) {
} else if (TD_RES_TMQ_BATCH_META(res)) {
tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
} else if (TD_RES_TMQ_RAW(res)) {
taosMemoryFree(pRsp->rawData);
doFreeReqResultInfo(&pRsp->resInfo);
tDeleteMqRawDataRsp(&pRsp->dataRsp);
}
taosMemoryFree(pRsp);
}

View File

@ -2319,8 +2319,8 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen)
SRequestConnInfo conn = {0};
RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
SHashObj* pVgHash = NULL;
SHashObj* pNameHash = NULL;
@ -2329,9 +2329,9 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen)
int retry = 0;
while (1) {
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pVgroupHash);
pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
RAW_NULL_CHECK(pStmt->pVgDataBlocks);
@ -2384,7 +2384,7 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen)
}
end:
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteSTaosxRsp(&rspObj.dataRsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
@ -2567,6 +2567,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
*raw = (tmq_raw_data){0};
SMqRspObj* rspObj = ((SMqRspObj*)res);
if (TD_RES_TMQ_META(res)) {
raw->raw = rspObj->metaRsp.metaRsp;
@ -2595,8 +2596,9 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw_type = rspObj->resType;
uDebug("tmq get raw batch meta:%p", raw);
} else if (TD_RES_TMQ_RAW(res)) {
raw->raw = rspObj->rawData;
raw->raw_len = rspObj->len;
raw->raw = rspObj->dataRsp.rawData;
rspObj->dataRsp.rawData = NULL;
raw->raw_len = rspObj->dataRsp.len;
raw->raw_type = rspObj->resType;
uDebug("tmq get raw raw:%p", raw);
} else {
@ -2609,9 +2611,10 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
void tmq_free_raw(tmq_raw_data raw) {
uDebug("tmq free raw data type:%d", raw.raw_type);
if (raw.raw_type == RES_TYPE__TMQ ||
raw.raw_type == RES_TYPE__TMQ_RAWDATA ||
raw.raw_type == RES_TYPE__TMQ_METADATA) {
taosMemoryFree(raw.raw);
} else if(raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL){
taosMemoryFree(raw.raw - sizeof(SMqRspHead));
}
(void)memset(terrMsg, 0, ERR_MSG_LEN);
}

View File

@ -197,10 +197,6 @@ typedef struct {
SMqDataRsp dataRsp;
SMqMetaRsp metaRsp;
SMqBatchMetaRsp batchMetaRsp;
struct{
int32_t len;
void* rawData;
};
};
} SMqPollRspWrapper;
@ -290,7 +286,7 @@ typedef struct {
} SVgroupSaveInfo;
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once
volatile int32_t tmqInitRes = 0; // initialize rsp code
volatile int32_t tmqInitRes = -1; // initialize rsp code
static SMqMgmt tmqMgmt = {0};
tmq_conf_t* tmq_conf_new() {
@ -1135,9 +1131,7 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;
taosMemoryFreeClear(pRsp->pEpset);
taosMemoryFreeClear(pRsp->rawData);
DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp)
}
}
@ -2051,7 +2045,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
goto END;
}
rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d,QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, requestId);
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s),QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
@ -2061,8 +2055,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
pRspWrapper->pollRsp.len = pMsg->len;
pRspWrapper->pollRsp.rawData = pMsg->pData;
PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.len = pMsg->len - sizeof(SMqRspHead);
pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead));
pMsg->pData = NULL;
} else { // invalid rspType
tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
@ -2085,8 +2080,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosFreeQitem(pRspWrapper);
tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
} else {
tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64,
tmq ? tmq->consumerId : 0, rspType, vgId, taosQueueItemSize(tmq->mqueue), requestId);
tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d,QID:0x%" PRIx64,
tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
}
}
@ -2453,8 +2448,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
if (pollRspWrapper->dataRsp.blockNum == 0 &&
pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
if (pollRspWrapper->dataRsp.blockNum == 0) {
tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
", total:%" PRId64 ",QID:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
@ -2471,9 +2465,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
tmq->totalRows += numOfRows;
} else {
pRspObj->rawData = pollRspWrapper->rawData;
pRspObj->len = pollRspWrapper->len;
}
pVg->emptyBlockReceiveTs = 0;
if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
@ -2773,7 +2764,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
if (res == NULL) {
return NULL;
}
if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
SMqDataRsp* data = &pRspObj->dataRsp;
if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||

View File

@ -9270,7 +9270,13 @@ _exit:
return code;
}
void tDestroySMqPollReq(SMqPollReq *pReq) { tOffsetDestroy(&pReq->reqOffset); }
void tDestroySMqPollReq(SMqPollReq *pReq) {
tOffsetDestroy(&pReq->reqOffset);
if (pReq->uidHash != NULL) {
taosHashCleanup(pReq->uidHash);
pReq->uidHash = NULL;
}
}
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t code = 0;
int32_t lino;
@ -11410,6 +11416,9 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
if (taosArrayPush(pRsp->blockData, &data) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
// for (int m= 0; m < 56; m++){
// printf("decode data[%d] = %d\n", m, *((int8_t *)data+18+m));
// }
int32_t len = bLen;
if (taosArrayPush(pRsp->blockDataLen, &len) == NULL) {
TAOS_CHECK_EXIT(terrno);
@ -11455,6 +11464,17 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return 0;
}
int32_t tDecodeMqRawDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset));
TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->blockNum));
_exit:
return code;
}
static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
@ -11528,6 +11548,14 @@ void tDeleteSTaosxRsp(SMqDataRsp *pRsp) {
pRsp->createTableReq = NULL;
}
void tDeleteMqRawDataRsp(SMqDataRsp *pRsp) {
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
if (pRsp->rawData != NULL){
taosMemoryFree(pRsp->rawData - sizeof(SMqRspHead));
}
}
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pReq->tbname));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->startTs));
@ -11688,14 +11716,13 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
uint8_t version;
uint8_t* dataAfterCreate = NULL;
uint8_t* dataStart = pCoder->data;
uint32_t posStart = pCoder->pos;
uint8_t* dataStart = pCoder->data + pCoder->pos;
uint32_t posAfterCreate = 0;
TAOS_CHECK_EXIT(tStartDecode(pCoder));
uint32_t pos = pCoder->pos;
TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &flags));
uint32_t flagsLen = pCoder->pos - posStart;
uint32_t flagsLen = pCoder->pos - pos;
pSubmitTbData->flags = flags & 0xff;
version = (flags >> 8) & 0xff;
@ -11716,26 +11743,6 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->uid));
TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &pSubmitTbData->sver));
if (rawData != NULL){ // no need to decode data
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
uint64_t nColData = 0;
TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData));
for (int32_t i = 0; i < nColData; ++i) {
SColData pColData = {0};
TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, &pColData));
}
} else {
uint64_t nRow = 0;
TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow));
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
SRow *ppRow = NULL;
TAOS_CHECK_EXIT(tDecodeRow(pCoder, &ppRow));
}
}
} else {
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
uint64_t nColData = 0;
@ -11767,14 +11774,12 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
TAOS_CHECK_EXIT(tDecodeRow(pCoder, ppRow));
}
}
}
pSubmitTbData->ctimeMs = 0;
if (!tDecodeIsEnd(pCoder)) {
TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->ctimeMs));
}
tEndDecode(pCoder);
if (rawData != NULL){
if (dataAfterCreate != NULL){
TAOS_MEMCPY(dataAfterCreate - INT_BYTES - flagsLen, dataStart, INT_BYTES + flagsLen);
@ -11784,7 +11789,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
*(void**)rawData = dataStart;
}
}
tEndDecode(pCoder);
_exit:
return code;
@ -11839,12 +11844,20 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) {
goto _exit;
}
bool hasCreateTable = false;
for (uint64_t i = 0; i < nSubmitTbData; i++) {
if (tDecodeSSubmitTbData(pCoder, taosArrayReserve(pReq->aSubmitTbData, 1),
SSubmitTbData* data = taosArrayReserve(pReq->aSubmitTbData, 1);
if (tDecodeSSubmitTbData(pCoder, data,
rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (data->flags & SUBMIT_REQ_AUTO_CREATE_TABLE){
hasCreateTable = true;
}
}
if (rawList != NULL && hasCreateTable){
taosArrayClear(rawList);
}
tEndDecode(pCoder);

View File

@ -397,7 +397,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
terrno = TSDB_CODE_INVALID_MSG;
goto END;
}
if (req.rawData == 1){
req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (req.uidHash == NULL) {
tqError("tq poll rawData taosHashInit failed");
code = terrno;
goto END;
}
}
int64_t consumerId = req.consumerId;
int32_t reqEpoch = req.epoch;
STqOffsetVal reqOffset = req.reqOffset;

View File

@ -581,17 +581,17 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
uid = pSubmitTbData->uid;
void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_NULL(ret, code, lino, END, true);
tqDebug("iterator data block in hash continue, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
END:
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
tqDebug("%s:%d get data:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
@ -1117,7 +1117,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
if (code != 0) {
return code;
}
} else if (rawList != NULL){
} else if (rawList != NULL && taosArrayGetSize(rawList) > 0) {
if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){
return terrno;
}

View File

@ -31,6 +31,9 @@ static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t p
memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
// for (int m= 0; m < 56; m++){
// printf("add data[%d] = %d\n", m, *((int8_t *)rawData+m));
// }
tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
END:
@ -337,6 +340,26 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
if (rawList != NULL && taosArrayGetSize(pBlocks) == 0){
if (taosHashGet(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES) != NULL) {
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 " is already exists", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
terrno = TSDB_CODE_TMQ_DUPLICATE_UID;
pReader->nextBlk = 0;
goto END;
} else {
code = taosHashPut(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES, &pExec->pTqReader->lastBlkUid, LONG_BYTES);
TSDB_CHECK_CODE(code, lino, END);
}
}
// this submit data is metadata and previous data is data
if (rawList != NULL && *totalRows > 0 && pSubmitTbData->pCreateTbReq != NULL && taosArrayGetSize(pBlocks) > 0){
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
terrno = TSDB_CODE_TMQ_DUPLICATE_UID;
pRsp->createTableNum = 0;
pReader->nextBlk = 0;
goto END;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
@ -398,6 +421,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
SArray *rawList = NULL;
if (pRequest->rawData){
rawList = taosArrayInit(0, POINTER_BYTES);
TSDB_CHECK_NULL(rawList, code, lino, END, terrno);
}
code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList);
TSDB_CHECK_CODE(code, lino, END);
@ -405,10 +429,16 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
while (tqNextBlockImpl(pReader, NULL)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){
goto END;
}
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){
goto END;
}
}
}

View File

@ -378,10 +378,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) ||
(taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) ||
(pRequest->rawData != 0 && totalRows >= TQ_POLL_MAX_BYTES)) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
(pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES || taosxRsp.createTableNum > 0 || terrno == TSDB_CODE_TMQ_DUPLICATE_UID))) {
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_DUPLICATE_UID ? fetchVer : fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){terrno = 0;}
goto END;
} else {
fetchVer++;

View File

@ -723,7 +723,7 @@ int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFun
*/
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
if (NULL == pVgroupHash || NULL == pVgroupList) {
taosHashCleanup(pVgroupHash);
@ -859,6 +859,13 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
}
if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
SDecoder dc = {0};
tDecoderInit(&dc, POINTER_SHIFT(dst->pData, sizeof(SSubmitReq2Msg)), dst->size - sizeof(SSubmitReq2Msg));
if (tDecodeSubmitReq(&dc, pSubmitReq, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
}
tDecoderClear(&dc);
}
if (TSDB_CODE_SUCCESS == code) {
code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);

View File

@ -860,6 +860,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_DATA, "Invalid data use here")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_DUPLICATE_UID, "Duplicate uid")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")