fix:[TS-5776]add raw type from consumer

This commit is contained in:
wangmm0220 2025-02-14 15:09:32 +08:00
parent 3440006c84
commit d67de02a31
16 changed files with 137 additions and 73 deletions

View File

@ -1017,7 +1017,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017)
#define TSDB_CODE_TMQ_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x4018)
#define TSDB_CODE_TMQ_DUPLICATE_UID TAOS_DEF_ERROR_CODE(0, 0x4019)
#define TSDB_CODE_TMQ_RAW_DATA_SPLIT TAOS_DEF_ERROR_CODE(0, 0x4019)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -1745,7 +1745,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->rawData = 1;
pTmq->rawData = conf->rawData;
pTmq->enableBatchMeta = conf->enableBatchMeta;
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
if (taosGetFqdn(pTmq->fqdn) != 0) {

View File

@ -11877,32 +11877,17 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) {
goto _exit;
}
bool hasCreateTable = false;
for (uint64_t i = 0; i < nSubmitTbData; i++) {
SSubmitTbData* data = taosArrayReserve(pReq->aSubmitTbData, 1);
if (tDecodeSSubmitTbData(pCoder, data,
rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) {
if (tDecodeSSubmitTbData(pCoder, data, rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (data->flags & SUBMIT_REQ_AUTO_CREATE_TABLE){
hasCreateTable = true;
}
}
if (rawList != NULL && hasCreateTable){
taosArrayClear(rawList);
}
tEndDecode(pCoder);
_exit:
if (code) {
if (pReq->aSubmitTbData) {
// todo
taosArrayDestroy(pReq->aSubmitTbData);
pReq->aSubmitTbData = NULL;
}
}
return code;
}

View File

@ -90,6 +90,7 @@ typedef struct {
// for replay
SSDataBlock* block;
int64_t blockTime;
SHashObj* tableCreateTimeHash; // for process create table msg in submit if fetch raw data
} STqHandle;
struct STQ {

View File

@ -167,7 +167,8 @@ int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tb
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t* createTime);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock);
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);

View File

@ -371,7 +371,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
return 0;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime) {
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
void *pData = NULL;
int nData = 0;
int64_t version;
@ -407,9 +407,6 @@ _query:
}
} else if (me.type == TSDB_CHILD_TABLE) {
uid = me.ctbEntry.suid;
if (createTime != NULL){
*createTime = me.ctbEntry.btime;
}
tDecoderClear(&dc);
goto _query;
} else {
@ -448,6 +445,46 @@ _err:
return NULL;
}
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock) {
void *pData = NULL;
int nData = 0;
int64_t version = 0;
SDecoder dc = {0};
int64_t createTime = 0;
if (lock) {
metaRLock(pMeta);
}
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
goto _exit;
}
version = ((SUidIdxVal *)pData)[0].version;
if (tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData) != 0) {
goto _exit;
}
SMetaEntry me = {0};
tDecoderInit(&dc, pData, nData);
int32_t code = metaDecodeEntry(&dc, &me);
if (code) {
tDecoderClear(&dc);
goto _exit;
}
if (me.type == TSDB_CHILD_TABLE) {
createTime = me.ctbEntry.btime;
}
tDecoderClear(&dc);
_exit:
if (lock) {
metaULock(pMeta);
}
tdbFree(pData);
return createTime;
}
SMCtbCursor *metaOpenCtbCursor(void *pVnode, tb_uid_t uid, int lock) {
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMCtbCursor *pCtbCur = NULL;
@ -620,7 +657,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL;
pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL);
pSW = metaGetTableSchema(pMeta, uid, sver, lock);
if (!pSW) return NULL;
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);

View File

@ -548,7 +548,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
bool ret = false;
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL);
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1);
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
ret = true;
}

View File

@ -57,6 +57,7 @@ void tqDestroyTqHandle(void* data) {
if (pData->pRef) {
walCloseRef(pData->pRef->pWal, pData->pRef->refId);
}
taosHashCleanup(pData->tableCreateTimeHash);
}
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {

View File

@ -343,6 +343,7 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
taosArrayDestroy(tbUidList);
}
handle->tableCreateTimeHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
END:
return code;

View File

@ -283,7 +283,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
return;
}
bool ret = false;
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1);
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
ret = true;
}
@ -742,7 +742,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
(pReader->cachedSchemaVer != sversion)) {
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
"version %d, possibly dropped table",
@ -1102,9 +1102,9 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;
int64_t createTime = INT64_MAX;
int64_t createTime = 0;
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &createTime);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
@ -1112,14 +1112,12 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
}
if (fetchMeta != WITH_DATA &&
pSubmitTbData->pCreateTbReq != NULL &&
pSubmitTbData->ctimeMs - createTime <= 0) { // judge if table is already created to avoid sending crateTbReq
if (pSubmitTbData->pCreateTbReq != NULL) {
int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
if (code != 0) {
return code;
}
} else if (rawList != NULL && taosArrayGetSize(rawList) > 0) {
} else if (rawList != NULL) {
if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){
return terrno;
}

View File

@ -336,26 +336,9 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
TSDB_CHECK_CODE(code, lino, END);
bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
if (rawList != NULL && taosArrayGetSize(pBlocks) == 0){
if (taosHashGet(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES) != NULL) {
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 " is already exists", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
terrno = TSDB_CODE_TMQ_DUPLICATE_UID;
goto END;
} else {
code = taosHashPut(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES, &pExec->pTqReader->lastBlkUid, LONG_BYTES);
TSDB_CHECK_CODE(code, lino, END);
}
}
// this submit data is metadata and previous data is data
if (rawList != NULL && *totalRows > 0 && pSubmitTbData->pCreateTbReq != NULL && taosArrayGetSize(pBlocks) > 0 && pRsp->createTableNum <= 1){
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
terrno = TSDB_CODE_TMQ_DUPLICATE_UID;
pRsp->createTableNum = 0;
goto END;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum);
@ -405,6 +388,52 @@ END:
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
}
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
STqExecHandle* pExec = &pHandle->execHandle;
STqReader* pReader = pExec->pTqReader;
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
for (int32_t i = 0; i < blockSz; i++){
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i);
if (pSubmitTbData== NULL){
tqReaderClearSubmitMsg(pReader);
taosArrayDestroy(*rawList);
*rawList = NULL;
return;
}
if (pSubmitTbData->pCreateTbReq == NULL){
continue;
}
int64_t createTime = 0;
int64_t uid = pSubmitTbData->uid;
if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
return;
} else {
if (taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES) != 0){
tqError("failed to add table create time to hash, uid:%"PRId64, uid);
}
}
int64_t *cTime = (int64_t*)taosHashGet(pHandle->tableCreateTimeHash, &uid, LONG_BYTES);
if (cTime != NULL){
createTime = *cTime;
} else{
createTime = metaGetTableCreateTime(pReader->pVnodeMeta, uid, 1);
if (taosHashPut(pHandle->tableCreateTimeHash, &uid, LONG_BYTES, &createTime, LONG_BYTES) != 0){
tqError("failed to add table create time to hash, uid:%"PRId64, uid);
}
}
if (pHandle->fetchMeta == WITH_DATA || pSubmitTbData->ctimeMs > createTime){
tDestroySVCreateTbReq(pSubmitTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
pSubmitTbData->pCreateTbReq = NULL;
} else{
taosArrayDestroy(*rawList);
*rawList = NULL;
}
}
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
int32_t code = 0;
int32_t lino = 0;
@ -417,26 +446,32 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
}
code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList);
TSDB_CHECK_CODE(code, lino, END);
preProcessSubmitMsg(pHandle, pRequest, &rawList);
// data could not contains same uid data in rawdata mode
if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
goto END;
}
// this submit data is metadata and previous data is rawdata
if (pRequest->rawData != 0 && *totalRows > 0 && rawList == NULL){
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
goto END;
}
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
while (tqNextBlockImpl(pReader, NULL)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){
tqReaderClearSubmitMsg(pReader);
goto END;
}
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){
tqReaderClearSubmitMsg(pReader);
goto END;
}
}
}
END:
tqReaderClearSubmitMsg(pReader);
taosArrayDestroy(rawList);
if (code != 0){
tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));

View File

@ -381,11 +381,13 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
(pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES ||
taosxRsp.createTableNum > 0 ||
taosArrayGetSize(taosxRsp.blockData) > tmqRowSize ||
terrno == TSDB_CODE_TMQ_DUPLICATE_UID))) {
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_DUPLICATE_UID ? fetchVer : fetchVer + 1);
terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d",
(int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno);
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){terrno = 0;}
if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){terrno = 0;}
goto END;
} else {
fetchVer++;

View File

@ -708,7 +708,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
}
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL);
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0);
if (pSW) {
*num = pSW->nCols;
tDeleteSchemaWrapper(pSW);

View File

@ -314,25 +314,25 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
uint64_t nColData;
if (tDecodeU64v(pCoder, &nColData) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
SColData colData = {0};
code = tDecodeColData(version, pCoder, &colData);
if (code) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (colData.flag != HAS_VALUE) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -340,14 +340,14 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
code = tDecodeColData(version, pCoder, &colData);
if (code) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
uint64_t nRow;
if (tDecodeU64v(pCoder, &nRow) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
@ -356,7 +356,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
if (pRow->ts < minKey || pRow->ts > maxKey) {
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
@ -369,6 +369,9 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
tEndDecode(pCoder);
_exit:
if (code) {
vError("vgId:%d, %s:%d failed to vnodePreProcessSubmitTbData submit request since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {

View File

@ -1100,7 +1100,7 @@ int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta*
return terrno;
}
qDebug("add raw data to vgId:%d", pTableMeta->vgId);
qDebug("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
return 0;
}

View File

@ -860,7 +860,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_DATA, "Invalid data use here")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_DUPLICATE_UID, "Duplicate uid")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_RAW_DATA_SPLIT, "Split submit data for rawdata")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")