remove queue scan
This commit is contained in:
parent
5a244d0eb5
commit
c4dcc994fb
|
@ -2957,6 +2957,7 @@ typedef struct {
|
|||
|
||||
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
|
||||
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
|
||||
void tDeleteSMqDataRsp(SMqDataRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
SMqRspHead head;
|
||||
|
|
|
@ -5889,6 +5889,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
|
||||
taosArrayDestroy(pRsp->blockDataLen);
|
||||
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
||||
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
|
||||
}
|
||||
|
||||
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
|
||||
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
|
||||
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
|
||||
|
|
|
@ -763,8 +763,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
int32_t cols = 0;
|
||||
|
||||
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&n, varDataVal(topicName));
|
||||
strcpy(varDataVal(topicName), mndGetDbStr(pTopic->name));
|
||||
/*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/
|
||||
/*tNameGetDbName(&n, varDataVal(topicName));*/
|
||||
varDataSetLen(topicName, strlen(varDataVal(topicName)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
|
||||
|
|
|
@ -67,21 +67,21 @@ typedef struct {
|
|||
// tqExec
|
||||
|
||||
typedef struct {
|
||||
char* qmsg;
|
||||
char* qmsg;
|
||||
} STqExecCol;
|
||||
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
int64_t suid;
|
||||
} STqExecTb;
|
||||
|
||||
typedef struct {
|
||||
SHashObj* pFilterOutTbUid;
|
||||
SHashObj* pFilterOutTbUid;
|
||||
} STqExecDb;
|
||||
|
||||
typedef struct {
|
||||
int8_t subType;
|
||||
|
||||
STqReader* pExecReader;
|
||||
STqReader* pExecReader;
|
||||
qTaskInfo_t task;
|
||||
union {
|
||||
STqExecCol execCol;
|
||||
|
@ -144,7 +144,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
|||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||
|
||||
// tqExec
|
||||
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp);
|
||||
int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp);
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
||||
|
||||
// tqMeta
|
||||
|
|
|
@ -357,8 +357,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
TD_VID(pTq->pVnode), formatBuf);
|
||||
} else {
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||
if (pReq->useSnapshot){
|
||||
if (pHandle->fetchMeta){
|
||||
if (pReq->useSnapshot) {
|
||||
if (pHandle->fetchMeta) {
|
||||
tqOffsetResetToMeta(&fetchOffsetNew, 0);
|
||||
} else {
|
||||
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
||||
|
@ -373,43 +373,47 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
goto OVER;
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
|
||||
" in vg %d, subkey %s, reset none failed",
|
||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
|
||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||
code = -1;
|
||||
goto OVER;
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG){
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew);
|
||||
|
||||
if(metaRsp.metaRspLen > 0){
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey,
|
||||
TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version);
|
||||
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId,
|
||||
pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||
metaRsp.rspOffset.version);
|
||||
taosMemoryFree(metaRsp.metaRsp);
|
||||
goto OVER;
|
||||
}
|
||||
|
||||
if (dataRsp.blockNum > 0){
|
||||
if (dataRsp.blockNum > 0) {
|
||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
goto OVER;
|
||||
}else{
|
||||
} else {
|
||||
fetchOffsetNew = dataRsp.rspOffset;
|
||||
}
|
||||
|
||||
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey,
|
||||
TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
|
||||
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
|
||||
}
|
||||
|
||||
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
||||
|
@ -426,7 +430,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
||||
if (consumerEpoch > reqEpoch) {
|
||||
tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
||||
", found new consumer epoch %d, discard req epoch %d",
|
||||
", found new consumer epoch %d, discard req epoch %d",
|
||||
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
|
||||
break;
|
||||
}
|
||||
|
@ -449,7 +453,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||
|
||||
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) {
|
||||
if (tqLogScanExec(pTq, pHandle, pCont, &dataRsp) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
}
|
||||
// TODO batch optimization:
|
||||
|
@ -490,18 +494,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
OVER:
|
||||
if (pCkHead) taosMemoryFree(pCkHead);
|
||||
// TODO wrap in destroy func
|
||||
taosArrayDestroy(dataRsp.blockDataLen);
|
||||
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
|
||||
|
||||
if (dataRsp.withSchema) {
|
||||
taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
}
|
||||
|
||||
if (dataRsp.withTbName) {
|
||||
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
|
||||
}
|
||||
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -629,9 +622,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
|||
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
|
||||
taosArrayDestroy(tbUidList);
|
||||
|
||||
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
|
||||
pHandle->execHandle.task =
|
||||
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
||||
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||
(SSnapContext**)(&handle.sContext));
|
||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
||||
}
|
||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
||||
|
|
|
@ -60,6 +60,46 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
||||
|
||||
qTaskInfo_t task = pExec->task;
|
||||
|
||||
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
||||
tqDebug("prepare scan failed, return");
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
} else {
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
||||
tqDebug("prepare scan failed, return");
|
||||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t rowCnt = 0;
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
tqDebug("tmqsnap task start to execute");
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
|
||||
|
||||
if (pDataBlock) {
|
||||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
|
||||
pRsp->blockNum++;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
qTaskInfo_t task = pExec->task;
|
||||
|
@ -97,23 +137,20 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
|||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
char* tbName = strdup(qExtractTbnameFromTask(task));
|
||||
taosArrayPush(pRsp->blockTbName, &tbName);
|
||||
}
|
||||
}
|
||||
if(pRsp->withSchema){
|
||||
if (pRsp->withSchema) {
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
||||
}else{
|
||||
} else {
|
||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
|
||||
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||
}
|
||||
}
|
||||
|
||||
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
|
||||
}else{
|
||||
} else {
|
||||
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
|
||||
}
|
||||
pRsp->blockNum++;
|
||||
|
@ -125,17 +162,9 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
|||
}
|
||||
}
|
||||
|
||||
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
|
||||
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
|
||||
pHandle->snapshotVer + 1);
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
|
||||
continue;
|
||||
}
|
||||
}else{
|
||||
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
|
||||
if(qStreamExtractPrepareUid(task) != 0){
|
||||
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
if (qStreamExtractPrepareUid(task) != 0) {
|
||||
continue;
|
||||
}
|
||||
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
|
||||
|
@ -143,13 +172,13 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
|||
break;
|
||||
}
|
||||
|
||||
if (pRsp->blockNum > 0){
|
||||
if (pRsp->blockNum > 0) {
|
||||
tqDebug("tmqsnap task exec exited, get data");
|
||||
break;
|
||||
}
|
||||
|
||||
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
|
||||
if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){
|
||||
if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
|
||||
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
|
||||
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
|
||||
|
@ -173,57 +202,8 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
|
||||
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
||||
qTaskInfo_t task = pExec->execCol.task[workerId];
|
||||
|
||||
if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
int32_t rowCnt = 0;
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
if (pDataBlock == NULL) break;
|
||||
|
||||
ASSERT(pDataBlock->info.rows != 0);
|
||||
ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0);
|
||||
|
||||
tqAddBlockDataToRsp(pDataBlock, pRsp);
|
||||
|
||||
if (pRsp->withTbName) {
|
||||
pRsp->withTbName = 0;
|
||||
#if 0
|
||||
int64_t uid;
|
||||
int64_t ts;
|
||||
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tqAddTbNameToRsp(pTq, uid, pRsp);
|
||||
#endif
|
||||
}
|
||||
pRsp->blockNum++;
|
||||
|
||||
rowCnt += pDataBlock->info.rows;
|
||||
if (rowCnt >= 4096) break;
|
||||
}
|
||||
int64_t uid;
|
||||
int64_t ts;
|
||||
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tqOffsetResetToData(&pRsp->rspOffset, uid, ts);
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) {
|
||||
int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp) {
|
||||
STqExecHandle* pExec = &pHandle->execHandle;
|
||||
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
|
||||
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
|
@ -268,6 +248,28 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
|||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
||||
pRsp->blockNum++;
|
||||
}
|
||||
#if 0
|
||||
if (pHandle->fetchMeta && pRsp->blockNum) {
|
||||
SSubmitMsgIter iter = {0};
|
||||
tInitSubmitMsgIter(pReq, &iter);
|
||||
STaosxRsp* pXrsp = (STaosxRsp*)pRsp;
|
||||
while (1) {
|
||||
SSubmitBlk* pBlk = NULL;
|
||||
if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1;
|
||||
if (pBlk->schemaLen > 0) {
|
||||
if (pXrsp->createTableNum == 0) {
|
||||
pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
|
||||
pXrsp->createTableReq = taosArrayInit(0, sizeof(void*));
|
||||
}
|
||||
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
|
||||
memcpy(createReq, pBlk->data, pBlk->schemaLen);
|
||||
taosArrayPush(pXrsp->createTableLen, &pBlk->schemaLen);
|
||||
taosArrayPush(pXrsp->createTableReq, &createReq);
|
||||
pXrsp->createTableNum++;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if (pRsp->blockNum == 0) {
|
||||
|
|
|
@ -143,6 +143,8 @@ typedef struct {
|
|||
STqOffsetVal prepareStatus; // for tmq
|
||||
STqOffsetVal lastStatus; // for tmq
|
||||
SMqMetaRsp metaRsp; // for tmq fetching meta
|
||||
int64_t snapshotVer;
|
||||
|
||||
SSchemaWrapper *schema;
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
SSDataBlock* pullOverBlk; // for streaming
|
||||
|
@ -486,24 +488,23 @@ typedef struct SStreamScanInfo {
|
|||
STimeWindowAggSupp twAggSup;
|
||||
SSDataBlock* pUpdateDataRes;
|
||||
// status for tmq
|
||||
// SSchemaWrapper schema;
|
||||
SNodeList* pGroupTags;
|
||||
SNode* pTagCond;
|
||||
SNode* pTagIndexCond;
|
||||
SNodeList* pGroupTags;
|
||||
SNode* pTagCond;
|
||||
SNode* pTagIndexCond;
|
||||
} SStreamScanInfo;
|
||||
|
||||
typedef struct SStreamRawScanInfo{
|
||||
// int8_t subType;
|
||||
// bool withMeta;
|
||||
// int64_t suid;
|
||||
// int64_t snapVersion;
|
||||
// void *metaInfo;
|
||||
// void *dataInfo;
|
||||
SVnode* vnode;
|
||||
SSDataBlock pRes; // result SSDataBlock
|
||||
STsdbReader* dataReader;
|
||||
SSnapContext* sContext;
|
||||
}SStreamRawScanInfo;
|
||||
typedef struct {
|
||||
// int8_t subType;
|
||||
// bool withMeta;
|
||||
// int64_t suid;
|
||||
// int64_t snapVersion;
|
||||
// void *metaInfo;
|
||||
// void *dataInfo;
|
||||
SVnode* vnode;
|
||||
SSDataBlock pRes; // result SSDataBlock
|
||||
STsdbReader* dataReader;
|
||||
SSnapContext* sContext;
|
||||
} SStreamRawScanInfo;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
SRetrieveMetaTableRsp* pRsp;
|
||||
|
@ -528,14 +529,14 @@ typedef struct SBlockDistInfo {
|
|||
SSDataBlock* pResBlock;
|
||||
void* pHandle;
|
||||
SReadHandle readHandle;
|
||||
uint64_t uid; // table uid
|
||||
uint64_t uid; // table uid
|
||||
} SBlockDistInfo;
|
||||
|
||||
// todo remove this
|
||||
typedef struct SOptrBasicInfo {
|
||||
SResultRowInfo resultRowInfo;
|
||||
SSDataBlock* pRes;
|
||||
bool mergeResultBlock;
|
||||
SResultRowInfo resultRowInfo;
|
||||
SSDataBlock* pRes;
|
||||
bool mergeResultBlock;
|
||||
} SOptrBasicInfo;
|
||||
|
||||
typedef struct SIntervalAggOperatorInfo {
|
||||
|
|
|
@ -139,7 +139,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
|||
|
||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
||||
if (msg == NULL) {
|
||||
// TODO create raw scan
|
||||
// create raw scan
|
||||
|
||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||
if (NULL == pTaskInfo) {
|
||||
|
@ -151,7 +151,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
|
|||
pTaskInfo->cost.created = taosGetTimestampMs();
|
||||
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
|
||||
pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
|
||||
if(NULL == pTaskInfo->pRoot){
|
||||
if (NULL == pTaskInfo->pRoot) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pTaskInfo);
|
||||
return NULL;
|
||||
|
@ -834,11 +834,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
if(setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid);
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
if (setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -847,27 +847,25 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
pInfo->dataReader = NULL;
|
||||
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
||||
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
|
||||
if(mtInfo.uid == 0) return 0; // no data
|
||||
if (mtInfo.uid == 0) return 0; // no data
|
||||
|
||||
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo);
|
||||
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
|
||||
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
|
||||
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL);
|
||||
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
|
||||
&pInfo->dataReader, NULL);
|
||||
|
||||
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
|
||||
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts);
|
||||
}else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
if(setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid);
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
if (setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setForSnapShot error. uid:%" PRIi64 " ,version:%" PRIi64, pOffset->uid);
|
||||
return -1;
|
||||
}
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %ld ts %ld", pOffset->uid);
|
||||
}else if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
} else if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
tsdbReaderClose(pInfo->dataReader);
|
||||
pInfo->dataReader = NULL;
|
||||
|
|
|
@ -268,7 +268,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
// add a new result set for a new group
|
||||
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
|
||||
sizeof(SResultRowPosition));
|
||||
sizeof(SResultRowPosition));
|
||||
}
|
||||
|
||||
// 2. set the new time window to be the new active time window
|
||||
|
@ -2815,92 +2815,6 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
}
|
||||
}
|
||||
}
|
||||
#if 0
|
||||
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||
uint8_t type = pOperator->operatorType;
|
||||
|
||||
pOperator->status = OP_OPENED;
|
||||
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamScanInfo* pScanInfo = pOperator->info;
|
||||
pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
|
||||
|
||||
pScanInfo->pTableScanOp->status = OP_OPENED;
|
||||
|
||||
STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
|
||||
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
|
||||
|
||||
if (uid == 0) {
|
||||
pInfo->noTable = 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/*if (pSnapShotScanInfo->dataReader == NULL) {*/
|
||||
/*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
|
||||
/*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
|
||||
/*}*/
|
||||
|
||||
pInfo->noTable = 0;
|
||||
|
||||
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||
bool found = false;
|
||||
for (int32_t i = 0; i < tableSz; i++) {
|
||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||
if (pTableInfo->uid == uid) {
|
||||
found = true;
|
||||
pInfo->currentTable = i;
|
||||
}
|
||||
}
|
||||
// TODO after processing drop, found can be false
|
||||
ASSERT(found);
|
||||
|
||||
tsdbSetTableId(pInfo->dataReader, uid);
|
||||
int64_t oldSkey = pInfo->cond.twindows.skey;
|
||||
pInfo->cond.twindows.skey = ts + 1;
|
||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||
pInfo->cond.twindows.skey = oldSkey;
|
||||
pInfo->scanTimes = 0;
|
||||
|
||||
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
|
||||
pInfo->currentTable, tableSz);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
} else {
|
||||
if (pOperator->numOfDownstream == 1) {
|
||||
return doPrepareScan(pOperator->pDownstream[0], uid, ts);
|
||||
} else if (pOperator->numOfDownstream == 0) {
|
||||
qError("failed to find stream scan operator to set the input data block");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
} else {
|
||||
qError("join not supported for stream block scan");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
|
||||
int32_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamScanInfo* pScanInfo = pOperator->info;
|
||||
STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
|
||||
*uid = pSnapShotScanInfo->lastStatus.uid;
|
||||
*ts = pSnapShotScanInfo->lastStatus.ts;
|
||||
} else {
|
||||
if (pOperator->pDownstream[0] == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
} else {
|
||||
doGetScanStatus(pOperator->pDownstream[0], uid, ts);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
// this is a blocking operator
|
||||
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||
|
@ -3024,7 +2938,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
|
|||
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pSup->pResultBuf, pPage);
|
||||
|
||||
|
||||
int32_t iter = 0;
|
||||
void* pIter = NULL;
|
||||
while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
|
||||
|
@ -3434,7 +3348,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
|
|||
|
||||
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
||||
const char* pKey) {
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
pAggSup->currentPageId = -1;
|
||||
|
@ -4294,42 +4208,6 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
return pList;
|
||||
}
|
||||
|
||||
#if 0
|
||||
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, const char* idstr) {
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
|
||||
code = 0;
|
||||
qDebug("no table qualified for query, %s", idstr);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SQueryTableDataCond cond = {0};
|
||||
code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
STsdbReader* pReader;
|
||||
code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
cleanupQueryTableDataCond(&cond);
|
||||
|
||||
return pReader;
|
||||
|
||||
_error:
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
|
|
|
@ -1219,7 +1219,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
|
|||
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
||||
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
||||
|
||||
|
@ -1228,7 +1228,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||
pInfo->pRes->info.version = pBlock->info.version;
|
||||
|
||||
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||
uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||
if (groupIdPre) {
|
||||
pInfo->pRes->info.groupId = *groupIdPre;
|
||||
} else {
|
||||
|
@ -1334,9 +1334,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
} else if (ret.fetchType == FETCH_TYPE__META) {
|
||||
ASSERT(0);
|
||||
// pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
// pTaskInfo->streamInfo.metaBlk = ret.meta;
|
||||
// return NULL;
|
||||
// pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
// pTaskInfo->streamInfo.metaBlk = ret.meta;
|
||||
// return NULL;
|
||||
} else if (ret.fetchType == FETCH_TYPE__NONE) {
|
||||
pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
|
||||
|
@ -1554,14 +1554,14 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
|||
}
|
||||
|
||||
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
|
||||
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
|
||||
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
|
||||
|
||||
qDebug("tmqsnap doRawScan called");
|
||||
if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){
|
||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SSDataBlock* pBlock = &pInfo->pRes;
|
||||
|
||||
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
|
||||
|
@ -1585,42 +1585,38 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
|
||||
if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal
|
||||
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
|
||||
qDebug("tmqsnap read snapshot done, change to get data from wal");
|
||||
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
|
||||
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
||||
pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
|
||||
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
}else{
|
||||
} else {
|
||||
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
|
||||
pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
|
||||
qDebug("tmqsnap change get data uid:%ld", mtInfo.uid);
|
||||
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
|
||||
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
|
||||
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||
}
|
||||
qDebug("tmqsnap stream scan tsdb return null");
|
||||
return NULL;
|
||||
}else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){
|
||||
SSnapContext *sContext = pInfo->sContext;
|
||||
void* data = NULL;
|
||||
int32_t dataLen = 0;
|
||||
int16_t type = 0;
|
||||
int64_t uid = 0;
|
||||
if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){
|
||||
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
void* data = NULL;
|
||||
int32_t dataLen = 0;
|
||||
int16_t type = 0;
|
||||
int64_t uid = 0;
|
||||
if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
|
||||
qError("tmqsnap getMetafromSnapShot error");
|
||||
taosMemoryFreeClear(data);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if(!sContext->queryMetaOrData){ // change to get data next poll request
|
||||
if (!sContext->queryMetaOrData) { // change to get data next poll request
|
||||
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
|
||||
pTaskInfo->streamInfo.lastStatus.uid = uid;
|
||||
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
|
||||
pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
|
||||
pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
|
||||
}else{
|
||||
} else {
|
||||
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
|
||||
pTaskInfo->streamInfo.lastStatus.uid = uid;
|
||||
pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
|
||||
|
@ -1631,44 +1627,44 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|||
|
||||
return NULL;
|
||||
}
|
||||
// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
||||
// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
|
||||
//
|
||||
// while(1){
|
||||
// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
|
||||
// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
|
||||
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
||||
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
||||
// return NULL;
|
||||
// }
|
||||
// SWalCont* pHead = &pInfo->pCkHead->head;
|
||||
// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
|
||||
//
|
||||
// if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
// SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||
// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
|
||||
// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes);
|
||||
// if(block){
|
||||
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
||||
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
||||
// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||
// return block;
|
||||
// }else{
|
||||
// fetchVer++;
|
||||
// }
|
||||
// } else{
|
||||
// ASSERT(pInfo->sContext->withMeta);
|
||||
// ASSERT(IS_META_MSG(pHead->msgType));
|
||||
// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||
// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
|
||||
// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
||||
// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
|
||||
// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
|
||||
// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
|
||||
// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
|
||||
// return NULL;
|
||||
// }
|
||||
// }
|
||||
// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
||||
// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
|
||||
//
|
||||
// while(1){
|
||||
// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
|
||||
// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
|
||||
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
||||
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
||||
// return NULL;
|
||||
// }
|
||||
// SWalCont* pHead = &pInfo->pCkHead->head;
|
||||
// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
|
||||
//
|
||||
// if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
// SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||
// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
|
||||
// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
|
||||
// &pInfo->pRes); if(block){
|
||||
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
||||
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
||||
// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||
// return block;
|
||||
// }else{
|
||||
// fetchVer++;
|
||||
// }
|
||||
// } else{
|
||||
// ASSERT(pInfo->sContext->withMeta);
|
||||
// ASSERT(IS_META_MSG(pHead->msgType));
|
||||
// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||
// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
|
||||
// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
||||
// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
|
||||
// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
|
||||
// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
|
||||
// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
|
||||
// return NULL;
|
||||
// }
|
||||
// }
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1689,7 +1685,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
|||
// create tq reader
|
||||
|
||||
SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -1699,13 +1695,12 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
|||
|
||||
pInfo->sContext = pHandle->sContext;
|
||||
pOperator->name = "RawStreamScanOperator";
|
||||
// pOperator->blocking = false;
|
||||
// pOperator->status = OP_NOT_OPENED;
|
||||
// pOperator->blocking = false;
|
||||
// pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
|
@ -1724,7 +1719,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
|||
}
|
||||
if (pStreamScan->pPseudoExpr) {
|
||||
destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
|
||||
taosMemoryFreeClear(pStreamScan->pPseudoExpr);
|
||||
taosMemoryFree(pStreamScan->pPseudoExpr);
|
||||
}
|
||||
|
||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||
|
|
Loading…
Reference in New Issue