diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7728b0b5eb..41bd11d347 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2626,6 +2626,22 @@ typedef struct { }; } STqOffsetVal; +static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { + pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; + pOffsetVal->uid = uid; + pOffsetVal->ts = ts; +} + +static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) { + pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META; + pOffsetVal->uid = uid; +} + +static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { + pOffsetVal->type = TMQ_OFFSET__LOG; + pOffsetVal->version = ver; +} + int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal); int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal); int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 29d509c27c..f08f54ef4b 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -811,8 +811,19 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } int32_t tmq_unsubscribe(tmq_t* tmq) { + int32_t rsp; + int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); - int32_t rsp = tmq_subscribe(tmq, lst); + while (1) { + rsp = tmq_subscribe(tmq, lst); + if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { + break; + } else { + retryCnt++; + taosMsleep(500); + } + } + tmq_list_destroy(lst); return rsp; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 753cdc603e..19dd321814 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -175,22 +175,6 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); char* tqOffsetBuildFName(const char* path, int32_t ver); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); -static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { - pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; - pOffsetVal->uid = uid; - pOffsetVal->ts = ts; -} - -static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) { - pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META; - pOffsetVal->uid = uid; -} - -static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { - pOffsetVal->type = TMQ_OFFSET__LOG; - pOffsetVal->version = ver; -} - // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index bfd23f1a1a..dd98fe3194 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -137,6 +137,9 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { continue; } + } else { + char* tbName = strdup(qExtractTbnameFromTask(task)); + taosArrayPush(pRsp->blockTbName, &tbName); } } if (pRsp->withSchema) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b3d865f591..a6b5d68a84 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1276,6 +1276,74 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock return 0; } +static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamScanInfo* pInfo = pOperator->info; + + qDebug("stream scan called"); + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + if (pResult && pResult->info.rows > 0) { + qDebug("stream scan tsdb return %d rows", pResult->info.rows); + return pResult; + } else { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->dataReader); + pTSInfo->dataReader = NULL; + tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); + qDebug("stream scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1); + if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { + return NULL; + } + ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1); + } + } + + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { + while (1) { + SFetchRet ret = {0}; + tqNextBlock(pInfo->tqReader, &ret); + if (ret.fetchType == FETCH_TYPE__DATA) { + blockDataCleanup(pInfo->pRes); + if (setBlockIntoRes(pInfo, &ret.data) < 0) { + ASSERT(0); + } + // TODO clean data block + if (pInfo->pRes->info.rows > 0) { + qDebug("stream scan log return %d rows", pInfo->pRes->info.rows); + return pInfo->pRes; + } + } else if (ret.fetchType == FETCH_TYPE__META) { + ASSERT(0); + // pTaskInfo->streamInfo.lastStatus = ret.offset; + // pTaskInfo->streamInfo.metaBlk = ret.meta; + // return NULL; + } else if (ret.fetchType == FETCH_TYPE__NONE) { + pTaskInfo->streamInfo.lastStatus = ret.offset; + ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); + ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); + char formatBuf[80]; + tFormatOffset(formatBuf, 80, &ret.offset); + qDebug("stream scan log return null, offset %s", formatBuf); + return NULL; + } else { + ASSERT(0); + } + } + } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + if (pResult && pResult->info.rows > 0) { + qDebug("stream scan tsdb return %d rows", pResult->info.rows); + return pResult; + } + qDebug("stream scan tsdb return null"); + return NULL; + } else { + ASSERT(0); + return NULL; + } +} + static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1317,48 +1385,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } #endif - qDebug("stream scan called"); - if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { - while (1) { - SFetchRet ret = {0}; - tqNextBlock(pInfo->tqReader, &ret); - if (ret.fetchType == FETCH_TYPE__DATA) { - blockDataCleanup(pInfo->pRes); - if (setBlockIntoRes(pInfo, &ret.data) < 0) { - ASSERT(0); - } - // TODO clean data block - if (pInfo->pRes->info.rows > 0) { - qDebug("stream scan log return %d rows", pInfo->pRes->info.rows); - return pInfo->pRes; - } - } else if (ret.fetchType == FETCH_TYPE__META) { - ASSERT(0); - // pTaskInfo->streamInfo.lastStatus = ret.offset; - // pTaskInfo->streamInfo.metaBlk = ret.meta; - // return NULL; - } else if (ret.fetchType == FETCH_TYPE__NONE) { - pTaskInfo->streamInfo.lastStatus = ret.offset; - ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); - ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); - char formatBuf[80]; - tFormatOffset(formatBuf, 80, &ret.offset); - qDebug("stream scan log return null, offset %s", formatBuf); - return NULL; - } else { - ASSERT(0); - } - } - } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { - SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); - if (pResult && pResult->info.rows > 0) { - qDebug("stream scan tsdb return %d rows", pResult->info.rows); - return pResult; - } - qDebug("stream scan tsdb return null"); - return NULL; - } - if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); @@ -1810,6 +1836,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->readHandle = *pHandle; pInfo->tableUid = pScanPhyNode->uid; + pTaskInfo->streamInfo.snapshotVer = pHandle->version; // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); @@ -1853,8 +1880,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, destroyStreamScanOperatorInfo, - NULL, NULL, NULL); + __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL); return pOperator;