Merge pull request #12975 from taosdata/feature/3_liaohj
enh(query): add detailed sort exec information in analysis of operator.
This commit is contained in:
commit
53527cfe59
|
@ -219,6 +219,16 @@ typedef struct {
|
||||||
|
|
||||||
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
||||||
|
|
||||||
|
#define SORT_QSORT_T 0x1
|
||||||
|
#define SORT_SPILLED_MERGE_SORT_T 0x2
|
||||||
|
typedef struct SSortExecInfo {
|
||||||
|
int32_t sortMethod;
|
||||||
|
int32_t sortBuffer;
|
||||||
|
int32_t loops; // loop count
|
||||||
|
int32_t writeBytes; // write io bytes
|
||||||
|
int32_t readBytes; // read io bytes
|
||||||
|
} SSortExecInfo;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -198,7 +198,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData);
|
||||||
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
|
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
|
||||||
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
|
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap);
|
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
|
||||||
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
|
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
|
||||||
int32_t pageSize);
|
int32_t pageSize);
|
||||||
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
|
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
|
||||||
|
|
|
@ -660,8 +660,7 @@ typedef struct {
|
||||||
int32_t tz; // query client timezone
|
int32_t tz; // query client timezone
|
||||||
char intervalUnit;
|
char intervalUnit;
|
||||||
char slidingUnit;
|
char slidingUnit;
|
||||||
char
|
char offsetUnit;
|
||||||
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
|
|
|
@ -361,19 +361,13 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if pIndexMap = NULL, merger one column by on column
|
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
|
||||||
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap) {
|
|
||||||
assert(pSrc != NULL && pDest != NULL);
|
assert(pSrc != NULL && pDest != NULL);
|
||||||
int32_t capacity = pDest->info.capacity;
|
int32_t capacity = pDest->info.capacity;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
|
||||||
int32_t mapIndex = i;
|
|
||||||
// if (pIndexMap) {
|
|
||||||
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
|
|
||||||
// }
|
|
||||||
|
|
||||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
|
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
|
||||||
|
|
||||||
capacity = pDest->info.capacity;
|
capacity = pDest->info.capacity;
|
||||||
colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
|
colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "commandInt.h"
|
#include "commandInt.h"
|
||||||
#include "plannodes.h"
|
#include "plannodes.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "tcommon.h"
|
||||||
|
|
||||||
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
|
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
|
||||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
|
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
|
||||||
|
@ -637,13 +638,48 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
}
|
}
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSortNode->pSortKeys->length);
|
|
||||||
|
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pDescNode->pSlots));
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->totalRowSize);
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDescNode->totalRowSize);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
|
||||||
|
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
|
||||||
|
// sort key
|
||||||
|
EXPLAIN_ROW_NEW(level, "Sort Key: ");
|
||||||
|
if (pResNode->pExecInfo) {
|
||||||
|
for (int32_t i = 0; i < LIST_LENGTH(pSortNode->pSortKeys); ++i) {
|
||||||
|
SOrderByExprNode *ptn = nodesListGetNode(pSortNode->pSortKeys, i);
|
||||||
|
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
|
||||||
|
// sort method
|
||||||
|
EXPLAIN_ROW_NEW(level, "Sort Method: ");
|
||||||
|
|
||||||
|
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
|
||||||
|
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0);
|
||||||
|
SSortExecInfo * pExecInfo = (SSortExecInfo *)execInfo->verboseInfo;
|
||||||
|
EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort");
|
||||||
|
if (pExecInfo->sortBuffer > 1024 * 1024) {
|
||||||
|
EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0));
|
||||||
|
} else if (pExecInfo->sortBuffer > 1024) {
|
||||||
|
EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0));
|
||||||
|
} else {
|
||||||
|
EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops);
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
}
|
||||||
|
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
|
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
|
||||||
|
@ -792,13 +828,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
}
|
}
|
||||||
// EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pPartNode->length);
|
|
||||||
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->totalRowSize);
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||||
// if (pPartNode->pGroupKeys) {
|
|
||||||
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
// EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pPartNode->pGroupKeys->length);
|
|
||||||
// }
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
|
|
@ -622,18 +622,14 @@ typedef struct SSortedMergeOperatorInfo {
|
||||||
|
|
||||||
typedef struct SSortOperatorInfo {
|
typedef struct SSortOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||||
SArray* pSortInfo;
|
SArray* pSortInfo;
|
||||||
SSortHandle* pSortHandle;
|
SSortHandle* pSortHandle;
|
||||||
SArray* pColMatchInfo; // for index map from table scan output
|
SArray* pColMatchInfo; // for index map from table scan output
|
||||||
int32_t bufPageSize;
|
int32_t bufPageSize;
|
||||||
|
|
||||||
// TODO extact struct
|
int64_t startTs; // sort start time
|
||||||
int64_t startTs; // sort start time
|
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
|
||||||
uint64_t totalSize; // total load bytes from remote
|
|
||||||
uint64_t totalRows; // total number of rows
|
|
||||||
uint64_t totalElapsed; // total elapsed time
|
|
||||||
} SSortOperatorInfo;
|
} SSortOperatorInfo;
|
||||||
|
|
||||||
typedef struct STagFilterOperatorInfo {
|
typedef struct STagFilterOperatorInfo {
|
||||||
|
|
|
@ -137,6 +137,14 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
||||||
*/
|
*/
|
||||||
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
|
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return the sort execution information.
|
||||||
|
*
|
||||||
|
* @param pHandle
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -806,7 +806,7 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
|
||||||
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
|
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
|
||||||
blockDataFromBuf(pDB, buf);
|
blockDataFromBuf(pDB, buf);
|
||||||
SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
|
SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
|
||||||
blockDataMerge(pInfo->pRes, pSub, NULL);
|
blockDataMerge(pInfo->pRes, pSub);
|
||||||
blockDataDestroy(pDB);
|
blockDataDestroy(pDB);
|
||||||
blockDataDestroy(pSub);
|
blockDataDestroy(pSub);
|
||||||
}
|
}
|
||||||
|
@ -1046,8 +1046,9 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
|
||||||
blockDataDestroy(pInfo->pRes);
|
blockDataDestroy(pInfo->pRes);
|
||||||
|
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||||
metaCloseTbCursor(pInfo->pCur);
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
pInfo->pCur = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->scanCols);
|
taosArrayDestroy(pInfo->scanCols);
|
||||||
|
|
|
@ -2,6 +2,9 @@
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
|
|
||||||
static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
||||||
|
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
|
||||||
|
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
||||||
|
|
||||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
|
|
||||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
@ -35,7 +38,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, getExplainExecInfo);
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -121,20 +124,17 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
||||||
SSortOperatorInfo* pInfo = pOperator->info;
|
SSortOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (OPTR_IS_OPENED(pOperator)) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pInfo->binfo.pRes is not equalled to the input datablock.
|
pInfo->startTs = taosGetTimestampUs();
|
||||||
// int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
|
||||||
|
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT,
|
||||||
-1, -1, NULL, pTaskInfo->id.str);
|
-1, -1, NULL, pTaskInfo->id.str);
|
||||||
|
|
||||||
|
@ -146,12 +146,39 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||||
taosMemoryFreeClear(ps);
|
taosMemoryFreeClear(ps);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, terrno);
|
longjmp(pTaskInfo->env, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs)/1000.0;
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
|
||||||
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSortOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
||||||
|
|
||||||
|
if (pBlock != NULL) {
|
||||||
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
} else {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
}
|
||||||
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
@ -161,3 +188,15 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
taosArrayDestroy(pInfo->pSortInfo);
|
taosArrayDestroy(pInfo->pSortInfo);
|
||||||
taosArrayDestroy(pInfo->pColMatchInfo);
|
taosArrayDestroy(pInfo->pColMatchInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||||
|
ASSERT(pOptr != NULL);
|
||||||
|
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
|
||||||
|
|
||||||
|
SSortOperatorInfo *pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
|
||||||
|
|
||||||
|
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
|
||||||
|
*pOptrExplain = pInfo;
|
||||||
|
*len = sizeof(SSortExecInfo);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
|
@ -31,20 +31,16 @@ struct STupleHandle {
|
||||||
|
|
||||||
struct SSortHandle {
|
struct SSortHandle {
|
||||||
int32_t type;
|
int32_t type;
|
||||||
|
|
||||||
int32_t pageSize;
|
int32_t pageSize;
|
||||||
int32_t numOfPages;
|
int32_t numOfPages;
|
||||||
SDiskbasedBuf *pBuf;
|
SDiskbasedBuf *pBuf;
|
||||||
|
|
||||||
SArray *pSortInfo;
|
SArray *pSortInfo;
|
||||||
SArray *pIndexMap;
|
|
||||||
SArray *pOrderedSource;
|
SArray *pOrderedSource;
|
||||||
|
|
||||||
_sort_fetch_block_fn_t fetchfp;
|
int32_t loops;
|
||||||
_sort_merge_compar_fn_t comparFn;
|
|
||||||
SMultiwayMergeTreeInfo *pMergeTree;
|
|
||||||
int64_t startTs;
|
|
||||||
uint64_t sortElapsed;
|
uint64_t sortElapsed;
|
||||||
|
int64_t startTs;
|
||||||
uint64_t totalElapsed;
|
uint64_t totalElapsed;
|
||||||
|
|
||||||
int32_t sourceId;
|
int32_t sourceId;
|
||||||
|
@ -53,13 +49,15 @@ struct SSortHandle {
|
||||||
int32_t numOfCompletedSources;
|
int32_t numOfCompletedSources;
|
||||||
bool opened;
|
bool opened;
|
||||||
const char *idStr;
|
const char *idStr;
|
||||||
|
|
||||||
bool inMemSort;
|
bool inMemSort;
|
||||||
bool needAdjust;
|
bool needAdjust;
|
||||||
STupleHandle tupleHandle;
|
STupleHandle tupleHandle;
|
||||||
|
|
||||||
void *param;
|
void *param;
|
||||||
void (*beforeFp)(SSDataBlock* pBlock, void* param);
|
void (*beforeFp)(SSDataBlock* pBlock, void* param);
|
||||||
|
|
||||||
|
_sort_fetch_block_fn_t fetchfp;
|
||||||
|
_sort_merge_compar_fn_t comparFn;
|
||||||
|
SMultiwayMergeTreeInfo *pMergeTree;
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
|
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
|
||||||
|
@ -80,7 +78,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
|
||||||
pSortHandle->pageSize = pageSize;
|
pSortHandle->pageSize = pageSize;
|
||||||
pSortHandle->numOfPages = numOfPages;
|
pSortHandle->numOfPages = numOfPages;
|
||||||
pSortHandle->pSortInfo = pSortInfo;
|
pSortHandle->pSortInfo = pSortInfo;
|
||||||
pSortHandle->pIndexMap = pIndexMap;
|
pSortHandle->loops = 0;
|
||||||
|
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||||
|
@ -415,6 +413,9 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
|
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
|
||||||
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
|
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
|
||||||
|
|
||||||
|
// the initial pass + sortPass + final mergePass
|
||||||
|
pHandle->loops = sortPass + 2;
|
||||||
|
|
||||||
size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
|
size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
|
||||||
for(int32_t t = 0; t < sortPass; ++t) {
|
for(int32_t t = 0; t < sortPass; ++t) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -502,12 +503,13 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
|
||||||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||||
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
|
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
|
||||||
taosArrayClear(pHandle->pOrderedSource);
|
taosArrayClear(pHandle->pOrderedSource);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
|
@ -524,6 +526,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
||||||
} else {
|
} else {
|
||||||
pHandle->pageSize = 4096;
|
pHandle->pageSize = 4096;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo!!
|
// todo!!
|
||||||
pHandle->numOfPages = 1024;
|
pHandle->numOfPages = 1024;
|
||||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
@ -535,7 +538,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo relocate the columns
|
// todo relocate the columns
|
||||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
|
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -569,6 +572,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
||||||
pHandle->cmpParam.numOfSources = 1;
|
pHandle->cmpParam.numOfSources = 1;
|
||||||
pHandle->inMemSort = true;
|
pHandle->inMemSort = true;
|
||||||
|
|
||||||
|
pHandle->loops = 1;
|
||||||
pHandle->tupleHandle.rowIndex = -1;
|
pHandle->tupleHandle.rowIndex = -1;
|
||||||
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -592,7 +596,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
|
||||||
|
|
||||||
pHandle->opened = true;
|
pHandle->opened = true;
|
||||||
|
|
||||||
int32_t code = createInitialSortedMultiSources(pHandle);
|
int32_t code = createInitialSources(pHandle);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -692,3 +696,20 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
|
||||||
SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
|
SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
|
||||||
return colDataGetData(pColInfo, pVHandle->rowIndex);
|
return colDataGetData(pColInfo, pVHandle->rowIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
||||||
|
SSortExecInfo info = {0};
|
||||||
|
|
||||||
|
info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
|
||||||
|
info.sortMethod = pHandle->inMemSort? SORT_QSORT_T:SORT_SPILLED_MERGE_SORT_T;
|
||||||
|
info.loops = pHandle->loops;
|
||||||
|
|
||||||
|
if (pHandle->pBuf != NULL) {
|
||||||
|
SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
|
||||||
|
info.writeBytes = st.flushBytes;
|
||||||
|
info.readBytes = st.loadBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -189,6 +189,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
|
||||||
const char* msg1 = "name too long";
|
const char* msg1 = "name too long";
|
||||||
const char* msg2 = "invalid database name";
|
const char* msg2 = "invalid database name";
|
||||||
const char* msg3 = "db is not specified";
|
const char* msg3 = "db is not specified";
|
||||||
|
const char* msg4 = "invalid table name";
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
|
char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
|
||||||
|
@ -207,6 +208,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tbLen = pTableName->n - dbLen - 1;
|
int32_t tbLen = pTableName->n - dbLen - 1;
|
||||||
|
if (tbLen <= 0) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||||
|
}
|
||||||
|
|
||||||
char tbname[TSDB_TABLE_FNAME_LEN] = {0};
|
char tbname[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
strncpy(tbname, p + 1, tbLen);
|
strncpy(tbname, p + 1, tbLen);
|
||||||
/*tbLen = */ strdequote(tbname);
|
/*tbLen = */ strdequote(tbname);
|
||||||
|
|
|
@ -549,11 +549,16 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
|
||||||
// print the statistics information
|
// print the statistics information
|
||||||
{
|
{
|
||||||
SDiskbasedBufStatis* ps = &pBuf->statis;
|
SDiskbasedBufStatis* ps = &pBuf->statis;
|
||||||
uDebug(
|
if (ps->loadPages == 0) {
|
||||||
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
|
uDebug(
|
||||||
"Kb\n",
|
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)",
|
||||||
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
|
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
|
||||||
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
|
} else {
|
||||||
|
uDebug(
|
||||||
|
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb",
|
||||||
|
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
|
||||||
|
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRemoveFile(pBuf->path);
|
taosRemoveFile(pBuf->path);
|
||||||
|
|
Loading…
Reference in New Issue