Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact
This commit is contained in:
commit
4f0e5c331c
|
@ -1590,7 +1590,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
||||||
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
|
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||||
STColumn* pCol = &pTSchema->columns[k];
|
STColumn* pCol = &pTSchema->columns[k];
|
||||||
ASSERT(pCol->type == pColInfoData->info.type);
|
|
||||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||||
switch (pColInfoData->info.type) {
|
switch (pColInfoData->info.type) {
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
@ -1614,20 +1613,39 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
||||||
case TSDB_DATA_TYPE_VARBINARY:
|
case TSDB_DATA_TYPE_VARBINARY:
|
||||||
case TSDB_DATA_TYPE_DECIMAL:
|
case TSDB_DATA_TYPE_DECIMAL:
|
||||||
case TSDB_DATA_TYPE_BLOB:
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
|
case TSDB_DATA_TYPE_JSON:
|
||||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||||
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
|
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
|
||||||
TASSERT(0);
|
TASSERT(0);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
||||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NORM, var, true, offset, k);
|
char tv[8] = {0};
|
||||||
|
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
|
float v = 0;
|
||||||
|
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
|
||||||
|
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||||
|
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
double v = 0;
|
||||||
|
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
|
||||||
|
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||||
|
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||||
|
int64_t v = 0;
|
||||||
|
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
|
||||||
|
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||||
|
} else {
|
||||||
|
uint64_t v = 0;
|
||||||
|
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
|
||||||
|
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||||
|
}
|
||||||
|
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset, k);
|
||||||
} else {
|
} else {
|
||||||
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
||||||
TASSERT(0);
|
TASSERT(0);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
offset += TYPE_BYTES[pColInfoData->info.type];
|
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
|
||||||
}
|
}
|
||||||
dataLen += TD_ROW_LEN(rb.pBuf);
|
dataLen += TD_ROW_LEN(rb.pBuf);
|
||||||
#ifdef TD_DEBUG_PRINT_ROW
|
#ifdef TD_DEBUG_PRINT_ROW
|
||||||
|
|
|
@ -773,7 +773,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
||||||
|
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
|
||||||
|
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
||||||
|
SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
|
|
@ -4517,6 +4517,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
|
||||||
|
SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
|
SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||||
|
int32_t numOfOutputCols = 0;
|
||||||
|
SArray* pColList = NULL;
|
||||||
|
//extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
|
|
||||||
|
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
|
|
@ -13,17 +13,18 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tdatablock.h"
|
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
|
||||||
static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
||||||
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
|
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
|
||||||
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
||||||
|
|
||||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
|
|
||||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo,
|
||||||
SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) {
|
SExprInfo* pExprInfo, int32_t numOfCols, SArray* pColMatchColInfo,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
int32_t rowSize = pResBlock->info.rowSize;
|
int32_t rowSize = pResBlock->info.rowSize;
|
||||||
|
@ -32,33 +33,33 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->numOfExprs = numOfCols;
|
||||||
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
|
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 1024);
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
pInfo->pColMatchInfo= pColMatchColInfo;
|
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||||
pOperator->name = "SortOperator";
|
pOperator->name = "SortOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
// lazy evaluation for the following parameter since the input datablock is not known till now.
|
// lazy evaluation for the following parameter since the input datablock is not known till now.
|
||||||
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
|
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize +
|
||||||
// pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
|
// header pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL,
|
||||||
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, getExplainExecInfo);
|
getExplainExecInfo);
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
taosMemoryFree(pOperator);
|
taosMemoryFree(pOperator);
|
||||||
|
@ -68,7 +69,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
bool isNull = tsortIsNullVal(pTupleHandle, i);
|
bool isNull = tsortIsNullVal(pTupleHandle, i);
|
||||||
if (isNull) {
|
if (isNull) {
|
||||||
colDataAppendNULL(pColInfo, pBlock->info.rows);
|
colDataAppendNULL(pColInfo, pBlock->info.rows);
|
||||||
} else {
|
} else {
|
||||||
|
@ -80,7 +81,8 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo) {
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||||
|
SArray* pColMatchInfo) {
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols);
|
ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols);
|
||||||
|
|
||||||
|
@ -129,10 +131,11 @@ SSDataBlock* loadNextDataBlock(void* param) {
|
||||||
|
|
||||||
// todo refactor: merged with fetch fp
|
// todo refactor: merged with fetch fp
|
||||||
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
||||||
SOperatorInfo* pOperator = param;
|
SOperatorInfo* pOperator = param;
|
||||||
SSortOperatorInfo* pSort = pOperator->info;
|
SSortOperatorInfo* pSort = pOperator->info;
|
||||||
if (pOperator->pExpr != NULL) {
|
if (pOperator->pExpr != NULL) {
|
||||||
int32_t code = projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
|
int32_t code =
|
||||||
|
projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pOperator->pTaskInfo->env, code);
|
longjmp(pOperator->pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -141,7 +144,7 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
||||||
|
|
||||||
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||||
SSortOperatorInfo* pInfo = pOperator->info;
|
SSortOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
if (OPTR_IS_OPENED(pOperator)) {
|
if (OPTR_IS_OPENED(pOperator)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -150,8 +153,8 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||||
pInfo->startTs = taosGetTimestampUs();
|
pInfo->startTs = taosGetTimestampUs();
|
||||||
|
|
||||||
// pInfo->binfo.pRes is not equalled to the input datablock.
|
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT, -1, -1,
|
||||||
-1, -1, NULL, pTaskInfo->id.str);
|
NULL, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
||||||
|
|
||||||
|
@ -166,7 +169,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||||
longjmp(pTaskInfo->env, terrno);
|
longjmp(pTaskInfo->env, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs)/1000.0;
|
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
@ -186,7 +189,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
SSDataBlock* pBlock =
|
||||||
|
getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
||||||
|
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
@ -208,10 +212,147 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
|
||||||
ASSERT(pOptr != NULL);
|
ASSERT(pOptr != NULL);
|
||||||
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
|
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
|
||||||
|
|
||||||
SSortOperatorInfo *pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
|
SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
|
||||||
|
|
||||||
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
|
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
|
||||||
*pOptrExplain = pInfo;
|
*pOptrExplain = pInfo;
|
||||||
*len = sizeof(SSortExecInfo);
|
*len = sizeof(SSortExecInfo);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SMultiwaySortMergeOperatorInfo {
|
||||||
|
SOptrBasicInfo binfo;
|
||||||
|
|
||||||
|
int32_t bufPageSize;
|
||||||
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||||
|
|
||||||
|
SArray* pSortInfo;
|
||||||
|
SSortHandle* pSortHandle;
|
||||||
|
SArray* pColMatchInfo; // for index map from table scan output
|
||||||
|
|
||||||
|
int64_t startTs; // sort start time
|
||||||
|
} SMultiwaySortMergeOperatorInfo;
|
||||||
|
|
||||||
|
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
|
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
if (OPTR_IS_OPENED(pOperator)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->startTs = taosGetTimestampUs();
|
||||||
|
|
||||||
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
|
||||||
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE,
|
||||||
|
pInfo->bufPageSize, numOfBufPage, NULL, pTaskInfo->id.str);
|
||||||
|
|
||||||
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
|
||||||
|
SSortSource ps = {0};
|
||||||
|
ps.param = pOperator->pDownstream[i];
|
||||||
|
tsortAddSource(pInfo->pSortHandle, &ps);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
|
||||||
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
|
||||||
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pBlock =
|
||||||
|
getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
||||||
|
|
||||||
|
if (pBlock != NULL) {
|
||||||
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
} else {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
}
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param;
|
||||||
|
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||||
|
|
||||||
|
taosArrayDestroy(pInfo->pSortInfo);
|
||||||
|
taosArrayDestroy(pInfo->pColMatchInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||||
|
ASSERT(pOptr != NULL);
|
||||||
|
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
|
||||||
|
|
||||||
|
SMultiwaySortMergeOperatorInfo* pOperatorInfo = (SMultiwaySortMergeOperatorInfo*)pOptr->info;
|
||||||
|
|
||||||
|
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
|
||||||
|
*pOptrExplain = pInfo;
|
||||||
|
*len = sizeof(SSortExecInfo);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
|
||||||
|
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
int32_t rowSize = pResBlock->info.rowSize;
|
||||||
|
|
||||||
|
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->binfo.pRes = pResBlock;
|
||||||
|
|
||||||
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
|
||||||
|
pInfo->pSortInfo = pSortInfo;
|
||||||
|
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||||
|
pOperator->name = "MultiwaySortMerge";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
|
||||||
|
pOperator->blocking = true;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
|
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
|
||||||
|
pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
||||||
|
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
||||||
|
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
|
||||||
|
|
||||||
|
int32_t code = appendDownstream(pOperator, downStreams, numStreams);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
taosMemoryFree(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
|
@ -171,8 +171,10 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||||
param->colId = l->colId;
|
param->colId = l->colId;
|
||||||
param->colValType = l->node.resType.type;
|
param->colValType = l->node.resType.type;
|
||||||
memcpy(param->dbName, l->dbName, sizeof(l->dbName));
|
memcpy(param->dbName, l->dbName, sizeof(l->dbName));
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wformat-overflow"
|
||||||
sprintf(param->colName, "%s_%s", l->colName, r->literal);
|
sprintf(param->colName, "%s_%s", l->colName, r->literal);
|
||||||
|
#pragma GCC diagnostic pop
|
||||||
param->colValType = r->typeData;
|
param->colValType = r->typeData;
|
||||||
return 0;
|
return 0;
|
||||||
// memcpy(param->colName, l->colName, sizeof(l->colName));
|
// memcpy(param->colName, l->colName, sizeof(l->colName));
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
def get_long_name(self, length, mode="mixed"):
|
||||||
|
"""
|
||||||
|
generate long name
|
||||||
|
mode could be numbers/letters/letters_mixed/mixed
|
||||||
|
"""
|
||||||
|
if mode == "numbers":
|
||||||
|
population = string.digits
|
||||||
|
elif mode == "letters":
|
||||||
|
population = string.ascii_letters.lower()
|
||||||
|
elif mode == "letters_mixed":
|
||||||
|
population = string.ascii_letters.upper() + string.ascii_letters.lower()
|
||||||
|
else:
|
||||||
|
population = string.ascii_letters.lower() + string.digits
|
||||||
|
return "".join(random.choices(population, k=length))
|
||||||
|
|
||||||
|
def __create_tb(self,dbname,stbname,tbname,comment):
|
||||||
|
tdSql.execute(f'create database if not exists {dbname}')
|
||||||
|
tdSql.execute(f'use {dbname}')
|
||||||
|
tdSql.execute(
|
||||||
|
f'create table {stbname} (ts timestamp,c0 int) tags(t0 int) ')
|
||||||
|
tdSql.execute(
|
||||||
|
f'create table {tbname} using {stbname} tags(1) comment "{comment}"')
|
||||||
|
def __create_normaltb(self,dbname,tbname,comment):
|
||||||
|
tdSql.execute(f'create database if not exists {dbname}')
|
||||||
|
tdSql.execute(f'use {dbname}')
|
||||||
|
tdSql.execute(
|
||||||
|
f'create table {tbname} (ts timestamp,c0 int) comment "{comment}"')
|
||||||
|
|
||||||
|
def check_comment(self):
|
||||||
|
dbname = self.get_long_name(length=10, mode="letters")
|
||||||
|
ntbname = self.get_long_name(length=5, mode="letters")
|
||||||
|
|
||||||
|
# create normal table with comment
|
||||||
|
comment = self.get_long_name(length=10, mode="letters")
|
||||||
|
self.__create_normaltb(dbname,ntbname,comment)
|
||||||
|
ntb_kv_list = tdSql.getResult("show tables")
|
||||||
|
print(ntb_kv_list)
|
||||||
|
tdSql.checkEqual(ntb_kv_list[0][8], comment)
|
||||||
|
tdSql.error('alter table {ntbname} comment "test1"')
|
||||||
|
tdSql.execute(f'drop database {dbname}')
|
||||||
|
|
||||||
|
# max length(1024)
|
||||||
|
comment = self.get_long_name(length=1024, mode="letters")
|
||||||
|
self.__create_normaltb(dbname,ntbname,comment)
|
||||||
|
ntb_kv_list = tdSql.getResult("show tables")
|
||||||
|
tdSql.checkEqual(ntb_kv_list[0][8], comment)
|
||||||
|
tdSql.execute(f'drop database {dbname}')
|
||||||
|
|
||||||
|
# error overlength
|
||||||
|
comment = self.get_long_name(length=1025, mode="letters")
|
||||||
|
tdSql.execute(f'create database if not exists {dbname}')
|
||||||
|
tdSql.execute(f'use {dbname}')
|
||||||
|
tdSql.error(f"create table ntb (ts timestamp,c0 int) comment '{comment}'")
|
||||||
|
tdSql.execute(f'drop database {dbname}')
|
||||||
|
|
||||||
|
# create child table with comment
|
||||||
|
comment = self.get_long_name(length=10, mode="letters")
|
||||||
|
stbname = self.get_long_name(length=5, mode="letters")
|
||||||
|
tbname = self.get_long_name(length=3, mode="letters")
|
||||||
|
self.__create_tb(dbname,stbname,tbname,comment)
|
||||||
|
ntb_kv_list = tdSql.getResult("show tables")
|
||||||
|
tdSql.checkEqual(ntb_kv_list[0][8], comment)
|
||||||
|
tdSql.error(f'alter table {tbname} comment "test1"')
|
||||||
|
tdSql.execute(f'drop database {dbname}')
|
||||||
|
|
||||||
|
# max length 1024
|
||||||
|
comment = self.get_long_name(length=1024, mode="letters")
|
||||||
|
self.__create_tb(dbname,ntbname,comment)
|
||||||
|
ntb_kv_list = tdSql.getResult("show tables")
|
||||||
|
tdSql.checkEqual(ntb_kv_list[0][8], comment)
|
||||||
|
tdSql.execute(f'drop database {dbname}')
|
||||||
|
|
||||||
|
# error overlength
|
||||||
|
comment = self.get_long_name(length=1025, mode="letters")
|
||||||
|
tdSql.execute(f'create database if not exists {dbname}')
|
||||||
|
tdSql.execute(f'use {dbname}')
|
||||||
|
tdSql.execute(f"create table stb (ts timestamp,c0 int) tags(t0 int)")
|
||||||
|
tdSql.error(f'create table stb_1 us stb tags(1) comment "{comment}"')
|
||||||
|
tdSql.execute(f'drop database {dbname}')
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.check_comment()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -21,6 +21,7 @@ python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
|
||||||
#python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
|
#python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
|
||||||
python3 ./test.py -f 1-insert/alter_stable.py
|
python3 ./test.py -f 1-insert/alter_stable.py
|
||||||
python3 ./test.py -f 1-insert/alter_table.py
|
python3 ./test.py -f 1-insert/alter_table.py
|
||||||
|
# python3 ./test.py -f 1-inerst/create_table_comment.py
|
||||||
python3 ./test.py -f 2-query/between.py
|
python3 ./test.py -f 2-query/between.py
|
||||||
python3 ./test.py -f 2-query/distinct.py
|
python3 ./test.py -f 2-query/distinct.py
|
||||||
python3 ./test.py -f 2-query/varchar.py
|
python3 ./test.py -f 2-query/varchar.py
|
||||||
|
|
Loading…
Reference in New Issue