Merge pull request #29427 from taosdata/fix/TD-33396

fix:[TD-33396]add log for tmq
This commit is contained in:
Shengliang Guan 2024-12-31 16:45:37 +08:00 committed by GitHub
commit fcc07c0bcd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 360 additions and 398 deletions

View File

@ -207,7 +207,7 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
(void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
(void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64,
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) start to send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64,
vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);

View File

@ -197,7 +197,9 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
end:
tDecoderClear(&dcoder);
return tbSuid == realTbSuid;
bool tmp = tbSuid == realTbSuid;
tqDebug("%s suid:%"PRId64" realSuid:%"PRId64" return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
return tmp;
}
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
@ -262,6 +264,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t
END:
*fetchOffset = offset;
tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64 ", 0x%" PRIx64,
vgId, code, offset, lastVer, committedVer, appliedVer, id);
return code;
}
@ -273,6 +277,8 @@ bool tqGetTablePrimaryKey(STqReader* pReader) {
}
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
tqDebug("%s:%p uid:%"PRId64, __FUNCTION__ , pReader, uid);
if (pReader == NULL) {
return;
}
@ -286,6 +292,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
}
STqReader* tqReaderOpen(SVnode* pVnode) {
tqDebug("%s:%p", __FUNCTION__ , pVnode);
if (pVnode == NULL) {
return NULL;
}
@ -317,6 +324,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
}
void tqReaderClose(STqReader* pReader) {
tqDebug("%s:%p", __FUNCTION__ , pReader);
if (pReader == NULL) return;
// close wal reader
@ -485,26 +493,25 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
}
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
if (pReader == NULL) {
if (pReader == NULL) {
return TSDB_CODE_INVALID_PARA;
}
pReader->msg.msgStr = msgStr;
pReader->msg.msgLen = msgLen;
pReader->msg.ver = ver;
tqDebug("tq reader set msg %p %d", msgStr, msgLen);
tqDebug("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
SDecoder decoder = {0};
tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
tDecoderClear(&decoder);
if (code != 0) {
tDecoderClear(&decoder);
tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
return code;
}
tDecoderClear(&decoder);
return 0;
return code;
}
SWalReader* tqGetWalReader(STqReader* pReader) {
@ -529,63 +536,63 @@ int64_t tqGetResultBlockTime(STqReader* pReader) {
}
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
if (pReader == NULL || pReader->msg.msgStr == NULL) {
return false;
}
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
(pReader->nextBlk + 1), numOfBlocks, idstr);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) {
return false;
}
if (pReader->tbIdHash == NULL) {
return true;
}
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
return true;
} else {
tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
taosHashGetSize(pReader->tbIdHash), idstr);
}
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
return false;
}
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
if (pReader == NULL || pReader->msg.msgStr == NULL) return false;
int32_t code = false;
int32_t lino = 0;
int64_t uid = 0;
TSDB_CHECK_NULL(pReader, code, lino, END, false);
TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) return false;
if (filterOutUids == NULL) return true;
TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
uid = pSubmitTbData->uid;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
if (ret == NULL) {
return true;
}
tqDebug("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
return false;
END:
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
int32_t code = false;
int32_t lino = 0;
int64_t uid = 0;
TSDB_CHECK_NULL(pReader, code, lino, END, false);
TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
uid = pSubmitTbData->uid;
void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_NULL(ret, code, lino, END, true);
tqDebug("iterator data block in hash continue, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
END:
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
@ -709,7 +716,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
if (pReader == NULL || pRes == NULL) {
return TSDB_CODE_INVALID_PARA;
}
tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
int32_t code = 0;
int32_t line = 0;
STSchema* pTSchema = NULL;
@ -846,7 +853,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
END:
if (code != 0) {
tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
}
taosMemoryFreeClear(pTSchema);
return code;
@ -876,10 +883,6 @@ END:
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
int32_t* lastRow) {
if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL || pSchemaWrapper == NULL ||
assigned == NULL || lastRow == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
SSchemaWrapper* pSW = NULL;
SSDataBlock* block = NULL;
@ -909,15 +912,15 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData,
taosMemoryFreeClear(block);
END:
if (code != 0) {
tqError("processBuildNew failed, code:%d", code);
}
tDeleteSchemaWrapper(pSW);
blockDataFreeRes(block);
taosMemoryFree(block);
return code;
}
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
int32_t curRow = 0;
int32_t lastRow = 0;
@ -931,6 +934,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
TQ_NULL_GO_TO_END(pCol);
int32_t numOfRows = pCol->nVal;
int32_t numOfCols = taosArrayGetSize(pCols);
tqDebug("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
@ -970,16 +974,16 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
}
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
tqDebug("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
}
taosMemoryFree(assigned);
return code;
}
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
STSchema* pTSchema = NULL;
@ -992,6 +996,8 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
SArray* pRows = pSubmitTbData->aRowP;
int32_t numOfRows = taosArrayGetSize(pRows);
pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
TQ_NULL_GO_TO_END(pTSchema);
tqDebug("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
@ -1030,17 +1036,18 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
tqDebug("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks));
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
}
taosMemoryFreeClear(pTSchema);
taosMemoryFree(assigned);
return code;
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
if (pReader == NULL || blocks == NULL || schemas == NULL) {
return TSDB_CODE_INVALID_PARA;
}
tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) {
return terrno;

View File

@ -16,15 +16,16 @@
#include "tq.h"
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
if (pBlock == NULL || pRsp == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TDB_CODE_SUCCESS;
int32_t lino = 0;
void* buf = NULL;
TSDB_CHECK_NULL(pBlock, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
return terrno;
}
buf = taosMemoryCalloc(1, dataStrLen);
TSDB_CHECK_NULL(buf, code, lino, END, terrno);
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
pRetrieve->version = 1;
@ -33,49 +34,49 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
if(actualLen < 0){
taosMemoryFree(buf);
return terrno;
}
TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
actualLen += sizeof(SRetrieveTableRspForTmq);
if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
taosMemoryFree(buf);
return terrno;
}
if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
taosMemoryFree(buf);
return terrno;
}
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
return TSDB_CODE_SUCCESS;
tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", actualLen, buf);
END:
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
}
return code;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
if (pRsp == NULL || pTq == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TDB_CODE_SUCCESS;
int32_t lino = 0;
SMetaReader mr = {0};
TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
return code;
}
code = metaReaderGetTableEntryByUidCache(&mr, uid);
TSDB_CHECK_CODE(code, lino, END);
for (int32_t i = 0; i < n; i++) {
char* tbName = taosStrdup(mr.me.name);
if (tbName == NULL) {
metaReaderClear(&mr);
return terrno;
}
TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
tqError("failed to push tbName to blockTbName:%s", tbName);
tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
continue;
}
tqDebug("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
}
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
}
metaReaderClear(&mr);
return 0;
return code;
}
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
@ -96,109 +97,130 @@ int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, S
return 0;
}
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
if (pTq == NULL || pHandle == NULL || pRsp == NULL || pOffset == NULL || pRequest == NULL){
return TSDB_CODE_INVALID_PARA;
}
int32_t vgId = TD_VID(pTq->pVnode);
static int32_t tqProcessReplayRsp(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, const SMqPollReq* pRequest, SSDataBlock* pDataBlock, qTaskInfo_t task){
int32_t code = 0;
int32_t line = 0;
int32_t lino = 0;
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
}
if (pHandle->block == NULL) {
if (pDataBlock == NULL) {
goto END;
}
STqOffsetVal offset = {0};
code = qStreamExtractOffset(task, &offset);
TSDB_CHECK_CODE(code, lino, END);
pHandle->block = NULL;
code = createOneDataBlock(pDataBlock, true, &pHandle->block);
TSDB_CHECK_CODE(code, lino, END);
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
int32_t vgId = TD_VID(pTq->pVnode);
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
TSDB_CHECK_CODE(code, lino, END);
}
const STqExecHandle* pExec = &pHandle->execHandle;
code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
TSDB_CHECK_CODE(code, lino, END);
pRsp->blockNum++;
if (pDataBlock == NULL) {
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
} else {
code = copyDataBlock(pHandle->block, pDataBlock);
TSDB_CHECK_CODE(code, lino, END);
STqOffsetVal offset = {0};
code = qStreamExtractOffset(task, &offset);
TSDB_CHECK_CODE(code, lino, END);
pRsp->sleepTime = offset.ts - pHandle->blockTime;
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
}
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at %d, failed to process replay response:%s", __FUNCTION__, lino, tstrerror(code));
}
return code;
}
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
int32_t code = 0;
int32_t lino = 0;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pRequest, code, lino, END, TSDB_CODE_INVALID_PARA);
int32_t vgId = TD_VID(pTq->pVnode);
int32_t totalRows = 0;
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
TSDB_CHECK_CODE(code, line, END);
TSDB_CHECK_CODE(code, lino, END);
qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
uint64_t st = taosGetTimestampMs();
while (1) {
SSDataBlock* pDataBlock = NULL;
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
TSDB_CHECK_CODE(code, line, END);
TSDB_CHECK_CODE(code, lino, END);
if (pRequest->enableReplay) {
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
}
if (pHandle->block == NULL) {
if (pDataBlock == NULL) {
break;
}
STqOffsetVal offset = {0};
code = qStreamExtractOffset(task, &offset);
TSDB_CHECK_CODE(code, line, END);
pHandle->block = NULL;
code = createOneDataBlock(pDataBlock, true, &pHandle->block);
TSDB_CHECK_CODE(code, line, END);
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
TSDB_CHECK_CODE(code, line, END);
}
code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
TSDB_CHECK_CODE(code, line, END);
pRsp->blockNum++;
if (pDataBlock == NULL) {
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
} else {
code = copyDataBlock(pHandle->block, pDataBlock);
TSDB_CHECK_CODE(code, line, END);
STqOffsetVal offset = {0};
code = qStreamExtractOffset(task, &offset);
TSDB_CHECK_CODE(code, line, END);
pRsp->sleepTime = offset.ts - pHandle->blockTime;
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
}
code = tqProcessReplayRsp(pTq, pHandle, pRsp, pRequest, pDataBlock, task);
TSDB_CHECK_CODE(code, lino, END);
break;
} else {
if (pDataBlock == NULL) {
break;
}
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
TSDB_CHECK_CODE(code, line, END);
}
if (pDataBlock == NULL) {
break;
}
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
TSDB_CHECK_CODE(code, lino, END);
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
break;
}
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
break;
}
}
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
code = qStreamExtractOffset(task, &pRsp->rspOffset);
END:
if (code != 0) {
tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
code);
tqError("%s failed at %d, tmq task executed error msg:%s", __FUNCTION__, lino, tstrerror(code));
}
return code;
}
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
if (pTq == NULL || pHandle == NULL || pRsp == NULL || pBatchMetaRsp == NULL || pOffset == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
int32_t lino = 0;
char* tbName = NULL;
SSchemaWrapper* pSW = NULL;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pBatchMetaRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
if (code != 0) {
return code;
}
code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
TSDB_CHECK_CODE(code, lino, END);
int32_t rowCnt = 0;
while (1) {
@ -206,52 +228,37 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
uint64_t ts = 0;
tqDebug("tmqsnap task start to execute");
code = qExecTask(task, &pDataBlock, &ts);
if (code != 0) {
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
return code;
}
TSDB_CHECK_CODE(code, lino, END);
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
if (pRsp->withTbName) {
char* tbName = taosStrdup(qExtractTbnameFromTask(task));
if (tbName == NULL) {
tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
return terrno;
}
if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
continue;
}
tbName = taosStrdup(qExtractTbnameFromTask(task));
TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
tbName = NULL;
}
if (pRsp->withSchema) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
continue;
}
pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
pSW = NULL;
}
if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision) != 0) {
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision);
TSDB_CHECK_CODE(code, lino, END);
pRsp->blockNum++;
rowCnt += pDataBlock->info.rows;
if (rowCnt <= tmqRowSize) continue;
if (rowCnt <= tmqRowSize) {
continue;
}
}
// get meta
SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
code = qStreamExtractOffset(task, &tmp->rspOffset);
if (code) {
return code;
}
TSDB_CHECK_CODE(code, lino, END);
*pBatchMetaRsp = *tmp;
tqDebug("tmqsnap task get meta");
break;
@ -259,16 +266,13 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
if (pDataBlock == NULL) {
code = qStreamExtractOffset(task, pOffset);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, END);
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
code = qStreamExtractOffset(task, &pRsp->rspOffset);
break;
}
@ -280,119 +284,93 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
}
}
tqDebug("%s:%d success", __FUNCTION__, lino);
END:
if (code != 0){
tqDebug("%s:%d failed, code:%s", __FUNCTION__, lino, tstrerror(code) );
}
taosMemoryFree(pSW);
taosMemoryFree(tbName);
return code;
}
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
if (pRsp == NULL || pCreateTbReq == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
int32_t lino = 0;
void* createReq = NULL;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
if (pRsp->createTableLen == NULL) {
code = terrno;
goto END;
}
TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
if (pRsp->createTableReq == NULL) {
code = terrno;
goto END;
}
TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
}
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
goto END;
}
TSDB_CHECK_CODE(code, lino, END);
createReq = taosMemoryCalloc(1, len);
if (createReq == NULL){
code = terrno;
goto END;
}
TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
tEncoderClear(&encoder);
if (code < 0) {
goto END;
}
if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
code = terrno;
goto END;
}
if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
code = terrno;
goto END;
}
TSDB_CHECK_CODE(code, lino, END);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
pRsp->createTableNum++;
tqDebug("build create table info msg success");
return 0;
END:
taosMemoryFree(createReq);
if (code != 0){
tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
taosMemoryFree(createReq);
}
return code;
}
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
if (pTq == NULL || pHandle == NULL || pRsp == NULL || totalRows == NULL) {
return;
}
int32_t code = 0;
STqExecHandle* pExec = &pHandle->execHandle;
STqReader* pReader = pExec->pTqReader;
int32_t lino = 0;
SArray* pBlocks = NULL;
SArray* pSchemas = NULL;
STqExecHandle* pExec = &pHandle->execHandle;
STqReader* pReader = pExec->pTqReader;
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
if (pBlocks == NULL) {
code = terrno;
goto END;
}
TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
pSchemas = taosArrayInit(0, sizeof(void*));
if(pSchemas == NULL){
code = terrno;
goto END;
}
TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
SSubmitTbData* pSubmitTbDataRet = NULL;
int64_t createTime = INT64_MAX;
code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
if (code != 0) {
tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
goto END;
}
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
goto END;
}
TSDB_CHECK_CODE(code, lino, END);
bool tmp = (pSubmitTbDataRet->flags & sourceExcluded) != 0;
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
if (code != 0) {
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
goto END;
}
TSDB_CHECK_CODE(code, lino, END);
}
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) { // judge if table is already created to avoid sending crateTbReq
code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
if (code != 0){
tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
goto END;
}
TSDB_CHECK_CODE(code, lino, END);
}
}
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
goto END;
}
tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL);
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
if (pBlock == NULL) {
continue;
}
if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision) != 0){
if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
@ -405,28 +383,29 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
}
pRsp->blockNum++;
}
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
return;
tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
END:
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
if (code != 0){
tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
} else {
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
}
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows,
int8_t sourceExcluded) {
if (pTq == NULL || pHandle == NULL || pRsp == NULL || totalRows == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
int32_t code = 0;
int32_t lino = 0;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(totalRows, code, lino, END, TSDB_CODE_INVALID_PARA);
STqExecHandle* pExec = &pHandle->execHandle;
int32_t code = 0;
STqReader* pReader = pExec->pTqReader;
code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
if (code != 0) {
return code;
}
TSDB_CHECK_CODE(code, lino, END);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
while (tqNextBlockImpl(pReader, NULL)) {
@ -438,5 +417,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
}
}
END:
if (code != 0){
tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
}
return code;
}

View File

@ -21,21 +21,27 @@ static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, c
const SMqBatchMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
if (pRsp == NULL) {
return TSDB_CODE_INVALID_PARA;
}
pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
int32_t code = TDB_CODE_SUCCESS;
int32_t lino = 0;
tqDebug("%s called", __FUNCTION__ );
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
return terrno;
}
pRsp->blockData = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
tOffsetCopy(&pRsp->reqOffset, &pOffset);
tOffsetCopy(&pRsp->rspOffset, &pOffset);
pRsp->withTbName = 0;
pRsp->withSchema = false;
return 0;
END:
if (code != 0){
tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
}
return code;
}
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
@ -44,45 +50,37 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
}
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
if (pRsp == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TDB_CODE_SUCCESS;
int32_t lino = 0;
tqDebug("%s called", __FUNCTION__ );
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
tOffsetCopy(&pRsp->reqOffset, &pOffset);
tOffsetCopy(&pRsp->rspOffset, &pOffset);
pRsp->withTbName = 1;
pRsp->withSchema = 1;
pRsp->blockData = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL ||
pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
if (pRsp->blockData != NULL) {
taosArrayDestroy(pRsp->blockData);
pRsp->blockData = NULL;
}
if (pRsp->blockDataLen != NULL) {
taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
}
if (pRsp->blockTbName != NULL) {
taosArrayDestroy(pRsp->blockTbName);
pRsp->blockTbName = NULL;
}
if (pRsp->blockSchema != NULL) {
taosArrayDestroy(pRsp->blockSchema);
pRsp->blockSchema = NULL;
}
return terrno;
END:
if (code != 0){
tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
taosArrayDestroy(pRsp->blockData);
taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroy(pRsp->blockTbName);
taosArrayDestroy(pRsp->blockSchema);
}
return 0;
return code;
}
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
@ -151,23 +149,19 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pOffset == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TDB_CODE_SUCCESS;
int32_t lino = 0;
tqDebug("%s called", __FUNCTION__ );
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
terrno = 0;
SMqDataRsp dataRsp = {0};
int code = tqInitDataRsp(&dataRsp, *pOffset);
if (code != 0) {
goto end;
}
code = tqInitDataRsp(&dataRsp, *pOffset);
TSDB_CHECK_CODE(code, lino, end);
code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
if (code != 0) {
goto end;
}
TSDB_CHECK_CODE(code, lino, end);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
@ -191,15 +185,21 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
tOffsetCopy(&dataRsp.reqOffset, pOffset);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64
" code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteMqDataRsp(&dataRsp);
return code;
}
end:
{
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
if (code != 0){
tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " error msg:%s, line:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
} else {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " success",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
}
tDeleteMqDataRsp(&dataRsp);
return code;
}
}
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC) \
@ -224,9 +224,6 @@ static void tDeleteCommon(void* parm) {}
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) {
if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || offset == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t vgId = TD_VID(pTq->pVnode);
SMqDataRsp taosxRsp = {0};
SMqBatchMetaRsp btMetaRsp = {0};
@ -238,17 +235,13 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,
btMetaRsp.rspOffset.ts);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
goto END;
}
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto END;
@ -277,30 +270,23 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalMetaRows > 0) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
if (totalRows != 0) {
tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, totalRows error, vgId:%d offset %" PRId64,
pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
code = code == 0 ? TSDB_CODE_TQ_INTERNAL_ERROR : code;
}
goto END;
}
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, &taosxRsp,
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto END;
}
SWalCont* pHead = &pHandle->pWalReader->pHead->head;
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
// process meta
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, &taosxRsp,
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto END;
}
@ -331,15 +317,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (!btMetaRsp.batchMetaReq) {
btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
if (btMetaRsp.batchMetaReq == NULL) {
code = TAOS_GET_TERRNO(terrno);
goto END;
}
TQ_NULL_GO_TO_END(btMetaRsp.batchMetaReq);
btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
if (btMetaRsp.batchMetaLen == NULL) {
code = TAOS_GET_TERRNO(terrno);
goto END;
}
TQ_NULL_GO_TO_END(btMetaRsp.batchMetaLen);
}
fetchVer++;
@ -355,10 +335,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen);
if (tBuf == NULL) {
code = TAOS_GET_TERRNO(terrno);
goto END;
}
TQ_NULL_GO_TO_END(tBuf);
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, metaBuff, len);
@ -369,14 +346,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
continue;
}
if (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf) == NULL) {
code = TAOS_GET_TERRNO(terrno);
goto END;
}
if (taosArrayPush(btMetaRsp.batchMetaLen, &tLen) == NULL) {
code = TAOS_GET_TERRNO(terrno);
goto END;
}
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf));
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen));
totalMetaRows++;
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
@ -399,17 +370,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
.ver = pHead->version,
};
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded);
if (code < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey);
goto END;
}
TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded));
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, &taosxRsp,
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto END;
} else {
@ -419,6 +384,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
END:
if (code != 0){
tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey);
}
tDeleteMqBatchMetaRsp(&btMetaRsp);
tDeleteSTaosxRsp(&taosxRsp);
return code;
@ -457,6 +426,9 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
}
END:
if (code != 0){
uError("failed to extract data for mq, msg:%s", tstrerror(code));
}
tOffsetDestroy(&reqOffset);
return code;
}