diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 793ffa17bc..397118411c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1017,7 +1017,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) #define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017) #define TSDB_CODE_TMQ_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x4018) -#define TSDB_CODE_TMQ_DUPLICATE_UID TAOS_DEF_ERROR_CODE(0, 0x4019) +#define TSDB_CODE_TMQ_RAW_DATA_SPLIT TAOS_DEF_ERROR_CODE(0, 0x4019) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 1e9533ef69..4b2360ea27 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1745,7 +1745,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->resetOffsetCfg = conf->resetOffset; pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; - pTmq->rawData = 1; + pTmq->rawData = conf->rawData; pTmq->enableBatchMeta = conf->enableBatchMeta; tstrncpy(pTmq->user, user, TSDB_USER_LEN); if (taosGetFqdn(pTmq->fqdn) != 0) { diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index e5926dcb24..fbf721993b 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -11877,32 +11877,17 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) { goto _exit; } - bool hasCreateTable = false; for (uint64_t i = 0; i < nSubmitTbData; i++) { SSubmitTbData* data = taosArrayReserve(pReq->aSubmitTbData, 1); - if (tDecodeSSubmitTbData(pCoder, data, - rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) { + if (tDecodeSSubmitTbData(pCoder, data, rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; } - if (data->flags & SUBMIT_REQ_AUTO_CREATE_TABLE){ - hasCreateTable = true; - } - } - if (rawList != NULL && hasCreateTable){ - taosArrayClear(rawList); } tEndDecode(pCoder); _exit: - if (code) { - if (pReq->aSubmitTbData) { - // todo - taosArrayDestroy(pReq->aSubmitTbData); - pReq->aSubmitTbData = NULL; - } - } return code; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 2c06ff1f07..50d6eb0090 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -90,6 +90,7 @@ typedef struct { // for replay SSDataBlock* block; int64_t blockTime; + SHashObj* tableCreateTimeHash; // for process create table msg in submit if fetch raw data } STqHandle; struct STQ { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 5a61c1c124..402ab8da7e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -167,7 +167,8 @@ int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tb int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); -SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t* createTime); +SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); +int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock); int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index c19a2e3ce2..3dcfa1e4fd 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -371,7 +371,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) { return 0; } -SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime) { +SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { void *pData = NULL; int nData = 0; int64_t version; @@ -407,9 +407,6 @@ _query: } } else if (me.type == TSDB_CHILD_TABLE) { uid = me.ctbEntry.suid; - if (createTime != NULL){ - *createTime = me.ctbEntry.btime; - } tDecoderClear(&dc); goto _query; } else { @@ -448,6 +445,46 @@ _err: return NULL; } +int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock) { + void *pData = NULL; + int nData = 0; + int64_t version = 0; + SDecoder dc = {0}; + int64_t createTime = 0; + if (lock) { + metaRLock(pMeta); + } + + if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) { + goto _exit; + } + + version = ((SUidIdxVal *)pData)[0].version; + + if (tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData) != 0) { + goto _exit; + } + + SMetaEntry me = {0}; + tDecoderInit(&dc, pData, nData); + int32_t code = metaDecodeEntry(&dc, &me); + if (code) { + tDecoderClear(&dc); + goto _exit; + } + if (me.type == TSDB_CHILD_TABLE) { + createTime = me.ctbEntry.btime; + } + tDecoderClear(&dc); + + _exit: + if (lock) { + metaULock(pMeta); + } + tdbFree(pData); + return createTime; +} + SMCtbCursor *metaOpenCtbCursor(void *pVnode, tb_uid_t uid, int lock) { SMeta *pMeta = ((SVnode *)pVnode)->pMeta; SMCtbCursor *pCtbCur = NULL; @@ -620,7 +657,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { STSchema *pTSchema = NULL; SSchemaWrapper *pSW = NULL; - pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL); + pSW = metaGetTableSchema(pMeta, uid, sver, lock); if (!pSW) return NULL; pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 7374b9ceb5..b227653e5e 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -548,7 +548,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) { void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) { bool ret = false; - SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL); + SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1); if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index daf4fa65f3..a015f13091 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -57,6 +57,7 @@ void tqDestroyTqHandle(void* data) { if (pData->pRef) { walCloseRef(pData->pRef->pWal, pData->pRef->refId); } + taosHashCleanup(pData->tableCreateTimeHash); } static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 580828b089..b84dcc4703 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -343,6 +343,7 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) { tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL); taosArrayDestroy(tbUidList); } + handle->tableCreateTimeHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); END: return code; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 792763862b..f01a35d58f 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -283,7 +283,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) { return; } bool ret = false; - SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL); + SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1); if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } @@ -742,7 +742,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* (pReader->cachedSchemaVer != sversion)) { tDeleteSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table", @@ -1102,9 +1102,9 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block int64_t uid = pSubmitTbData->uid; pReader->lastBlkUid = uid; - int64_t createTime = INT64_MAX; + int64_t createTime = 0; tDeleteSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &createTime); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); @@ -1112,14 +1112,12 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; } - if (fetchMeta != WITH_DATA && - pSubmitTbData->pCreateTbReq != NULL && - pSubmitTbData->ctimeMs - createTime <= 0) { // judge if table is already created to avoid sending crateTbReq + if (pSubmitTbData->pCreateTbReq != NULL) { int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq); if (code != 0) { return code; } - } else if (rawList != NULL && taosArrayGetSize(rawList) > 0) { + } else if (rawList != NULL) { if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){ return terrno; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 265e165038..faf1954667 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -336,26 +336,9 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int TSDB_CHECK_CODE(code, lino, END); bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0; TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); + + int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks); - if (rawList != NULL && taosArrayGetSize(pBlocks) == 0){ - if (taosHashGet(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES) != NULL) { - tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 " is already exists", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid); - terrno = TSDB_CODE_TMQ_DUPLICATE_UID; - goto END; - } else { - code = taosHashPut(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES, &pExec->pTqReader->lastBlkUid, LONG_BYTES); - TSDB_CHECK_CODE(code, lino, END); - } - } - - // this submit data is metadata and previous data is data - if (rawList != NULL && *totalRows > 0 && pSubmitTbData->pCreateTbReq != NULL && taosArrayGetSize(pBlocks) > 0 && pRsp->createTableNum <= 1){ - tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid); - terrno = TSDB_CODE_TMQ_DUPLICATE_UID; - pRsp->createTableNum = 0; - goto END; - } - if (pRsp->withTbName) { int64_t uid = pExec->pTqReader->lastBlkUid; code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum); @@ -405,6 +388,52 @@ END: taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper); } +static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){ + STqExecHandle* pExec = &pHandle->execHandle; + STqReader* pReader = pExec->pTqReader; + int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); + for (int32_t i = 0; i < blockSz; i++){ + SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i); + if (pSubmitTbData== NULL){ + tqReaderClearSubmitMsg(pReader); + taosArrayDestroy(*rawList); + *rawList = NULL; + return; + } + if (pSubmitTbData->pCreateTbReq == NULL){ + continue; + } + int64_t createTime = 0; + int64_t uid = pSubmitTbData->uid; + if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) { + tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid); + terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT; + return; + } else { + if (taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES) != 0){ + tqError("failed to add table create time to hash, uid:%"PRId64, uid); + } + } + + int64_t *cTime = (int64_t*)taosHashGet(pHandle->tableCreateTimeHash, &uid, LONG_BYTES); + if (cTime != NULL){ + createTime = *cTime; + } else{ + createTime = metaGetTableCreateTime(pReader->pVnodeMeta, uid, 1); + if (taosHashPut(pHandle->tableCreateTimeHash, &uid, LONG_BYTES, &createTime, LONG_BYTES) != 0){ + tqError("failed to add table create time to hash, uid:%"PRId64, uid); + } + } + if (pHandle->fetchMeta == WITH_DATA || pSubmitTbData->ctimeMs > createTime){ + tDestroySVCreateTbReq(pSubmitTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE); + pSubmitTbData->pCreateTbReq = NULL; + } else{ + taosArrayDestroy(*rawList); + *rawList = NULL; + } + } +} + int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) { int32_t code = 0; int32_t lino = 0; @@ -417,26 +446,32 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData } code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList); TSDB_CHECK_CODE(code, lino, END); + preProcessSubmitMsg(pHandle, pRequest, &rawList); + + // data could not contains same uid data in rawdata mode + if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){ + goto END; + } + + // this submit data is metadata and previous data is rawdata + if (pRequest->rawData != 0 && *totalRows > 0 && rawList == NULL){ + tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid); + terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT; + goto END; + } if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { while (tqNextBlockImpl(pReader, NULL)) { tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList); - if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){ - tqReaderClearSubmitMsg(pReader); - goto END; - } } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList); - if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){ - tqReaderClearSubmitMsg(pReader); - goto END; - } } } END: + tqReaderClearSubmitMsg(pReader); taosArrayDestroy(rawList); if (code != 0){ tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code)); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5dd8de9429..b0f5515339 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -381,11 +381,13 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, (pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES || taosxRsp.createTableNum > 0 || taosArrayGetSize(taosxRsp.blockData) > tmqRowSize || - terrno == TSDB_CODE_TMQ_DUPLICATE_UID))) { - tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_DUPLICATE_UID ? fetchVer : fetchVer + 1); + terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) { + tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d", + (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno); + tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId); - if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){terrno = 0;} + if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){terrno = 0;} goto END; } else { fetchVer++; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 2b07de916c..58a57c1949 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -708,7 +708,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { } int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { - SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL); + SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0); if (pSW) { *num = pSW->nCols; tDeleteSchemaWrapper(pSW); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3fb7b597ce..e1bef30d32 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -314,25 +314,25 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int uint64_t nColData; if (tDecodeU64v(pCoder, &nColData) < 0) { code = TSDB_CODE_INVALID_MSG; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } SColData colData = {0}; code = tDecodeColData(version, pCoder, &colData); if (code) { code = TSDB_CODE_INVALID_MSG; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } if (colData.flag != HAS_VALUE) { code = TSDB_CODE_INVALID_MSG; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t iRow = 0; iRow < colData.nVal; iRow++) { if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) { code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } @@ -340,14 +340,14 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int code = tDecodeColData(version, pCoder, &colData); if (code) { code = TSDB_CODE_INVALID_MSG; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } } else { uint64_t nRow; if (tDecodeU64v(pCoder, &nRow) < 0) { code = TSDB_CODE_INVALID_MSG; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t iRow = 0; iRow < nRow; ++iRow) { @@ -356,7 +356,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int if (pRow->ts < minKey || pRow->ts > maxKey) { code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } } @@ -369,6 +369,9 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int tEndDecode(pCoder); _exit: + if (code) { + vError("vgId:%d, %s:%d failed to vnodePreProcessSubmitTbData submit request since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 0d70292501..a77964bb4f 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -1100,7 +1100,7 @@ int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* return terrno; } - qDebug("add raw data to vgId:%d", pTableMeta->vgId); + qDebug("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data); return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c05423e51c..f34b00bec5 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -860,7 +860,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_DATA, "Invalid data use here") -TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_DUPLICATE_UID, "Duplicate uid") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_RAW_DATA_SPLIT, "Split submit data for rawdata") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")