fix set schema and tbname
This commit is contained in:
parent
e9d1733e8c
commit
1c99eb1d4d
|
@ -85,11 +85,11 @@ int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
tqDebug("tmqsnap task start to execute");
|
tqDebug("tmq task start to execute");
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
|
tqDebug("tmq task execute end, get %p", pDataBlock);
|
||||||
|
|
||||||
if (pDataBlock) {
|
if (pDataBlock) {
|
||||||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
|
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
|
||||||
|
|
|
@ -856,6 +856,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
|
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
|
||||||
&pInfo->dataReader, NULL);
|
&pInfo->dataReader, NULL);
|
||||||
|
|
||||||
|
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
|
||||||
|
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||||
|
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||||
|
|
||||||
qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts);
|
qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts);
|
||||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||||
|
|
|
@ -1280,18 +1280,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
qDebug("stream scan called");
|
qDebug("queue scan called");
|
||||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
if (pResult && pResult->info.rows > 0) {
|
if (pResult && pResult->info.rows > 0) {
|
||||||
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
|
qDebug("queue scan tsdb return %d rows", pResult->info.rows);
|
||||||
return pResult;
|
return pResult;
|
||||||
} else {
|
} else {
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
tsdbReaderClose(pTSInfo->dataReader);
|
tsdbReaderClose(pTSInfo->dataReader);
|
||||||
pTSInfo->dataReader = NULL;
|
pTSInfo->dataReader = NULL;
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
||||||
qDebug("stream scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1);
|
qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1);
|
||||||
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
|
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1310,7 +1310,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
// TODO clean data block
|
// TODO clean data block
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
qDebug("stream scan log return %d rows", pInfo->pRes->info.rows);
|
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
}
|
}
|
||||||
} else if (ret.fetchType == FETCH_TYPE__META) {
|
} else if (ret.fetchType == FETCH_TYPE__META) {
|
||||||
|
@ -1324,7 +1324,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
|
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
|
||||||
char formatBuf[80];
|
char formatBuf[80];
|
||||||
tFormatOffset(formatBuf, 80, &ret.offset);
|
tFormatOffset(formatBuf, 80, &ret.offset);
|
||||||
qDebug("stream scan log return null, offset %s", formatBuf);
|
qDebug("queue scan log return null, offset %s", formatBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -1349,6 +1349,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
qDebug("stream scan called");
|
||||||
#if 0
|
#if 0
|
||||||
SStreamState* pState = pTaskInfo->streamInfo.pState;
|
SStreamState* pState = pTaskInfo->streamInfo.pState;
|
||||||
if (pState) {
|
if (pState) {
|
||||||
|
|
Loading…
Reference in New Issue