diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 4eea744be1..ae48876db2 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -157,6 +157,7 @@ typedef struct SQueryTableDataCond { STimeWindow twindows; int64_t startVersion; int64_t endVersion; + int64_t schemaVersion; } SQueryTableDataCond; int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 102dcc3fb0..f96095eda6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2589,12 +2589,7 @@ enum { typedef struct { int8_t type; union { - // snapshot meta - struct { - int64_t muid; - int64_t mversion; - }; - // snapshot data + // snapshot struct { int64_t uid; int64_t ts; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index b31df3927d..6d243bb8d8 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -43,6 +43,9 @@ typedef struct SReadHandle { int32_t numOfVgroups; void* sContext; // SSnapContext* + void* pWalReader; + SHashObj *pFilterOutTbUid; + } SReadHandle; // in queue mode, data streams are seperated by msg @@ -178,7 +181,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); -void* qStreamExtractMetaMsg(qTaskInfo_t tinfo); +SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); void* qExtractReaderFromStreamScanner(void* scanner); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d0f9e1dc4b..e3b758ba4e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5617,7 +5617,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) { return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { - return pLeft->muid == pRight->muid && pLeft->mversion == pRight->mversion; + return pLeft->uid == pRight->uid; } else { ASSERT(0); /*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/ diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3f47f2947a..c84b3e6972 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -159,7 +159,6 @@ typedef struct SMetaTableInfo{ }SMetaTableInfo; typedef struct SSnapContext { - SMeta *pMeta; int64_t snapVersion; TBC *pCur; @@ -167,6 +166,7 @@ typedef struct SSnapContext { int8_t subType; SHashObj *idVersion; SHashObj *suidInfo; + bool withMeta; bool queryMetaOrData; // true-get meta, false-get data }SSnapContext; @@ -204,6 +204,8 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); +int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, SWalCkHead** ppCkHead); +SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block); // sma int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); @@ -217,11 +219,11 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); -int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext* ctx); -int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type); +int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet); +int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx); -int32_t setMetaForSnapShot(SSnapContext* ctx, int64_t uid, int64_t ver); -int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid); +int32_t setForSnapShot(SSnapContext* ctx, int64_t uid); +int32_t destroySnapContext(SSnapContext* ctx); // structs struct STsdbCfg { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1a423f041c..99ac85c3c3 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,7 +88,7 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; - int32_t numOfCols; // number of out pout column, temporarily used +// int32_t numOfCols; // number of out pout column, temporarily used SSchemaWrapper* pSchemaWrapper; // columns that are involved in query } STqExecHandle; @@ -101,9 +101,6 @@ typedef struct { int64_t snapshotVer; - SSnapContext* sContext; - // TODO remove - SWalReader* pWalReader; SWalRef* pRef; @@ -138,11 +135,8 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); // tqRead -int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* offset); -int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); +int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); -// tqExec -int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); // tqMeta @@ -176,10 +170,9 @@ static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t u pOffsetVal->ts = ts; } -static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid, int64_t version) { +static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) { pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META; - pOffsetVal->muid = uid; - pOffsetVal->mversion = version; + pOffsetVal->uid = uid; } static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index eb6de02e97..cdadc0a55d 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -209,12 +209,22 @@ static void destroySTableInfoForChildTable(void* data) { tDeleteSSchemaWrapper(pData->schemaRow); } -int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext* ctx){ +static void clearAndMoveToFirst(SSnapContext* ctx){ + tdbTbcClose(ctx->pCur); + tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL); + tdbTbcMoveToFirst(ctx->pCur); +} + +int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet){ + SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext)); + if(ctx == NULL) return -1; + *ctxRet = ctx; ctx->pMeta = pMeta; ctx->snapVersion = snapVersion; ctx->suid = suid; ctx->subType = subType; ctx->queryMetaOrData = withMeta; + ctx->withMeta = withMeta; int32_t ret = tdbTbcOpen(pMeta->pTbDb, &ctx->pCur, NULL); if (ret < 0) { return -1; @@ -243,7 +253,7 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t if(tmp->version > ctx->snapVersion) break; taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &tmp->version, sizeof(int64_t)); } - tdbTbcMoveToFirst(ctx->pCur); + clearAndMoveToFirst(ctx); return TDB_CODE_SUCCESS; } @@ -251,7 +261,7 @@ int32_t destroySnapContext(SSnapContext* ctx){ tdbTbcClose(ctx->pCur); taosHashCleanup(ctx->idVersion); taosHashCleanup(ctx->suidInfo); - + taosMemoryFree(ctx); return 0; } @@ -334,25 +344,15 @@ static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){ taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); } -int32_t setMetaForSnapShot(SSnapContext* ctx, int64_t uid, int64_t ver){ +int32_t setForSnapShot(SSnapContext* ctx, int64_t uid){ int c = 0; - ctx->queryMetaOrData = true; // change to get data - if(uid == 0 && ver == 0){ - tdbTbcMoveToFirst(ctx->pCur); + + if(uid == -1){ return c; } - STbDbKey key = {.version = ver, .uid = uid}; - tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c); - - return c; -} - -int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid){ - int c = 0; - ctx->queryMetaOrData = false; // change to get data if(uid == 0){ - tdbTbcMoveToFirst(ctx->pCur); + clearAndMoveToFirst(ctx); return c; } @@ -367,7 +367,7 @@ int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid){ return c; } -int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type){ +int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid){ int32_t ret = 0; void *pKey = NULL; void *pVal = NULL; @@ -377,13 +377,13 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); if (ret < 0) { ctx->queryMetaOrData = false; // change to get data - tdbTbcMoveToFirst(ctx->pCur); + clearAndMoveToFirst(ctx); return 0; } STbDbKey *tmp = (STbDbKey*)pKey; if(tmp->version > ctx->snapVersion) { - tdbTbcMoveToFirst(ctx->pCur); + clearAndMoveToFirst(ctx); ctx->queryMetaOrData = false; // change to get data return 0; } @@ -394,6 +394,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in } ASSERT(*ver == tmp->version); + *uid = tmp->uid; SDecoder dc = {0}; SMetaEntry me = {0}; tDecoderInit(&dc, pVal, vLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c999e58eb4..b755fcd4ff 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -294,12 +294,10 @@ static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; - int64_t timeout = pReq->timeout; int32_t reqEpoch = pReq->epoch; int32_t code = 0; STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal fetchOffsetNew; - SWalCkHead* pCkHead = NULL; // 1.find handle STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); @@ -329,6 +327,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); + SMqMetaRsp metaRsp = {0}; SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); @@ -347,7 +346,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (pReq->useSnapshot){ if (pHandle->fetchMeta){ - tqOffsetResetToMeta(&fetchOffsetNew, 0, 0); + tqOffsetResetToMeta(&fetchOffsetNew, 0); } else { tqOffsetResetToData(&fetchOffsetNew, 0, 0); } @@ -373,151 +372,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } } - // 3.query - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - /*if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {*/ - /*fetchOffsetNew.version++;*/ - /*}*/ - if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { - ASSERT(0); - code = -1; - goto OVER; - } - if (dataRsp.blockNum == 0) { - // TODO add to async task pool - /*dataRsp.rspOffset.version--;*/ - } + tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew); + + if(dataRsp.blockNum != 0){ if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } goto OVER; } - if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA)) { - if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { - ASSERT(0); - } - if (dataRsp.blockNum == 0) { - // TODO add to async task pool - } - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; - } - goto OVER; - } - if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META)) { - SSnapContext* sContext = pHandle->sContext; - if(setMetaForSnapShot(sContext, fetchOffsetNew.muid, fetchOffsetNew.mversion) != 0) { - qError("setMetaForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, fetchOffsetNew.muid, fetchOffsetNew.mversion); - code = -1; - goto OVER; - } - void* data = NULL; - int32_t dataLen = 0; - int16_t type = 0; - if(getMetafromSnapShot(sContext, &data, &dataLen, &type) < 0){ - qError("getMetafromSnapShot error"); - taosMemoryFreeClear(data); - code = -1; - goto OVER; - } - - if(!sContext->queryMetaOrData){ // change to get data next poll request - fetchOffsetNew.type = TMQ_OFFSET__SNAPSHOT_DATA; - fetchOffsetNew.uid = 0; - fetchOffsetNew.ts = 0; - } - SMqMetaRsp metaRsp = {0}; - metaRsp.rspOffset = fetchOffsetNew; - metaRsp.resMsgType = type; - metaRsp.metaRspLen = dataLen; - metaRsp.metaRsp = data; + if(metaRsp.metaRspLen > 0){ if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { - taosMemoryFreeClear(data); code = -1; - goto OVER; } - taosMemoryFreeClear(data); - code = 0; goto OVER; } - if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__LOG)) { - int64_t fetchVer = fetchOffsetNew.version + 1; - pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); - if (pCkHead == NULL) { - code = -1; - goto OVER; - } - - walSetReaderCapacity(pHandle->pWalReader, 2048); - - while (1) { - consumerEpoch = atomic_load_32(&pHandle->epoch); - if (consumerEpoch > reqEpoch) { - tqWarn("tmq poll: consumer %ld (epoch %d), subkey %s, vg %d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", - consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); - break; - } - - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { - // TODO add push mgr - - tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); - ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version); - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; - } - goto OVER; - } - - SWalCont* pHead = &pCkHead->head; - - tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, - pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); - - if (pHead->msgType == TDMT_VND_SUBMIT) { - SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - - if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) { - /*ASSERT(0);*/ - } - // TODO batch optimization: - // TODO continue scan until meeting batch requirement - if (dataRsp.blockNum > 0 /* threshold */) { - tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); - ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version); - - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; - } - goto OVER; - } else { - fetchVer++; - } - - } else { - ASSERT(pHandle->fetchMeta); - ASSERT(IS_META_MSG(pHead->msgType)); - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); - SMqMetaRsp metaRsp = {0}; - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); - metaRsp.resMsgType = pHead->msgType; - metaRsp.metaRspLen = pHead->bodyLen; - metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { - code = -1; - goto OVER; - } - code = 0; - goto OVER; - } - } - } - + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, no data", consumerId, pHandle->subKey, + TD_VID(pTq->pVnode)); OVER: - if (pCkHead) taosMemoryFree(pCkHead); + // TODO wrap in destroy func taosArrayDestroy(dataRsp.blockDataLen); taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); @@ -530,6 +404,8 @@ OVER: taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree); } + taosMemoryFreeClear(metaRsp.metaRsp); + return code; } @@ -604,7 +480,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { req.qmsg = NULL; pHandle->execHandle.task = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, NULL, &pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.task); void* scanner = NULL; @@ -613,25 +489,17 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, handle.sContext); - pHandle->sContext = handle.sContext; + buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); + handle.tqReader = pHandle->execHandle.pExecReader; + handle.pWalReader = ((STqReader*)handle.tqReader)->pWalReader; + handle.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid; pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - - buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, handle.sContext); - pHandle->sContext = handle.sContext; - - pHandle->execHandle.execTb.suid = req.suid; - pHandle->execHandle.task = - qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); @@ -642,6 +510,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); + + buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); + handle.tqReader = pHandle->execHandle.pExecReader; + handle.pWalReader = ((STqReader*)handle.tqReader)->pWalReader; + pHandle->execHandle.task = + qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %ld", req.subKey, pHandle->consumerId); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index d3faa668b4..7fc9838f2b 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -60,7 +60,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { return 0; } -int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { +int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; @@ -91,16 +91,20 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa if (pDataBlock != NULL) { if (pRsp->withTbName) { + int64_t uid = 0; if (pOffset->type == TMQ_OFFSET__LOG) { - int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { - continue; - } + uid = pExec->pExecReader->msgIter.uid; } else { - pRsp->withTbName = 0; + uid = pDataBlock->info.uid; + } + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + continue; } } - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + if(pRsp->withSchema){ + tqAddBlockSchemaToRsp(pExec, pRsp); + } + tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) { continue; @@ -108,17 +112,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa rowCnt += pDataBlock->info.rows; if (rowCnt <= 4096) continue; } - } else { - if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ - SMetaTableInfo mtInfo = getUidfromSnapShot(pHandle->sContext); - if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal - - }else{ - pOffset->uid = mtInfo.uid; - qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - continue; - } - } } if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -128,27 +121,19 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa continue; } - void* meta = qStreamExtractMetaMsg(task); - if (meta != NULL) { - // tq add meta to rsp + if (pRsp->blockNum > 0){ + qStreamExtractOffset(task, &pRsp->rspOffset); + break; } - if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { - ASSERT(0); + SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); + if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){ + qStreamPrepareScan(task, &tmp->rspOffset, pHandle->execHandle.subType); + tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; + continue; } - ASSERT(pRsp->rspOffset.type != 0); - -#if 0 - if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - if (pRsp->blockNum > 0) { - ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); - } else { - ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); - } - } -#endif - + *pMetaRsp = *tmp; tqDebug("task exec exited"); break; } @@ -206,56 +191,22 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S } #endif -int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) { - ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); - - if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { - pRsp->withSchema = 1; - STqReader* pReader = pExec->pExecReader; - tqReaderSetDataMsg(pReader, pReq, 0); +SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block) { + if (subType == TOPIC_SUB_TYPE__TABLE) { while (tqNextDataBlock(pReader)) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock(&block, pReader) < 0) { + if (tqRetrieveDataBlock(block, pReader) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } - if (pRsp->withTbName) { - int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { - blockDataFreeRes(&block); - continue; - } - } - tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); - blockDataFreeRes(&block); - tqAddBlockSchemaToRsp(pExec, pRsp); - pRsp->blockNum++; + return block; } - } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { - pRsp->withSchema = 1; - STqReader* pReader = pExec->pExecReader; - tqReaderSetDataMsg(pReader, pReq, 0); - while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock(&block, pReader) < 0) { + } else if (subType == TOPIC_SUB_TYPE__DB) { + while (tqNextDataBlockFilterOut(pReader, pFilterOutTbUid)) { + if (tqRetrieveDataBlock(block, pReader) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } - if (pRsp->withTbName) { - int64_t uid = pExec->pExecReader->msgIter.uid; - if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { - blockDataFreeRes(&block); - continue; - } - } - tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); - blockDataFreeRes(&block); - tqAddBlockSchemaToRsp(pExec, pRsp); - pRsp->blockNum++; + return block; } } - if (pRsp->blockNum == 0) { - return -1; - } - - return 0; + return NULL; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 9079be6099..2ccea3449a 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -70,25 +70,35 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { } walRefVer(handle.pRef, handle.snapshotVer); + SReadHandle reader = { + .meta = pTq->pVnode->pMeta, + .vnode = pTq->pVnode, + .initTableReader = true, + .initTqReader = true, + .version = handle.snapshotVer, + }; + if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - SReadHandle reader = { - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, - .initTableReader = true, - .initTqReader = true, - .version = handle.snapshotVer, - }; handle.execHandle.task = qCreateQueueExecTaskInfo( - handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); + handle.execHandle.execCol.qmsg, &reader, NULL, &handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.task); void* scanner = NULL; qExtractStreamScanner(handle.execHandle.task, &scanner); ASSERT(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(handle.execHandle.pExecReader); - } else { - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + } else if(handle.execHandle.subType == TOPIC_SUB_TYPE__DB){ + + handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); + buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); + reader.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + reader.tqReader = handle.execHandle.pExecReader; + reader.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid; + + handle.execHandle.task = + qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); + handle.execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 45322a1fb7..04a40451ec 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -15,22 +15,21 @@ #include "tq.h" -int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { +int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int32_t code = 0; - taosThreadMutexLock(&pHandle->pWalReader->mutex); + taosThreadMutexLock(&pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { - if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", - pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); + if (walFetchHead(pWalReader, offset, *ppCkHead) < 0) { + tqDebug("tmq poll no more log to return"); *fetchOffset = offset - 1; code = -1; goto END; } if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { - code = walFetchBody(pHandle->pWalReader, ppCkHead); + code = walFetchBody(pWalReader, ppCkHead); if (code < 0) { ASSERT(0); @@ -42,10 +41,10 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea code = 0; goto END; } else { - if (pHandle->fetchMeta) { + if (fetchMeta) { SWalCont* pHead = &((*ppCkHead)->head); if (IS_META_MSG(pHead->msgType)) { - code = walFetchBody(pHandle->pWalReader, ppCkHead); + code = walFetchBody(pWalReader, ppCkHead); if (code < 0) { ASSERT(0); @@ -58,7 +57,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea goto END; } } - code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); + code = walSkipFetchBody(pWalReader, *ppCkHead); if (code < 0) { ASSERT(0); *fetchOffset = offset; @@ -69,7 +68,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea } } END: - taosThreadMutexUnlock(&pHandle->pWalReader->mutex); + taosThreadMutexUnlock(&pWalReader->mutex); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c003f5a63f..a88da59d4a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2581,10 +2581,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } if (pCond->suid != 0) { - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, pCond->schemaVersion); } else if (taosArrayGetSize(pTableList) > 0) { STableKeyInfo* pKey = taosArrayGet(pTableList, 0); - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, pCond->schemaVersion); } int32_t numOfTables = taosArrayGetSize(pTableList); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e067caf950..b33c244a5c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -149,7 +149,7 @@ typedef struct { //TODO remove prepareStatus STqOffsetVal prepareStatus; // for tmq STqOffsetVal lastStatus; // for tmq - void* metaBlk; // for tmq fetching meta + SMqMetaRsp metaRsp; // for tmq fetching meta SSDataBlock* pullOverBlk; // for streaming SWalFilterCond cond; int64_t lastScanUid; @@ -498,7 +498,8 @@ typedef struct SStreamRawScanInfo{ // void *metaInfo; // void *dataInfo; - SReadHandle * readHandle; + SWalCkHead* pCkHead; + SReadHandle* readHandle; SSDataBlock pRes; // result SSDataBlock uint64_t groupId; STsdbReader* dataReader; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 96c20d6136..d8b0983811 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -847,6 +847,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = -1; + pCond->schemaVersion = -1; // pCond->type = pTableScanNode->scanFlag; int32_t j = 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a9a7dfbee0..51caa33093 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -154,13 +154,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n // extract the number of output columns SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc; - *numOfCols = 0; + if(numOfCols) *numOfCols = 0; SNode* pNode; FOREACH(pNode, pDescNode->pSlots) { SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; if (pSlotDesc->output) { - ++(*numOfCols); + if(numOfCols) ++(*numOfCols); } } @@ -585,10 +585,10 @@ const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) { return pInfo->tqReader->pSchemaWrapper; } -void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { +SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); - return pTaskInfo->streamInfo.metaBlk; + return &pTaskInfo->streamInfo.metaRsp; } int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { @@ -613,6 +613,7 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = sContext->snapVersion; + pCond->schemaVersion = sContext->snapVersion; for (int32_t i = 0; i < pCond->numOfCols; ++i) { pCond->colList[i].type = mtInfo.schema->pSchema[i].type; @@ -722,7 +723,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT }else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; - if(setDataForSnapShot(sContext, pOffset->uid) != 0) { + if(setForSnapShot(sContext, pOffset->uid) != 0) { qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid); return -1; } @@ -732,6 +733,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if(pOffset->ts == 0) pOffset->ts = INT64_MIN; if (pOffset->uid == 0) { + qError("setDataForSnapShot error. uid = 0 "); return -1; } @@ -746,6 +748,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderOpen(pInfo->readHandle->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts); + }else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){ + SStreamRawScanInfo* pInfo = pOperator->info; + SSnapContext* sContext = pInfo->sContext; + if(setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid); + return -1; + } + }else if (pOffset->type == TMQ_OFFSET__LOG) { + SStreamRawScanInfo* pInfo = pOperator->info; + tsdbReaderClose(pInfo->dataReader); + pInfo->dataReader = NULL; } return 0; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e52cbf40a9..2a65156bb3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3924,6 +3924,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = -1; + pCond->schemaVersion = -1; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fd6c7876b9..9744b14870 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1456,45 +1456,140 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamRawScanInfo* pInfo = pOperator->info; + pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta qDebug("stream scan called"); - ASSERT(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA); + if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){ + SSDataBlock* pBlock = &pInfo->pRes; - SSDataBlock* pBlock = &pInfo->pRes; + while (tsdbNextDataBlock(pInfo->dataReader)) { + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } - while (tsdbNextDataBlock(pInfo->dataReader)) { - if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + SDataBlockInfo binfo = pBlock->info; + tsdbRetrieveDataBlockInfo(pInfo->dataReader, &binfo); + + pBlock->info = binfo; + + SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); + pBlock->pDataBlock = pCols; + if (pCols == NULL) { + SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext); + if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal + return NULL; + }else{ + pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; + qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); + continue; + } + } + + pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; + pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; + pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; + + return pBlock; } - - SDataBlockInfo binfo = pBlock->info; - tsdbRetrieveDataBlockInfo(pInfo->dataReader, &binfo); - - pBlock->info = binfo; - - SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); - if (pCols == NULL) { + qDebug("stream scan tsdb return null"); + return NULL; + }else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){ + SSnapContext *sContext = pInfo->sContext; + void* data = NULL; + int32_t dataLen = 0; + int16_t type = 0; + int64_t uid = 0; + if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){ + qError("getMetafromSnapShot error"); + taosMemoryFreeClear(data); return NULL; } -// size_t numOfSrcCols = taosArrayGetSize(pCols); -// for (int i = 0; i < taosArrayGetSize(pCols); i++) { -// SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId); -// colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info); -// } + if(!sContext->queryMetaOrData){ // change to get data next poll request + pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; + pTaskInfo->streamInfo.lastStatus.uid = uid; + pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA; + pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0; + pTaskInfo->streamInfo.metaRsp.rspOffset.ts = 0; + }else{ + pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; + pTaskInfo->streamInfo.lastStatus.uid = uid; + pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus; + pTaskInfo->streamInfo.metaRsp.resMsgType = type; + pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen; + pTaskInfo->streamInfo.metaRsp.metaRsp = data; + } - pBlock->pDataBlock = pCols; + return NULL; + }else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { + if(pInfo->pCkHead == NULL){ + pInfo->pCkHead = taosMemoryCalloc(1, sizeof(SWalCkHead) + 2048); + if (pInfo->pCkHead == NULL) { - pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; - pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; - pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; + } + walSetReaderCapacity(pInfo->readHandle->pWalReader, 2048); + } - return pBlock; + int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version; + + SWalCont* pHead = &pInfo->pCkHead->head; + if(pHead->msgType != TDMT_VND_SUBMIT){ + fetchVer++; + if (tqFetchLog(pInfo->readHandle->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { + return NULL; + } + qDebug("tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); + pHead = &pInfo->pCkHead->head; + + if(pHead->msgType == TDMT_VND_SUBMIT){ + SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + tqReaderSetDataMsg(pInfo->readHandle->tqReader, pCont, 0); + }else if(pInfo->sContext->withMeta){ + ASSERT(IS_META_MSG(pHead->msgType)); + qDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; + pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; + pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; + pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; + pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); + memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); + return NULL; + } + } + + if (pHead->msgType == TDMT_VND_SUBMIT) { + while(1){ + blockDataFreeRes(&pInfo->pRes); + SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->readHandle->tqReader, pInfo->readHandle->pFilterOutTbUid, &pInfo->pRes); + if(!block){ + fetchVer++; + if (tqFetchLog(pInfo->readHandle->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { + return NULL; + } + pHead = &pInfo->pCkHead->head; + SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + tqReaderSetDataMsg(pInfo->readHandle->tqReader, pCont, 0); + } + return block; + } + } } - qDebug("stream scan tsdb return null"); return NULL; } +static void destroyRawScanOperatorInfo(void* param, int32_t numOfOutput) { + SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param; + taosMemoryFreeClear(pRawScan->pCkHead); + if (pRawScan->readHandle->tqReader) { + tqCloseReader(pRawScan->readHandle->tqReader); + } + blockDataFreeRes(&pRawScan->pRes); + tsdbReaderClose(pRawScan->dataReader); + destroySnapContext(pRawScan->sContext); + taosHashCleanup(pRawScan->readHandle->pFilterOutTbUid); + taosMemoryFree(pRawScan); +} + // for subscribing db or stb (not including column), // if this scan is used, meta data can be return // and schemas are decided when scanning @@ -1519,7 +1614,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, NULL, + pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL); return pOperator; }