From c3cd858a6396ccf90c9d3948c1463ca7c5ecbbd0 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 4 Aug 2022 15:01:59 +0800 Subject: [PATCH] feat:add snapshot for tmq in stable and db --- include/common/tmsg.h | 30 +- include/libs/executor/executor.h | 4 +- source/client/src/tmq.c | 8 +- source/common/src/tmsg.c | 21 +- source/dnode/vnode/inc/vnode.h | 23 ++ source/dnode/vnode/src/inc/tq.h | 13 +- source/dnode/vnode/src/meta/metaSnapshot.c | 315 +++++++++++++++++++++ source/dnode/vnode/src/tq/tq.c | 133 ++++++--- source/dnode/vnode/src/tq/tqExec.c | 19 +- source/dnode/vnode/src/tq/tqMeta.c | 6 +- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/libs/executor/inc/executorimpl.h | 17 ++ source/libs/executor/src/executor.c | 226 +++++++++------ source/libs/executor/src/scanoperator.c | 80 +++++- 14 files changed, 719 insertions(+), 178 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7eafc4c3d8..102dcc3fb0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2589,6 +2589,11 @@ enum { typedef struct { int8_t type; union { + // snapshot meta + struct { + int64_t muid; + int64_t mversion; + }; // snapshot data struct { int64_t uid; @@ -2913,33 +2918,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { typedef struct { SMqRspHead head; - int64_t reqOffset; - int64_t rspOffset; - STqOffsetVal reqOffsetNew; - STqOffsetVal rspOffsetNew; + STqOffsetVal rspOffset; int16_t resMsgType; int32_t metaRspLen; void* metaRsp; } SMqMetaRsp; -static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); - tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); - tlen += taosEncodeFixedI16(buf, pRsp->resMsgType); - tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen); - tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) { - buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); - buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); - buf = taosDecodeFixedI16(buf, &pRsp->resMsgType); - buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen); - buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen); - return (void*)buf; -} +int32_t tEncodeSMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp); +int32_t tDecodeSMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp); typedef struct { SMqRspHead head; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e15708e357..b31df3927d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -41,6 +41,8 @@ typedef struct SReadHandle { bool initTableReader; bool initTqReader; int32_t numOfVgroups; + + void* sContext; // SSnapContext* } SReadHandle; // in queue mode, data streams are seperated by msg @@ -172,7 +174,7 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); -int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset); +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index bdd8c75f26..9c06568d62 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1210,7 +1210,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); - tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp); + SDecoder decoder; + tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); + tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp); + tDecoderClear(&decoder); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); } @@ -1758,8 +1761,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, * rspMsg->msg.rspOffset);*/ - pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset; - pVg->currentOffsetNew.type = TMQ_OFFSET__LOG; + pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 05b27546eb..d0f9e1dc4b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5603,8 +5603,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts); - } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) { - snprintf(buf, maxLen, "offset(ss meta) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts); } else { ASSERT(0); } @@ -5619,9 +5617,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) { return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { - ASSERT(0); - // TODO - return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; + return pLeft->muid == pRight->muid && pLeft->mversion == pRight->mversion; } else { ASSERT(0); /*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/ @@ -5706,6 +5702,21 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1; return 0; } + +int32_t tEncodeSMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp) { + if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; + if(tEncodeI16(pEncoder, pRsp->resMsgType)) return -1; + if(tEncodeBinary(pEncoder, pRsp->metaRsp, pRsp->metaRspLen)) return -1; + return 0; +} + +int32_t tDecodeSMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp) { + if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; + if (tDecodeI16(pDecoder, &pRsp->resMsgType) < 0) return -1; + if (tDecodeBinaryAlloc(pDecoder, &pRsp->metaRsp, (uint64_t*)&pRsp->metaRspLen) < 0) return -1; + return 0; +} + int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 66cfcd4f33..3f47f2947a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -152,6 +152,23 @@ void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); size_t tsdbCacheGetCapacity(SVnode *pVnode); // tq +typedef struct SMetaTableInfo{ + int64_t suid; + int64_t uid; + SSchemaWrapper *schema; +}SMetaTableInfo; + +typedef struct SSnapContext { + + SMeta *pMeta; + int64_t snapVersion; + TBC *pCur; + int64_t suid; + int8_t subType; + SHashObj *idVersion; + SHashObj *suidInfo; + bool queryMetaOrData; // true-get meta, false-get data +}SSnapContext; typedef struct STqReader { int64_t ver; @@ -200,6 +217,12 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); +int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext* ctx); +int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type); +SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx); +int32_t setMetaForSnapShot(SSnapContext* ctx, int64_t uid, int64_t ver); +int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid); + // structs struct STsdbCfg { int8_t precision; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 44b9d1f69c..1a423f041c 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -68,21 +68,21 @@ typedef struct { typedef struct { char* qmsg; - qTaskInfo_t task; } STqExecCol; typedef struct { - int64_t suid; + int64_t suid; } STqExecTb; typedef struct { - SHashObj* pFilterOutTbUid; + SHashObj* pFilterOutTbUid; } STqExecDb; typedef struct { int8_t subType; STqReader* pExecReader; + qTaskInfo_t task; union { STqExecCol execCol; STqExecTb execTb; @@ -101,6 +101,7 @@ typedef struct { int64_t snapshotVer; + SSnapContext* sContext; // TODO remove SWalReader* pWalReader; @@ -175,6 +176,12 @@ static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t u pOffsetVal->ts = ts; } +static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid, int64_t version) { + pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META; + pOffsetVal->muid = uid; + pOffsetVal->mversion = version; +} + static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { pOffsetVal->type = TMQ_OFFSET__LOG; pOffsetVal->version = ver; diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index e01f0e7c01..eb6de02e97 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -195,3 +195,318 @@ _err: metaError("vgId:%d, vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); return code; } + +typedef struct STableInfoForChildTable{ + char *tableName; + SArray *tagName; + SSchemaWrapper *schemaRow; +}STableInfoForChildTable; + +static void destroySTableInfoForChildTable(void* data) { + STableInfoForChildTable* pData = (STableInfoForChildTable*)data; + taosMemoryFree(pData->tagName); + taosArrayDestroy(pData->tagName); + tDeleteSSchemaWrapper(pData->schemaRow); +} + +int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext* ctx){ + ctx->pMeta = pMeta; + ctx->snapVersion = snapVersion; + ctx->suid = suid; + ctx->subType = subType; + ctx->queryMetaOrData = withMeta; + int32_t ret = tdbTbcOpen(pMeta->pTbDb, &ctx->pCur, NULL); + if (ret < 0) { + return -1; + } + ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if(ctx->idVersion == NULL){ + return -1; + } + + ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if(ctx->suidInfo == NULL){ + return -1; + } + taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable); + + void *pKey = NULL; + void *pVal = NULL; + int vLen, kLen; + + tdbTbcMoveToFirst(ctx->pCur); + while(1){ + ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); + if (ret < 0) break; + + STbDbKey *tmp = (STbDbKey*)pKey; + if(tmp->version > ctx->snapVersion) break; + taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &tmp->version, sizeof(int64_t)); + } + tdbTbcMoveToFirst(ctx->pCur); + return TDB_CODE_SUCCESS; +} + +int32_t destroySnapContext(SSnapContext* ctx){ + tdbTbcClose(ctx->pCur); + taosHashCleanup(ctx->idVersion); + taosHashCleanup(ctx->suidInfo); + + return 0; +} + +static int32_t buildNormalChildTableInfo(SVCreateTbReq *req, void **pBuf, int32_t *contLen){ + int32_t ret = 0; + SVCreateTbBatchReq reqs = {}; + + reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); + if (NULL == reqs.pArray){ + ret = -1; + goto end; + } + taosArrayPush(reqs.pArray, &req); + reqs.nReqs = 1; + + tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret); + if(ret < 0){ + ret = -1; + goto end; + } + *contLen += sizeof(SMsgHead); + *pBuf = taosMemoryMalloc(*contLen); + if (NULL == *pBuf) { + ret = -1; + goto end; + } + SEncoder coder = {0}; + tEncoderInit(&coder, *pBuf + sizeof(SMsgHead), *contLen); + if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) { + taosMemoryFreeClear(*pBuf); + tEncoderClear(&coder); + ret = -1; + goto end; + } + tEncoderClear(&coder); + +end: + taosArrayDestroy(reqs.pArray); + return ret; +} + +static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *contLen){ + int32_t ret = 0; + tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret); + if (ret < 0) { + return -1; + } + + *contLen += sizeof(SMsgHead); + *pBuf = taosMemoryMalloc(*contLen); + if (NULL == *pBuf) { + return -1; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, *pBuf + sizeof(SMsgHead), *contLen); + if (tEncodeSVCreateStbReq(&encoder, req) < 0) { + taosMemoryFreeClear(*pBuf); + tEncoderClear(&encoder); + return -1; + } + tEncoderClear(&encoder); + return 0; +} + +static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){ + STableInfoForChildTable dataTmp = {0}; + dataTmp.tableName = strdup(me->name); + dataTmp.tagName = taosArrayInit(me->stbEntry.schemaTag.nCols, TSDB_COL_NAME_LEN); + for(int i = 0; i < me->stbEntry.schemaTag.nCols; i++){ + SSchema *schema = &me->stbEntry.schemaTag.pSchema[i]; + taosArrayPush(dataTmp.tagName, schema->name); + } + dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow); + + STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t)); + if(data){ + destroySTableInfoForChildTable(data); + } + taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); +} + +int32_t setMetaForSnapShot(SSnapContext* ctx, int64_t uid, int64_t ver){ + int c = 0; + ctx->queryMetaOrData = true; // change to get data + if(uid == 0 && ver == 0){ + tdbTbcMoveToFirst(ctx->pCur); + return c; + } + STbDbKey key = {.version = ver, .uid = uid}; + tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c); + + return c; +} + +int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid){ + int c = 0; + ctx->queryMetaOrData = false; // change to get data + + if(uid == 0){ + tdbTbcMoveToFirst(ctx->pCur); + return c; + } + + int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t)); + if(!ver){ + return -1; + } + + STbDbKey key = {.version = *ver, .uid = uid}; + tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c); + + return c; +} + +int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type){ + int32_t ret = 0; + void *pKey = NULL; + void *pVal = NULL; + int vLen, kLen; + + while(1){ + ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); + if (ret < 0) { + ctx->queryMetaOrData = false; // change to get data + tdbTbcMoveToFirst(ctx->pCur); + return 0; + } + + STbDbKey *tmp = (STbDbKey*)pKey; + if(tmp->version > ctx->snapVersion) { + tdbTbcMoveToFirst(ctx->pCur); + ctx->queryMetaOrData = false; // change to get data + return 0; + } + int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); + ASSERT(ver); + if(*ver > tmp->version){ + continue; + } + ASSERT(*ver == tmp->version); + + SDecoder dc = {0}; + SMetaEntry me = {0}; + tDecoderInit(&dc, pVal, vLen); + metaDecodeEntry(&dc, &me); + + if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) + || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { + saveSuperTableInfoForChildTable(&me, ctx->suidInfo); + + SVCreateStbReq req = {0}; + req.name = me.name; + req.suid = me.uid; + req.schemaRow = me.stbEntry.schemaRow; + req.schemaTag = me.stbEntry.schemaTag; + + ret = buildSuperTableInfo(&req, pBuf, contLen); + tDecoderClear(&dc); + *type = TDMT_VND_CREATE_STB; + break; + } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) + || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) { + + STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); + ASSERT(data); + SVCreateTbReq req = {0}; + + req.type = TD_CHILD_TABLE; + req.name = me.name; + req.uid = me.uid; + req.commentLen = -1; + req.ctb.suid = me.ctbEntry.suid; + req.ctb.tagNum = taosArrayGetSize(data->tagName); + req.ctb.name = data->tableName; + req.ctb.pTag = me.ctbEntry.pTags; + req.ctb.tagName = data->tagName; + ret = buildNormalChildTableInfo(&req, pBuf, contLen); + tDecoderClear(&dc); + *type = TDMT_VND_CREATE_TABLE; + break; + } else if(ctx->subType == TOPIC_SUB_TYPE__DB){ + SVCreateTbReq req = {0}; + req.type = TD_NORMAL_TABLE; + req.name = me.name; + req.uid = me.uid; + req.commentLen = -1; + req.ntb.schemaRow = me.ntbEntry.schemaRow; + ret = buildNormalChildTableInfo(&req, pBuf, contLen); + tDecoderClear(&dc); + *type = TDMT_VND_CREATE_TABLE; + break; + } else{ + tDecoderClear(&dc); + continue; + } + } + + return ret; +} + +SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){ + SMetaTableInfo result = {0}; + int32_t ret = 0; + void *pKey = NULL; + void *pVal = NULL; + int vLen, kLen; + + while(1){ + ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); + if (ret < 0) { + return result; + } + + STbDbKey *tmp = (STbDbKey*)pKey; + if(tmp->version > ctx->snapVersion) { + return result; + } + int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); + ASSERT(ver); + if(*ver > tmp->version){ + continue; + } + ASSERT(*ver == tmp->version); + + SDecoder dc = {0}; + SMetaEntry me = {0}; + tDecoderInit(&dc, pVal, vLen); + metaDecodeEntry(&dc, &me); + + if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE){ + STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); + result.uid = me.uid; + result.suid = me.ctbEntry.suid; + result.schema = data->schemaRow; + tDecoderClear(&dc); + break; + } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) { + result.uid = me.uid; + result.suid = 0; + result.schema = &me.ntbEntry.schemaRow; + tDecoderClear(&dc); + break; + } else if(ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) { + STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); + result.uid = me.uid; + result.suid = me.ctbEntry.suid; + result.schema = data->schemaRow; + tDecoderClear(&dc); + break; + } else{ + tDecoderClear(&dc); + continue; + } + } + + return result; +} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 32bfd1274e..c999e58eb4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -100,7 +100,13 @@ void tqClose(STQ* pTq) { } int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqMetaRsp(NULL, pRsp); + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code); + if (code < 0) { + return -1; + } + int32_t tlen = sizeof(SMqRspHead) + len; void* buf = rpcMallocCont(tlen); if (buf == NULL) { return -1; @@ -111,7 +117,11 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, ((SMqRspHead*)buf)->consumerId = pReq->consumerId; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqMetaRsp(&abuf, pRsp); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, abuf, len); + tEncodeSMqMetaRsp(&encoder, pRsp); + tEncoderClear(&encoder); SRpcMsg resp = { .info = pMsg->info, @@ -121,9 +131,8 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, }; tmsgSendRsp(&resp); - tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64 - ", rspOffset:%" PRId64, - TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset); + tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d", + TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type); return 0; } @@ -336,12 +345,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { TD_VID(pTq->pVnode), formatBuf); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { - if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - if (!pHandle->fetchMeta) { - tqOffsetResetToData(&fetchOffsetNew, 0, 0); + if (pReq->useSnapshot){ + if (pHandle->fetchMeta){ + tqOffsetResetToMeta(&fetchOffsetNew, 0, 0); } else { - // reset to meta - ASSERT(0); + tqOffsetResetToData(&fetchOffsetNew, 0, 0); } } else { tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); @@ -385,7 +393,56 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { goto OVER; } - if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA)) { + if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { + ASSERT(0); + } + if (dataRsp.blockNum == 0) { + // TODO add to async task pool + } + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + goto OVER; + } + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META)) { + SSnapContext* sContext = pHandle->sContext; + if(setMetaForSnapShot(sContext, fetchOffsetNew.muid, fetchOffsetNew.mversion) != 0) { + qError("setMetaForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, fetchOffsetNew.muid, fetchOffsetNew.mversion); + code = -1; + goto OVER; + } + void* data = NULL; + int32_t dataLen = 0; + int16_t type = 0; + if(getMetafromSnapShot(sContext, &data, &dataLen, &type) < 0){ + qError("getMetafromSnapShot error"); + taosMemoryFreeClear(data); + code = -1; + goto OVER; + } + + if(!sContext->queryMetaOrData){ // change to get data next poll request + fetchOffsetNew.type = TMQ_OFFSET__SNAPSHOT_DATA; + fetchOffsetNew.uid = 0; + fetchOffsetNew.ts = 0; + } + SMqMetaRsp metaRsp = {0}; + metaRsp.rspOffset = fetchOffsetNew; + metaRsp.resMsgType = type; + metaRsp.metaRspLen = dataLen; + metaRsp.metaRsp = data; + if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { + taosMemoryFreeClear(data); + code = -1; + goto OVER; + } + taosMemoryFreeClear(data); + code = 0; + goto OVER; + } + + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__LOG)) { int64_t fetchVer = fetchOffsetNew.version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { @@ -445,11 +502,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ASSERT(IS_META_MSG(pHead->msgType)); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); SMqMetaRsp metaRsp = {0}; - /*metaRsp.reqOffset = pReq->reqOffset.version;*/ - metaRsp.rspOffset = fetchVer; - /*metaRsp.rspOffsetNew.version = fetchVer;*/ - tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version); - tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer); + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; @@ -461,22 +514,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { goto OVER; } } - - taosMemoryFree(pCkHead); -#if 0 - } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts); - if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { - ASSERT(0); - } - - // 4. send rsp - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; - } -#endif - } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) { - ASSERT(0); } OVER: @@ -553,23 +590,25 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { int64_t ver = pRef->refVer; pHandle->pRef = pRef; + SReadHandle handle = { + .meta = pTq->pVnode->pMeta, + .vnode = pTq->pVnode, + .initTableReader = true, + .initTqReader = true, + .version = ver, + }; + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->snapshotVer = ver; req.qmsg = NULL; - SReadHandle handle = { - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, - .initTableReader = true, - .initTqReader = true, - .version = ver, - }; - pHandle->execHandle.execCol.task = + + pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper); - ASSERT(pHandle->execHandle.execCol.task); + ASSERT(pHandle->execHandle.task); void* scanner = NULL; - qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); + qExtractStreamScanner(pHandle->execHandle.task, &scanner); ASSERT(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader); @@ -579,10 +618,20 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, handle.sContext); + pHandle->sContext = handle.sContext; + + pHandle->execHandle.task = + qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, handle.sContext); + pHandle->sContext = handle.sContext; + pHandle->execHandle.execTb.suid = req.suid; + pHandle->execHandle.task = + qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 40dbbda603..d3faa668b4 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -62,16 +62,16 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; - qTaskInfo_t task = pExec->execCol.task; + qTaskInfo_t task = pExec->task; - if (qStreamPrepareScan(task, pOffset) < 0) { + if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { tqDebug("prepare scan failed, return"); if (pOffset->type == TMQ_OFFSET__LOG) { pRsp->rspOffset = *pOffset; return 0; } else { tqOffsetResetToLog(pOffset, pHandle->snapshotVer); - if (qStreamPrepareScan(task, pOffset) < 0) { + if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { tqDebug("prepare scan failed, return"); pRsp->rspOffset = *pOffset; return 0; @@ -108,12 +108,23 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa rowCnt += pDataBlock->info.rows; if (rowCnt <= 4096) continue; } + } else { + if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ + SMetaTableInfo mtInfo = getUidfromSnapShot(pHandle->sContext); + if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal + + }else{ + pOffset->uid = mtInfo.uid; + qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); + continue; + } + } } if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %ld", TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); tqOffsetResetToLog(pOffset, pHandle->snapshotVer); - qStreamPrepareScan(task, pOffset); + qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); continue; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index b8e021f795..9079be6099 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -79,11 +79,11 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { .version = handle.snapshotVer, }; - handle.execHandle.execCol.task = qCreateQueueExecTaskInfo( + handle.execHandle.task = qCreateQueueExecTaskInfo( handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); - ASSERT(handle.execHandle.execCol.task); + ASSERT(handle.execHandle.task); void* scanner = NULL; - qExtractStreamScanner(handle.execHandle.execCol.task, &scanner); + qExtractStreamScanner(handle.execHandle.task, &scanner); ASSERT(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(handle.execHandle.pExecReader); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 6f0b5af4f6..45322a1fb7 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -398,7 +398,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { if (pIter == NULL) break; STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task, tbUidList, isAdd); + int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd); ASSERT(code == 0); } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { if (!isAdd) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 577f9772be..e067caf950 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -490,6 +490,21 @@ typedef struct SStreamScanInfo { SNode* pTagIndexCond; } SStreamScanInfo; +typedef struct SStreamRawScanInfo{ +// int8_t subType; +// bool withMeta; +// int64_t suid; +// int64_t snapVersion; +// void *metaInfo; +// void *dataInfo; + + SReadHandle * readHandle; + SSDataBlock pRes; // result SSDataBlock + uint64_t groupId; + STsdbReader* dataReader; + SSnapContext* sContext; +}SStreamRawScanInfo; + typedef struct SSysTableScanInfo { SRetrieveMetaTableRsp* pRsp; SRetrieveTableReq req; @@ -929,6 +944,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 775017b8dd..a9a7dfbee0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -117,7 +117,23 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { if (msg == NULL) { // TODO create raw scan - return NULL; + + SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); + if (NULL == pTaskInfo) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + + pTaskInfo->cost.created = taosGetTimestampMs(); + pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE; + pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo); + if(NULL == pTaskInfo->pRoot){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pTaskInfo); + return NULL; + } + return pTaskInfo; } struct SSubplan* pPlan = NULL; @@ -582,102 +598,154 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { return 0; } -int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { +int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo mtInfo) { + memset(pCond, 0, sizeof(SQueryTableDataCond)); + pCond->order = TSDB_ORDER_ASC; + pCond->numOfCols = mtInfo.schema->nCols; + pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo)); + if (pCond->colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return terrno; + } + + pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pCond->suid = mtInfo.suid; + pCond->type = TIMEWINDOW_RANGE_CONTAINED; + pCond->startVersion = -1; + pCond->endVersion = sContext->snapVersion; + + for (int32_t i = 0; i < pCond->numOfCols; ++i) { + pCond->colList[i].type = mtInfo.schema->pSchema[i].type; + pCond->colList[i].bytes = mtInfo.schema->pSchema[i].bytes; + pCond->colList[i].colId = mtInfo.schema->pSchema[i].colId; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); pTaskInfo->streamInfo.prepareStatus = *pOffset; - if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { - while (1) { - uint8_t type = pOperator->operatorType; - pOperator->status = OP_OPENED; - // TODO add more check - if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - ASSERT(pOperator->numOfDownstream == 1); - pOperator = pOperator->pDownstream[0]; - } + if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { + return 0; + } + if (subType == TOPIC_SUB_TYPE__COLUMN) { + uint8_t type = pOperator->operatorType; + pOperator->status = OP_OPENED; + // TODO add more check + if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + ASSERT(pOperator->numOfDownstream == 1); + pOperator = pOperator->pDownstream[0]; + } - SStreamScanInfo* pInfo = pOperator->info; - if (pOffset->type == TMQ_OFFSET__LOG) { - STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + SStreamScanInfo* pInfo = pOperator->info; + if (pOffset->type == TMQ_OFFSET__LOG) { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->dataReader); + pTSInfo->dataReader = NULL; #if 0 - if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && - pInfo->tqReader->pWalReader->curVersion != pOffset->version) { - qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version, - pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version); - ASSERT(0); - } + if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && + pInfo->tqReader->pWalReader->curVersion != pOffset->version) { + qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version, + pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version); + ASSERT(0); + } #endif - if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { + if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { + return -1; + } + ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ + int64_t uid = pOffset->uid; + int64_t ts = pOffset->ts; + + if (uid == 0) { + if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; + } else { return -1; } - ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); - } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ - int64_t uid = pOffset->uid; - int64_t ts = pOffset->ts; + } - if (uid == 0) { - if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - uid = pTableInfo->uid; - ts = INT64_MIN; - } else { - return -1; - } - } - - /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ - /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ + /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); #ifndef NDEBUG - qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable, - pInfo->pTableScanOp->resultInfo.totalRows); - pInfo->pTableScanOp->resultInfo.totalRows = 0; + qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable, + pInfo->pTableScanOp->resultInfo.totalRows); + pInfo->pTableScanOp->resultInfo.totalRows = 0; #endif - bool found = false; - for (int32_t i = 0; i < tableSz; i++) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); - if (pTableInfo->uid == uid) { - found = true; - pTableScanInfo->currentTable = i; - break; - } + bool found = false; + for (int32_t i = 0; i < tableSz; i++) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); + if (pTableInfo->uid == uid) { + found = true; + pTableScanInfo->currentTable = i; + break; } - - // TODO after dropping table, table may be not found - ASSERT(found); - - if (pTableScanInfo->dataReader == NULL) { - if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, - pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || - pTableScanInfo->dataReader == NULL) { - ASSERT(0); - } - } - - tsdbSetTableId(pTableScanInfo->dataReader, uid); - int64_t oldSkey = pTableScanInfo->cond.twindows.skey; - pTableScanInfo->cond.twindows.skey = ts + 1; - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); - pTableScanInfo->cond.twindows.skey = oldSkey; - pTableScanInfo->scanTimes = 0; - - qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, - pTableScanInfo->currentTable, tableSz); - /*}*/ - - } else { - ASSERT(0); } - return 0; + + // TODO after dropping table, table may be not found + ASSERT(found); + + if (pTableScanInfo->dataReader == NULL) { + if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, + pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || + pTableScanInfo->dataReader == NULL) { + ASSERT(0); + } + } + + tsdbSetTableId(pTableScanInfo->dataReader, uid); + int64_t oldSkey = pTableScanInfo->cond.twindows.skey; + pTableScanInfo->cond.twindows.skey = ts + 1; + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + pTableScanInfo->cond.twindows.skey = oldSkey; + pTableScanInfo->scanTimes = 0; + + qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, + pTableScanInfo->currentTable, tableSz); + /*}*/ + + } else { + ASSERT(0); } + }else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ + SStreamRawScanInfo* pInfo = pOperator->info; + SSnapContext* sContext = pInfo->sContext; + if(setDataForSnapShot(sContext, pOffset->uid) != 0) { + qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid); + return -1; + } + + SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); + if(pOffset->uid == 0) pOffset->uid = mtInfo.uid; + if(pOffset->ts == 0) pOffset->ts = INT64_MIN; + + if (pOffset->uid == 0) { + return -1; + } + + tsdbReaderClose(pInfo->dataReader); + pInfo->dataReader = NULL; + cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); + taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList); + initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo); + pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; + pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); + tsdbReaderOpen(pInfo->readHandle->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); + + qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts); } return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8821dbd5a1..fd6c7876b9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1234,9 +1234,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } } else if (ret.fetchType == FETCH_TYPE__META) { ASSERT(0); - pTaskInfo->streamInfo.lastStatus = ret.offset; - pTaskInfo->streamInfo.metaBlk = ret.meta; - return NULL; +// pTaskInfo->streamInfo.lastStatus = ret.offset; +// pTaskInfo->streamInfo.metaBlk = ret.meta; +// return NULL; } else if (ret.fetchType == FETCH_TYPE__NONE) { pTaskInfo->streamInfo.lastStatus = ret.offset; ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); @@ -1257,10 +1257,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } qDebug("stream scan tsdb return null"); return NULL; - } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) { - // TODO scan meta - ASSERT(0); - return NULL; } if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { @@ -1444,11 +1440,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } } -static SSDataBlock* doRawScan(SOperatorInfo* pInfo) { - // - return NULL; -} - static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); @@ -1461,17 +1452,76 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { return tableIdList; } +static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { +// NOTE: this operator does never check if current status is done or not + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamRawScanInfo* pInfo = pOperator->info; + + qDebug("stream scan called"); + ASSERT(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA); + + SSDataBlock* pBlock = &pInfo->pRes; + + while (tsdbNextDataBlock(pInfo->dataReader)) { + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + SDataBlockInfo binfo = pBlock->info; + tsdbRetrieveDataBlockInfo(pInfo->dataReader, &binfo); + + pBlock->info = binfo; + + SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); + if (pCols == NULL) { + return NULL; + } + +// size_t numOfSrcCols = taosArrayGetSize(pCols); +// for (int i = 0; i < taosArrayGetSize(pCols); i++) { +// SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId); +// colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info); +// } + + pBlock->pDataBlock = pCols; + + pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; + pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; + pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; + + return pBlock; + } + qDebug("stream scan tsdb return null"); + return NULL; +} + // for subscribing db or stb (not including column), // if this scan is used, meta data can be return // and schemas are decided when scanning -SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, - SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) { +SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) { // create operator // create tb reader // create meta reader // create tq reader - return NULL; + SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pInfo->readHandle = pHandle; + pInfo->sContext = pHandle->sContext; + pOperator->name = "RawStreamScanOperator"; +// pOperator->blocking = false; +// pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, NULL, + NULL, NULL, NULL); + return pOperator; } static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {