enhance: use one tsdb reader to read table sequentially
This commit is contained in:
parent
021fcf2a59
commit
8b6d7db7ad
|
@ -232,7 +232,6 @@ typedef struct STableMergeScanInfo {
|
|||
int32_t tableEndIndex;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
SArray* queryConds; // array of queryTableDataCond
|
||||
STableScanBase base;
|
||||
int32_t bufPageSize;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
|
@ -245,6 +244,7 @@ typedef struct STableMergeScanInfo {
|
|||
int64_t numOfRows;
|
||||
SScanInfo scanInfo;
|
||||
int32_t scanTimes;
|
||||
int32_t readIdx;
|
||||
SSDataBlock* pResBlock;
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
SSortExecInfo sortExecInfo;
|
||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
|||
enum {
|
||||
SORT_MULTISOURCE_MERGE = 0x1,
|
||||
SORT_SINGLESOURCE_SORT = 0x2,
|
||||
SORT_TABLE_MERGE_SCAN = 0x3
|
||||
};
|
||||
|
||||
typedef struct SMultiMergeSource {
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// clang-format off
|
||||
|
||||
#include "executorInt.h"
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
|
@ -56,7 +54,6 @@ typedef struct STableMergeScanSortSourceParam {
|
|||
int32_t readerIdx;
|
||||
uint64_t uid;
|
||||
SSDataBlock* inputBlock;
|
||||
STsdbReader* dataReader;
|
||||
} STableMergeScanSortSourceParam;
|
||||
|
||||
typedef struct STableCountScanOperatorInfo {
|
||||
|
@ -2741,28 +2738,28 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
SSDataBlock* pBlock = source->inputBlock;
|
||||
int32_t code = 0;
|
||||
|
||||
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
|
||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||
if (NULL == source->dataReader) {
|
||||
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
|
||||
|
||||
bool hasNext = false;
|
||||
|
||||
if (NULL == pInfo->base.dataReader) {
|
||||
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, p, 1, pBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
|
||||
if (code != 0) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
pInfo->readIdx = readIdx + pInfo->tableStartIndex ;
|
||||
} else if (pInfo->readIdx != readIdx + pInfo->tableStartIndex) {
|
||||
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, p, 1);
|
||||
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
|
||||
}
|
||||
|
||||
pInfo->base.dataReader = source->dataReader;
|
||||
STsdbReader* reader = pInfo->base.dataReader;
|
||||
bool hasNext = false;
|
||||
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
|
||||
|
||||
while (true) {
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
||||
if (code != 0) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
|
@ -2772,7 +2769,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
|
@ -2782,12 +2778,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pQueryCond->order == TSDB_ORDER_ASC) {
|
||||
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
|
||||
} else {
|
||||
pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
|
||||
}
|
||||
|
||||
uint32_t status = 0;
|
||||
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
||||
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
||||
|
@ -2809,14 +2799,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
qTrace("tsdb/read-table-data: %p, close reader", reader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
pAPI->tsdReader.tsdReaderClose(source->dataReader);
|
||||
source->dataReader = NULL;
|
||||
pInfo->base.dataReader = NULL;
|
||||
blockDataDestroy(source->inputBlock);
|
||||
source->inputBlock = NULL;
|
||||
return NULL;
|
||||
|
@ -2870,32 +2855,18 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
int32_t tableStartIdx = pInfo->tableStartIndex;
|
||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
||||
|
||||
pInfo->base.dataReader = NULL;
|
||||
|
||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||
// the additional one is reserved for merge result
|
||||
// pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
||||
int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize);
|
||||
if (kWay >= 128) {
|
||||
kWay = 128;
|
||||
} else if (kWay <= 2) {
|
||||
kWay = 2;
|
||||
} else {
|
||||
int i = 2;
|
||||
while (i * 2 <= kWay) i = i * 2;
|
||||
kWay = i;
|
||||
}
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
||||
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
|
||||
|
||||
// one table has one data block
|
||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||
pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));
|
||||
|
||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||
STableMergeScanSortSourceParam param = {0};
|
||||
|
@ -2904,10 +2875,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
|
||||
taosArrayPush(pInfo->sortSourceParams, ¶m);
|
||||
|
||||
SQueryTableDataCond cond;
|
||||
dumpQueryTableCond(&pInfo->base.cond, &cond);
|
||||
taosArrayPush(pInfo->queryConds, &cond);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||
|
@ -2932,8 +2899,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
|
||||
|
||||
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
||||
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
|
||||
|
@ -2941,24 +2906,15 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
||||
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
||||
|
||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->sortSourceParams); ++i) {
|
||||
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
||||
blockDataDestroy(param->inputBlock);
|
||||
pAPI->tsdReader.tsdReaderClose(param->dataReader);
|
||||
param->dataReader = NULL;
|
||||
}
|
||||
taosArrayClear(pInfo->sortSourceParams);
|
||||
|
||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||
pInfo->pSortHandle = NULL;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
|
||||
SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
|
||||
taosMemoryFree(cond->colList);
|
||||
}
|
||||
taosArrayDestroy(pInfo->queryConds);
|
||||
pInfo->queryConds = NULL;
|
||||
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -3056,13 +3012,11 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
|||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
||||
|
||||
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);
|
||||
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
|
||||
|
||||
for (int32_t i = 0; i < numOfTable; i++) {
|
||||
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
|
||||
blockDataDestroy(p->inputBlock);
|
||||
pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader);
|
||||
p->dataReader = NULL;
|
||||
}
|
||||
|
||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||
|
@ -3072,12 +3026,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
|||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||
pTableScanInfo->pSortHandle = NULL;
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
|
||||
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
|
||||
taosMemoryFree(pCond->colList);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTableScanInfo->queryConds);
|
||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||
|
||||
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
||||
|
@ -3143,6 +3091,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pInfo->base.scanFlag = MAIN_SCAN;
|
||||
pInfo->base.readHandle = *readHandle;
|
||||
|
||||
pInfo->readIdx = -1;
|
||||
|
||||
pInfo->base.limitInfo.limit.limit = -1;
|
||||
pInfo->base.limitInfo.slimit.limit = -1;
|
||||
pInfo->base.pTableListInfo = pTableListInfo;
|
||||
|
@ -3573,6 +3523,4 @@ static void destoryTableCountScanOperator(void* param) {
|
|||
|
||||
taosArrayDestroy(pTableCountScanInfo->stbUidList);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
// clang-format on
|
||||
}
|
|
@ -769,6 +769,61 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
|
|||
return pgSize;
|
||||
}
|
||||
|
||||
static int32_t createPageBuf(SSortHandle* pHandle) {
|
||||
if (pHandle->pBuf == NULL) {
|
||||
if (!osTempSpaceAvailable()) {
|
||||
terrno = TSDB_CODE_NO_DISKSPACE;
|
||||
qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
|
||||
"tableBlocksBuf", tsTempDir);
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t addDataBlockToPageBuf(SSortHandle * pHandle, SSDataBlock* pDataBlock, SArray* aPgId) {
|
||||
int32_t start = 0;
|
||||
while (start < pDataBlock->info.rows) {
|
||||
int32_t stop = 0;
|
||||
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
|
||||
SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
|
||||
if (p == NULL) {
|
||||
taosArrayDestroy(aPgId);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||
if (pPage == NULL) {
|
||||
taosArrayDestroy(aPgId);
|
||||
blockDataDestroy(p);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
taosArrayPush(aPgId, &pageId);
|
||||
|
||||
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
|
||||
ASSERT(size <= getBufPageSize(pHandle->pBuf));
|
||||
|
||||
blockDataToBuf(pPage, p);
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
|
||||
blockDataDestroy(p);
|
||||
start = stop + 1;
|
||||
}
|
||||
|
||||
blockDataCleanup(pDataBlock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
int32_t code = 0;
|
||||
|
@ -875,6 +930,35 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
}
|
||||
}
|
||||
} else if (pHandle->type == SORT_TABLE_MERGE_SCAN) {
|
||||
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
|
||||
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
|
||||
|
||||
// pHandle->numOfPages = 1024; //todo check sortbufsize
|
||||
createPageBuf(pHandle);
|
||||
|
||||
for (int i = 0; i < nSrc; ++i) {
|
||||
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
|
||||
|
||||
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i);
|
||||
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
|
||||
while (pBlk != NULL) {
|
||||
addDataBlockToPageBuf(pHandle, pBlk, aPgId);
|
||||
pBlk = pHandle->fetchfp(pSrc->param);
|
||||
}
|
||||
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
|
||||
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pBlock, &pHandle->sourceId, aPgId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(aExtSrc);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
|
||||
taosArrayDestroy(aExtSrc);
|
||||
|
||||
pHandle->type = SORT_SINGLESOURCE_SORT;
|
||||
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue