opti:consume data excluded from some source

This commit is contained in:
wangmm0220 2024-02-05 17:07:50 +08:00
parent 27b63b721f
commit a656d75ca7
15 changed files with 65 additions and 17 deletions

View File

@ -3603,6 +3603,7 @@ typedef struct {
int64_t timeout;
STqOffsetVal reqOffset;
int8_t enableReplay;
int8_t sourceExcluded;
} SMqPollReq;
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
@ -3891,6 +3892,9 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
#define SOURCE_NULL 0
#define SOURCE_TAOSX 1
typedef struct {
int32_t flags;
SVCreateTbReq* pCreateTbReq;
@ -3901,7 +3905,8 @@ typedef struct {
SArray* aRowP;
SArray* aCol;
};
int64_t ctimeMs;
int64_t ctimeMs;
int8_t source;
} SSubmitTbData;
typedef struct {

View File

@ -197,6 +197,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
void qStreamSetOpen(qTaskInfo_t tinfo);
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);

View File

@ -63,6 +63,7 @@ struct tmq_conf_t {
int8_t withTbName;
int8_t snapEnable;
int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit
uint16_t port;
int32_t autoCommitInterval;
char* ip;
@ -82,6 +83,7 @@ struct tmq_t {
int32_t autoCommitInterval;
int8_t resetOffsetCfg;
int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit
uint64_t consumerId;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
@ -384,6 +386,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_INVALID;
}
}
if (strcasecmp(key, "msg.consume.excluded") == 0) {
conf->sourceExcluded = taosStr2int64(value);
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK;
@ -1081,6 +1087,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->commitCbUserParam = conf->commitCbUserParam;
pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded;
if(conf->replayEnable){
pTmq->autoCommit = false;
}
@ -1549,6 +1556,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->useSnapshot = tmq->useSnapshot;
pReq->reqId = generateRequestId();
pReq->enableReplay = tmq->replayEnable;
pReq->sourceExcluded = tmq->sourceExcluded;
}
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {

View File

@ -6516,6 +6516,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1;
if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1;
if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1;
tEndEncode(&encoder);
@ -6556,6 +6557,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1;
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -8578,6 +8583,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
}
}
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1;
tEndEncode(pCoder);
return 0;
@ -8665,6 +8671,12 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
goto _exit;
}
}
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
tEndDecode(pCoder);

View File

@ -221,7 +221,7 @@ bool tqReaderIsQueriedTable(STqReader *pReader, uint64_t uid);
bool tqCurrentBlockConsumed(const STqReader *pReader);
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
bool tqNextBlockInWal(STqReader *pReader, const char *idstr);
bool tqNextBlockInWal(STqReader *pReader, const char *idstr, int sourceExcluded);
bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
SWalReader *tqGetWalReader(STqReader *pReader);
SSDataBlock *tqGetResultBlock(STqReader *pReader);

View File

@ -119,7 +119,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId);

View File

@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
continue;
}
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,};
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL};
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray);

View File

@ -368,7 +368,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
}
// todo ignore the error in wal?
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
SWalReader* pWalReader = pReader->pWalReader;
SSDataBlock* pDataBlock = NULL;
@ -391,7 +391,10 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if ((pSubmitTbData->source & sourceExcluded) != 0){
pReader->nextBlk += 1;
continue;
}
if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
SSDataBlock* pRes = NULL;

View File

@ -94,6 +94,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
return -1;
}
qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
while (1) {
SSDataBlock* pDataBlock = NULL;
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
@ -250,7 +251,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return 0;
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) {
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
STqExecHandle* pExec = &pHandle->execHandle;
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
@ -265,6 +266,10 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
}
if ((pSubmitTbDataRet->source & sourceExcluded) != 0){
goto loop_table;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
@ -329,6 +334,10 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
}
if ((pSubmitTbDataRet->source & sourceExcluded) != 0){
goto loop_db;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {

View File

@ -791,7 +791,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
return;
}
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
if (code != TSDB_CODE_SUCCESS) {
continue;
@ -835,7 +835,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
pTask->execInfo.sink.numOfBlocks += 1;
uint64_t groupId = pDataBlock->info.id.groupId;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
if (index == NULL) { // no data yet, append it

View File

@ -250,7 +250,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
.ver = pHead->version,
};
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded);
if (code < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey);

View File

@ -59,6 +59,7 @@ typedef struct STaskStopInfo {
typedef struct {
STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t sourceExcluded;
int64_t snapshotVer;
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor

View File

@ -1148,6 +1148,11 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
pOperator->status = OP_NOT_OPENED;
}
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;

View File

@ -1978,7 +1978,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) {
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id, pTaskInfo->streamInfo.sourceExcluded);
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);

View File

@ -211,6 +211,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
bool colMode, bool ignoreColVals) {
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
if (NULL == pTableCxt) {
*pOutput = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -268,12 +269,8 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
}
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pTableCxt;
qDebug("tableDataCxt created, uid:%" PRId64 ", vgId:%d", pTableMeta->uid, pTableMeta->vgId);
} else {
taosMemoryFree(pTableCxt);
}
*pOutput = pTableCxt;
qDebug("tableDataCxt created, code:%d, uid:%" PRId64 ", vgId:%d", code, pTableMeta->uid, pTableMeta->vgId);
return code;
}
@ -288,6 +285,7 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
pTmp->suid = pSrc->suid;
pTmp->uid = pSrc->uid;
pTmp->sver = pSrc->sver;
pTmp->source = pSrc->source;
pTmp->pCreateTbReq = NULL;
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
if (pSrc->pCreateTbReq) {
@ -344,6 +342,10 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
void* pData = *pTableCxt; // deal scan coverity
code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
}
if (TSDB_CODE_SUCCESS != code) {
insDestroyTableDataCxt(*pTableCxt);
}
return code;
}
@ -651,6 +653,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end;
}
pTableCxt->pData->source = SOURCE_TAOSX;
if(tmp == NULL){
ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) {