fix(query): fix bug in tsdbread.
This commit is contained in:
parent
9f55ca7f80
commit
8c62d5f9fa
|
@ -1621,7 +1621,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
|
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
|
||||||
|
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotIds[0]);
|
||||||
pBlock->info.id.uid = pBlockScanInfo->uid;
|
pBlock->info.id.uid = pBlockScanInfo->uid;
|
||||||
|
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
|
@ -2493,7 +2493,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
|
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
|
||||||
blockDataUpdateTsWindow(pResBlock, 0);
|
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotIds[0]);
|
||||||
|
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
@ -3534,7 +3534,6 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
|
||||||
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
|
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
|
||||||
STableBlockScanInfo* pScanInfo) {
|
STableBlockScanInfo* pScanInfo) {
|
||||||
int32_t outputRowIndex = pBlock->info.rows;
|
int32_t outputRowIndex = pBlock->info.rows;
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
|
||||||
int64_t uid = pScanInfo->uid;
|
int64_t uid = pScanInfo->uid;
|
||||||
|
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
@ -3549,7 +3548,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (i < numOfCols && j < pSchema->numOfCols) {
|
while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
|
||||||
col_id_t colId = pSupInfo->colIds[i];
|
col_id_t colId = pSupInfo->colIds[i];
|
||||||
|
|
||||||
if (colId == pSchema->columns[j].colId) {
|
if (colId == pSchema->columns[j].colId) {
|
||||||
|
@ -3570,7 +3569,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
|
||||||
}
|
}
|
||||||
|
|
||||||
// set null value since current column does not exist in the "pSchema"
|
// set null value since current column does not exist in the "pSchema"
|
||||||
while (i < numOfCols) {
|
while (i < pSupInfo->numOfCols) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]);
|
||||||
colDataAppendNULL(pColInfoData, outputRowIndex);
|
colDataAppendNULL(pColInfoData, outputRowIndex);
|
||||||
i += 1;
|
i += 1;
|
||||||
|
|
|
@ -2513,10 +2513,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
int32_t readIdx = source->readerIdx;
|
int32_t readIdx = source->readerIdx;
|
||||||
SSDataBlock* pBlock = source->inputBlock;
|
SSDataBlock* pBlock = source->inputBlock;
|
||||||
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
|
||||||
|
|
||||||
SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
|
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
|
||||||
blockDataCleanup(pBlock);
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
||||||
|
@ -2534,12 +2532,11 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// process this data block based on the probabilities
|
// process this data block based on the probabilities
|
||||||
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
|
bool processThisBlock = processBlockWithProbability(&pInfo->sample);
|
||||||
if (!processThisBlock) {
|
if (!processThisBlock) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
|
||||||
if (pQueryCond->order == TSDB_ORDER_ASC) {
|
if (pQueryCond->order == TSDB_ORDER_ASC) {
|
||||||
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
|
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2547,7 +2544,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
|
loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
||||||
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -2561,7 +2558,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
|
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
tsdbReaderClose(pInfo->base.dataReader);
|
tsdbReaderClose(pInfo->base.dataReader);
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
|
@ -2641,6 +2638,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
param.readerIdx = i;
|
param.readerIdx = i;
|
||||||
param.pOperator = pOperator;
|
param.pOperator = pOperator;
|
||||||
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
taosArrayPush(pInfo->sortSourceParams, ¶m);
|
taosArrayPush(pInfo->sortSourceParams, ¶m);
|
||||||
|
|
||||||
SQueryTableDataCond cond;
|
SQueryTableDataCond cond;
|
||||||
|
|
|
@ -1659,22 +1659,11 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
||||||
// the primary timestamp column
|
// the primary timestamp column
|
||||||
bool needed = false;
|
bool needed = false;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SExprInfo* pExpr = pCtx[i].pExpr;
|
SExprInfo* pExpr = pCtx[i].pExpr;
|
||||||
|
|
||||||
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
|
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
|
||||||
SFunctParam* pParam = &pExpr->base.pParam[0];
|
|
||||||
|
|
||||||
SColumn c = *pParam->pCol;
|
|
||||||
taosArrayPush(pInfo->pInterpCols, &c);
|
|
||||||
needed = true;
|
needed = true;
|
||||||
|
break;
|
||||||
SGroupKeys key = {0};
|
|
||||||
key.bytes = c.bytes;
|
|
||||||
key.type = c.type;
|
|
||||||
key.isNull = false;
|
|
||||||
key.pData = taosMemoryCalloc(1, c.bytes);
|
|
||||||
taosArrayPush(pInfo->pPrevValues, &key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1699,6 +1688,24 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SExprInfo* pExpr = pCtx[i].pExpr;
|
||||||
|
|
||||||
|
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
|
||||||
|
SFunctParam* pParam = &pExpr->base.pParam[0];
|
||||||
|
|
||||||
|
SColumn c = *pParam->pCol;
|
||||||
|
taosArrayPush(pInfo->pInterpCols, &c);
|
||||||
|
|
||||||
|
SGroupKeys key = {0};
|
||||||
|
key.bytes = c.bytes;
|
||||||
|
key.type = c.type;
|
||||||
|
key.isNull = false;
|
||||||
|
key.pData = taosMemoryCalloc(1, c.bytes);
|
||||||
|
taosArrayPush(pInfo->pPrevValues, &key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return needed;
|
return needed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue