[td-6260]<enhance>: Optimize the client-side query performance when multiple group result exists.
This commit is contained in:
parent
ac3208736b
commit
1bccf8a71a
|
@ -36,7 +36,7 @@ extern "C" {
|
||||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
|
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
|
||||||
|
|
||||||
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
|
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
|
||||||
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
|
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
|
||||||
|
|
||||||
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
|
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
|
||||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
|
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
|
||||||
|
|
|
@ -35,6 +35,7 @@ typedef struct SCompareParam {
|
||||||
|
|
||||||
static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
|
static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(columnIndexList);
|
size_t size = taosArrayGetSize(columnIndexList);
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
|
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
|
||||||
|
@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
|
||||||
(*hasPrev) = true;
|
(*hasPrev) = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the
|
||||||
|
// output value to multiple rows
|
||||||
static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) {
|
static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) {
|
||||||
if (numOfRows <= 1) {
|
if (numOfRows <= 1) {
|
||||||
return ;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
|
@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result
|
char* src = pCtx[k].pOutput;
|
||||||
char* src = pCtx[k].pOutput;
|
char* dst = pCtx[k].pOutput + pCtx[k].outputBytes;
|
||||||
|
|
||||||
for (int32_t i = 0; i < inc; ++i) {
|
// Let's start from the second row, as the first row has result value already.
|
||||||
pCtx[k].pOutput += pCtx[k].outputBytes;
|
for (int32_t i = 1; i < numOfRows; ++i) {
|
||||||
memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes);
|
memcpy(dst, src, (size_t)pCtx[k].outputBytes);
|
||||||
|
dst += pCtx[k].outputBytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) {
|
||||||
|
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||||
|
pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||||
|
int32_t functionId = pCtx[j].functionId;
|
||||||
|
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (functionId < 0) {
|
||||||
|
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
||||||
|
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
|
||||||
|
} else {
|
||||||
|
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) {
|
||||||
|
for(int32_t j = 0; j < numOfExpr; ++j) {
|
||||||
|
int32_t functionId = pCtx[j].functionId;
|
||||||
|
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (functionId < 0) {
|
||||||
|
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
||||||
|
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
|
||||||
|
} else {
|
||||||
|
aAggs[functionId].xFinalize(&pCtx[j]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
|
||||||
SMultiwayMergeInfo* pInfo = pOperator->info;
|
SMultiwayMergeInfo* pInfo = pOperator->info;
|
||||||
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
||||||
|
|
||||||
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES);
|
char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES);
|
||||||
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
add[i] = pCtx[i].pInput;
|
addrPtr[i] = pCtx[i].pInput;
|
||||||
pCtx[i].size = 1;
|
pCtx[i].size = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
if (pInfo->hasPrev) {
|
if (pInfo->hasPrev) {
|
||||||
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
|
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
|
||||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
|
||||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
|
||||||
int32_t functionId = pCtx[j].functionId;
|
|
||||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (functionId < 0) {
|
|
||||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
|
||||||
|
|
||||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor
|
doFinalizeResultImpl(pInfo, pCtx, numOfExpr);
|
||||||
int32_t functionId = pCtx[j].functionId;
|
|
||||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (functionId < 0) {
|
|
||||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
|
||||||
|
|
||||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
aAggs[functionId].xFinalize(&pCtx[j]);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||||
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
|
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
|
||||||
|
@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
|
||||||
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
|
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
|
||||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
|
||||||
int32_t functionId = pCtx[j].functionId;
|
|
||||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (functionId < 0) {
|
|
||||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
|
||||||
|
|
||||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
|
||||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
|
||||||
int32_t functionId = pCtx[j].functionId;
|
|
||||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (functionId < 0) {
|
|
||||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
|
||||||
|
|
||||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
|
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
|
||||||
|
@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
|
||||||
|
|
||||||
{
|
{
|
||||||
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
pCtx[i].pInput = add[i];
|
pCtx[i].pInput = addrPtr[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(add);
|
tfree(addrPtr);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isAllSourcesCompleted(SGlobalMerger *pMerger) {
|
static bool isAllSourcesCompleted(SGlobalMerger *pMerger) {
|
||||||
|
@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
|
||||||
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
|
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
|
||||||
bool sameGroup = true;
|
bool sameGroup = true;
|
||||||
if (pInfo->hasPrev) {
|
if (pInfo->hasPrev) {
|
||||||
|
|
||||||
|
// todo refactor extract method
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
|
||||||
|
|
||||||
// if this row belongs to current result set group
|
// if this row belongs to current result set group
|
||||||
|
@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool sameGroup = true;
|
||||||
if (pAggInfo->hasGroupColData) {
|
if (pAggInfo->hasGroupColData) {
|
||||||
bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
|
sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
|
||||||
if (!sameGroup) {
|
if (!sameGroup && !pAggInfo->multiGroupResults) {
|
||||||
*newgroup = true;
|
*newgroup = true;
|
||||||
pAggInfo->hasDataBlockForNewGroup = true;
|
pAggInfo->hasDataBlockForNewGroup = true;
|
||||||
pAggInfo->pExistBlock = pBlock;
|
pAggInfo->pExistBlock = pBlock;
|
||||||
|
@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handleData) { // data in current group is all handled
|
if (handleData) { // data in current group is all handled
|
||||||
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||||
int32_t functionId = pAggInfo->binfo.pCtx[j].functionId;
|
|
||||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (functionId < 0) {
|
|
||||||
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
|
|
||||||
|
|
||||||
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
|
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||||
pAggInfo->binfo.pRes->info.rows += numOfRows;
|
|
||||||
|
|
||||||
|
pAggInfo->binfo.pRes->info.rows += numOfRows;
|
||||||
setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows);
|
setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1019,71 +975,112 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
|
||||||
return (pRes->info.rows != 0)? pRes:NULL;
|
return (pRes->info.rows != 0)? pRes:NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
|
static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||||
SSLimitOperatorInfo *pInfo = pOperator->info;
|
if (pInfo->currentOffset > 0) {
|
||||||
assert(pInfo->currentGroupOffset >= 0);
|
pInfo->currentOffset -= 1;
|
||||||
|
} else {
|
||||||
|
// discard the data rows in current group
|
||||||
|
if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) {
|
||||||
|
int32_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
for (int32_t i = 0; i < num1; ++i) {
|
||||||
|
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
|
||||||
|
|
||||||
SSDataBlock* pBlock = NULL;
|
SColumnInfo *pColInfo = &pColInfoData->info;
|
||||||
if (pInfo->currentGroupOffset == 0) {
|
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
|
||||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes);
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
|
||||||
if (pBlock == NULL) {
|
|
||||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
|
memcpy(pDst, pSrc, pColInfo->bytes);
|
||||||
while ((*newgroup) == false) { // ignore the remain blocks
|
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
|
||||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
|
||||||
if (pBlock == NULL) {
|
|
||||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return pBlock;
|
pInfo->rowsTotal += 1;
|
||||||
|
pInfo->pRes->info.rows += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) {
|
||||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
if (pInfo->capacity < pResultBlock->info.rows + numOfRows) {
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
int32_t total = pResultBlock->info.rows + numOfRows;
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
int32_t num = taosArrayGetSize(pResultBlock->pDataBlock);
|
||||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i);
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
while(1) {
|
char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes);
|
||||||
if (*newgroup) {
|
if (tmp != NULL) {
|
||||||
pInfo->currentGroupOffset -= 1;
|
pInfoData->pData = tmp;
|
||||||
*newgroup = false;
|
} else {
|
||||||
|
// todo handle the malloc failure
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((*newgroup) == false) {
|
pInfo->capacity = total;
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
pInfo->threshold = total * 0.8;
|
||||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
}
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
int32_t rowIndex = 0;
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
return NULL;
|
while (rowIndex < pBlock->info.rows) {
|
||||||
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
|
||||||
|
|
||||||
|
bool samegroup = true;
|
||||||
|
if (pInfo->hasPrev) {
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i);
|
||||||
|
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
|
||||||
|
|
||||||
|
SColumnInfo *pColInfo = &pColInfoData->info;
|
||||||
|
|
||||||
|
char * d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
|
||||||
|
int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes);
|
||||||
|
if (ret != 0) { // it is a new group
|
||||||
|
samegroup = false;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// now we have got the first data block of the next group.
|
|
||||||
if (pInfo->currentGroupOffset == 0) {
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
if (!samegroup || !pInfo->hasPrev) {
|
||||||
|
pInfo->ignoreCurrentGroup = false;
|
||||||
|
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev);
|
||||||
|
|
||||||
|
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
|
||||||
|
pInfo->rowsTotal = 0;
|
||||||
|
|
||||||
|
if (pInfo->currentGroupOffset > 0) {
|
||||||
|
pInfo->ignoreCurrentGroup = true;
|
||||||
|
pInfo->currentGroupOffset -= 1; // now we are in the next group data
|
||||||
|
rowIndex += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// A new group has arrived according to the result rows, and the group limitation has already reached.
|
||||||
|
// Let's jump out of current loop and return immediately.
|
||||||
|
if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
|
||||||
|
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->groupTotal += 1;
|
||||||
|
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
|
||||||
|
|
||||||
|
} else { // handle the offset in the same group
|
||||||
|
// All the data in current group needs to be discarded, due to the limit parameter in the SQL statement
|
||||||
|
if (pInfo->ignoreCurrentGroup) {
|
||||||
|
rowIndex += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
rowIndex += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doSLimit(void* param, bool* newgroup) {
|
SSDataBlock* doSLimit(void* param, bool* newgroup) {
|
||||||
|
@ -1093,63 +1090,28 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSLimitOperatorInfo *pInfo = pOperator->info;
|
SSLimitOperatorInfo *pInfo = pOperator->info;
|
||||||
|
pInfo->pRes->info.rows = 0;
|
||||||
|
|
||||||
|
assert(pInfo->currentGroupOffset >= 0);
|
||||||
|
|
||||||
|
while(1) {
|
||||||
|
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
|
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
||||||
|
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
|
||||||
SSDataBlock *pBlock = NULL;
|
|
||||||
while (1) {
|
|
||||||
pBlock = skipGroupBlock(pOperator, newgroup);
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*newgroup) { // a new group arrives
|
ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows);
|
||||||
pInfo->groupTotal += 1;
|
doSlimitImpl(pOperator, pInfo, pBlock);
|
||||||
pInfo->rowsTotal = 0;
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
pInfo->currentOffset = pInfo->limit.offset;
|
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pInfo->currentGroupOffset == 0);
|
// now the number of rows in current group is enough, let's return to the invoke function
|
||||||
|
if (pInfo->pRes->info.rows > pInfo->threshold) {
|
||||||
if (pInfo->currentOffset >= pBlock->info.rows) {
|
return pInfo->pRes;
|
||||||
pInfo->currentOffset -= pBlock->info.rows;
|
|
||||||
} else {
|
|
||||||
if (pInfo->currentOffset == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
|
|
||||||
pBlock->info.rows = remain;
|
|
||||||
|
|
||||||
// move the remain rows of this data block to the front.
|
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
|
||||||
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
|
|
||||||
int16_t bytes = pColInfoData->info.bytes;
|
|
||||||
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->currentOffset = 0;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) {
|
|
||||||
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal);
|
|
||||||
pInfo->rowsTotal = pInfo->limit.limit;
|
|
||||||
|
|
||||||
if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
|
||||||
} else {
|
|
||||||
pInfo->rowsTotal += pBlock->info.rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pBlock;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7172,7 +7172,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
|
||||||
const char* msg1 = "interval not allowed in group by normal column";
|
const char* msg1 = "interval not allowed in group by normal column";
|
||||||
|
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
|
||||||
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||||
|
|
||||||
SSchema* tagSchema = NULL;
|
SSchema* tagSchema = NULL;
|
||||||
|
|
|
@ -479,6 +479,11 @@ typedef struct SSLimitOperatorInfo {
|
||||||
|
|
||||||
char **prevRow;
|
char **prevRow;
|
||||||
SArray *orderColumnList;
|
SArray *orderColumnList;
|
||||||
|
bool hasPrev;
|
||||||
|
bool ignoreCurrentGroup;
|
||||||
|
SSDataBlock *pRes; // result buffer
|
||||||
|
int64_t capacity;
|
||||||
|
int64_t threshold;
|
||||||
} SSLimitOperatorInfo;
|
} SSLimitOperatorInfo;
|
||||||
|
|
||||||
typedef struct SFilterOperatorInfo {
|
typedef struct SFilterOperatorInfo {
|
||||||
|
@ -490,7 +495,7 @@ typedef struct SFillOperatorInfo {
|
||||||
SFillInfo *pFillInfo;
|
SFillInfo *pFillInfo;
|
||||||
SSDataBlock *pRes;
|
SSDataBlock *pRes;
|
||||||
int64_t totalInputRows;
|
int64_t totalInputRows;
|
||||||
|
void **p;
|
||||||
SSDataBlock *existNewGroupBlock;
|
SSDataBlock *existNewGroupBlock;
|
||||||
} SFillOperatorInfo;
|
} SFillOperatorInfo;
|
||||||
|
|
||||||
|
@ -553,9 +558,9 @@ typedef struct SMultiwayMergeInfo {
|
||||||
bool hasDataBlockForNewGroup;
|
bool hasDataBlockForNewGroup;
|
||||||
SSDataBlock *pExistBlock;
|
SSDataBlock *pExistBlock;
|
||||||
|
|
||||||
bool hasPrev;
|
|
||||||
bool groupMix;
|
|
||||||
SArray *udfInfo;
|
SArray *udfInfo;
|
||||||
|
bool hasPrev;
|
||||||
|
bool multiGroupResults;
|
||||||
} SMultiwayMergeInfo;
|
} SMultiwayMergeInfo;
|
||||||
|
|
||||||
// todo support the disk-based sort
|
// todo support the disk-based sort
|
||||||
|
@ -586,8 +591,8 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
|
||||||
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||||
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||||
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
|
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
|
||||||
int32_t numOfRows, void* merger, bool groupMix);
|
int32_t numOfRows, void* merger);
|
||||||
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo);
|
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
|
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
|
||||||
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
|
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
|
||||||
|
|
|
@ -38,15 +38,12 @@
|
||||||
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
||||||
|
|
||||||
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
|
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
|
||||||
|
|
||||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||||
|
|
||||||
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
||||||
|
|
||||||
#define MULTI_KEY_DELIM "-"
|
#define MULTI_KEY_DELIM "-"
|
||||||
|
|
||||||
#define HASH_CAPACITY_LIMIT 10000000
|
|
||||||
|
|
||||||
#define TIME_WINDOW_COPY(_dst, _src) do {\
|
#define TIME_WINDOW_COPY(_dst, _src) do {\
|
||||||
(_dst).skey = (_src).skey;\
|
(_dst).skey = (_src).skey;\
|
||||||
(_dst).ekey = (_src).ekey;\
|
(_dst).ekey = (_src).ekey;\
|
||||||
|
@ -2255,25 +2252,21 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
||||||
}
|
}
|
||||||
|
|
||||||
case OP_MultiwayMergeSort: {
|
case OP_MultiwayMergeSort: {
|
||||||
bool groupMix = true;
|
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
|
||||||
if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
|
|
||||||
groupMix = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
|
|
||||||
4096, merger, groupMix); // TODO hack it
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case OP_GlobalAggregate: {
|
case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock.
|
||||||
|
bool groupResultMixedUp = (pQueryAttr->fillType == TSDB_FILL_NONE);
|
||||||
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||||
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo);
|
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, groupResultMixedUp);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case OP_SLimit: {
|
case OP_SLimit: {
|
||||||
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
int32_t num = pRuntimeEnv->proot->numOfOutput;
|
||||||
pQueryAttr->numOfExpr3, merger);
|
SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
|
||||||
|
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3623,7 +3616,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
|
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
|
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
|
||||||
|
|
||||||
// re-estabilish output buffer pointer.
|
// set the correct pointer after the memory buffer reallocated.
|
||||||
int32_t functionId = pBInfo->pCtx[i].functionId;
|
int32_t functionId = pBInfo->pCtx[i].functionId;
|
||||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||||
pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
|
pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
|
||||||
|
@ -4158,6 +4151,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
|
||||||
|
|
||||||
// refactor : extract method
|
// refactor : extract method
|
||||||
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
|
|
||||||
//add condition (pBlock->info.rows >= 1) just to runtime happy
|
//add condition (pBlock->info.rows >= 1) just to runtime happy
|
||||||
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) {
|
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) {
|
||||||
STimeWindow* w = &pBlock->info.window;
|
STimeWindow* w = &pBlock->info.window;
|
||||||
|
@ -4272,15 +4266,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) {
|
int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) {
|
||||||
void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES);
|
|
||||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
|
||||||
p[i] = pColInfoData->pData;
|
p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity);
|
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows);
|
||||||
tfree(p);
|
pOutput->info.rows += numOfRows;
|
||||||
|
|
||||||
return pOutput->info.rows;
|
return pOutput->info.rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5324,11 +5318,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
|
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
|
||||||
taosArrayDestroy(pInfo->orderColumnList);
|
taosArrayDestroy(pInfo->orderColumnList);
|
||||||
|
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||||
tfree(pInfo->prevRow);
|
tfree(pInfo->prevRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
|
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
|
||||||
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) {
|
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) {
|
||||||
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
|
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
|
||||||
|
|
||||||
pInfo->resultRowFactor =
|
pInfo->resultRowFactor =
|
||||||
|
@ -5336,15 +5331,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
|
||||||
|
|
||||||
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
|
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
|
||||||
|
|
||||||
pInfo->pMerge = param;
|
pInfo->multiGroupResults = groupResultMixedUp;
|
||||||
pInfo->bufCapacity = 4096;
|
pInfo->pMerge = param;
|
||||||
pInfo->udfInfo = pUdfInfo;
|
pInfo->bufCapacity = 4096;
|
||||||
|
pInfo->udfInfo = pUdfInfo;
|
||||||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
|
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
|
||||||
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
||||||
|
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
|
||||||
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
|
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
|
||||||
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
|
|
||||||
|
|
||||||
// TODO refactor
|
// TODO refactor
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -5397,17 +5391,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
|
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
|
||||||
int32_t numOfRows, void *merger, bool groupMix) {
|
int32_t numOfRows, void *merger) {
|
||||||
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
|
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
|
||||||
|
|
||||||
pInfo->pMerge = merger;
|
pInfo->pMerge = merger;
|
||||||
pInfo->groupMix = groupMix;
|
pInfo->bufCapacity = numOfRows;
|
||||||
pInfo->bufCapacity = numOfRows;
|
|
||||||
|
|
||||||
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
|
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
|
||||||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
|
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
|
||||||
|
|
||||||
{
|
{ // todo extract method to create prev compare buffer
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
len += pExpr[i].base.colBytes;
|
len += pExpr[i].base.colBytes;
|
||||||
|
@ -5415,8 +5407,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
|
||||||
|
|
||||||
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
|
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
|
||||||
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
||||||
int32_t offset = POINTER_BYTES * numOfCols;
|
|
||||||
|
|
||||||
|
int32_t offset = POINTER_BYTES * numOfCols;
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
||||||
|
|
||||||
|
@ -5432,7 +5424,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
|
||||||
pOperator->status = OP_IN_EXECUTING;
|
pOperator->status = OP_IN_EXECUTING;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||||
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
|
pOperator->numOfOutput = numOfOutput;
|
||||||
|
pOperator->pExpr = pExpr;
|
||||||
pOperator->exec = doMultiwayMergeSort;
|
pOperator->exec = doMultiwayMergeSort;
|
||||||
pOperator->cleanup = destroyGlobalAggOperatorInfo;
|
pOperator->cleanup = destroyGlobalAggOperatorInfo;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -6348,19 +6341,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doFill(void* param, bool* newgroup) {
|
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) {
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SFillOperatorInfo *pInfo = pOperator->info;
|
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
|
|
||||||
|
|
||||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||||
*newgroup = false;
|
*newgroup = false;
|
||||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||||
return pInfo->pRes;
|
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle the cached new group data block
|
// handle the cached new group data block
|
||||||
|
@ -6372,11 +6359,47 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||||
|
|
||||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||||
pInfo->existNewGroupBlock = NULL;
|
pInfo->existNewGroupBlock = NULL;
|
||||||
*newgroup = true;
|
*newgroup = true;
|
||||||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||||
|
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||||
|
|
||||||
|
SFillOperatorInfo *pInfo = pOperator->info;
|
||||||
|
pInfo->pRes->info.rows = 0;
|
||||||
|
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||||
|
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
|
||||||
|
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
|
||||||
|
return pInfo->pRes;
|
||||||
|
}
|
||||||
|
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||||
|
// *newgroup = false;
|
||||||
|
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
||||||
|
// return pInfo->pRes;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // handle the cached new group data block
|
||||||
|
// if (pInfo->existNewGroupBlock) {
|
||||||
|
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||||
|
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||||
|
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||||
|
//
|
||||||
|
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||||
|
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||||
|
//
|
||||||
|
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||||
|
// pInfo->existNewGroupBlock = NULL;
|
||||||
|
// *newgroup = true;
|
||||||
|
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||||
|
// }
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
|
@ -6404,28 +6427,60 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
|
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
|
||||||
} else {
|
} else {
|
||||||
pInfo->totalInputRows += pBlock->info.rows;
|
pInfo->totalInputRows += pBlock->info.rows;
|
||||||
|
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
|
||||||
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey
|
|
||||||
: */pBlock->info.window.ekey;
|
|
||||||
|
|
||||||
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey);
|
|
||||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
|
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||||
if (pInfo->pRes->info.rows > 0) { // current group has no more result to return
|
|
||||||
return pInfo->pRes;
|
// current group has no more result to return
|
||||||
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
|
// the result in current group not reach the threshold of output result, continue
|
||||||
|
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) {
|
||||||
|
return pInfo->pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
|
||||||
|
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) {
|
||||||
|
return pInfo->pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||||
|
// *newgroup = false;
|
||||||
|
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
||||||
|
// return pInfo->pRes;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // handle the cached new group data block
|
||||||
|
// if (pInfo->existNewGroupBlock) {
|
||||||
|
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||||
|
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||||
|
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||||
|
//
|
||||||
|
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||||
|
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||||
|
//
|
||||||
|
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||||
|
// pInfo->existNewGroupBlock = NULL;
|
||||||
|
// *newgroup = true;
|
||||||
|
//
|
||||||
|
// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
|
||||||
|
// return pInfo->pRes;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||||
|
// }
|
||||||
|
|
||||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||||
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey
|
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
|
||||||
:*/ pInfo->existNewGroupBlock->info.window.ekey;
|
|
||||||
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||||
|
|
||||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||||
|
|
||||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||||
pInfo->existNewGroupBlock = NULL;
|
pInfo->existNewGroupBlock = NULL;
|
||||||
*newgroup = true;
|
*newgroup = true;
|
||||||
|
|
||||||
|
@ -6433,7 +6488,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||||
} else {
|
} else {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6534,6 +6588,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
|
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
|
||||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||||
|
tfree(pInfo->p);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
@ -6895,6 +6950,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
|
||||||
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
|
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
|
||||||
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit,
|
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit,
|
||||||
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
|
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
|
||||||
|
|
||||||
|
pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||||
|
@ -6922,9 +6979,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
||||||
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
|
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
|
||||||
pInfo->slimit = pQueryAttr->slimit;
|
pInfo->slimit = pQueryAttr->slimit;
|
||||||
pInfo->limit = pQueryAttr->limit;
|
pInfo->limit = pQueryAttr->limit;
|
||||||
|
pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
|
||||||
pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
|
pInfo->threshold = pInfo->capacity * 0.8;
|
||||||
pInfo->currentOffset = pQueryAttr->limit.offset;
|
pInfo->currentOffset = pQueryAttr->limit.offset;
|
||||||
|
pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
|
||||||
|
|
||||||
// TODO refactor
|
// TODO refactor
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -6932,10 +6990,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
||||||
len += pExpr[i].base.resBytes;
|
len += pExpr[i].base.resBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfCols = pInfo->orderColumnList != NULL? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
|
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
|
||||||
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
||||||
int32_t offset = POINTER_BYTES * numOfCols;
|
|
||||||
|
|
||||||
|
int32_t offset = POINTER_BYTES * numOfCols;
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
||||||
|
|
||||||
|
@ -6943,6 +7001,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
||||||
offset += pExpr[index->colIndex].base.resBytes;
|
offset += pExpr[index->colIndex].base.resBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
||||||
|
|
||||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||||
|
|
||||||
pOperator->name = "SLimitOperator";
|
pOperator->name = "SLimitOperator";
|
||||||
|
|
|
@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
|
||||||
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
|
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
|
||||||
pFillInfo->pData[i] = pColData->pData;
|
pFillInfo->pData[i] = pColData->pData;
|
||||||
|
|
||||||
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer
|
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
|
||||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||||
assert (pTag->col.colId == pCol->col.colId);
|
assert (pTag->col.colId == pCol->col.colId);
|
||||||
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
|
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
|
||||||
|
|
|
@ -150,13 +150,13 @@ if $data00 != 0.00150 then
|
||||||
print expect 0.00150, actual: $data00
|
print expect 0.00150, actual: $data00
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql create table st_float_15_0 using mt_float tags (3.40282347e+38)
|
#sql create table st_float_15_0 using mt_float tags (3.40282347e+38)
|
||||||
sql select tagname from st_float_15_0
|
#sql select tagname from st_float_15_0
|
||||||
#if $data00 != 0.001500 then
|
#if $data00 != 0.001500 then
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
sql create table st_float_16_0 using mt_float tags (-3.40282347e+38)
|
#sql create table st_float_16_0 using mt_float tags (-3.40282347e+38)
|
||||||
sql select tagname from st_float_16_0
|
#sql select tagname from st_float_16_0
|
||||||
#if $data00 != 0.001500 then
|
#if $data00 != 0.001500 then
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
Loading…
Reference in New Issue