Merge pull request #7529 from taosdata/feature/query
[td-6260]<enhance>: Optimize the client-side query performance
This commit is contained in:
commit
c13e41832f
|
@ -1 +1 @@
|
|||
Subproject commit ceda5bf9fcd7836509ac97dcc0056b3f1dd48cc5
|
||||
Subproject commit 0ca5b15a8eac40327dd737be52c926fa5675712c
|
|
@ -36,7 +36,7 @@ extern "C" {
|
|||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
|
||||
|
||||
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
|
||||
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
|
||||
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
|
||||
|
||||
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
|
||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct SCompareParam {
|
|||
|
||||
static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
|
||||
int32_t ret = 0;
|
||||
|
||||
size_t size = taosArrayGetSize(columnIndexList);
|
||||
if (size > 0) {
|
||||
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
|
||||
|
@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
|
|||
(*hasPrev) = true;
|
||||
}
|
||||
|
||||
// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the
|
||||
// output value to multiple rows
|
||||
static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) {
|
||||
if (numOfRows <= 1) {
|
||||
return ;
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
|
@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
|
|||
continue;
|
||||
}
|
||||
|
||||
int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result
|
||||
char* src = pCtx[k].pOutput;
|
||||
char* src = pCtx[k].pOutput;
|
||||
char* dst = pCtx[k].pOutput + pCtx[k].outputBytes;
|
||||
|
||||
for (int32_t i = 0; i < inc; ++i) {
|
||||
pCtx[k].pOutput += pCtx[k].outputBytes;
|
||||
memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes);
|
||||
// Let's start from the second row, as the first row has result value already.
|
||||
for (int32_t i = 1; i < numOfRows; ++i) {
|
||||
memcpy(dst, src, (size_t)pCtx[k].outputBytes);
|
||||
dst += pCtx[k].outputBytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) {
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
|
||||
} else {
|
||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) {
|
||||
for(int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
|
||||
} else {
|
||||
aAggs[functionId].xFinalize(&pCtx[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
|
|||
SMultiwayMergeInfo* pInfo = pOperator->info;
|
||||
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
||||
|
||||
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES);
|
||||
char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES);
|
||||
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
add[i] = pCtx[i].pInput;
|
||||
addrPtr[i] = pCtx[i].pInput;
|
||||
pCtx[i].size = 1;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
if (pInfo->hasPrev) {
|
||||
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
||||
|
||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||
}
|
||||
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
|
||||
} else {
|
||||
for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
||||
|
||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
aAggs[functionId].xFinalize(&pCtx[j]);
|
||||
}
|
||||
doFinalizeResultImpl(pInfo, pCtx, numOfExpr);
|
||||
|
||||
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
|
||||
|
@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
|
|||
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
||||
|
||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||
}
|
||||
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
|
||||
}
|
||||
} else {
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
|
||||
|
||||
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||
}
|
||||
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
|
||||
}
|
||||
|
||||
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
|
||||
|
@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
|
|||
|
||||
{
|
||||
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
pCtx[i].pInput = add[i];
|
||||
pCtx[i].pInput = addrPtr[i];
|
||||
}
|
||||
}
|
||||
|
||||
tfree(add);
|
||||
tfree(addrPtr);
|
||||
}
|
||||
|
||||
static bool isAllSourcesCompleted(SGlobalMerger *pMerger) {
|
||||
|
@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
|
|||
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
|
||||
bool sameGroup = true;
|
||||
if (pInfo->hasPrev) {
|
||||
|
||||
// todo refactor extract method
|
||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
|
||||
|
||||
// if this row belongs to current result set group
|
||||
|
@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
|
|||
break;
|
||||
}
|
||||
|
||||
bool sameGroup = true;
|
||||
if (pAggInfo->hasGroupColData) {
|
||||
bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
|
||||
if (!sameGroup) {
|
||||
sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
|
||||
if (!sameGroup && !pAggInfo->multiGroupResults) {
|
||||
*newgroup = true;
|
||||
pAggInfo->hasDataBlockForNewGroup = true;
|
||||
pAggInfo->pExistBlock = pBlock;
|
||||
|
@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
|
|||
}
|
||||
|
||||
if (handleData) { // data in current group is all handled
|
||||
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||
int32_t functionId = pAggInfo->binfo.pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
|
||||
|
||||
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
|
||||
}
|
||||
doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
|
||||
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
pAggInfo->binfo.pRes->info.rows += numOfRows;
|
||||
|
||||
pAggInfo->binfo.pRes->info.rows += numOfRows;
|
||||
setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows);
|
||||
}
|
||||
|
||||
|
@ -1019,71 +975,127 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
|
|||
return (pRes->info.rows != 0)? pRes:NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
|
||||
SSLimitOperatorInfo *pInfo = pOperator->info;
|
||||
assert(pInfo->currentGroupOffset >= 0);
|
||||
static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
if (pInfo->currentOffset > 0) {
|
||||
pInfo->currentOffset -= 1;
|
||||
} else {
|
||||
// discard the data rows in current group
|
||||
if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) {
|
||||
size_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
for (int32_t i = 0; i < num1; ++i) {
|
||||
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
if (pInfo->currentGroupOffset == 0) {
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
SColumnInfo *pColInfo = &pColInfoData->info;
|
||||
|
||||
char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
|
||||
char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes);
|
||||
|
||||
memcpy(pDst, pSrc, pColInfo->bytes);
|
||||
}
|
||||
|
||||
pInfo->rowsTotal += 1;
|
||||
pInfo->pRes->info.rows += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
|
||||
while ((*newgroup) == false) { // ignore the remain blocks
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) {
|
||||
if (pInfo->capacity < pResultBlock->info.rows + numOfRows) {
|
||||
int32_t total = pResultBlock->info.rows + numOfRows;
|
||||
|
||||
size_t num = taosArrayGetSize(pResultBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i);
|
||||
|
||||
char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes);
|
||||
if (tmp != NULL) {
|
||||
pInfoData->pData = tmp;
|
||||
} else {
|
||||
// todo handle the malloc failure
|
||||
}
|
||||
|
||||
pInfo->capacity = total;
|
||||
pInfo->threshold = (int64_t)(total * 0.8);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum {
|
||||
BLOCK_NEW_GROUP = 1,
|
||||
BLOCK_NO_GROUP = 2,
|
||||
BLOCK_SAME_GROUP = 3,
|
||||
};
|
||||
|
||||
static int32_t doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
int32_t rowIndex = 0;
|
||||
|
||||
while (rowIndex < pBlock->info.rows) {
|
||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
|
||||
|
||||
bool samegroup = true;
|
||||
if (pInfo->hasPrev) {
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i);
|
||||
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
|
||||
|
||||
SColumnInfo *pColInfo = &pColInfoData->info;
|
||||
|
||||
char *d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
|
||||
int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes);
|
||||
if (ret != 0) { // it is a new group
|
||||
samegroup = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
if (!samegroup || !pInfo->hasPrev) {
|
||||
pInfo->ignoreCurrentGroup = false;
|
||||
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev);
|
||||
|
||||
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
|
||||
pInfo->rowsTotal = 0;
|
||||
|
||||
if (pInfo->currentGroupOffset > 0) {
|
||||
pInfo->ignoreCurrentGroup = true;
|
||||
pInfo->currentGroupOffset -= 1; // now we are in the next group data
|
||||
rowIndex += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// A new group has arrived according to the result rows, and the group limitation has already reached.
|
||||
// Let's jump out of current loop and return immediately.
|
||||
if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return BLOCK_NO_GROUP;
|
||||
}
|
||||
|
||||
pInfo->groupTotal += 1;
|
||||
|
||||
// data in current group not allowed, return if current result does not belong to the previous group.And there
|
||||
// are results exists in current SSDataBlock
|
||||
if (!pInfo->multigroupResult && !samegroup && pInfo->pRes->info.rows > 0) {
|
||||
return BLOCK_NEW_GROUP;
|
||||
}
|
||||
|
||||
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
|
||||
|
||||
} else { // handle the offset in the same group
|
||||
// All the data in current group needs to be discarded, due to the limit parameter in the SQL statement
|
||||
if (pInfo->ignoreCurrentGroup) {
|
||||
rowIndex += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
|
||||
}
|
||||
|
||||
rowIndex += 1;
|
||||
}
|
||||
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while(1) {
|
||||
if (*newgroup) {
|
||||
pInfo->currentGroupOffset -= 1;
|
||||
*newgroup = false;
|
||||
}
|
||||
|
||||
while ((*newgroup) == false) {
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
// now we have got the first data block of the next group.
|
||||
if (pInfo->currentGroupOffset == 0) {
|
||||
return pBlock;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
return BLOCK_SAME_GROUP;
|
||||
}
|
||||
|
||||
SSDataBlock* doSLimit(void* param, bool* newgroup) {
|
||||
|
@ -1093,63 +1105,41 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
|
|||
}
|
||||
|
||||
SSLimitOperatorInfo *pInfo = pOperator->info;
|
||||
pInfo->pRes->info.rows = 0;
|
||||
|
||||
if (pInfo->pPrevBlock != NULL) {
|
||||
ensureOutputBuf(pInfo, pInfo->pRes, pInfo->pPrevBlock->info.rows);
|
||||
int32_t ret = doSlimitImpl(pOperator, pInfo, pInfo->pPrevBlock);
|
||||
assert(ret != BLOCK_NEW_GROUP);
|
||||
|
||||
pInfo->pPrevBlock = NULL;
|
||||
}
|
||||
|
||||
assert(pInfo->currentGroupOffset >= 0);
|
||||
|
||||
while(1) {
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
SSDataBlock *pBlock = NULL;
|
||||
while (1) {
|
||||
pBlock = skipGroupBlock(pOperator, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
if (*newgroup) { // a new group arrives
|
||||
pInfo->groupTotal += 1;
|
||||
pInfo->rowsTotal = 0;
|
||||
pInfo->currentOffset = pInfo->limit.offset;
|
||||
ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows);
|
||||
int32_t ret = doSlimitImpl(pOperator, pInfo, pBlock);
|
||||
if (ret == BLOCK_NEW_GROUP) {
|
||||
pInfo->pPrevBlock = pBlock;
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
assert(pInfo->currentGroupOffset == 0);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
if (pInfo->currentOffset >= pBlock->info.rows) {
|
||||
pInfo->currentOffset -= pBlock->info.rows;
|
||||
} else {
|
||||
if (pInfo->currentOffset == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
|
||||
pBlock->info.rows = remain;
|
||||
|
||||
// move the remain rows of this data block to the front.
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
|
||||
}
|
||||
|
||||
pInfo->currentOffset = 0;
|
||||
break;
|
||||
// now the number of rows in current group is enough, let's return to the invoke function
|
||||
if (pInfo->pRes->info.rows > pInfo->threshold) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
}
|
||||
|
||||
if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) {
|
||||
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal);
|
||||
pInfo->rowsTotal = pInfo->limit.limit;
|
||||
|
||||
if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
} else {
|
||||
pInfo->rowsTotal += pBlock->info.rows;
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
|
|
@ -931,7 +931,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
pQueryInfo = pCmd->active;
|
||||
pQueryInfo->pUdfInfo = pUdfInfo;
|
||||
pQueryInfo->udfCopy = true;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7205,7 +7204,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
|
|||
const char* msg1 = "interval not allowed in group by normal column";
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
SSchema* tagSchema = NULL;
|
||||
|
@ -8735,6 +8733,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
|
|||
if (taosArrayGetSize(subInfo->pSubquery) >= 2) {
|
||||
return invalidOperationMsg(msgBuf, "not support union in subquery");
|
||||
}
|
||||
|
||||
SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo));
|
||||
tscInitQueryInfo(pSub);
|
||||
|
||||
|
@ -8757,6 +8756,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
|
|||
if (pTableMetaInfo1 == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub);
|
||||
pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta);
|
||||
|
||||
|
@ -8840,7 +8840,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
// check if there is 3 level select
|
||||
SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i);
|
||||
SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0);
|
||||
if (p->from->type == SQL_NODE_FROM_SUBQUERY){
|
||||
if (p->from->type == SQL_NODE_FROM_SUBQUERY) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
|
||||
}
|
||||
|
||||
|
@ -8933,6 +8933,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
}
|
||||
}
|
||||
|
||||
// disable group result mixed up if interval/session window query exists.
|
||||
if (isTimeWindowQuery(pQueryInfo)) {
|
||||
size_t num = taosArrayGetSize(pQueryInfo->pUpstream);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, i);
|
||||
pUp->multigroupResult = false;
|
||||
}
|
||||
}
|
||||
|
||||
// parse the having clause in the first place
|
||||
int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
|
||||
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
|
||||
|
|
|
@ -331,189 +331,36 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
|||
.handle = NULL,
|
||||
.code = 0
|
||||
};
|
||||
|
||||
|
||||
rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
|
||||
// SRpcMsg* rpcMsg = pSchedMsg->ahandle;
|
||||
// SRpcEpSet* pEpSet = pSchedMsg->thandle;
|
||||
//
|
||||
// TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
|
||||
// SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
|
||||
// if (pSql == NULL) {
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// assert(pSql->self == handle);
|
||||
//
|
||||
// STscObj *pObj = pSql->pTscObj;
|
||||
// SSqlRes *pRes = &pSql->res;
|
||||
// SSqlCmd *pCmd = &pSql->cmd;
|
||||
//
|
||||
// pSql->rpcRid = -1;
|
||||
//
|
||||
// if (pObj->signature != pObj) {
|
||||
// tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
|
||||
//
|
||||
// taosRemoveRef(tscObjRef, handle);
|
||||
// taosReleaseRef(tscObjRef, handle);
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
||||
// if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
// tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
|
||||
// pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
|
||||
//
|
||||
// taosRemoveRef(tscObjRef, handle);
|
||||
// taosReleaseRef(tscObjRef, handle);
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// if (pEpSet) {
|
||||
// if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
|
||||
// if (pCmd->command < TSDB_SQL_MGMT) {
|
||||
// tscUpdateVgroupInfo(pSql, pEpSet);
|
||||
// } else {
|
||||
// tscUpdateMgmtEpSet(pSql, pEpSet);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// int32_t cmd = pCmd->command;
|
||||
//
|
||||
// // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
|
||||
// if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||
// pSql->cmd.insertParam.schemaAttached = 1;
|
||||
// }
|
||||
//
|
||||
// // single table query error need to be handled here.
|
||||
// if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
|
||||
// (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
|
||||
// rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
|
||||
//
|
||||
// // 1. super table subquery
|
||||
// // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
|
||||
// if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
||||
// TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
|
||||
// !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
|
||||
// (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
|
||||
// // do nothing in case of super table subquery
|
||||
// } else {
|
||||
// pSql->retry += 1;
|
||||
// tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
|
||||
//
|
||||
// pSql->res.code = rpcMsg->code; // keep the previous error code
|
||||
// if (pSql->retry > pSql->maxRetry) {
|
||||
// tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry);
|
||||
// } else {
|
||||
// // wait for a little bit moment and then retry
|
||||
// // todo do not sleep in rpc callback thread, add this process into queue to process
|
||||
// if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||
// int32_t duration = getWaitingTimeInterval(pSql->retry);
|
||||
// taosMsleep(duration);
|
||||
// }
|
||||
//
|
||||
// pSql->retryReason = rpcMsg->code;
|
||||
// rpcMsg->code = tscRenewTableMeta(pSql, 0);
|
||||
// // if there is an error occurring, proceed to the following error handling procedure.
|
||||
// if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
// taosReleaseRef(tscObjRef, handle);
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pRes->rspLen = 0;
|
||||
//
|
||||
// if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
|
||||
// tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code));
|
||||
// } else {
|
||||
// pRes->code = rpcMsg->code;
|
||||
// }
|
||||
//
|
||||
// if (pRes->code == TSDB_CODE_SUCCESS) {
|
||||
// tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry);
|
||||
// pSql->retry = 0;
|
||||
// }
|
||||
//
|
||||
// if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
|
||||
// assert(rpcMsg->msgType == pCmd->msgType + 1);
|
||||
// pRes->code = rpcMsg->code;
|
||||
// pRes->rspType = rpcMsg->msgType;
|
||||
// pRes->rspLen = rpcMsg->contLen;
|
||||
//
|
||||
// if (pRes->rspLen > 0 && rpcMsg->pCont) {
|
||||
// char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
||||
// if (tmp == NULL) {
|
||||
// pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// } else {
|
||||
// pRes->pRsp = tmp;
|
||||
// memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
|
||||
// }
|
||||
// } else {
|
||||
// tfree(pRes->pRsp);
|
||||
// }
|
||||
//
|
||||
// /*
|
||||
// * There is not response callback function for submit response.
|
||||
// * The actual inserted number of points is the first number.
|
||||
// */
|
||||
// if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
|
||||
// SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
|
||||
// pMsg->code = htonl(pMsg->code);
|
||||
// pMsg->numOfRows = htonl(pMsg->numOfRows);
|
||||
// pMsg->affectedRows = htonl(pMsg->affectedRows);
|
||||
// pMsg->failedRows = htonl(pMsg->failedRows);
|
||||
// pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
|
||||
//
|
||||
// pRes->numOfRows += pMsg->affectedRows;
|
||||
// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
|
||||
// tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
|
||||
// } else {
|
||||
// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
|
||||
// rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||
// }
|
||||
//
|
||||
// bool shouldFree = tscShouldBeFreed(pSql);
|
||||
// if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||
// pRes->code = rpcMsg->code;
|
||||
// }
|
||||
// rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
|
||||
// (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
|
||||
// }
|
||||
//
|
||||
// if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
|
||||
// tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self);
|
||||
// taosRemoveRef(tscObjRef, handle);
|
||||
// }
|
||||
//
|
||||
// taosReleaseRef(tscObjRef, handle);
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
//}
|
||||
// handle three situation
|
||||
// 1. epset retry, only return last failure ep
|
||||
// 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn
|
||||
// 3. other situation, no expected
|
||||
void tscSetFqdnErrorMsg(SSqlObj* pSql, SRpcEpSet* pEpSet) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
SSqlRes* pRes = &pSql->res;
|
||||
|
||||
char* msgBuf = tscGetErrorMsgPayload(pCmd);
|
||||
|
||||
if (pEpSet) {
|
||||
sprintf(msgBuf, "%s\"%s\"", tstrerror(pRes->code),pEpSet->fqdn[(pEpSet->inUse)%(pEpSet->numOfEps)]);
|
||||
} else if (pCmd->command >= TSDB_SQL_MGMT) {
|
||||
SRpcEpSet tEpset;
|
||||
|
||||
SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
|
||||
taosCorBeginRead(&pCorEpSet->version);
|
||||
tEpset = pCorEpSet->epSet;
|
||||
taosCorEndRead(&pCorEpSet->version);
|
||||
|
||||
sprintf(msgBuf, "%s\"%s\"", tstrerror(pRes->code),tEpset.fqdn[(tEpset.inUse)%(tEpset.numOfEps)]);
|
||||
} else {
|
||||
sprintf(msgBuf, "%s", tstrerror(pRes->code));
|
||||
}
|
||||
}
|
||||
|
||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
|
||||
|
@ -542,7 +389,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
||||
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
|
||||
pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
|
||||
pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
|
||||
|
||||
taosRemoveRef(tscObjRef, handle);
|
||||
taosReleaseRef(tscObjRef, handle);
|
||||
|
@ -575,9 +422,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
// 1. super table subquery
|
||||
// 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
|
||||
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
||||
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
|
||||
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
|
||||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
|
||||
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
|
||||
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
|
||||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
|
||||
// do nothing in case of super table subquery
|
||||
} else {
|
||||
pSql->retry += 1;
|
||||
|
@ -651,7 +498,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
|
||||
pRes->numOfRows += pMsg->affectedRows;
|
||||
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
|
||||
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
|
||||
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
|
||||
} else {
|
||||
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
|
||||
}
|
||||
|
@ -666,28 +513,13 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||
pRes->code = rpcMsg->code;
|
||||
}
|
||||
|
||||
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
|
||||
if (pRes->code == TSDB_CODE_RPC_FQDN_ERROR) {
|
||||
tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64);
|
||||
// handle three situation
|
||||
// 1. epset retry, only return last failure ep
|
||||
// 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn
|
||||
// 3. other situation, no expected
|
||||
if (pEpSet) {
|
||||
sprintf(tscGetErrorMsgPayload(pCmd), "%s\"%s\"", tstrerror(pRes->code),pEpSet->fqdn[(pEpSet->inUse)%(pEpSet->numOfEps)]);
|
||||
} else if (pCmd->command >= TSDB_SQL_MGMT) {
|
||||
SRpcEpSet tEpset;
|
||||
|
||||
SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
|
||||
taosCorBeginRead(&pCorEpSet->version);
|
||||
tEpset = pCorEpSet->epSet;
|
||||
taosCorEndRead(&pCorEpSet->version);
|
||||
|
||||
sprintf(tscGetErrorMsgPayload(pCmd), "%s\"%s\"", tstrerror(pRes->code),tEpset.fqdn[(tEpset.inUse)%(tEpset.numOfEps)]);
|
||||
} else {
|
||||
sprintf(tscGetErrorMsgPayload(pCmd), "%s", tstrerror(pRes->code));
|
||||
}
|
||||
if (rpcMsg->code == TSDB_CODE_RPC_FQDN_ERROR) {
|
||||
tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64);
|
||||
tscSetFqdnErrorMsg(pSql, pEpSet);
|
||||
}
|
||||
|
||||
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
|
||||
}
|
||||
|
||||
|
@ -3094,8 +2926,8 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
|
|||
memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity);
|
||||
}
|
||||
if (NULL == taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity)) {
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
}
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
}
|
||||
|
||||
STableMeta* pMeta = pTableMetaInfo->pTableMeta;
|
||||
STableMeta* pSTMeta = (STableMeta *)(pSql->pBuf);
|
||||
|
|
|
@ -2120,7 +2120,7 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
|
|||
return f;
|
||||
}
|
||||
|
||||
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo) {
|
||||
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo) {
|
||||
if (pQueryInfo->fieldsInfo.numOfOutput <= 0 || pQueryInfo->fieldsInfo.internalField == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -2129,10 +2129,10 @@ int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo) {
|
|||
SInternalField* pField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
|
||||
if (!pField->visible) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pQueryInfo->fieldsInfo.numOfOutput;
|
||||
|
||||
return pQueryInfo->fieldsInfo.numOfOutput;
|
||||
}
|
||||
|
||||
|
||||
|
@ -3181,6 +3181,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
|
|||
pQueryInfo->slimit.offset = 0;
|
||||
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
|
||||
pQueryInfo->window = TSWINDOW_INITIALIZER;
|
||||
pQueryInfo->multigroupResult = true;
|
||||
}
|
||||
|
||||
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
|
||||
|
@ -3192,7 +3193,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
|
|||
}
|
||||
|
||||
tscInitQueryInfo(pQueryInfo);
|
||||
|
||||
pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer
|
||||
|
||||
if (pCmd->pQueryInfo == NULL) {
|
||||
|
@ -3241,6 +3241,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
|
|||
|
||||
taosArrayDestroy(pQueryInfo->pUpstream);
|
||||
pQueryInfo->pUpstream = NULL;
|
||||
pQueryInfo->bufLen = 0;
|
||||
}
|
||||
|
||||
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
|
||||
|
@ -3275,6 +3276,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
|
|||
pQueryInfo->window = pSrc->window;
|
||||
pQueryInfo->sessionWindow = pSrc->sessionWindow;
|
||||
pQueryInfo->pTableMetaInfo = NULL;
|
||||
pQueryInfo->multigroupResult = pSrc->multigroupResult;
|
||||
|
||||
pQueryInfo->bufLen = pSrc->bufLen;
|
||||
pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery;
|
||||
|
@ -3660,24 +3662,25 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
|
|||
pnCmd->active = pNewQueryInfo;
|
||||
|
||||
memcpy(&pNewQueryInfo->interval, &pQueryInfo->interval, sizeof(pNewQueryInfo->interval));
|
||||
pNewQueryInfo->type = pQueryInfo->type;
|
||||
pNewQueryInfo->window = pQueryInfo->window;
|
||||
pNewQueryInfo->limit = pQueryInfo->limit;
|
||||
pNewQueryInfo->slimit = pQueryInfo->slimit;
|
||||
pNewQueryInfo->order = pQueryInfo->order;
|
||||
pNewQueryInfo->vgroupLimit = pQueryInfo->vgroupLimit;
|
||||
pNewQueryInfo->tsBuf = NULL;
|
||||
pNewQueryInfo->fillType = pQueryInfo->fillType;
|
||||
pNewQueryInfo->fillVal = NULL;
|
||||
pNewQueryInfo->type = pQueryInfo->type;
|
||||
pNewQueryInfo->window = pQueryInfo->window;
|
||||
pNewQueryInfo->limit = pQueryInfo->limit;
|
||||
pNewQueryInfo->slimit = pQueryInfo->slimit;
|
||||
pNewQueryInfo->order = pQueryInfo->order;
|
||||
pNewQueryInfo->tsBuf = NULL;
|
||||
pNewQueryInfo->fillType = pQueryInfo->fillType;
|
||||
pNewQueryInfo->fillVal = NULL;
|
||||
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
|
||||
pNewQueryInfo->prjOffset = pQueryInfo->prjOffset;
|
||||
pNewQueryInfo->numOfFillVal = 0;
|
||||
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
|
||||
pNewQueryInfo->prjOffset = pQueryInfo->prjOffset;
|
||||
pNewQueryInfo->numOfTables = 0;
|
||||
pNewQueryInfo->numOfTables = 0;
|
||||
pNewQueryInfo->pTableMetaInfo = NULL;
|
||||
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
|
||||
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
|
||||
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
|
||||
pNewQueryInfo->vgroupLimit = pQueryInfo->vgroupLimit;
|
||||
pNewQueryInfo->distinct = pQueryInfo->distinct;
|
||||
pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult;
|
||||
|
||||
pNewQueryInfo->distinct = pQueryInfo->distinct;
|
||||
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
|
||||
if (pNewQueryInfo->buf == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
|
@ -3904,12 +3907,12 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
|
|||
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
|
||||
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) {
|
||||
pParentSql->res.code = code;
|
||||
|
||||
|
||||
tscAsyncResultOnError(pParentSql);
|
||||
return;
|
||||
}
|
||||
|
||||
tscFreeSubobj(pParentSql);
|
||||
tscFreeSubobj(pParentSql);
|
||||
tfree(pParentSql->pSubs);
|
||||
|
||||
pParentSql->res.code = TSDB_CODE_SUCCESS;
|
||||
|
@ -3918,9 +3921,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
|
|||
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
|
||||
tstrerror(code), pParentSql->retry);
|
||||
|
||||
|
||||
|
||||
tscResetSqlCmd(&pParentSql->cmd, true, pParentSql->self);
|
||||
|
||||
|
||||
code = tsParseSql(pParentSql, true);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
return;
|
||||
|
@ -3988,7 +3991,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
|||
pNew->maxRetry = pSql->maxRetry;
|
||||
|
||||
pNew->cmd.resColumnId = TSDB_RES_COL_ID;
|
||||
|
||||
|
||||
tsem_init(&pNew->rspSem, 0, 0);
|
||||
|
||||
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
|
||||
|
@ -4522,15 +4525,15 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
|
|||
STableMeta* p = *ppSTable;
|
||||
STableMeta* pChild = *ppChild;
|
||||
|
||||
size_t sz = (p != NULL) ? tscGetTableMetaSize(p) : 0; //ppSTableBuf actually capacity may larger than sz, dont care
|
||||
if (sz != 0) {
|
||||
size_t sz = (p != NULL) ? tscGetTableMetaSize(p) : 0; //ppSTableBuf actually capacity may larger than sz, dont care
|
||||
if (p != NULL && sz != 0) {
|
||||
memset((char *)p, 0, sz);
|
||||
}
|
||||
|
||||
if (NULL == taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz)) {
|
||||
tfree(p);
|
||||
} else {
|
||||
*ppSTable = p;
|
||||
*ppSTable = p;
|
||||
}
|
||||
|
||||
// tableMeta exists, build child table meta according to the super table meta
|
||||
|
@ -4811,6 +4814,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->distinct = pQueryInfo->distinct;
|
||||
pQueryAttr->sw = pQueryInfo->sessionWindow;
|
||||
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
|
||||
pQueryAttr->multigroupResult = pQueryInfo->multigroupResult;
|
||||
|
||||
pQueryAttr->numOfCols = numOfCols;
|
||||
pQueryAttr->numOfOutput = numOfOutput;
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
|
||||
Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4
|
|
@ -1 +1 @@
|
|||
Subproject commit b62a26ecc164a310104df57691691b237e091c89
|
||||
Subproject commit ce5201014136503d34fecbd56494b67b4961056c
|
|
@ -221,6 +221,7 @@ typedef struct SQueryAttr {
|
|||
bool distinct; // distinct query or not
|
||||
bool stateWindow; // window State on sub/normal table
|
||||
bool createFilterOperator; // if filter operator is needed
|
||||
bool multigroupResult; // multigroup result can exist in one SSDataBlock
|
||||
int32_t interBufSize; // intermediate buffer sizse
|
||||
|
||||
int32_t havingNum; // having expr number
|
||||
|
@ -467,16 +468,23 @@ typedef struct SLimitOperatorInfo {
|
|||
} SLimitOperatorInfo;
|
||||
|
||||
typedef struct SSLimitOperatorInfo {
|
||||
int64_t groupTotal;
|
||||
int64_t currentGroupOffset;
|
||||
int64_t groupTotal;
|
||||
int64_t currentGroupOffset;
|
||||
|
||||
int64_t rowsTotal;
|
||||
int64_t currentOffset;
|
||||
SLimitVal limit;
|
||||
SLimitVal slimit;
|
||||
int64_t rowsTotal;
|
||||
int64_t currentOffset;
|
||||
SLimitVal limit;
|
||||
SLimitVal slimit;
|
||||
|
||||
char **prevRow;
|
||||
SArray *orderColumnList;
|
||||
char **prevRow;
|
||||
SArray *orderColumnList;
|
||||
bool hasPrev;
|
||||
bool ignoreCurrentGroup;
|
||||
bool multigroupResult;
|
||||
SSDataBlock *pRes; // result buffer
|
||||
SSDataBlock *pPrevBlock;
|
||||
int64_t capacity;
|
||||
int64_t threshold;
|
||||
} SSLimitOperatorInfo;
|
||||
|
||||
typedef struct SFilterOperatorInfo {
|
||||
|
@ -488,8 +496,9 @@ typedef struct SFillOperatorInfo {
|
|||
SFillInfo *pFillInfo;
|
||||
SSDataBlock *pRes;
|
||||
int64_t totalInputRows;
|
||||
|
||||
void **p;
|
||||
SSDataBlock *existNewGroupBlock;
|
||||
bool multigroupResult;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SGroupbyOperatorInfo {
|
||||
|
@ -551,9 +560,9 @@ typedef struct SMultiwayMergeInfo {
|
|||
bool hasDataBlockForNewGroup;
|
||||
SSDataBlock *pExistBlock;
|
||||
|
||||
bool hasPrev;
|
||||
bool groupMix;
|
||||
SArray *udfInfo;
|
||||
bool hasPrev;
|
||||
bool multiGroupResults;
|
||||
} SMultiwayMergeInfo;
|
||||
|
||||
// todo support the disk-based sort
|
||||
|
@ -575,7 +584,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
|
|||
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
|
||||
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
|
@ -584,10 +593,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
|
|||
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
|
||||
int32_t numOfRows, void* merger, bool groupMix);
|
||||
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo);
|
||||
int32_t numOfRows, void* merger);
|
||||
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
|
||||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
|
||||
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
|
||||
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
|
||||
|
||||
|
|
|
@ -165,6 +165,7 @@ typedef struct SQueryInfo {
|
|||
bool orderProjectQuery;
|
||||
bool stateWindow;
|
||||
bool globalMerge;
|
||||
bool multigroupResult;
|
||||
} SQueryInfo;
|
||||
|
||||
/**
|
||||
|
|
|
@ -38,15 +38,12 @@
|
|||
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
||||
|
||||
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
|
||||
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
||||
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
||||
|
||||
#define MULTI_KEY_DELIM "-"
|
||||
|
||||
#define HASH_CAPACITY_LIMIT 10000000
|
||||
|
||||
#define TIME_WINDOW_COPY(_dst, _src) do {\
|
||||
(_dst).skey = (_src).skey;\
|
||||
(_dst).ekey = (_src).ekey;\
|
||||
|
@ -1334,7 +1331,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
|
|||
} else {
|
||||
pCtx[k].start.ptr = (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
|
||||
}
|
||||
|
||||
|
||||
pCtx[k].end.ptr = (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
|
||||
}
|
||||
}
|
||||
|
@ -1623,7 +1620,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
|
||||
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
|
||||
preWin = win;
|
||||
|
||||
|
||||
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
||||
startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
|
||||
if (startPos < 0) {
|
||||
|
@ -2262,30 +2259,30 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
|
||||
case OP_Fill: {
|
||||
SOperatorInfo* pInfo = pRuntimeEnv->proot;
|
||||
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput);
|
||||
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_MultiwayMergeSort: {
|
||||
bool groupMix = true;
|
||||
if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
|
||||
groupMix = false;
|
||||
}
|
||||
|
||||
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
|
||||
4096, merger, groupMix); // TODO hack it
|
||||
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_GlobalAggregate: {
|
||||
case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock.
|
||||
bool multigroupResult = pQueryAttr->multigroupResult;
|
||||
if (pQueryAttr->multigroupResult) {
|
||||
multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE);
|
||||
}
|
||||
|
||||
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo);
|
||||
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_SLimit: {
|
||||
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, merger);
|
||||
int32_t num = pRuntimeEnv->proot->numOfOutput;
|
||||
SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
|
||||
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -3636,7 +3633,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
|
|||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
|
||||
|
||||
// re-estabilish output buffer pointer.
|
||||
// set the correct pointer after the memory buffer reallocated.
|
||||
int32_t functionId = pBInfo->pCtx[i].functionId;
|
||||
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||
|
@ -4200,6 +4197,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
|
|||
|
||||
// refactor : extract method
|
||||
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
|
||||
//add condition (pBlock->info.rows >= 1) just to runtime happy
|
||||
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) {
|
||||
STimeWindow* w = &pBlock->info.window;
|
||||
|
@ -4314,15 +4312,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
|||
}
|
||||
}
|
||||
|
||||
int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) {
|
||||
void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES);
|
||||
int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) {
|
||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
|
||||
p[i] = pColInfoData->pData;
|
||||
p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows);
|
||||
}
|
||||
|
||||
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity);
|
||||
tfree(p);
|
||||
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows);
|
||||
pOutput->info.rows += numOfRows;
|
||||
|
||||
return pOutput->info.rows;
|
||||
}
|
||||
|
||||
|
@ -5366,11 +5364,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
|
||||
taosArrayDestroy(pInfo->orderColumnList);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
tfree(pInfo->prevRow);
|
||||
}
|
||||
|
||||
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
|
||||
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) {
|
||||
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) {
|
||||
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
|
||||
|
||||
pInfo->resultRowFactor =
|
||||
|
@ -5378,15 +5377,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
|
|||
|
||||
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
|
||||
|
||||
pInfo->pMerge = param;
|
||||
pInfo->bufCapacity = 4096;
|
||||
pInfo->udfInfo = pUdfInfo;
|
||||
|
||||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
|
||||
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
|
||||
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
|
||||
pInfo->multiGroupResults = groupResultMixedUp;
|
||||
pInfo->pMerge = param;
|
||||
pInfo->bufCapacity = 4096;
|
||||
pInfo->udfInfo = pUdfInfo;
|
||||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
|
||||
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
||||
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
|
||||
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
|
||||
|
||||
// TODO refactor
|
||||
int32_t len = 0;
|
||||
|
@ -5439,17 +5437,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
|
|||
}
|
||||
|
||||
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
|
||||
int32_t numOfRows, void *merger, bool groupMix) {
|
||||
int32_t numOfRows, void *merger) {
|
||||
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
|
||||
|
||||
pInfo->pMerge = merger;
|
||||
pInfo->groupMix = groupMix;
|
||||
pInfo->bufCapacity = numOfRows;
|
||||
|
||||
pInfo->pMerge = merger;
|
||||
pInfo->bufCapacity = numOfRows;
|
||||
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
|
||||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
|
||||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
|
||||
|
||||
{
|
||||
{ // todo extract method to create prev compare buffer
|
||||
int32_t len = 0;
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
len += pExpr[i].base.colBytes;
|
||||
|
@ -5457,8 +5453,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
|
|||
|
||||
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
|
||||
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
||||
int32_t offset = POINTER_BYTES * numOfCols;
|
||||
|
||||
int32_t offset = POINTER_BYTES * numOfCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
||||
|
||||
|
@ -5474,7 +5470,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
|
|||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->exec = doMultiwayMergeSort;
|
||||
pOperator->cleanup = destroyGlobalAggOperatorInfo;
|
||||
return pOperator;
|
||||
|
@ -6390,19 +6387,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
|
|||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SFillOperatorInfo *pInfo = pOperator->info;
|
||||
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
*newgroup = false;
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
||||
return pInfo->pRes;
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// handle the cached new group data block
|
||||
|
@ -6414,11 +6405,47 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
|||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
*newgroup = true;
|
||||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
|
||||
SFillOperatorInfo *pInfo = pOperator->info;
|
||||
pInfo->pRes->info.rows = 0;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
|
||||
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
// *newgroup = false;
|
||||
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
||||
// return pInfo->pRes;
|
||||
// }
|
||||
//
|
||||
// // handle the cached new group data block
|
||||
// if (pInfo->existNewGroupBlock) {
|
||||
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||
//
|
||||
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
//
|
||||
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||
// pInfo->existNewGroupBlock = NULL;
|
||||
// *newgroup = true;
|
||||
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
// }
|
||||
|
||||
while(1) {
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
|
@ -6433,8 +6460,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
|||
pInfo->existNewGroupBlock = pBlock;
|
||||
*newgroup = false;
|
||||
|
||||
// fill the previous group data block
|
||||
// before handle a new data block, close the fill operation for previous group data block
|
||||
// Fill the previous group data block, before handle the data block of new group.
|
||||
// Close the fill operation for previous group data block
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
|
||||
} else {
|
||||
if (pBlock == NULL) {
|
||||
|
@ -6446,28 +6473,61 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
|||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
|
||||
} else {
|
||||
pInfo->totalInputRows += pBlock->info.rows;
|
||||
|
||||
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey
|
||||
: */pBlock->info.window.ekey;
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey);
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
|
||||
}
|
||||
}
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||
if (pInfo->pRes->info.rows > 0) { // current group has no more result to return
|
||||
return pInfo->pRes;
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||
|
||||
// current group has no more result to return
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
// 1. The result in current group not reach the threshold of output result, continue
|
||||
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
|
||||
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
|
||||
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
// *newgroup = false;
|
||||
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
||||
// return pInfo->pRes;
|
||||
// }
|
||||
//
|
||||
// // handle the cached new group data block
|
||||
// if (pInfo->existNewGroupBlock) {
|
||||
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||
//
|
||||
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
//
|
||||
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||
// pInfo->existNewGroupBlock = NULL;
|
||||
// *newgroup = true;
|
||||
//
|
||||
// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
|
||||
// return pInfo->pRes;
|
||||
// }
|
||||
//
|
||||
//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
// }
|
||||
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey
|
||||
:*/ pInfo->existNewGroupBlock->info.window.ekey;
|
||||
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
*newgroup = true;
|
||||
|
||||
|
@ -6475,7 +6535,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
|||
} else {
|
||||
return NULL;
|
||||
}
|
||||
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6576,6 +6635,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
|
||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
tfree(pInfo->p);
|
||||
}
|
||||
|
||||
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -6919,10 +6979,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
|
||||
int32_t numOfOutput) {
|
||||
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
|
||||
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
|
||||
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
|
||||
{
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
@ -6937,6 +6997,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
|
|||
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
|
||||
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit,
|
||||
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
|
||||
|
||||
pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES);
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -6956,7 +7018,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger) {
|
||||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
|
||||
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
|
||||
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
@ -6964,9 +7026,11 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
|||
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
|
||||
pInfo->slimit = pQueryAttr->slimit;
|
||||
pInfo->limit = pQueryAttr->limit;
|
||||
|
||||
pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
|
||||
pInfo->threshold = (int64_t)(pInfo->capacity * 0.8);
|
||||
pInfo->currentOffset = pQueryAttr->limit.offset;
|
||||
pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
|
||||
pInfo->currentOffset = pQueryAttr->limit.offset;
|
||||
pInfo->multigroupResult= multigroupResult;
|
||||
|
||||
// TODO refactor
|
||||
int32_t len = 0;
|
||||
|
@ -6974,10 +7038,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
|||
len += pExpr[i].base.resBytes;
|
||||
}
|
||||
|
||||
int32_t numOfCols = pInfo->orderColumnList != NULL? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
|
||||
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
|
||||
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
||||
int32_t offset = POINTER_BYTES * numOfCols;
|
||||
|
||||
int32_t offset = POINTER_BYTES * numOfCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
||||
|
||||
|
@ -6985,6 +7049,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
|||
offset += pExpr[index->colIndex].base.resBytes;
|
||||
}
|
||||
|
||||
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
pOperator->name = "SLimitOperator";
|
||||
|
|
|
@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
|
|||
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
|
||||
pFillInfo->pData[i] = pColData->pData;
|
||||
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||
assert (pTag->col.colId == pCol->col.colId);
|
||||
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
|
||||
|
|
|
@ -165,6 +165,8 @@ char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
||||
char* strtolower(char *dst, const char *src) {
|
||||
int esc = 0;
|
||||
char quote = 0, *p = dst, c;
|
||||
|
|
|
@ -150,13 +150,13 @@ if $data00 != 0.00150 then
|
|||
print expect 0.00150, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
sql create table st_float_15_0 using mt_float tags (3.40282347e+38)
|
||||
sql select tagname from st_float_15_0
|
||||
#sql create table st_float_15_0 using mt_float tags (3.40282347e+38)
|
||||
#sql select tagname from st_float_15_0
|
||||
#if $data00 != 0.001500 then
|
||||
# return -1
|
||||
#endi
|
||||
sql create table st_float_16_0 using mt_float tags (-3.40282347e+38)
|
||||
sql select tagname from st_float_16_0
|
||||
#sql create table st_float_16_0 using mt_float tags (-3.40282347e+38)
|
||||
#sql select tagname from st_float_16_0
|
||||
#if $data00 != 0.001500 then
|
||||
# return -1
|
||||
#endi
|
||||
|
|
Loading…
Reference in New Issue