fix tbname
This commit is contained in:
parent
c4dcc994fb
commit
e9d1733e8c
|
@ -2626,6 +2626,22 @@ typedef struct {
|
|||
};
|
||||
} STqOffsetVal;
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
|
||||
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
|
||||
pOffsetVal->uid = uid;
|
||||
pOffsetVal->ts = ts;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) {
|
||||
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META;
|
||||
pOffsetVal->uid = uid;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
|
||||
pOffsetVal->type = TMQ_OFFSET__LOG;
|
||||
pOffsetVal->version = ver;
|
||||
}
|
||||
|
||||
int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
|
||||
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
|
||||
int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
|
||||
|
|
|
@ -811,8 +811,19 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
|
|||
}
|
||||
|
||||
int32_t tmq_unsubscribe(tmq_t* tmq) {
|
||||
int32_t rsp;
|
||||
int32_t retryCnt = 0;
|
||||
tmq_list_t* lst = tmq_list_new();
|
||||
int32_t rsp = tmq_subscribe(tmq, lst);
|
||||
while (1) {
|
||||
rsp = tmq_subscribe(tmq, lst);
|
||||
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
|
||||
break;
|
||||
} else {
|
||||
retryCnt++;
|
||||
taosMsleep(500);
|
||||
}
|
||||
}
|
||||
|
||||
tmq_list_destroy(lst);
|
||||
return rsp;
|
||||
}
|
||||
|
|
|
@ -175,22 +175,6 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
|||
char* tqOffsetBuildFName(const char* path, int32_t ver);
|
||||
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
|
||||
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
|
||||
pOffsetVal->uid = uid;
|
||||
pOffsetVal->ts = ts;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) {
|
||||
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META;
|
||||
pOffsetVal->uid = uid;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
|
||||
pOffsetVal->type = TMQ_OFFSET__LOG;
|
||||
pOffsetVal->version = ver;
|
||||
}
|
||||
|
||||
// tqStream
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask);
|
||||
|
||||
|
|
|
@ -137,6 +137,9 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
|||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
char* tbName = strdup(qExtractTbnameFromTask(task));
|
||||
taosArrayPush(pRsp->blockTbName, &tbName);
|
||||
}
|
||||
}
|
||||
if (pRsp->withSchema) {
|
||||
|
|
|
@ -1276,6 +1276,74 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
return 0;
|
||||
}
|
||||
|
||||
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
|
||||
qDebug("stream scan called");
|
||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||
if (pResult && pResult->info.rows > 0) {
|
||||
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
|
||||
return pResult;
|
||||
} else {
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
tsdbReaderClose(pTSInfo->dataReader);
|
||||
pTSInfo->dataReader = NULL;
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
||||
qDebug("stream scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1);
|
||||
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
||||
while (1) {
|
||||
SFetchRet ret = {0};
|
||||
tqNextBlock(pInfo->tqReader, &ret);
|
||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
// TODO clean data block
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
qDebug("stream scan log return %d rows", pInfo->pRes->info.rows);
|
||||
return pInfo->pRes;
|
||||
}
|
||||
} else if (ret.fetchType == FETCH_TYPE__META) {
|
||||
ASSERT(0);
|
||||
// pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
// pTaskInfo->streamInfo.metaBlk = ret.meta;
|
||||
// return NULL;
|
||||
} else if (ret.fetchType == FETCH_TYPE__NONE) {
|
||||
pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
|
||||
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
|
||||
char formatBuf[80];
|
||||
tFormatOffset(formatBuf, 80, &ret.offset);
|
||||
qDebug("stream scan log return null, offset %s", formatBuf);
|
||||
return NULL;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||
if (pResult && pResult->info.rows > 0) {
|
||||
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
|
||||
return pResult;
|
||||
}
|
||||
qDebug("stream scan tsdb return null");
|
||||
return NULL;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -1317,48 +1385,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
#endif
|
||||
|
||||
qDebug("stream scan called");
|
||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
||||
while (1) {
|
||||
SFetchRet ret = {0};
|
||||
tqNextBlock(pInfo->tqReader, &ret);
|
||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
// TODO clean data block
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
qDebug("stream scan log return %d rows", pInfo->pRes->info.rows);
|
||||
return pInfo->pRes;
|
||||
}
|
||||
} else if (ret.fetchType == FETCH_TYPE__META) {
|
||||
ASSERT(0);
|
||||
// pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
// pTaskInfo->streamInfo.metaBlk = ret.meta;
|
||||
// return NULL;
|
||||
} else if (ret.fetchType == FETCH_TYPE__NONE) {
|
||||
pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
|
||||
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
|
||||
char formatBuf[80];
|
||||
tFormatOffset(formatBuf, 80, &ret.offset);
|
||||
qDebug("stream scan log return null, offset %s", formatBuf);
|
||||
return NULL;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||
if (pResult && pResult->info.rows > 0) {
|
||||
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
|
||||
return pResult;
|
||||
}
|
||||
qDebug("stream scan tsdb return null");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
|
||||
|
@ -1810,6 +1836,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
|
||||
pInfo->readHandle = *pHandle;
|
||||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
pTaskInfo->streamInfo.snapshotVer = pHandle->version;
|
||||
|
||||
// set the extract column id to streamHandle
|
||||
tqReaderSetColIdList(pInfo->tqReader, pColIds);
|
||||
|
@ -1853,8 +1880,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, destroyStreamScanOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
|
Loading…
Reference in New Issue