Merge pull request #17723 from taosdata/enh/optQueryMem
enh: opt query mem
This commit is contained in:
commit
8a8aa3efea
|
@ -152,17 +152,20 @@ typedef struct STsdbReader STsdbReader;
|
||||||
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
||||||
#define CACHESCAN_RETRIEVE_LAST 0x8
|
#define CACHESCAN_RETRIEVE_LAST 0x8
|
||||||
|
|
||||||
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
|
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
|
||||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
|
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
|
||||||
const char *idstr);
|
const char *idstr);
|
||||||
|
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
|
bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid);
|
||||||
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
||||||
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||||
|
bool tsdbIsAscendingOrder(STsdbReader *pReader);
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
void *tsdbGetIdx(SMeta *pMeta);
|
||||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||||
|
|
|
@ -3775,7 +3775,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
|
bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) {
|
||||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
|
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
|
||||||
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
|
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
|
||||||
return false;
|
return false;
|
||||||
|
@ -4174,3 +4174,4 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
|
||||||
}
|
}
|
||||||
tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
|
tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
|
||||||
}
|
}
|
||||||
|
bool tsdbIsAscendingOrder(STsdbReader* pReader) { return ASCENDING_TRAVERSE(pReader->order); }
|
||||||
|
|
|
@ -341,21 +341,23 @@ typedef struct STableScanInfo {
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STableMergeScanInfo {
|
typedef struct STableMergeScanInfo {
|
||||||
STableListInfo* tableListInfo;
|
STableListInfo* tableListInfo;
|
||||||
int32_t tableStartIndex;
|
int32_t tableStartIndex;
|
||||||
int32_t tableEndIndex;
|
int32_t tableEndIndex;
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SArray* dataReaders; // array of tsdbReaderT*
|
SArray* dataReaders; // array of tsdbReaderT*
|
||||||
SReadHandle readHandle;
|
SArray* queryConds; // array of queryTableDataCond
|
||||||
int32_t bufPageSize;
|
STsdbReader* pReader;
|
||||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
SReadHandle readHandle;
|
||||||
SArray* pSortInfo;
|
int32_t bufPageSize;
|
||||||
SSortHandle* pSortHandle;
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||||
SSDataBlock* pSortInputBlock;
|
SArray* pSortInfo;
|
||||||
int64_t startTs; // sort start time
|
SSortHandle* pSortHandle;
|
||||||
SArray* sortSourceParams;
|
SSDataBlock* pSortInputBlock;
|
||||||
SLimitInfo limitInfo;
|
int64_t startTs; // sort start time
|
||||||
|
SArray* sortSourceParams;
|
||||||
|
SLimitInfo limitInfo;
|
||||||
SFileBlockLoadRecorder readRecorder;
|
SFileBlockLoadRecorder readRecorder;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
SScanInfo scanInfo;
|
SScanInfo scanInfo;
|
||||||
|
|
|
@ -42,7 +42,7 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S
|
||||||
static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns",
|
static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns",
|
||||||
"ttl", "stable_name", "vgroup_id', 'uid", "type"};
|
"ttl", "stable_name", "vgroup_id', 'uid", "type"};
|
||||||
|
|
||||||
static char* SYSTABLE_IDX_EXCEPT[] = {"db_name", "vgroup_id"};
|
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
|
||||||
|
|
||||||
typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result);
|
typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result);
|
||||||
typedef int32_t (*__sys_check)(SNode* cond);
|
typedef int32_t (*__sys_check)(SNode* cond);
|
||||||
|
@ -363,7 +363,8 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
||||||
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
|
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
|
||||||
pLimitInfo->remainOffset -= pBlock->info.rows;
|
pLimitInfo->remainOffset -= pBlock->info.rows;
|
||||||
pBlock->info.rows = 0;
|
pBlock->info.rows = 0;
|
||||||
qDebug("current block ignore due to offset, current:%"PRId64", %s", pLimitInfo->remainOffset, GET_TASKID(pTaskInfo));
|
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
} else {
|
} else {
|
||||||
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
|
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
|
||||||
pLimitInfo->remainOffset = 0;
|
pLimitInfo->remainOffset = 0;
|
||||||
|
@ -376,9 +377,9 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
||||||
int32_t keep = pBlock->info.rows - overflowRows;
|
int32_t keep = pBlock->info.rows - overflowRows;
|
||||||
|
|
||||||
blockDataKeepFirstNRows(pBlock, keep);
|
blockDataKeepFirstNRows(pBlock, keep);
|
||||||
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
|
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3251,18 +3252,80 @@ static int tableUidCompare(const void* a, const void* b) {
|
||||||
}
|
}
|
||||||
return u1 < u2 ? -1 : 1;
|
return u1 < u2 ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct MergeIndex {
|
||||||
|
int idx;
|
||||||
|
int len;
|
||||||
|
} MergeIndex;
|
||||||
|
|
||||||
|
static FORCE_INLINE int optSysBinarySearch(SArray* arr, int s, int e, uint64_t k) {
|
||||||
|
uint64_t v;
|
||||||
|
int32_t m;
|
||||||
|
while (s <= e) {
|
||||||
|
m = s + (e - s) / 2;
|
||||||
|
v = *(uint64_t*)taosArrayGet(arr, m);
|
||||||
|
if (v >= k) {
|
||||||
|
e = m - 1;
|
||||||
|
} else {
|
||||||
|
s = m + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
void optSysIntersection(SArray* in, SArray* out) {
|
||||||
|
int32_t sz = (int32_t)taosArrayGetSize(in);
|
||||||
|
if (sz <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
MergeIndex* mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
|
||||||
|
for (int i = 0; i < sz; i++) {
|
||||||
|
SArray* t = taosArrayGetP(in, i);
|
||||||
|
mi[i].len = (int32_t)taosArrayGetSize(t);
|
||||||
|
mi[i].idx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* base = taosArrayGetP(in, 0);
|
||||||
|
for (int i = 0; i < taosArrayGetSize(base); i++) {
|
||||||
|
uint64_t tgt = *(uint64_t*)taosArrayGet(base, i);
|
||||||
|
bool has = true;
|
||||||
|
for (int j = 1; j < taosArrayGetSize(in); j++) {
|
||||||
|
SArray* oth = taosArrayGetP(in, j);
|
||||||
|
int mid = optSysBinarySearch(oth, mi[j].idx, mi[j].len - 1, tgt);
|
||||||
|
if (mid >= 0 && mid < mi[j].len) {
|
||||||
|
uint64_t val = *(uint64_t*)taosArrayGet(oth, mid);
|
||||||
|
has = (val == tgt ? true : false);
|
||||||
|
mi[j].idx = mid;
|
||||||
|
} else {
|
||||||
|
has = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (has == true) {
|
||||||
|
taosArrayPush(out, &tgt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosMemoryFreeClear(mi);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt) {
|
static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt) {
|
||||||
// TODO, find comm mem from mRslt
|
// TODO, find comm mem from mRslt
|
||||||
for (int i = 0; i < taosArrayGetSize(mRslt); i++) {
|
for (int i = 0; i < taosArrayGetSize(mRslt); i++) {
|
||||||
SArray* aRslt = taosArrayGetP(mRslt, i);
|
SArray* arslt = taosArrayGetP(mRslt, i);
|
||||||
taosArrayAddAll(rslt, aRslt);
|
taosArraySort(arslt, tableUidCompare);
|
||||||
|
}
|
||||||
|
optSysIntersection(mRslt, rslt);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t optSysSpecialColumn(SNode* cond) {
|
||||||
|
SOperatorNode* pOper = (SOperatorNode*)cond;
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pOper->pLeft;
|
||||||
|
for (int i = 0; i < sizeof(SYSTABLE_SPECIAL_COL) / sizeof(SYSTABLE_SPECIAL_COL[0]); i++) {
|
||||||
|
if (0 == strcmp(pCol->colName, SYSTABLE_SPECIAL_COL[i])) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosArraySort(rslt, tableUidCompare);
|
|
||||||
taosArrayRemoveDuplicate(rslt, tableUidCompare, NULL);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
|
static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
|
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
|
||||||
|
@ -3286,7 +3349,6 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
|
||||||
SNodeList* pList = (SNodeList*)pNode->pParameterList;
|
SNodeList* pList = (SNodeList*)pNode->pParameterList;
|
||||||
|
|
||||||
int32_t len = LIST_LENGTH(pList);
|
int32_t len = LIST_LENGTH(pList);
|
||||||
if (len <= 0) return ret;
|
|
||||||
|
|
||||||
bool hasIdx = false;
|
bool hasIdx = false;
|
||||||
bool hasRslt = true;
|
bool hasRslt = true;
|
||||||
|
@ -3297,12 +3359,16 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
|
||||||
if (cell == NULL) break;
|
if (cell == NULL) break;
|
||||||
|
|
||||||
SArray* aRslt = taosArrayInit(16, sizeof(int64_t));
|
SArray* aRslt = taosArrayInit(16, sizeof(int64_t));
|
||||||
|
|
||||||
ret = optSysTabFilteImpl(arg, cell->pNode, aRslt);
|
ret = optSysTabFilteImpl(arg, cell->pNode, aRslt);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
// has index
|
// has index
|
||||||
hasIdx = true;
|
hasIdx = true;
|
||||||
taosArrayPush(mRslt, &aRslt);
|
if (optSysSpecialColumn(cell->pNode) == 0) {
|
||||||
|
taosArrayPush(mRslt, &aRslt);
|
||||||
|
} else {
|
||||||
|
// db_name/vgroup not result
|
||||||
|
taosArrayDestroy(aRslt);
|
||||||
|
}
|
||||||
} else if (ret == -2) {
|
} else if (ret == -2) {
|
||||||
// current vg
|
// current vg
|
||||||
hasIdx = true;
|
hasIdx = true;
|
||||||
|
@ -4172,6 +4238,131 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle,
|
||||||
|
STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx,
|
||||||
|
STsdbReader** ppReader, const char* idstr) {
|
||||||
|
STsdbReader* pReader = NULL;
|
||||||
|
SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
|
||||||
|
taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));
|
||||||
|
}
|
||||||
|
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr);
|
||||||
|
if (code != 0) {
|
||||||
|
taosArrayDestroy(subTableList);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
*ppReader = pReader;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
||||||
|
SSDataBlock* pBlock, uint32_t* status) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
uint64_t uid = pBlock->info.uid;
|
||||||
|
|
||||||
|
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
||||||
|
|
||||||
|
pCost->totalBlocks += 1;
|
||||||
|
pCost->totalRows += pBlock->info.rows;
|
||||||
|
|
||||||
|
*status = pInfo->dataBlockLoadFlag;
|
||||||
|
if (pTableScanInfo->pFilterNode != NULL ||
|
||||||
|
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
|
||||||
|
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
||||||
|
taosMemoryFreeClear(pBlock->pBlockAgg);
|
||||||
|
|
||||||
|
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
|
||||||
|
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
|
pCost->filterOutBlocks += 1;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||||
|
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
|
pCost->skipBlocks += 1;
|
||||||
|
|
||||||
|
// clear all data in pBlock that are set when handing the previous block
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||||
|
SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
pcol->pData = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
|
pCost->loadBlockStatis += 1;
|
||||||
|
|
||||||
|
bool allColumnsHaveAgg = true;
|
||||||
|
SColumnDataAgg** pColAgg = NULL;
|
||||||
|
STsdbReader* reader = pTableScanInfo->pReader;
|
||||||
|
tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
|
||||||
|
|
||||||
|
if (allColumnsHaveAgg == true) {
|
||||||
|
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
||||||
|
// todo create this buffer during creating operator
|
||||||
|
if (pBlock->pBlockAgg == NULL) {
|
||||||
|
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
|
||||||
|
if (!pColMatchInfo->needOutput) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||||
|
*status = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
|
||||||
|
|
||||||
|
pCost->totalCheckedRows += pBlock->info.rows;
|
||||||
|
pCost->loadBlocks += 1;
|
||||||
|
|
||||||
|
STsdbReader* reader = pTableScanInfo->pReader;
|
||||||
|
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
|
||||||
|
if (pCols == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
||||||
|
|
||||||
|
// currently only the tbname pseudo column
|
||||||
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
|
||||||
|
pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTableScanInfo->pFilterNode != NULL) {
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL);
|
||||||
|
|
||||||
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
pTableScanInfo->readRecorder.filterTime += el;
|
||||||
|
|
||||||
|
if (pBlock->info.rows == 0) {
|
||||||
|
pCost->filterOutBlocks += 1;
|
||||||
|
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
|
||||||
|
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
|
||||||
|
} else {
|
||||||
|
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
||||||
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
||||||
|
@ -4255,7 +4446,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
||||||
pCost->totalCheckedRows += pBlock->info.rows;
|
pCost->totalCheckedRows += pBlock->info.rows;
|
||||||
pCost->loadBlocks += 1;
|
pCost->loadBlocks += 1;
|
||||||
|
|
||||||
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
|
STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
|
||||||
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
|
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
|
||||||
if (pCols == NULL) {
|
if (pCols == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -4294,9 +4485,143 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
||||||
typedef struct STableMergeScanSortSourceParam {
|
typedef struct STableMergeScanSortSourceParam {
|
||||||
SOperatorInfo* pOperator;
|
SOperatorInfo* pOperator;
|
||||||
int32_t readerIdx;
|
int32_t readerIdx;
|
||||||
|
int64_t uid;
|
||||||
SSDataBlock* inputBlock;
|
SSDataBlock* inputBlock;
|
||||||
} STableMergeScanSortSourceParam;
|
} STableMergeScanSortSourceParam;
|
||||||
|
|
||||||
|
static SSDataBlock* getTableDataBlockTemp(void* param) {
|
||||||
|
STableMergeScanSortSourceParam* source = param;
|
||||||
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
int32_t readIdx = source->readerIdx;
|
||||||
|
SSDataBlock* pBlock = source->inputBlock;
|
||||||
|
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
|
SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
SArray* subTable = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
taosArrayPush(subTable, taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex));
|
||||||
|
SReadHandle* pHandle = &pInfo->readHandle;
|
||||||
|
tsdbReaderOpen(pHandle->vnode, pQueryCond, subTable, &pInfo->pReader, GET_TASKID(pTaskInfo));
|
||||||
|
taosArrayDestroy(subTable);
|
||||||
|
|
||||||
|
STsdbReader* reader = pInfo->pReader;
|
||||||
|
while (tsdbNextDataBlock(reader)) {
|
||||||
|
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
// process this data block based on the probabilities
|
||||||
|
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
|
||||||
|
if (!processThisBlock) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
SDataBlockInfo binfo = pBlock->info;
|
||||||
|
tsdbRetrieveDataBlockInfo(reader, &binfo);
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pBlock, binfo.rows);
|
||||||
|
pBlock->info.type = binfo.type;
|
||||||
|
pBlock->info.uid = binfo.uid;
|
||||||
|
pBlock->info.window = binfo.window;
|
||||||
|
pBlock->info.rows = binfo.rows;
|
||||||
|
|
||||||
|
if (tsdbIsAscendingOrder(pInfo->pReader)) {
|
||||||
|
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
|
||||||
|
} else {
|
||||||
|
pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t status = 0;
|
||||||
|
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// current block is filter out according to filter condition, continue load the next block
|
||||||
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
|
if (groupId) {
|
||||||
|
pBlock->info.groupId = *groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows;
|
||||||
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
tsdbReaderClose(pInfo->pReader);
|
||||||
|
pInfo->pReader = NULL;
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
tsdbReaderClose(pInfo->pReader);
|
||||||
|
pInfo->pReader = NULL;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
static SSDataBlock* getTableDataBlock2(void* param) {
|
||||||
|
STableMergeScanSortSourceParam* source = param;
|
||||||
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
|
int64_t uid = source->uid;
|
||||||
|
SSDataBlock* pBlock = source->inputBlock;
|
||||||
|
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
|
STsdbReader* reader = pTableScanInfo->pReader;
|
||||||
|
while (tsdbTableNextDataBlock(reader, uid)) {
|
||||||
|
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
// process this data block based on the probabilities
|
||||||
|
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
|
||||||
|
if (!processThisBlock) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
SDataBlockInfo binfo = pBlock->info;
|
||||||
|
tsdbRetrieveDataBlockInfo(reader, &binfo);
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pBlock, binfo.rows);
|
||||||
|
pBlock->info.type = binfo.type;
|
||||||
|
pBlock->info.uid = binfo.uid;
|
||||||
|
pBlock->info.window = binfo.window;
|
||||||
|
pBlock->info.rows = binfo.rows;
|
||||||
|
|
||||||
|
uint32_t status = 0;
|
||||||
|
int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// current block is filter out according to filter condition, continue load the next block
|
||||||
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
|
if (groupId) {
|
||||||
|
pBlock->info.groupId = *groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||||
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* getTableDataBlock(void* param) {
|
static SSDataBlock* getTableDataBlock(void* param) {
|
||||||
STableMergeScanSortSourceParam* source = param;
|
STableMergeScanSortSourceParam* source = param;
|
||||||
SOperatorInfo* pOperator = source->pOperator;
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
|
@ -4332,7 +4657,6 @@ static SSDataBlock* getTableDataBlock(void* param) {
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
|
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
|
||||||
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -4375,6 +4699,14 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
|
||||||
|
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
||||||
|
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
||||||
|
for (int i = 0; i < src->numOfCols; i++) {
|
||||||
|
dst->colList[i] = src->colList[i];
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -4395,10 +4727,9 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
int32_t tableEndIdx = pInfo->tableEndIndex;
|
||||||
|
|
||||||
STableListInfo* tableListInfo = pInfo->tableListInfo;
|
STableListInfo* tableListInfo = pInfo->tableListInfo;
|
||||||
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
|
||||||
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
|
|
||||||
pInfo->dataReaders, GET_TASKID(pTaskInfo));
|
|
||||||
|
|
||||||
|
// pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
||||||
|
pInfo->pReader = NULL;
|
||||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||||
// the additional one is reserved for merge result
|
// the additional one is reserved for merge result
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
||||||
|
@ -4406,18 +4737,27 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockTemp, NULL, NULL);
|
||||||
|
|
||||||
|
// one table has one data block
|
||||||
|
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||||
|
pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
|
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i + tableStartIdx);
|
||||||
|
|
||||||
size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
|
|
||||||
for (int32_t i = 0; i < numReaders; ++i) {
|
|
||||||
STableMergeScanSortSourceParam param = {0};
|
STableMergeScanSortSourceParam param = {0};
|
||||||
param.readerIdx = i;
|
param.readerIdx = i;
|
||||||
param.pOperator = pOperator;
|
param.pOperator = pOperator;
|
||||||
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
taosArrayPush(pInfo->sortSourceParams, ¶m);
|
taosArrayPush(pInfo->sortSourceParams, ¶m);
|
||||||
|
|
||||||
|
SQueryTableDataCond cond;
|
||||||
|
dumpSQueryTableCond(&pInfo->cond, &cond);
|
||||||
|
taosArrayPush(pInfo->queryConds, &cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numReaders; ++i) {
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
||||||
ps->param = param;
|
ps->param = param;
|
||||||
|
@ -4437,7 +4777,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
|
int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
|
||||||
|
|
||||||
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||||
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
||||||
|
@ -4446,7 +4786,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
||||||
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numReaders; ++i) {
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
||||||
blockDataDestroy(param->inputBlock);
|
blockDataDestroy(param->inputBlock);
|
||||||
}
|
}
|
||||||
|
@ -4454,12 +4794,13 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numReaders; ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
|
||||||
STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
|
SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
|
||||||
tsdbReaderClose(reader);
|
taosMemoryFree(cond->colList);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pInfo->dataReaders);
|
taosArrayDestroy(pInfo->queryConds);
|
||||||
pInfo->dataReaders = NULL;
|
pInfo->queryConds = NULL;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4543,13 +4884,23 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
void destroyTableMergeScanOperatorInfo(void* param) {
|
void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||||
cleanupQueryTableDataCond(&pTableScanInfo->cond);
|
cleanupQueryTableDataCond(&pTableScanInfo->cond);
|
||||||
|
|
||||||
|
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTable; i++) {
|
||||||
|
STableMergeScanSortSourceParam* param = taosArrayGet(pTableScanInfo->sortSourceParams, i);
|
||||||
|
blockDataDestroy(param->inputBlock);
|
||||||
|
}
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
|
tsdbReaderClose(pTableScanInfo->pReader);
|
||||||
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
|
pTableScanInfo->pReader = NULL;
|
||||||
tsdbReaderClose(reader);
|
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
|
||||||
|
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
|
||||||
|
taosMemoryFree(pCond->colList);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pTableScanInfo->dataReaders);
|
taosArrayDestroy(pTableScanInfo->queryConds);
|
||||||
|
|
||||||
if (pTableScanInfo->matchInfo.pList != NULL) {
|
if (pTableScanInfo->matchInfo.pList != NULL) {
|
||||||
taosArrayDestroy(pTableScanInfo->matchInfo.pList);
|
taosArrayDestroy(pTableScanInfo->matchInfo.pList);
|
||||||
|
|
|
@ -38,7 +38,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
int32_t code = extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
|
int32_t code =
|
||||||
|
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
|
||||||
|
|
||||||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
|
|
||||||
|
@ -62,8 +63,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
// there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
// there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
||||||
// TODO dynamic set the available sort buffer
|
// TODO dynamic set the available sort buffer
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo,
|
pOperator->fpSet =
|
||||||
getExplainExecInfo);
|
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -126,7 +127,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
||||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
||||||
|
@ -316,7 +317,7 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
|
||||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
||||||
|
@ -592,7 +593,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
|
|
||||||
blockDataEnsureCapacity(p, capacity);
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
_retry:
|
_retry:
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = NULL;
|
STupleHandle* pTupleHandle = NULL;
|
||||||
if (pInfo->groupSort) {
|
if (pInfo->groupSort) {
|
||||||
|
@ -647,13 +648,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
// ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID);
|
// ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
||||||
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
|
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
|
||||||
}
|
}
|
||||||
|
pInfo->limitInfo.numOfOutputRows += p->info.rows;
|
||||||
pDataBlock->info.rows = p->info.rows;
|
pDataBlock->info.rows = p->info.rows;
|
||||||
pDataBlock->info.groupId = pInfo->groupId;
|
pDataBlock->info.groupId = pInfo->groupId;
|
||||||
}
|
}
|
||||||
|
@ -734,7 +735,8 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
||||||
SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
|
|
||||||
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
|
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
|
||||||
|
&pInfo->matchInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue