feat: get snapshot data for taosX
This commit is contained in:
parent
2afc23a120
commit
c7bf08d5af
|
@ -43,7 +43,6 @@ typedef struct {
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
|
|
||||||
void* sContext; // SSnapContext*
|
void* sContext; // SSnapContext*
|
||||||
SHashObj *pFilterOutTbUid;
|
|
||||||
|
|
||||||
void* pStateBackend;
|
void* pStateBackend;
|
||||||
} SReadHandle;
|
} SReadHandle;
|
||||||
|
|
|
@ -215,8 +215,6 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver);
|
||||||
bool tqNextDataBlock(STqReader *pReader);
|
bool tqNextDataBlock(STqReader *pReader);
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
|
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
|
||||||
int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, SWalCkHead** ppCkHead);
|
|
||||||
SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block);
|
|
||||||
|
|
||||||
void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
|
|
@ -101,6 +101,7 @@ typedef struct {
|
||||||
|
|
||||||
int64_t snapshotVer;
|
int64_t snapshotVer;
|
||||||
|
|
||||||
|
SWalReader* pWalReader;
|
||||||
|
|
||||||
SWalRef* pRef;
|
SWalRef* pRef;
|
||||||
|
|
||||||
|
@ -140,7 +141,10 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
|
|
||||||
// tqRead
|
// tqRead
|
||||||
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||||
|
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||||
|
|
||||||
|
// tqExec
|
||||||
|
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp);
|
||||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
||||||
|
|
||||||
// tqMeta
|
// tqMeta
|
||||||
|
|
|
@ -310,6 +310,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STqOffsetVal reqOffset = pReq->reqOffset;
|
STqOffsetVal reqOffset = pReq->reqOffset;
|
||||||
STqOffsetVal fetchOffsetNew;
|
STqOffsetVal fetchOffsetNew;
|
||||||
|
SWalCkHead* pCkHead = NULL;
|
||||||
|
|
||||||
// 1.find handle
|
// 1.find handle
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
|
@ -340,7 +341,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
||||||
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||||
|
|
||||||
SMqMetaRsp metaRsp = {0};
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||||
|
|
||||||
|
@ -385,32 +385,113 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG){
|
||||||
|
SMqMetaRsp metaRsp = {0};
|
||||||
tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew);
|
tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew);
|
||||||
|
|
||||||
if(dataRsp.blockNum != 0){
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
goto OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(metaRsp.metaRspLen > 0){
|
if(metaRsp.metaRspLen > 0){
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey,
|
||||||
|
TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version);
|
||||||
taosMemoryFree(metaRsp.metaRsp);
|
taosMemoryFree(metaRsp.metaRsp);
|
||||||
goto OVER;
|
goto OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, no data", consumerId, pHandle->subKey,
|
if (dataRsp.blockNum > 0){
|
||||||
TD_VID(pTq->pVnode));
|
|
||||||
|
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, fetchOffsetNew.version);
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
OVER:
|
}else{
|
||||||
|
fetchOffsetNew = dataRsp.rspOffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey,
|
||||||
|
TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
||||||
|
int64_t fetchVer = fetchOffsetNew.version + 1;
|
||||||
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
|
if (pCkHead == NULL) {
|
||||||
|
code = -1;
|
||||||
|
goto OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
|
if (consumerEpoch > reqEpoch) {
|
||||||
|
tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
||||||
|
", found new consumer epoch %d, discard req epoch %d",
|
||||||
|
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||||
|
// TODO add push mgr
|
||||||
|
|
||||||
|
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
|
||||||
|
ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
|
||||||
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
goto OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
SWalCont* pHead = &pCkHead->head;
|
||||||
|
|
||||||
|
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
||||||
|
pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
|
||||||
|
|
||||||
|
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||||
|
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||||
|
|
||||||
|
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
}
|
||||||
|
// TODO batch optimization:
|
||||||
|
// TODO continue scan until meeting batch requirement
|
||||||
|
if (dataRsp.blockNum > 0 /* threshold */) {
|
||||||
|
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
|
||||||
|
ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
|
||||||
|
|
||||||
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
goto OVER;
|
||||||
|
} else {
|
||||||
|
fetchVer++;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ASSERT(pHandle->fetchMeta);
|
||||||
|
ASSERT(IS_META_MSG(pHead->msgType));
|
||||||
|
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||||
|
SMqMetaRsp metaRsp = {0};
|
||||||
|
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
|
||||||
|
metaRsp.resMsgType = pHead->msgType;
|
||||||
|
metaRsp.metaRspLen = pHead->bodyLen;
|
||||||
|
metaRsp.metaRsp = pHead->body;
|
||||||
|
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||||
|
code = -1;
|
||||||
|
goto OVER;
|
||||||
|
}
|
||||||
|
code = 0;
|
||||||
|
goto OVER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// send empty to client
|
||||||
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
OVER:
|
||||||
|
if (pCkHead) taosMemoryFree(pCkHead);
|
||||||
// TODO wrap in destroy func
|
// TODO wrap in destroy func
|
||||||
taosArrayDestroy(dataRsp.blockDataLen);
|
taosArrayDestroy(dataRsp.blockDataLen);
|
||||||
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
|
||||||
|
@ -526,16 +607,19 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
||||||
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
ASSERT(pHandle->execHandle.pExecReader);
|
ASSERT(pHandle->execHandle.pExecReader);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
|
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
|
||||||
handle.tqReader = pHandle->execHandle.pExecReader;
|
|
||||||
handle.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid;
|
|
||||||
|
|
||||||
pHandle->execHandle.task =
|
pHandle->execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
|
||||||
|
pHandle->execHandle.execTb.suid = req.suid;
|
||||||
|
|
||||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||||
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
||||||
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
|
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
|
||||||
|
@ -548,7 +632,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
||||||
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
|
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
|
||||||
handle.tqReader = pHandle->execHandle.pExecReader;
|
|
||||||
pHandle->execHandle.task =
|
pHandle->execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,18 +120,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qStreamExtractOffset(task, &pRsp->rspOffset);
|
||||||
|
|
||||||
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
|
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
|
||||||
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && qStreamExtractPrepareUid(task) != 0){
|
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && qStreamExtractPrepareUid(task) != 0){
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
|
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
|
||||||
pHandle->snapshotVer + 1);
|
pHandle->snapshotVer + 1);
|
||||||
// tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
break;
|
||||||
// qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->blockNum > 0){
|
if (pRsp->blockNum > 0){
|
||||||
qStreamExtractOffset(task, &pRsp->rspOffset);
|
|
||||||
tqDebug("tmqsnap task exec exited, get data");
|
tqDebug("tmqsnap task exec exited, get data");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -203,22 +203,56 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block) {
|
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) {
|
||||||
if (subType == TOPIC_SUB_TYPE__TABLE) {
|
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
|
||||||
|
|
||||||
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
pRsp->withSchema = 1;
|
||||||
|
STqReader* pReader = pExec->pExecReader;
|
||||||
|
tqReaderSetDataMsg(pReader, pReq, 0);
|
||||||
while (tqNextDataBlock(pReader)) {
|
while (tqNextDataBlock(pReader)) {
|
||||||
if (tqRetrieveDataBlock(block, pReader) < 0) {
|
SSDataBlock block = {0};
|
||||||
|
if (tqRetrieveDataBlock(&block, pReader) < 0) {
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
||||||
}
|
}
|
||||||
return block;
|
if (pRsp->withTbName) {
|
||||||
|
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||||
|
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
||||||
|
blockDataFreeRes(&block);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
} else if (subType == TOPIC_SUB_TYPE__DB) {
|
}
|
||||||
while (tqNextDataBlockFilterOut(pReader, pFilterOutTbUid)) {
|
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
||||||
if (tqRetrieveDataBlock(block, pReader) < 0) {
|
blockDataFreeRes(&block);
|
||||||
|
tqAddBlockSchemaToRsp(pExec, pRsp);
|
||||||
|
pRsp->blockNum++;
|
||||||
|
}
|
||||||
|
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
pRsp->withSchema = 1;
|
||||||
|
STqReader* pReader = pExec->pExecReader;
|
||||||
|
tqReaderSetDataMsg(pReader, pReq, 0);
|
||||||
|
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
|
||||||
|
SSDataBlock block = {0};
|
||||||
|
if (tqRetrieveDataBlock(&block, pReader) < 0) {
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
||||||
}
|
}
|
||||||
return block;
|
if (pRsp->withTbName) {
|
||||||
|
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||||
|
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
||||||
|
blockDataFreeRes(&block);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
||||||
|
blockDataFreeRes(&block);
|
||||||
|
tqAddBlockSchemaToRsp(pExec, pRsp);
|
||||||
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
if (pRsp->blockNum == 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -269,12 +269,11 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
ASSERT(handle.execHandle.pExecReader);
|
ASSERT(handle.execHandle.pExecReader);
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
handle.execHandle.execDb.pFilterOutTbUid =
|
handle.execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
// handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
|
||||||
reader.tqReader = handle.execHandle.pExecReader;
|
|
||||||
reader.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid;
|
|
||||||
|
|
||||||
handle.execHandle.task =
|
handle.execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
||||||
|
|
|
@ -15,21 +15,22 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
taosThreadMutexLock(&pWalReader->mutex);
|
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||||
int64_t offset = *fetchOffset;
|
int64_t offset = *fetchOffset;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (walFetchHead(pWalReader, offset, *ppCkHead) < 0) {
|
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||||
tqDebug("tmq poll no more log to return");
|
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
||||||
|
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
||||||
*fetchOffset = offset - 1;
|
*fetchOffset = offset - 1;
|
||||||
code = -1;
|
code = -1;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
|
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
|
||||||
code = walFetchBody(pWalReader, ppCkHead);
|
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -41,10 +42,10 @@ int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset,
|
||||||
code = 0;
|
code = 0;
|
||||||
goto END;
|
goto END;
|
||||||
} else {
|
} else {
|
||||||
if (fetchMeta) {
|
if (pHandle->fetchMeta) {
|
||||||
SWalCont* pHead = &((*ppCkHead)->head);
|
SWalCont* pHead = &((*ppCkHead)->head);
|
||||||
if (IS_META_MSG(pHead->msgType)) {
|
if (IS_META_MSG(pHead->msgType)) {
|
||||||
code = walFetchBody(pWalReader, ppCkHead);
|
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -57,7 +58,7 @@ int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset,
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
code = walSkipFetchBody(pWalReader, *ppCkHead);
|
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
*fetchOffset = offset;
|
*fetchOffset = offset;
|
||||||
|
@ -68,7 +69,7 @@ int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
END:
|
END:
|
||||||
taosThreadMutexUnlock(&pWalReader->mutex);
|
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -499,14 +499,9 @@ typedef struct SStreamRawScanInfo{
|
||||||
// void *metaInfo;
|
// void *metaInfo;
|
||||||
// void *dataInfo;
|
// void *dataInfo;
|
||||||
SVnode* vnode;
|
SVnode* vnode;
|
||||||
SWalCkHead* pCkHead;
|
|
||||||
bool needFetchLog;
|
|
||||||
bool hasDataInOneFetchVer;
|
|
||||||
SSDataBlock pRes; // result SSDataBlock
|
SSDataBlock pRes; // result SSDataBlock
|
||||||
STsdbReader* dataReader;
|
STsdbReader* dataReader;
|
||||||
SSnapContext* sContext;
|
SSnapContext* sContext;
|
||||||
STqReader* tqReader;
|
|
||||||
SHashObj* pFilterOutTbUid;
|
|
||||||
}SStreamRawScanInfo;
|
}SStreamRawScanInfo;
|
||||||
|
|
||||||
typedef struct SSysTableScanInfo {
|
typedef struct SSysTableScanInfo {
|
||||||
|
|
|
@ -1533,10 +1533,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataBlockInfo binfo = pBlock->info;
|
tsdbRetrieveDataBlockInfo(pInfo->dataReader, &pBlock->info);
|
||||||
tsdbRetrieveDataBlockInfo(pInfo->dataReader, &binfo);
|
|
||||||
|
|
||||||
pBlock->info = binfo;
|
|
||||||
|
|
||||||
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
|
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
|
||||||
pBlock->pDataBlock = pCols;
|
pBlock->pDataBlock = pCols;
|
||||||
|
@ -1596,79 +1593,53 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
pTaskInfo->streamInfo.metaRsp.metaRsp = data;
|
pTaskInfo->streamInfo.metaRsp.metaRsp = data;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
|
||||||
int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version;
|
|
||||||
|
|
||||||
while(1){
|
|
||||||
if(pInfo->needFetchLog){
|
|
||||||
fetchVer++;
|
|
||||||
if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
|
|
||||||
qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
|
|
||||||
pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
|
|
||||||
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SWalCont* pHead = &pInfo->pCkHead->head;
|
// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
||||||
qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
|
// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
|
||||||
|
//
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
// while(1){
|
||||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
|
||||||
tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
|
// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
|
||||||
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
||||||
pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
||||||
pInfo->hasDataInOneFetchVer = false;
|
// return NULL;
|
||||||
pInfo->pRes.pDataBlock = NULL;
|
// }
|
||||||
}
|
// SWalCont* pHead = &pInfo->pCkHead->head;
|
||||||
}
|
// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
|
||||||
|
//
|
||||||
SWalCont* pHead = &pInfo->pCkHead->head;
|
// if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||||
|
// SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
|
||||||
blockDataFreeRes(&pInfo->pRes);
|
// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes);
|
||||||
SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes);
|
// if(block){
|
||||||
if(block){
|
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
||||||
qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
||||||
pInfo->needFetchLog = false;
|
// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||||
pInfo->hasDataInOneFetchVer = true;
|
// return block;
|
||||||
return block;
|
// }else{
|
||||||
}else{
|
// fetchVer++;
|
||||||
pInfo->needFetchLog = true;
|
// }
|
||||||
|
// } else{
|
||||||
if(pInfo->hasDataInOneFetchVer){
|
// ASSERT(pInfo->sContext->withMeta);
|
||||||
return block;
|
// ASSERT(IS_META_MSG(pHead->msgType));
|
||||||
}else{
|
// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||||
continue;
|
// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
|
||||||
}
|
// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
||||||
}
|
// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
|
||||||
} else if(pInfo->sContext->withMeta){
|
// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
|
||||||
ASSERT(IS_META_MSG(pHead->msgType));
|
// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
|
||||||
qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
|
||||||
pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
|
// return NULL;
|
||||||
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
// }
|
||||||
pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
|
// }
|
||||||
pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
|
|
||||||
pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
|
|
||||||
memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->needFetchLog = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyRawScanOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroyRawScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
|
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
|
||||||
taosMemoryFreeClear(pRawScan->pCkHead);
|
|
||||||
if (pRawScan->tqReader) {
|
|
||||||
tqCloseReader(pRawScan->tqReader);
|
|
||||||
}
|
|
||||||
blockDataFreeRes(&pRawScan->pRes);
|
|
||||||
tsdbReaderClose(pRawScan->dataReader);
|
tsdbReaderClose(pRawScan->dataReader);
|
||||||
destroySnapContext(pRawScan->sContext);
|
destroySnapContext(pRawScan->sContext);
|
||||||
taosHashCleanup(pRawScan->pFilterOutTbUid);
|
|
||||||
taosMemoryFree(pRawScan);
|
taosMemoryFree(pRawScan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1688,18 +1659,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pCkHead = taosMemoryCalloc(1, sizeof(SWalCkHead) + 2048);
|
|
||||||
if (pInfo->pCkHead == NULL) {
|
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pInfo->needFetchLog = true;
|
|
||||||
pInfo->hasDataInOneFetchVer = false;
|
|
||||||
|
|
||||||
pInfo->vnode = pHandle->vnode;
|
pInfo->vnode = pHandle->vnode;
|
||||||
pInfo->pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
|
||||||
pInfo->tqReader = pHandle->tqReader;
|
|
||||||
walSetReaderCapacity(pInfo->tqReader->pWalReader, 2048);
|
|
||||||
|
|
||||||
pInfo->sContext = pHandle->sContext;
|
pInfo->sContext = pHandle->sContext;
|
||||||
pOperator->name = "RawStreamScanOperator";
|
pOperator->name = "RawStreamScanOperator";
|
||||||
|
|
Loading…
Reference in New Issue