diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 08fae0952c..ecfa17ffb3 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -125,7 +125,7 @@ enum { }; static char* tmqMsgTypeStr[] = { - "data", "meta", "ask ep", "meta data", "wal info", "batch meta" + "data", "meta", "ask ep", "meta data", "wal info", "batch meta", "raw data" }; enum { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d14044a8bf..c9b9bd417c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4182,6 +4182,7 @@ typedef struct { int8_t sourceExcluded; int8_t rawData; int8_t enableBatchMeta; + SHashObj *uidHash; // to find if uid is duplicated } SMqPollReq; int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); @@ -4255,13 +4256,19 @@ typedef struct { SArray* createTableLen; SArray* createTableReq; }; + struct{ + int32_t len; + void* rawData; + }; }; } SMqDataRsp; int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj); int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); +int32_t tDecodeMqRawDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); void tDeleteMqDataRsp(SMqDataRsp* pRsp); +void tDeleteMqRawDataRsp(SMqDataRsp* pRsp); int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8488e2800a..793ffa17bc 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1017,6 +1017,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) #define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017) #define TSDB_CODE_TMQ_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x4018) +#define TSDB_CODE_TMQ_DUPLICATE_UID TAOS_DEF_ERROR_CODE(0, 0x4019) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index ae88883739..c9e0410341 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -253,10 +253,6 @@ typedef struct { SMqDataRsp dataRsp; SMqMetaRsp metaRsp; SMqBatchMetaRsp batchMetaRsp; - struct{ - int32_t len; - void* rawData; - }; }; } SMqRspObj; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index eb1bffeee3..9f0706bdaf 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -555,8 +555,7 @@ void taos_free_result(TAOS_RES *res) { } else if (TD_RES_TMQ_BATCH_META(res)) { tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp); } else if (TD_RES_TMQ_RAW(res)) { - taosMemoryFree(pRsp->rawData); - doFreeReqResultInfo(&pRsp->resInfo); + tDeleteMqRawDataRsp(&pRsp->dataRsp); } taosMemoryFree(pRsp); } diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 689adf51e7..f418b7e94c 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -2319,8 +2319,8 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) SRequestConnInfo conn = {0}; RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn)); - uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); - RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj)); + uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); + RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj)); SHashObj* pVgHash = NULL; SHashObj* pNameHash = NULL; @@ -2329,9 +2329,9 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) int retry = 0; while (1) { RAW_RETURN_CHECK(smlInitHandle(&pQuery)); - uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum); + uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum); SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot; - pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); RAW_NULL_CHECK(pVgroupHash); pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES); RAW_NULL_CHECK(pStmt->pVgDataBlocks); @@ -2384,7 +2384,7 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) } end: - uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); + uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); tDeleteSTaosxRsp(&rspObj.dataRsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); @@ -2567,6 +2567,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { uError("invalid parameter in %s", __func__); return TSDB_CODE_INVALID_PARA; } + *raw = (tmq_raw_data){0}; SMqRspObj* rspObj = ((SMqRspObj*)res); if (TD_RES_TMQ_META(res)) { raw->raw = rspObj->metaRsp.metaRsp; @@ -2595,8 +2596,9 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { raw->raw_type = rspObj->resType; uDebug("tmq get raw batch meta:%p", raw); } else if (TD_RES_TMQ_RAW(res)) { - raw->raw = rspObj->rawData; - raw->raw_len = rspObj->len; + raw->raw = rspObj->dataRsp.rawData; + rspObj->dataRsp.rawData = NULL; + raw->raw_len = rspObj->dataRsp.len; raw->raw_type = rspObj->resType; uDebug("tmq get raw raw:%p", raw); } else { @@ -2609,9 +2611,10 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { void tmq_free_raw(tmq_raw_data raw) { uDebug("tmq free raw data type:%d", raw.raw_type); if (raw.raw_type == RES_TYPE__TMQ || - raw.raw_type == RES_TYPE__TMQ_RAWDATA || raw.raw_type == RES_TYPE__TMQ_METADATA) { taosMemoryFree(raw.raw); + } else if(raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL){ + taosMemoryFree(raw.raw - sizeof(SMqRspHead)); } (void)memset(terrMsg, 0, ERR_MSG_LEN); } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4d5ecdc320..e87ef4c00d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -197,10 +197,6 @@ typedef struct { SMqDataRsp dataRsp; SMqMetaRsp metaRsp; SMqBatchMetaRsp batchMetaRsp; - struct{ - int32_t len; - void* rawData; - }; }; } SMqPollRspWrapper; @@ -290,7 +286,7 @@ typedef struct { } SVgroupSaveInfo; static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once -volatile int32_t tmqInitRes = 0; // initialize rsp code +volatile int32_t tmqInitRes = -1; // initialize rsp code static SMqMgmt tmqMgmt = {0}; tmq_conf_t* tmq_conf_new() { @@ -1135,9 +1131,7 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp) } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { - SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp; - taosMemoryFreeClear(pRsp->pEpset); - taosMemoryFreeClear(pRsp->rawData); + DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp) } } @@ -2051,7 +2045,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { goto END; } rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; - tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d,QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, requestId); + tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s),QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId); if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp) } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { @@ -2061,8 +2055,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp) } else if (rspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { - pRspWrapper->pollRsp.len = pMsg->len; - pRspWrapper->pollRsp.rawData = pMsg->pData; + PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp) + pRspWrapper->pollRsp.dataRsp.len = pMsg->len - sizeof(SMqRspHead); + pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)); pMsg->pData = NULL; } else { // invalid rspType tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); @@ -2085,8 +2080,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosFreeQitem(pRspWrapper); tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); } else { - tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64, - tmq ? tmq->consumerId : 0, rspType, vgId, taosQueueItemSize(tmq->mqueue), requestId); + tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d,QID:0x%" PRIx64, + tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId); } } @@ -2453,8 +2448,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset); - if (pollRspWrapper->dataRsp.blockNum == 0 && - pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { + if (pollRspWrapper->dataRsp.blockNum == 0) { tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); @@ -2471,9 +2465,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){ tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj); tmq->totalRows += numOfRows; - } else { - pRspObj->rawData = pollRspWrapper->rawData; - pRspObj->len = pollRspWrapper->len; } pVg->emptyBlockReceiveTs = 0; if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { @@ -2773,7 +2764,7 @@ const char* tmq_get_table_name(TAOS_RES* res) { if (res == NULL) { return NULL; } - if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) { + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) { SMqRspObj* pRspObj = (SMqRspObj*)res; SMqDataRsp* data = &pRspObj->dataRsp; if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 || diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 1b6c7add3f..cb8c98bde4 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -9270,7 +9270,13 @@ _exit: return code; } -void tDestroySMqPollReq(SMqPollReq *pReq) { tOffsetDestroy(&pReq->reqOffset); } +void tDestroySMqPollReq(SMqPollReq *pReq) { + tOffsetDestroy(&pReq->reqOffset); + if (pReq->uidHash != NULL) { + taosHashCleanup(pReq->uidHash); + pReq->uidHash = NULL; + } +} int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { int32_t code = 0; int32_t lino; @@ -11410,6 +11416,9 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { if (taosArrayPush(pRsp->blockData, &data) == NULL) { TAOS_CHECK_EXIT(terrno); } +// for (int m= 0; m < 56; m++){ +// printf("decode data[%d] = %d\n", m, *((int8_t *)data+18+m)); +// } int32_t len = bLen; if (taosArrayPush(pRsp->blockDataLen, &len) == NULL) { TAOS_CHECK_EXIT(terrno); @@ -11455,6 +11464,17 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { return 0; } +int32_t tDecodeMqRawDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset)); + TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->blockNum)); +_exit: + return code; +} + static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) { taosArrayDestroy(pRsp->blockDataLen); pRsp->blockDataLen = NULL; @@ -11528,6 +11548,14 @@ void tDeleteSTaosxRsp(SMqDataRsp *pRsp) { pRsp->createTableReq = NULL; } +void tDeleteMqRawDataRsp(SMqDataRsp *pRsp) { + tOffsetDestroy(&pRsp->reqOffset); + tOffsetDestroy(&pRsp->rspOffset); + if (pRsp->rawData != NULL){ + taosMemoryFree(pRsp->rawData - sizeof(SMqRspHead)); + } +} + int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pReq->tbname)); TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->startTs)); @@ -11688,14 +11716,13 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa uint8_t version; uint8_t* dataAfterCreate = NULL; - uint8_t* dataStart = pCoder->data; - uint32_t posStart = pCoder->pos; + uint8_t* dataStart = pCoder->data + pCoder->pos; uint32_t posAfterCreate = 0; TAOS_CHECK_EXIT(tStartDecode(pCoder)); uint32_t pos = pCoder->pos; TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &flags)); - uint32_t flagsLen = pCoder->pos - posStart; + uint32_t flagsLen = pCoder->pos - pos; pSubmitTbData->flags = flags & 0xff; version = (flags >> 8) & 0xff; @@ -11716,56 +11743,35 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->uid)); TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &pSubmitTbData->sver)); - if (rawData != NULL){ // no need to decode data - if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { - uint64_t nColData = 0; + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + uint64_t nColData = 0; - TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData)); + TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData)); - for (int32_t i = 0; i < nColData; ++i) { - SColData pColData = {0}; - TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, &pColData)); - } - } else { - uint64_t nRow = 0; - TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow)); + pSubmitTbData->aCol = taosArrayInit(nColData, sizeof(SColData)); + if (pSubmitTbData->aCol == NULL) { + TAOS_CHECK_EXIT(terrno); + } - for (int32_t iRow = 0; iRow < nRow; ++iRow) { - SRow *ppRow = NULL; - TAOS_CHECK_EXIT(tDecodeRow(pCoder, &ppRow)); - } + for (int32_t i = 0; i < nColData; ++i) { + TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1))); } } else { - if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { - uint64_t nColData = 0; + uint64_t nRow = 0; + TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow)); - TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData)); + pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *)); + if (pSubmitTbData->aRowP == NULL) { + TAOS_CHECK_EXIT(terrno); + } - pSubmitTbData->aCol = taosArrayInit(nColData, sizeof(SColData)); - if (pSubmitTbData->aCol == NULL) { + for (int32_t iRow = 0; iRow < nRow; ++iRow) { + SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); + if (ppRow == NULL) { TAOS_CHECK_EXIT(terrno); } - for (int32_t i = 0; i < nColData; ++i) { - TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1))); - } - } else { - uint64_t nRow = 0; - TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow)); - - pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *)); - if (pSubmitTbData->aRowP == NULL) { - TAOS_CHECK_EXIT(terrno); - } - - for (int32_t iRow = 0; iRow < nRow; ++iRow) { - SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); - if (ppRow == NULL) { - TAOS_CHECK_EXIT(terrno); - } - - TAOS_CHECK_EXIT(tDecodeRow(pCoder, ppRow)); - } + TAOS_CHECK_EXIT(tDecodeRow(pCoder, ppRow)); } } @@ -11774,7 +11780,6 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->ctimeMs)); } - tEndDecode(pCoder); if (rawData != NULL){ if (dataAfterCreate != NULL){ TAOS_MEMCPY(dataAfterCreate - INT_BYTES - flagsLen, dataStart, INT_BYTES + flagsLen); @@ -11784,7 +11789,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa *(void**)rawData = dataStart; } } - + tEndDecode(pCoder); _exit: return code; @@ -11839,12 +11844,20 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) { goto _exit; } + bool hasCreateTable = false; for (uint64_t i = 0; i < nSubmitTbData; i++) { - if (tDecodeSSubmitTbData(pCoder, taosArrayReserve(pReq->aSubmitTbData, 1), + SSubmitTbData* data = taosArrayReserve(pReq->aSubmitTbData, 1); + if (tDecodeSSubmitTbData(pCoder, data, rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; } + if (data->flags & SUBMIT_REQ_AUTO_CREATE_TABLE){ + hasCreateTable = true; + } + } + if (rawList != NULL && hasCreateTable){ + taosArrayClear(rawList); } tEndDecode(pCoder); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3bfc50fcb2..2aae3845f0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -397,7 +397,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { terrno = TSDB_CODE_INVALID_MSG; goto END; } - + if (req.rawData == 1){ + req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (req.uidHash == NULL) { + tqError("tq poll rawData taosHashInit failed"); + code = terrno; + goto END; + } + } int64_t consumerId = req.consumerId; int32_t reqEpoch = req.epoch; STqOffsetVal reqOffset = req.reqOffset; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 6452f7bb5c..1ab76019a9 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -581,17 +581,17 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) { uid = pSubmitTbData->uid; void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t)); TSDB_CHECK_NULL(ret, code, lino, END, true); - tqDebug("iterator data block in hash continue, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); + tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); pReader->nextBlk++; } tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->nextBlk = 0; pReader->msg.msgStr = NULL; - tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid); + tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid); END: - tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); + tqDebug("%s:%d get data:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); return code; } @@ -1117,7 +1117,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block if (code != 0) { return code; } - } else if (rawList != NULL){ + } else if (rawList != NULL && taosArrayGetSize(rawList) > 0) { if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){ return terrno; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 9501ca4099..0e697ccd8b 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -31,6 +31,9 @@ static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t p memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno); +// for (int m= 0; m < 56; m++){ +// printf("add data[%d] = %d\n", m, *((int8_t *)rawData+m)); +// } tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf); END: @@ -337,6 +340,26 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0; TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks); + if (rawList != NULL && taosArrayGetSize(pBlocks) == 0){ + if (taosHashGet(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES) != NULL) { + tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 " is already exists", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid); + terrno = TSDB_CODE_TMQ_DUPLICATE_UID; + pReader->nextBlk = 0; + goto END; + } else { + code = taosHashPut(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES, &pExec->pTqReader->lastBlkUid, LONG_BYTES); + TSDB_CHECK_CODE(code, lino, END); + } + } + + // this submit data is metadata and previous data is data + if (rawList != NULL && *totalRows > 0 && pSubmitTbData->pCreateTbReq != NULL && taosArrayGetSize(pBlocks) > 0){ + tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid); + terrno = TSDB_CODE_TMQ_DUPLICATE_UID; + pRsp->createTableNum = 0; + pReader->nextBlk = 0; + goto END; + } if (pRsp->withTbName) { int64_t uid = pExec->pTqReader->lastBlkUid; @@ -398,6 +421,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData SArray *rawList = NULL; if (pRequest->rawData){ rawList = taosArrayInit(0, POINTER_BYTES); + TSDB_CHECK_NULL(rawList, code, lino, END, terrno); } code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList); TSDB_CHECK_CODE(code, lino, END); @@ -405,10 +429,16 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { while (tqNextBlockImpl(pReader, NULL)) { tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList); + if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){ + goto END; + } } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList); + if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){ + goto END; + } } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d619e0534d..86a5bba712 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -378,10 +378,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) || - (pRequest->rawData != 0 && totalRows >= TQ_POLL_MAX_BYTES)) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); + (pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES || taosxRsp.createTableNum > 0 || terrno == TSDB_CODE_TMQ_DUPLICATE_UID))) { + tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_DUPLICATE_UID ? fetchVer : fetchVer + 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId); + if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){terrno = 0;} goto END; } else { fetchVer++; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 8ec9032a7c..c9d1454fbc 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -723,7 +723,7 @@ int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFun */ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) { - SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); if (NULL == pVgroupHash || NULL == pVgroupList) { taosHashCleanup(pVgroupHash); @@ -859,6 +859,13 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, } if (TSDB_CODE_SUCCESS == code) { code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size); + SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0}; + SDecoder dc = {0}; + tDecoderInit(&dc, POINTER_SHIFT(dst->pData, sizeof(SSubmitReq2Msg)), dst->size - sizeof(SSubmitReq2Msg)); + if (tDecodeSubmitReq(&dc, pSubmitReq, NULL) < 0) { + code = TSDB_CODE_INVALID_MSG; + } + tDecoderClear(&dc); } if (TSDB_CODE_SUCCESS == code) { code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 842d42a002..c05423e51c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -860,6 +860,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_DATA, "Invalid data use here") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_DUPLICATE_UID, "Duplicate uid") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")