[td-225] refactor code.
This commit is contained in:
parent
4e4e7b2454
commit
bc39673ca6
|
@ -199,7 +199,7 @@ SExprInfo* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnI
|
|||
SExprInfo* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
|
||||
int16_t size);
|
||||
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
|
||||
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
|
||||
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
|
||||
|
||||
SExprInfo* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
|
||||
int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
|
||||
|
@ -207,9 +207,8 @@ void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src);
|
|||
void tscSqlExprInfoDestroy(SArray* pExprInfo);
|
||||
|
||||
SColumn* tscColumnClone(const SColumn* src);
|
||||
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex);
|
||||
SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex, SSchema* pSchema);
|
||||
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
|
||||
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
|
||||
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
|
||||
void tscColumnListDestroy(SArray* pColList);
|
||||
|
||||
void tscDequoteAndTrimToken(SStrToken* pToken);
|
||||
|
|
|
@ -115,7 +115,9 @@ typedef struct SFieldInfo {
|
|||
} SFieldInfo;
|
||||
|
||||
typedef struct SColumn {
|
||||
SColumnIndex colIndex;
|
||||
uint64_t tableUid;
|
||||
int32_t columnIndex;
|
||||
// SColumnIndex colIndex;
|
||||
SColumnInfo info;
|
||||
} SColumn;
|
||||
|
||||
|
|
|
@ -344,7 +344,7 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM
|
|||
// validate the table columns information
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pQueryInfo->colList); ++i) {
|
||||
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
|
||||
if (pCol->colIndex.columnIndex >= numOfCols) {
|
||||
if (pCol->columnIndex >= numOfCols) {
|
||||
return pSql->retryReason;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ typedef struct SCompareParam {
|
|||
int32_t groupOrderType;
|
||||
} SCompareParam;
|
||||
|
||||
bool needToMergeRv(SSDataBlock* pBlock, SLocalMerger *pLocalMerge, int32_t index, char **buf);
|
||||
bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndex, int32_t index, char **buf);
|
||||
|
||||
int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
|
||||
int32_t pLeftIdx = *(int32_t *)pLeft;
|
||||
|
@ -598,11 +598,19 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
|
|||
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
|
||||
int32_t numOfInternalOutput = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
|
||||
int32_t startCols = numOfInternalOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
|
||||
|
||||
// the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns
|
||||
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
|
||||
orderColIndexList[i] = startCols++;
|
||||
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, i);
|
||||
for(int32_t j = 0; j < numOfInternalOutput; ++j) {
|
||||
SExprInfo* pExprInfo = tscSqlExprGet(pQueryInfo, j);
|
||||
|
||||
int32_t functionId = pExprInfo->base.functionId;
|
||||
if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG)) {
|
||||
orderColIndexList[i] = j;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryInfo->interval.interval != 0) {
|
||||
|
@ -1134,6 +1142,26 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
|
|||
(*hasPrev) = true;
|
||||
}
|
||||
|
||||
static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) {
|
||||
if (numOfRows <= 1) {
|
||||
return ;
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
if (pCtx[k].functionId != TSDB_FUNC_TAG) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result
|
||||
char* src = pCtx[k].pOutput;
|
||||
|
||||
for (int32_t i = 0; i < inc; ++i) {
|
||||
pCtx[k].pOutput += pCtx[k].outputBytes;
|
||||
memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
|
||||
SMultiwayMergeInfo* pInfo = pOperator->info;
|
||||
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
||||
|
@ -1141,79 +1169,68 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
|
|||
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES);
|
||||
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
add[i] = pCtx[i].pInput;
|
||||
pCtx[i].size = 1;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
if (pInfo->hasPrev) {
|
||||
if (needToMergeRv(pBlock, pInfo->pMerge, i, pInfo->prevRow)) {
|
||||
if (needToMergeRv(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCtx[j].size = 1;
|
||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||
}
|
||||
} else {
|
||||
for(int32_t j = 0; j < numOfExpr; ++j) {
|
||||
for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCtx[j].size = 1;
|
||||
aAggs[functionId].xFinalize(&pCtx[j]);
|
||||
}
|
||||
|
||||
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
|
||||
|
||||
pInfo->binfo.pRes->info.rows += numOfRows;
|
||||
|
||||
if (i == 0) {
|
||||
for(int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pOutput += pCtx[j].outputBytes;
|
||||
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
|
||||
aAggs[pCtx[j].functionId].init(&pCtx[j]);
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCtx[j].size = 1;
|
||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||
}
|
||||
}
|
||||
|
||||
for(int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pOutput += pCtx[j].outputBytes;
|
||||
pCtx[j].pInput += pCtx[j].inputBytes;
|
||||
|
||||
aAggs[pCtx[j].functionId].init(&pCtx[j]);
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCtx[j].size = 1;
|
||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
int32_t functionId = pCtx[j].functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCtx[j].size = 1;
|
||||
aAggs[functionId].mergeFunc(&pCtx[j]);
|
||||
}
|
||||
}
|
||||
|
||||
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
|
||||
}
|
||||
|
||||
|
@ -1330,17 +1347,12 @@ bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *t
|
|||
return (ret == 0);
|
||||
}
|
||||
|
||||
bool needToMergeRv(SSDataBlock* pBlock, SLocalMerger *pLocalMerge, int32_t index, char **buf) {
|
||||
bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
|
||||
int32_t ret = 0;
|
||||
tOrderDescriptor *pDesc = pLocalMerge->pDesc;
|
||||
if (pDesc->orderInfo.numOfCols > 0) {
|
||||
// if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
|
||||
ret = compare_aRv(pBlock, pDesc->orderInfo.colIndex, pDesc->orderInfo.numOfCols, index, buf, TSDB_ORDER_ASC);
|
||||
// } else { // desc
|
||||
// ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
|
||||
// }
|
||||
size_t size = taosArrayGetSize(columnIndexList);
|
||||
if (size > 0) {
|
||||
ret = compare_aRv(pBlock, columnIndexList, size, index, buf, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
// if ret == 0, means the result belongs to the same group
|
||||
return (ret == 0);
|
||||
}
|
||||
|
@ -1429,24 +1441,6 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren
|
|||
return true;
|
||||
}
|
||||
|
||||
bool genFinalResultsRv(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurrentGroupRes) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
|
||||
tFilePage * pResBuf = pLocalMerge->pResultBuf;
|
||||
SColumnModel *pModel = pLocalMerge->resColModel;
|
||||
|
||||
pRes->code = TSDB_CODE_SUCCESS;
|
||||
|
||||
tColModelCompact(pModel, pResBuf, pModel->capacity);
|
||||
|
||||
// no interval query, no fill operation
|
||||
genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning
|
||||
size_t t = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
for (int32_t i = 0; i < t; ++i) {
|
||||
|
@ -1844,7 +1838,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel
|
|||
pBlock->info.rows += 1;
|
||||
}
|
||||
|
||||
SSDataBlock* doMultiwaySort(void* param) {
|
||||
SSDataBlock* doMultiwaySort(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -1891,6 +1885,7 @@ SSDataBlock* doMultiwaySort(void* param) {
|
|||
continue;
|
||||
} else {
|
||||
sameGroup = false;
|
||||
*newgroup = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1957,7 +1952,7 @@ static bool isSameGroupRv(SArray* orderColumnList, SSDataBlock* pBlock, char** d
|
|||
return true;
|
||||
}
|
||||
|
||||
SSDataBlock* doGlobalAggregate(void* param) {
|
||||
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -1966,6 +1961,7 @@ SSDataBlock* doGlobalAggregate(void* param) {
|
|||
SMultiwayMergeInfo *pAggInfo = pOperator->info;
|
||||
SOperatorInfo *upstream = pOperator->upstream;
|
||||
|
||||
*newgroup = false;
|
||||
bool handleData = false;
|
||||
pAggInfo->binfo.pRes->info.rows = 0;
|
||||
|
||||
|
@ -1991,19 +1987,23 @@ SSDataBlock* doGlobalAggregate(void* param) {
|
|||
pAggInfo->pExistBlock = NULL;
|
||||
pAggInfo->hasDataBlockForNewGroup = false;
|
||||
handleData = true;
|
||||
*newgroup = true;
|
||||
}
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while(1) {
|
||||
pBlock = upstream->exec(upstream);
|
||||
bool prev = *newgroup;
|
||||
pBlock = upstream->exec(upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
*newgroup = prev;
|
||||
break;
|
||||
}
|
||||
|
||||
if (pAggInfo->hasGroupColData) {
|
||||
bool sameGroup = isSameGroupRv(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
|
||||
if (!sameGroup) {
|
||||
*newgroup = true;
|
||||
pAggInfo->hasDataBlockForNewGroup = true;
|
||||
pAggInfo->pExistBlock = pBlock;
|
||||
savePrevOrderColumns(pAggInfo->prevRow, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasPrev);
|
||||
|
@ -2014,7 +2014,7 @@ SSDataBlock* doGlobalAggregate(void* param) {
|
|||
// not belongs to the same group, return the result of current group
|
||||
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
|
||||
|
||||
// handle the output buffer problem
|
||||
// todo: it may be overflow handle the output buffer problem
|
||||
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows);
|
||||
|
||||
doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pBlock);
|
||||
|
@ -2034,12 +2034,82 @@ SSDataBlock* doGlobalAggregate(void* param) {
|
|||
|
||||
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
pAggInfo->binfo.pRes->info.rows += numOfRows;
|
||||
|
||||
setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows);
|
||||
}
|
||||
|
||||
return (pAggInfo->binfo.pRes->info.rows != 0)? pAggInfo->binfo.pRes:NULL;
|
||||
SSDataBlock* pRes = pAggInfo->binfo.pRes;
|
||||
{
|
||||
SColumnInfoData* pInfoData = taosArrayGet(pRes->pDataBlock, 0);
|
||||
|
||||
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pRes->info.rows > 0) {
|
||||
STimeWindow* w = &pRes->info.window;
|
||||
w->skey = *(int64_t*)pInfoData->pData;
|
||||
w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pRes->info.rows - 1));
|
||||
}
|
||||
}
|
||||
|
||||
return (pRes->info.rows != 0)? pRes:NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* doSLimit(void* param) {
|
||||
static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
|
||||
SSLimitOperatorInfo *pInfo = pOperator->info;
|
||||
assert(pInfo->currentGroupOffset >= 0);
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
if (pInfo->currentGroupOffset == 0) {
|
||||
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
|
||||
while ((*newgroup) == false) { // ignore the remain blocks
|
||||
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while(1) {
|
||||
if (*newgroup) {
|
||||
pInfo->currentGroupOffset -= 1;
|
||||
*newgroup = false;
|
||||
}
|
||||
|
||||
while ((*newgroup) == false) {
|
||||
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
// now we have got the first data block of the next group.
|
||||
if (pInfo->currentGroupOffset == 0) {
|
||||
return pBlock;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* doSLimit(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -2049,97 +2119,138 @@ SSDataBlock* doSLimit(void* param) {
|
|||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while (1) {
|
||||
pBlock = pOperator->upstream->exec(pOperator->upstream);
|
||||
pBlock = skipGroupBlock(pOperator, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (*newgroup) { // a new group arrives
|
||||
pInfo->groupTotal += 1;
|
||||
pInfo->rowsTotal = 0;
|
||||
pInfo->currentOffset = pInfo->limit.offset;
|
||||
}
|
||||
|
||||
assert(pInfo->currentGroupOffset == 0);
|
||||
|
||||
if (pInfo->currentOffset >= pBlock->info.rows) {
|
||||
pInfo->currentOffset -= pBlock->info.rows;
|
||||
} else {
|
||||
if (pInfo->currentOffset == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
|
||||
pBlock->info.rows = remain;
|
||||
|
||||
// move the remain rows of this data block to the front.
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
|
||||
}
|
||||
|
||||
pInfo->currentOffset = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
if (!pInfo->hasPrev) {
|
||||
pInfo->groupTotal = 1;
|
||||
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
|
||||
} else {
|
||||
bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow);
|
||||
if (!sameGroup) { // reset info for new group data
|
||||
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
|
||||
pInfo->rowsTotal = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (pInfo->currentGroupOffset == 0) {
|
||||
if (pInfo->currentOffset == 0) { // TODO refactor
|
||||
break;
|
||||
} else if (pInfo->currentOffset >= pBlock->info.rows) {
|
||||
pInfo->currentOffset -= pBlock->info.rows;
|
||||
} else {
|
||||
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
|
||||
pBlock->info.rows = remain;
|
||||
|
||||
// move the remain rows of this data block to the front.
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
|
||||
}
|
||||
|
||||
pInfo->currentOffset = 0;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (pInfo->hasPrev) {
|
||||
// Check if current data block belongs to current result group or not
|
||||
bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow);
|
||||
if (sameGroup) {
|
||||
continue; // ignore the data block of the same group and try next
|
||||
} else {
|
||||
//update the group column data by using the current group.
|
||||
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
|
||||
|
||||
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
|
||||
pInfo->rowsTotal = 0;
|
||||
|
||||
if ((--pInfo->currentGroupOffset) == 0) {
|
||||
if (pInfo->currentOffset == 0) { // TODO refactor
|
||||
break;
|
||||
} else if (pInfo->currentOffset >= pBlock->info.rows) {
|
||||
pInfo->currentOffset -= pBlock->info.rows;
|
||||
} else {
|
||||
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
|
||||
pBlock->info.rows = remain;
|
||||
|
||||
// move the remain rows of this data block to the front.
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
|
||||
}
|
||||
|
||||
pInfo->currentOffset = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
|
||||
} else { // data in current group has reached the limit, ignore the remain data of this group
|
||||
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal >= pInfo->limit.limit)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
// if (pInfo->currentGroupOffset == 0) {
|
||||
// if (pInfo->currentOffset == 0) { // TODO refactor
|
||||
// break;
|
||||
// } else if (pInfo->currentOffset >= pBlock->info.rows) {
|
||||
// pInfo->currentOffset -= pBlock->info.rows;
|
||||
// } else {
|
||||
// int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
|
||||
// pBlock->info.rows = remain;
|
||||
//
|
||||
// // move the remain rows of this data block to the front.
|
||||
// for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
// SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
//
|
||||
// int16_t bytes = pColInfoData->info.bytes;
|
||||
// memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
|
||||
// }
|
||||
//
|
||||
// pInfo->currentOffset = 0;
|
||||
// break;
|
||||
// }
|
||||
// } else {
|
||||
// if (pInfo->hasPrev) {
|
||||
// // Check if current data block belongs to current result group or not
|
||||
// bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow);
|
||||
// if (sameGroup) {
|
||||
// continue; // ignore the data block of the same group and try next
|
||||
// } else {
|
||||
// //update the group column data by using the current group.
|
||||
// savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
|
||||
//
|
||||
// pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
|
||||
// pInfo->rowsTotal = 0;
|
||||
//
|
||||
// if ((--pInfo->currentGroupOffset) == 0) {
|
||||
// if (pInfo->currentOffset == 0) { // TODO refactor
|
||||
// break;
|
||||
// } else if (pInfo->currentOffset >= pBlock->info.rows) {
|
||||
// pInfo->currentOffset -= pBlock->info.rows;
|
||||
// } else {
|
||||
// int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
|
||||
// pBlock->info.rows = remain;
|
||||
//
|
||||
// // move the remain rows of this data block to the front.
|
||||
// for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
// SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
//
|
||||
// int16_t bytes = pColInfoData->info.bytes;
|
||||
// memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
|
||||
// }
|
||||
//
|
||||
// pInfo->currentOffset = 0;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// } else {
|
||||
// savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
if (!pInfo->hasPrev || !isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow)) {
|
||||
pInfo->groupTotal += 1;
|
||||
if (pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
|
||||
// if (!pInfo->hasPrev || !isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow)) {
|
||||
// pInfo->groupTotal += 1;
|
||||
if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
// }
|
||||
|
||||
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) {
|
||||
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal);
|
||||
pInfo->rowsTotal = pInfo->limit.limit;
|
||||
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
} else {
|
||||
pInfo->rowsTotal += pBlock->info.rows;
|
||||
}
|
||||
|
|
|
@ -1399,10 +1399,9 @@ int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStr
|
|||
return (totalLen < TSDB_TABLE_FNAME_LEN) ? TSDB_CODE_SUCCESS : TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
|
||||
SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t tableUid) {
|
||||
SSchema s = {.type = TSDB_DATA_TYPE_TIMESTAMP, .bytes = TSDB_KEYSIZE, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
tscColumnListInsert(pQueryInfo->colList, &tsCol, &s);
|
||||
tscColumnListInsert(pQueryInfo->colList, PRIMARYKEY_TIMESTAMP_COL_INDEX, tableUid, &s);
|
||||
}
|
||||
|
||||
static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t exprIndex, tSqlExprItem* pItem) {
|
||||
|
@ -1482,7 +1481,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
|
|||
insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->base.aliasName, pExpr);
|
||||
|
||||
// add ts column
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, pExpr->base.uid);
|
||||
|
||||
tbufCloseWriter(&bw);
|
||||
taosArrayDestroy(colList);
|
||||
|
@ -1706,15 +1705,16 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi
|
|||
int8_t type, char* fieldName, SExprInfo* pSqlExpr) {
|
||||
for (int32_t i = 0; i < pColList->num; ++i) {
|
||||
int32_t tableIndex = pColList->ids[i].tableIndex;
|
||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex];
|
||||
STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[tableIndex]->pTableMeta;
|
||||
|
||||
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
|
||||
if (pColList->ids[i].columnIndex >= numOfCols) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
tscColumnListInsert(pQueryInfo->colList, &pColList->ids[i], &pSchema[pColList->ids[i].columnIndex]);
|
||||
uint64_t uid = pTableMeta->id.uid;
|
||||
SSchema* pSchema = tscGetTableSchema(pTableMeta);
|
||||
tscColumnListInsert(pQueryInfo->colList, pColList->ids[i].columnIndex, uid, &pSchema[pColList->ids[i].columnIndex]);
|
||||
}
|
||||
|
||||
TAOS_FIELD f = tscCreateField(type, fieldName, bytes);
|
||||
|
@ -1736,7 +1736,7 @@ SExprInfo* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tab
|
|||
|
||||
if (functionId == TSDB_FUNC_TAGPRJ) {
|
||||
index.columnIndex = colIndex - tscGetNumOfColumns(pTableMeta);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index, pSchema);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pSchema);
|
||||
} else {
|
||||
index.columnIndex = colIndex;
|
||||
}
|
||||
|
@ -1765,7 +1765,7 @@ SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColInd
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pIndex->tableIndex);
|
||||
|
||||
if (TSDB_COL_IS_TAG(flag)) {
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, pIndex, pColSchema);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, pIndex->columnIndex, pTableMetaInfo->pTableMeta->id.uid, pColSchema);
|
||||
}
|
||||
|
||||
return pExpr;
|
||||
|
@ -1828,8 +1828,9 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
|
|||
}
|
||||
|
||||
// add the primary timestamp column even though it is not required by user
|
||||
if (pQueryInfo->pTableMetaInfo[index.tableIndex]->pTableMeta->tableType != TSDB_TEMP_TABLE) {
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
|
||||
STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[index.tableIndex]->pTableMeta;
|
||||
if (pTableMeta->tableType != TSDB_TEMP_TABLE) {
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMeta->id.uid);
|
||||
}
|
||||
} else if (optr == TK_STRING || optr == TK_INTEGER || optr == TK_FLOAT) { // simple column projection query
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
|
@ -1871,7 +1872,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
|
|||
}
|
||||
|
||||
// add the primary timestamp column even though it is not required by user
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
|
||||
} else {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
@ -1916,16 +1918,15 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
|
|||
}
|
||||
|
||||
// for all queries, the timestamp column needs to be loaded
|
||||
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
SSchema s = {.colId = PRIMARYKEY_TIMESTAMP_COL_INDEX, .bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP,};
|
||||
tscColumnListInsert(pQueryInfo->colList, &index, &s);
|
||||
tscColumnListInsert(pQueryInfo->colList, PRIMARYKEY_TIMESTAMP_COL_INDEX, pExpr->base.uid, &s);
|
||||
|
||||
// if it is not in the final result, do not add it
|
||||
SColumnList ids = createColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
|
||||
if (finalResult) {
|
||||
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, pExpr->base.aliasName, pExpr);
|
||||
} else {
|
||||
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]), pSchema);
|
||||
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2065,13 +2066,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
} else {
|
||||
for (int32_t i = 0; i < list.num; ++i) {
|
||||
SSchema* ps = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
tscColumnListInsert(pQueryInfo->colList, &list.ids[i], &ps[list.ids[i].columnIndex]);
|
||||
tscColumnListInsert(pQueryInfo->colList, list.ids[i].columnIndex, pTableMetaInfo->pTableMeta->id.uid,
|
||||
&ps[list.ids[i].columnIndex]);
|
||||
}
|
||||
}
|
||||
|
||||
// the time stamp may be always needed
|
||||
if (index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2176,10 +2178,10 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
pExpr->base.aliasName, pExpr);
|
||||
} else {
|
||||
assert(ids.num == 1);
|
||||
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]), pSchema);
|
||||
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
|
||||
}
|
||||
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, pExpr->base.uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
case TSDB_FUNC_FIRST:
|
||||
|
@ -2357,7 +2359,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
* for dp = 0, it is actually min,
|
||||
* for dp = 100, it is max,
|
||||
*/
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
|
||||
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
|
||||
colIndex += 1; // the first column is ts
|
||||
|
||||
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
|
||||
|
@ -2398,7 +2400,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
insertResultField(pQueryInfo, colIndex, &ids, resultSize, resultType, pExpr->base.aliasName, pExpr);
|
||||
} else {
|
||||
assert(ids.num == 1);
|
||||
tscColumnListInsert(pQueryInfo->colList, &ids.ids[0], pSchema);
|
||||
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2448,7 +2450,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index, &pSchema[index.columnIndex]);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid,
|
||||
&pSchema[index.columnIndex]);
|
||||
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
SSchema s = {0};
|
||||
|
@ -3054,14 +3057,14 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
|
|||
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
|
||||
|
||||
index.columnIndex = relIndex;
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index, pSchema);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pSchema);
|
||||
} else {
|
||||
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
|
||||
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
|
||||
}
|
||||
|
||||
tscColumnListInsert(pQueryInfo->colList, &index, pSchema);
|
||||
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema);
|
||||
|
||||
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
|
||||
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
|
||||
|
@ -3259,7 +3262,7 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
|
|||
const char* msg2 = "binary column not support this operator";
|
||||
const char* msg3 = "bool column not support this operator";
|
||||
|
||||
SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex, pSchema);
|
||||
SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex->columnIndex, pTableMeta->id.uid, pSchema);
|
||||
SColumnFilterInfo* pColFilter = NULL;
|
||||
|
||||
/*
|
||||
|
@ -3312,7 +3315,8 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
|
|||
}
|
||||
}
|
||||
|
||||
pColumn->colIndex = *pIndex;
|
||||
pColumn->columnIndex = pIndex->columnIndex;
|
||||
pColumn->tableUid = pTableMeta->id.uid;
|
||||
return doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, pIndex, pExpr);
|
||||
}
|
||||
|
||||
|
@ -3412,7 +3416,7 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS
|
|||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) {
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid)) {
|
||||
// tscColumnListInsert(pTableMetaInfo->tagColList, &index, );
|
||||
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
|
@ -3441,10 +3445,11 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS
|
|||
(*rightNode)->tagColId = pTagSchema2->colId;
|
||||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) {
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid)) {
|
||||
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index, pTagSchema2);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pTagSchema2);
|
||||
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
|
@ -4373,7 +4378,8 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
|
|||
SColumnIndex index = {.tableIndex = i, .columnIndex = pIndex->colIndex - numOfCols};
|
||||
|
||||
SSchema* s = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index, &s[pIndex->colIndex]);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid,
|
||||
&s[pIndex->colIndex]);
|
||||
}
|
||||
|
||||
tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &bw);
|
||||
|
@ -5755,7 +5761,8 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
|
|||
if (pExpr == NULL || pExpr->base.functionId != TSDB_FUNC_TAG) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pParentQueryInfo, tableIndex);
|
||||
|
||||
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, uid);
|
||||
|
||||
SSchema* pTagSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, colId);
|
||||
int16_t colIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, colId);
|
||||
|
@ -5778,8 +5785,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
|
|||
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
|
||||
pColIndex->colIndex = relIndex;
|
||||
|
||||
index = (SColumnIndex) {.tableIndex = tableIndex, .columnIndex = relIndex};
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index, pTagSchema);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, relIndex, uid, pTagSchema);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6035,34 +6041,26 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
|
|||
}
|
||||
|
||||
static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||
const char* msg2 = "interval not allowed in group by normal column";
|
||||
const char* msg1 = "interval not allowed in group by normal column";
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
SSchema s = *tGetTbnameColumnSchema();
|
||||
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
int16_t bytes = 0;
|
||||
int16_t type = 0;
|
||||
char* name = NULL;
|
||||
SSchema* tagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
SSchema* s = NULL;
|
||||
|
||||
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
|
||||
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, i);
|
||||
int16_t colIndex = pColIndex->colIndex;
|
||||
|
||||
if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
type = s.type;
|
||||
bytes = s.bytes;
|
||||
name = s.name;
|
||||
s = tGetTbnameColumnSchema();
|
||||
} else {
|
||||
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
|
||||
SSchema* tagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
type = tagSchema[colIndex].type;
|
||||
bytes = tagSchema[colIndex].bytes;
|
||||
name = tagSchema[colIndex].name;
|
||||
s = &tagSchema[colIndex];
|
||||
} else {
|
||||
type = pSchema[colIndex].type;
|
||||
bytes = pSchema[colIndex].bytes;
|
||||
name = pSchema[colIndex].name;
|
||||
s = &pSchema[colIndex];
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6070,34 +6068,33 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
|
|||
|
||||
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
|
||||
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
|
||||
SExprInfo* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true);
|
||||
SExprInfo* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, s->type, s->bytes,
|
||||
getNewResColId(pQueryInfo), s->bytes, true);
|
||||
|
||||
memset(pExpr->base.aliasName, 0, sizeof(pExpr->base.aliasName));
|
||||
tstrncpy(pExpr->base.aliasName, name, sizeof(pExpr->base.aliasName));
|
||||
tstrncpy(pExpr->base.aliasName, s->name, sizeof(pExpr->base.aliasName));
|
||||
|
||||
pExpr->base.colInfo.flag = TSDB_COL_TAG;
|
||||
|
||||
// NOTE: tag column does not add to source column list
|
||||
SColumnList ids = createColumnList(1, 0, pColIndex->colIndex);
|
||||
insertResultField(pQueryInfo, (int32_t)size, &ids, bytes, (int8_t)type, name, pExpr);
|
||||
insertResultField(pQueryInfo, (int32_t)size, &ids, s->bytes, (int8_t)s->type, s->name, pExpr);
|
||||
} else {
|
||||
// if this query is "group by" normal column, time window query is not allowed
|
||||
if (isTimeWindowQuery(pQueryInfo)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
bool hasGroupColumn = false;
|
||||
for (int32_t j = 0; j < size; ++j) {
|
||||
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, j);
|
||||
if (pExpr->base.colInfo.colId == pColIndex->colId) {
|
||||
if ((pExpr->base.functionId == TSDB_FUNC_PRJ) && pExpr->base.colInfo.colId == pColIndex->colId) {
|
||||
hasGroupColumn = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* if the group by column does not required by user, add this column into the final result set
|
||||
* but invisible to user
|
||||
*/
|
||||
//if the group by column does not required by user, add an invisible column into the final result set.
|
||||
if (!hasGroupColumn) {
|
||||
doAddGroupColumnForSubquery(pQueryInfo, i);
|
||||
}
|
||||
|
@ -6154,6 +6151,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
|
||||
// check if all the tags prj columns belongs to the group by columns
|
||||
if (onlyTagPrjFunction(pQueryInfo) && allTagPrjInGroupby(pQueryInfo)) {
|
||||
// It is a groupby aggregate query, the tag project function is not suitable for this case.
|
||||
updateTagPrjFunction(pQueryInfo);
|
||||
return doAddGroupbyColumnsOnDemand(pCmd, pQueryInfo);
|
||||
}
|
||||
|
|
|
@ -705,10 +705,11 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
|
|||
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||
}
|
||||
|
||||
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
|
||||
tscError("%p table schema is not matched with parsed sql", addr);
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
//TODO disable it temporarily
|
||||
// if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
|
||||
// tscError("%p table schema is not matched with parsed sql", addr);
|
||||
// return TSDB_CODE_TSC_INVALID_SQL;
|
||||
// }
|
||||
|
||||
assert(pExpr->resColId < 0);
|
||||
SSqlExpr* pSqlExpr = (SSqlExpr *)(*pMsg);
|
||||
|
@ -777,13 +778,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->tableScanOperator = htonl(*tablescanOp);
|
||||
}
|
||||
|
||||
if (query.order.order == TSDB_ORDER_ASC) {
|
||||
pQueryMsg->window.skey = htobe64(query.window.skey);
|
||||
pQueryMsg->window.ekey = htobe64(query.window.ekey);
|
||||
} else {
|
||||
pQueryMsg->window.skey = htobe64(query.window.ekey);
|
||||
pQueryMsg->window.ekey = htobe64(query.window.skey);
|
||||
}
|
||||
|
||||
pQueryMsg->order = htons(query.order.order);
|
||||
pQueryMsg->orderColId = htons(query.order.orderColId);
|
||||
|
@ -1594,9 +1590,6 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
uint64_t localQueryId = 0;
|
||||
// SMultiwayMergeInfo* pInfo = (SMultiwayMergeInfo*) pQueryInfo->pQInfo->runtimeEnv.proot->info;
|
||||
// pInfo->pMerge = pRes->pLocalMerger;
|
||||
|
||||
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
|
||||
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
|
||||
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
|
||||
|
@ -2109,9 +2102,10 @@ int tscProcessShowRsp(SSqlObj *pSql) {
|
|||
SColumnIndex index = {0};
|
||||
pSchema = pMetaMsg->schema;
|
||||
|
||||
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
|
||||
index.columnIndex = i;
|
||||
tscColumnListInsert(pQueryInfo->colList, &index, &pSchema[i]);
|
||||
tscColumnListInsert(pQueryInfo->colList, i, uid, &pSchema[i]);
|
||||
|
||||
TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
|
||||
SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
|
||||
|
|
|
@ -451,25 +451,6 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
|
|||
free(pSupporter);
|
||||
}
|
||||
|
||||
/*
|
||||
* need the secondary query process
|
||||
* In case of count(ts)/count(*)/spread(ts) query, that are only applied to
|
||||
* primary timestamp column , the secondary query is not necessary
|
||||
*
|
||||
*/
|
||||
static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
|
||||
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* base = taosArrayGet(pQueryInfo->colList, i);
|
||||
if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
|
||||
int32_t num = 0;
|
||||
int32_t* list = NULL;
|
||||
|
@ -602,6 +583,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|||
pQueryInfo->exprList = pSupporter->exprList;
|
||||
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
|
||||
pQueryInfo->groupbyExpr = pSupporter->groupInfo;
|
||||
pQueryInfo->pUpstream = taosArrayInit(4, sizeof(POINTER_BYTES));
|
||||
pQueryInfo->pDownstream = taosArrayInit(4, sizeof(POINTER_BYTES));
|
||||
|
||||
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
|
||||
|
||||
|
@ -1868,11 +1851,10 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|||
assert(pNewQueryInfo != NULL);
|
||||
|
||||
// update the table index
|
||||
size_t num = taosArrayGetSize(pNewQueryInfo->colList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
|
||||
pCol->colIndex.tableIndex = 0;
|
||||
}
|
||||
// size_t num = taosArrayGetSize(pNewQueryInfo->colList);
|
||||
// for (int32_t i = 0; i < num; ++i) {
|
||||
// SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
|
||||
// }
|
||||
|
||||
pSupporter->colList = pNewQueryInfo->colList;
|
||||
pNewQueryInfo->colList = NULL;
|
||||
|
@ -2402,9 +2384,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
|||
}
|
||||
}
|
||||
|
||||
SColumnIndex columnIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
tscInsertPrimaryTsSourceColumn(pNewQueryInfo, &columnIndex);
|
||||
|
||||
tscInsertPrimaryTsSourceColumn(pNewQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
|
||||
tscTansformFuncForSTableQuery(pNewQueryInfo);
|
||||
|
||||
tscDebug(
|
||||
|
@ -2791,7 +2771,10 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
tscFreeRetrieveSup(pSql);
|
||||
|
||||
// set the command flag must be after the semaphore been correctly set.
|
||||
if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
|
||||
}
|
||||
|
||||
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
|
||||
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
|
||||
} else {
|
||||
|
|
|
@ -598,7 +598,7 @@ static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray*
|
|||
SSchema *pSchema = pTableMeta->schema;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pTableCols, i);
|
||||
int32_t index = pCol->colIndex.columnIndex;
|
||||
int32_t index = pCol->columnIndex;
|
||||
|
||||
pColInfo[i].type = pSchema[index].type;
|
||||
pColInfo[i].bytes = pSchema[index].bytes;
|
||||
|
@ -613,7 +613,7 @@ typedef struct SDummyInputInfo {
|
|||
SSqlRes *pRes; // refactor: remove it
|
||||
} SDummyInputInfo;
|
||||
|
||||
SSDataBlock* doGetDataBlock(void* param) {
|
||||
SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
|
||||
SOperatorInfo *pOperator = (SOperatorInfo*) param;
|
||||
|
||||
SDummyInputInfo *pInput = pOperator->info;
|
||||
|
@ -634,6 +634,7 @@ SSDataBlock* doGetDataBlock(void* param) {
|
|||
}
|
||||
|
||||
pInput->pRes->numOfRows = 0;
|
||||
*newgroup = false;
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
@ -1601,19 +1602,18 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex) {
|
||||
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) {
|
||||
// ignore the tbname columnIndex to be inserted into source list
|
||||
if (pColIndex->columnIndex < 0) {
|
||||
if (columnIndex < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pColumnList);
|
||||
int16_t col = pColIndex->columnIndex;
|
||||
|
||||
int32_t i = 0;
|
||||
while (i < numOfCols) {
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
if ((pCol->colIndex.columnIndex != col) || (pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
|
||||
if ((pCol->columnIndex != columnIndex) || (pCol->tableUid != uid)) {
|
||||
++i;
|
||||
continue;
|
||||
} else {
|
||||
|
@ -1640,22 +1640,20 @@ void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO refactor
|
||||
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex, SSchema* pSchema) {
|
||||
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema) {
|
||||
// ignore the tbname columnIndex to be inserted into source list
|
||||
if (pColIndex->columnIndex < 0) {
|
||||
if (columnIndex < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pColumnList);
|
||||
int16_t col = pColIndex->columnIndex;
|
||||
|
||||
int32_t i = 0;
|
||||
while (i < numOfCols) {
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
if (pCol->colIndex.columnIndex < col) {
|
||||
if (pCol->columnIndex < columnIndex) {
|
||||
i++;
|
||||
} else if (pCol->colIndex.tableIndex < pColIndex->tableIndex) {
|
||||
} else if (pCol->tableUid < uid) {
|
||||
i++;
|
||||
} else {
|
||||
break;
|
||||
|
@ -1668,7 +1666,8 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex, SSche
|
|||
return NULL;
|
||||
}
|
||||
|
||||
b->colIndex = *pColIndex;
|
||||
b->columnIndex = columnIndex;
|
||||
b->tableUid = uid;
|
||||
b->info.colId = pSchema->colId;
|
||||
b->info.bytes = pSchema->bytes;
|
||||
b->info.type = pSchema->type;
|
||||
|
@ -1677,13 +1676,14 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex, SSche
|
|||
} else {
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
|
||||
if (i < numOfCols && (pCol->colIndex.columnIndex > col || pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
|
||||
if (i < numOfCols && (pCol->columnIndex > columnIndex || pCol->tableUid != uid)) {
|
||||
SColumn* b = calloc(1, sizeof(SColumn));
|
||||
if (b == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
b->colIndex = *pColIndex;
|
||||
b->columnIndex = columnIndex;
|
||||
b->tableUid = uid;
|
||||
b->info.colId = pSchema->colId;
|
||||
b->info.bytes = pSchema->bytes;
|
||||
b->info.type = pSchema->type;
|
||||
|
@ -1713,7 +1713,8 @@ SColumn* tscColumnClone(const SColumn* src) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
dst->colIndex = src->colIndex;
|
||||
dst->columnIndex = src->columnIndex;
|
||||
dst->tableUid = src->tableUid;
|
||||
dst->info.numOfFilters = src->info.numOfFilters;
|
||||
dst->info.filterInfo = tFilterInfoDup(src->info.filterInfo, src->info.numOfFilters);
|
||||
dst->info.type = src->info.type;
|
||||
|
@ -1727,14 +1728,14 @@ static void tscColumnDestroy(SColumn* pCol) {
|
|||
free(pCol);
|
||||
}
|
||||
|
||||
void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) {
|
||||
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid) {
|
||||
assert(src != NULL && dst != NULL);
|
||||
|
||||
size_t num = taosArrayGetSize(src);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(src, i);
|
||||
|
||||
if (pCol->colIndex.tableIndex == tableIndex || tableIndex < 0) {
|
||||
if (pCol->tableUid == tableUid) {
|
||||
SColumn* p = tscColumnClone(pCol);
|
||||
taosArrayPush(dst, &p);
|
||||
}
|
||||
|
@ -2223,6 +2224,9 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
|
|||
|
||||
taosArrayDestroy(pQueryInfo->pUpstream);
|
||||
taosArrayDestroy(pQueryInfo->pDownstream);
|
||||
|
||||
pQueryInfo->pUpstream = NULL;
|
||||
pQueryInfo->pDownstream = NULL;
|
||||
}
|
||||
|
||||
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
|
||||
|
@ -3339,13 +3343,24 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
|
|||
SExprInfo* pExpr = &pQueryAttr->pExpr1[i];
|
||||
SSqlExpr* pse = &pQueryAttr->pExpr3[i].base;
|
||||
|
||||
*pse = pExpr->base;
|
||||
memcpy(pse->aliasName, pExpr->base.aliasName, tListLen(pse->aliasName));
|
||||
|
||||
pse->uid = pExpr->base.uid;
|
||||
pse->functionId = pExpr->base.functionId;
|
||||
pse->resType = pExpr->base.resType;
|
||||
pse->resBytes = pExpr->base.resBytes;
|
||||
pse->interBytes = pExpr->base.interBytes;
|
||||
pse->resColId = pExpr->base.resColId;
|
||||
pse->offset = pExpr->base.offset;
|
||||
pse->numOfParams = pExpr->base.numOfParams;
|
||||
|
||||
pse->colInfo = pExpr->base.colInfo;
|
||||
pse->colInfo.colId = pExpr->base.resColId;
|
||||
pse->colInfo.colIndex = i;
|
||||
|
||||
pse->colType = pExpr->base.resType;
|
||||
pse->colBytes = pExpr->base.resBytes;
|
||||
pse->colInfo.flag = TSDB_COL_NORMAL;
|
||||
pse->colInfo.flag = pExpr->base.colInfo.flag;
|
||||
|
||||
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) {
|
||||
tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
|
||||
|
@ -3404,9 +3419,9 @@ static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
|
|||
SSchema* pSchema = tscGetTableTagSchema(pTableMeta);
|
||||
for (int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
|
||||
SSchema* pColSchema = &pSchema[pCol->colIndex.columnIndex];
|
||||
SSchema* pColSchema = &pSchema[pCol->columnIndex];
|
||||
|
||||
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < TSDB_TBNAME_COLUMN_INDEX) ||
|
||||
if ((pCol->columnIndex >= numOfTagColumns || pCol->columnIndex < TSDB_TBNAME_COLUMN_INDEX) ||
|
||||
(!isValidDataType(pColSchema->type))) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
@ -3447,7 +3462,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->order = pQueryInfo->order;
|
||||
pQueryAttr->fillType = pQueryInfo->fillType;
|
||||
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
|
||||
|
||||
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
|
||||
pQueryAttr->window = pQueryInfo->window;
|
||||
} else {
|
||||
pQueryAttr->window.skey = pQueryInfo->window.ekey;
|
||||
pQueryAttr->window.ekey = pQueryInfo->window.skey;
|
||||
}
|
||||
|
||||
memcpy(&pQueryAttr->interval, &pQueryInfo->interval, sizeof(pQueryAttr->interval));
|
||||
|
||||
|
|
|
@ -233,7 +233,7 @@ typedef struct SQueryAttr {
|
|||
int32_t vgId;
|
||||
} SQueryAttr;
|
||||
|
||||
typedef SSDataBlock* (*__operator_fn_t)(void* param);
|
||||
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
|
||||
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
|
||||
|
||||
struct SOperatorInfo;
|
||||
|
@ -411,6 +411,8 @@ typedef struct SArithOperatorInfo {
|
|||
SOptrBasicInfo binfo;
|
||||
int32_t bufCapacity;
|
||||
uint32_t seed;
|
||||
|
||||
SSDataBlock *existDataBlock;
|
||||
} SArithOperatorInfo;
|
||||
|
||||
typedef struct SLimitOperatorInfo {
|
||||
|
@ -438,6 +440,8 @@ typedef struct SFillOperatorInfo {
|
|||
SFillInfo *pFillInfo;
|
||||
SSDataBlock *pRes;
|
||||
int64_t totalInputRows;
|
||||
|
||||
SSDataBlock *existNewGroupBlock;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SGroupbyOperatorInfo {
|
||||
|
@ -494,9 +498,9 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx
|
|||
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
|
||||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
|
||||
|
||||
SSDataBlock* doGlobalAggregate(void* param);
|
||||
SSDataBlock* doMultiwaySort(void* param);
|
||||
SSDataBlock* doSLimit(void* param);
|
||||
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
|
||||
SSDataBlock* doMultiwaySort(void* param, bool* newgroup);
|
||||
SSDataBlock* doSLimit(void* param, bool* newgroup);
|
||||
|
||||
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
|
||||
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||
|
|
|
@ -238,7 +238,7 @@ int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1
|
|||
char *data2);
|
||||
|
||||
struct SSDataBlock;
|
||||
int32_t compare_aRv(struct SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order);
|
||||
int32_t compare_aRv(struct SSDataBlock* pBlock, SArray* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order);
|
||||
|
||||
int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes);
|
||||
|
||||
|
|
|
@ -3037,6 +3037,10 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
|
|||
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
|
||||
SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pOutput->pHisto->numOfElems > 1000) {
|
||||
printf("%d\n", pOutput->pHisto->numOfElems);
|
||||
}
|
||||
|
||||
if (pCtx->currentStage == MERGE_STAGE) {
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
|
||||
assert(pOutput->pHisto->numOfElems > 0);
|
||||
|
@ -3349,9 +3353,15 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
* @param pCtx
|
||||
* @return
|
||||
*/
|
||||
static void copy_function(SQLFunctionCtx *pCtx);
|
||||
|
||||
static void tag_function(SQLFunctionCtx *pCtx) {
|
||||
SET_VAL(pCtx, 1, 1);
|
||||
if (pCtx->currentStage == MERGE_STAGE) {
|
||||
copy_function(pCtx);
|
||||
} else {
|
||||
tVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true);
|
||||
}
|
||||
}
|
||||
|
||||
static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||
|
@ -5102,7 +5112,7 @@ SAggFunctionInfo aAggs[] = {{
|
|||
},
|
||||
{
|
||||
// 17
|
||||
"ts",
|
||||
"ts_dummy",
|
||||
TSDB_FUNC_TS_DUMMY,
|
||||
TSDB_FUNC_TS_DUMMY,
|
||||
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
|
||||
|
@ -5115,7 +5125,7 @@ SAggFunctionInfo aAggs[] = {{
|
|||
},
|
||||
{
|
||||
// 18
|
||||
"tag",
|
||||
"tag_dummy",
|
||||
TSDB_FUNC_TAG_DUMMY,
|
||||
TSDB_FUNC_TAG_DUMMY,
|
||||
TSDB_BASE_FUNC_SO,
|
||||
|
|
|
@ -193,7 +193,8 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
|
|||
idata.info.bytes = pExpr[i].base.resBytes;
|
||||
idata.info.colId = pExpr[i].base.resColId;
|
||||
|
||||
idata.pData = calloc(1, MAX(idata.info.bytes * numOfRows, minSize)); // at least to hold a pointer on x64 platform
|
||||
int32_t size = MAX(idata.info.bytes * numOfRows, minSize);
|
||||
idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform
|
||||
taosArrayPush(res->pDataBlock, &idata);
|
||||
}
|
||||
|
||||
|
@ -907,7 +908,8 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
|
|||
setArithParams((SArithmeticSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
|
||||
} else {
|
||||
SColIndex* pCol = &pOperator->pExpr[i].base.colInfo;
|
||||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || pCol->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || (pCol->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) ||
|
||||
(TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
|
||||
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
|
||||
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
|
||||
|
||||
|
@ -920,6 +922,16 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
|
|||
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
pCtx[i].ptsList = (int64_t*) tsInfo->pData;
|
||||
}
|
||||
} else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
|
||||
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
|
||||
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
|
||||
|
||||
pCtx[i].pInput = p->pData;
|
||||
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);
|
||||
for(int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||
char* dst = p->pData + j * p->info.bytes;
|
||||
tVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1701,7 +1713,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
break;
|
||||
}
|
||||
|
||||
case OP_Arithmetic: {
|
||||
case OP_Arithmetic: { // TODO refactor to remove arith operator.
|
||||
SOperatorInfo* prev = pRuntimeEnv->pTableScanner;
|
||||
if (i == 0) {
|
||||
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
|
@ -1710,6 +1722,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
} else {
|
||||
prev = pRuntimeEnv->proot;
|
||||
assert(pQueryAttr->pExpr2 != NULL);
|
||||
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
|
||||
}
|
||||
|
||||
|
@ -2939,10 +2952,11 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
|
|||
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows) {
|
||||
SSDataBlock* pDataBlock = pBInfo->pRes;
|
||||
|
||||
int32_t newSize = pDataBlock->info.rows + numOfInputRows;
|
||||
int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer
|
||||
if ((*bufCapacity) < newSize) {
|
||||
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
|
||||
char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes);
|
||||
if (p != NULL) {
|
||||
pColInfo->pData = p;
|
||||
|
@ -3431,6 +3445,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
|
|||
int32_t orderType = (pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
|
||||
doCopyToSDataBlock(pRuntimeEnv, pGroupResInfo, orderType, pBlock);
|
||||
|
||||
// refactor : extract method
|
||||
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
|
||||
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
|
@ -3861,8 +3876,9 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
|
|||
pFillCol[i].col.bytes = pExprInfo->base.resBytes;
|
||||
pFillCol[i].col.type = (int8_t)pExprInfo->base.resType;
|
||||
pFillCol[i].col.offset = offset;
|
||||
pFillCol[i].col.colId = pExprInfo->base.resColId;
|
||||
pFillCol[i].tagIndex = -2;
|
||||
pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query
|
||||
pFillCol[i].flag = pExprInfo->base.colInfo.flag; // always be the normal column for table query
|
||||
pFillCol[i].functionId = pExprInfo->base.functionId;
|
||||
pFillCol[i].fillVal.i = fillVal[i];
|
||||
|
||||
|
@ -4011,7 +4027,7 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) {
|
|||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableScanImpl(void* param) {
|
||||
static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
|
@ -4020,6 +4036,8 @@ static SSDataBlock* doTableScanImpl(void* param) {
|
|||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
STableGroupInfo *pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo;
|
||||
|
||||
*newgroup = false;
|
||||
|
||||
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
|
||||
if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) {
|
||||
longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
|
@ -4058,17 +4076,18 @@ static SSDataBlock* doTableScanImpl(void* param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableScan(void* param) {
|
||||
static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
|
||||
STableScanInfo *pTableScanInfo = pOperator->info;
|
||||
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
|
||||
*newgroup = false;
|
||||
|
||||
while (pTableScanInfo->current < pTableScanInfo->times) {
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
|
||||
if (p != NULL) {
|
||||
return p;
|
||||
}
|
||||
|
@ -4102,6 +4121,7 @@ static SSDataBlock* doTableScan(void* param) {
|
|||
GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey);
|
||||
}
|
||||
|
||||
SSDataBlock *p = NULL;
|
||||
if (pTableScanInfo->reverseTimes > 0) {
|
||||
setupEnvForReverseScan(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
|
||||
|
||||
|
@ -4123,22 +4143,20 @@ static SSDataBlock* doTableScan(void* param) {
|
|||
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey;
|
||||
}
|
||||
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
if (p != NULL) {
|
||||
return p;
|
||||
}
|
||||
p = doTableScanImpl(pOperator, newgroup);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
return p;
|
||||
}
|
||||
|
||||
static SSDataBlock* doBlockInfoScan(void* param) {
|
||||
static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
|
||||
SOperatorInfo *pOperator = (SOperatorInfo*)param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableScanInfo *pTableScanInfo = pOperator->info;
|
||||
*newgroup = false;
|
||||
|
||||
STableBlockDist tableBlockDist = {0};
|
||||
tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
|
||||
|
@ -4357,12 +4375,23 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) {
|
|||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColIndex* index = taosArrayGet(pOrderColumns, i);
|
||||
|
||||
bool found = false;
|
||||
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||
if (index->colId == pQuery->pExpr1[j].base.colInfo.colId) {
|
||||
SSqlExpr* pExpr = &pQuery->pExpr1[j].base;
|
||||
|
||||
// TSDB_FUNC_TAG_DUMMY function needs to be ignored
|
||||
if (index->colId == pExpr->colInfo.colId &&
|
||||
((TSDB_COL_IS_TAG(pExpr->colInfo.flag) && pExpr->functionId == TSDB_FUNC_TAG) ||
|
||||
(TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && pExpr->functionId == TSDB_FUNC_PRJ))) {
|
||||
index->colIndex = j;
|
||||
index->colId = pQuery->pExpr1[j].base.resColId;
|
||||
index->colId = pExpr->resColId;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert(found && index->colIndex >= 0 && index->colIndex < pQuery->numOfOutput);
|
||||
}
|
||||
|
||||
return pOrderColumns;
|
||||
|
@ -4393,7 +4422,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
|
|||
|
||||
int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0;
|
||||
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
||||
int32_t offset = POINTER_BYTES * numOfOutput;
|
||||
int32_t offset = POINTER_BYTES * numOfCols;
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
||||
|
@ -4404,7 +4433,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
|
|||
|
||||
numOfCols = (pInfo->groupColumnList != NULL)? taosArrayGetSize(pInfo->groupColumnList):0;
|
||||
pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len));
|
||||
offset = POINTER_BYTES * numOfOutput;
|
||||
offset = POINTER_BYTES * numOfCols;
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset;
|
||||
|
@ -4453,7 +4482,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
|
|||
|
||||
int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0;
|
||||
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
||||
int32_t offset = POINTER_BYTES * numOfOutput;
|
||||
int32_t offset = POINTER_BYTES * numOfCols;
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
||||
|
@ -4481,7 +4510,7 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
|
|||
}
|
||||
|
||||
// this is a blocking operator
|
||||
static SSDataBlock* doAggregate(void* param) {
|
||||
static SSDataBlock* doAggregate(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -4498,7 +4527,7 @@ static SSDataBlock* doAggregate(void* param) {
|
|||
SOperatorInfo* upstream = pOperator->upstream;
|
||||
|
||||
while(1) {
|
||||
SSDataBlock* pBlock = upstream->exec(upstream);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -4524,7 +4553,7 @@ static SSDataBlock* doAggregate(void* param) {
|
|||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSTableAggregate(void* param) {
|
||||
static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -4551,7 +4580,7 @@ static SSDataBlock* doSTableAggregate(void* param) {
|
|||
SOperatorInfo* upstream = pOperator->upstream;
|
||||
|
||||
while(1) {
|
||||
SSDataBlock* pBlock = upstream->exec(upstream);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -4587,7 +4616,7 @@ static SSDataBlock* doSTableAggregate(void* param) {
|
|||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doArithmeticOperation(void* param) {
|
||||
static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
|
||||
SArithOperatorInfo* pArithInfo = pOperator->info;
|
||||
|
@ -4599,13 +4628,55 @@ static SSDataBlock* doArithmeticOperation(void* param) {
|
|||
|
||||
pRes->info.rows = 0;
|
||||
|
||||
if (pArithInfo->existDataBlock) { // TODO refactor
|
||||
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||
|
||||
SSDataBlock* pBlock = pArithInfo->existDataBlock;
|
||||
pArithInfo->existDataBlock = NULL;
|
||||
*newgroup = true;
|
||||
|
||||
// todo dynamic set tags
|
||||
if (pTableQueryInfo != NULL) {
|
||||
setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||
}
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
|
||||
updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows);
|
||||
|
||||
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
|
||||
if (pTableQueryInfo != NULL) { // TODO refactor
|
||||
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
|
||||
}
|
||||
|
||||
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
|
||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||
return pRes;
|
||||
}
|
||||
}
|
||||
|
||||
while(1) {
|
||||
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
|
||||
bool prevVal = *newgroup;
|
||||
|
||||
// The upstream exec may change the value of the newgroup, so use a local variable instead.
|
||||
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
assert(*newgroup == false);
|
||||
|
||||
*newgroup = prevVal;
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
break;
|
||||
}
|
||||
|
||||
// Return result of the previous group in the firstly.
|
||||
if (newgroup && pRes->info.rows > 0) {
|
||||
pArithInfo->existDataBlock = pBlock;
|
||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||
|
||||
// todo dynamic set tags
|
||||
|
@ -4633,7 +4704,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
|
|||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doLimit(void* param) {
|
||||
static SSDataBlock* doLimit(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -4644,7 +4715,7 @@ static SSDataBlock* doLimit(void* param) {
|
|||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while (1) {
|
||||
pBlock = pOperator->upstream->exec(pOperator->upstream);
|
||||
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
|
@ -4684,7 +4755,7 @@ static SSDataBlock* doLimit(void* param) {
|
|||
return pBlock;
|
||||
}
|
||||
|
||||
static SSDataBlock* doIntervalAgg(void* param) {
|
||||
static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -4710,7 +4781,7 @@ static SSDataBlock* doIntervalAgg(void* param) {
|
|||
SOperatorInfo* upstream = pOperator->upstream;
|
||||
|
||||
while(1) {
|
||||
SSDataBlock* pBlock = upstream->exec(upstream);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -4739,7 +4810,7 @@ static SSDataBlock* doIntervalAgg(void* param) {
|
|||
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSTableIntervalAgg(void* param) {
|
||||
static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -4763,7 +4834,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
|
|||
SOperatorInfo* upstream = pOperator->upstream;
|
||||
|
||||
while(1) {
|
||||
SSDataBlock* pBlock = upstream->exec(upstream);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -4791,7 +4862,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
|
|||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSessionWindowAgg(void* param) {
|
||||
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -4818,7 +4889,7 @@ static SSDataBlock* doSessionWindowAgg(void* param) {
|
|||
SOperatorInfo* upstream = pOperator->upstream;
|
||||
|
||||
while(1) {
|
||||
SSDataBlock* pBlock = upstream->exec(upstream);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -4847,7 +4918,7 @@ static SSDataBlock* doSessionWindowAgg(void* param) {
|
|||
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* hashGroupbyAggregate(void* param) {
|
||||
static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -4869,7 +4940,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param) {
|
|||
SOperatorInfo* upstream = pOperator->upstream;
|
||||
|
||||
while(1) {
|
||||
SSDataBlock* pBlock = upstream->exec(upstream);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -4904,22 +4975,50 @@ static SSDataBlock* hashGroupbyAggregate(void* param) {
|
|||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doFill(void* param) {
|
||||
static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SFillOperatorInfo *pInfo = pOperator->info;
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
*newgroup = false;
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
// handle the cached new group data block
|
||||
if (pInfo->existNewGroupBlock) {
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
*newgroup = true;
|
||||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
|
||||
while(1) {
|
||||
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
|
||||
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup);
|
||||
if (*newgroup) {
|
||||
assert(pBlock != NULL);
|
||||
}
|
||||
|
||||
if (*newgroup && pInfo->totalInputRows > 0) { // there are already processed current group data block
|
||||
pInfo->existNewGroupBlock = pBlock;
|
||||
*newgroup = false;
|
||||
|
||||
// fill the previous group data block
|
||||
// before handle a new data block, close the fill operation for previous group data block
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
|
||||
} else {
|
||||
if (pBlock == NULL) {
|
||||
if (pInfo->totalInputRows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
|
@ -4930,14 +5029,35 @@ static SSDataBlock* doFill(void* param) {
|
|||
} else {
|
||||
pInfo->totalInputRows += pBlock->info.rows;
|
||||
|
||||
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pBlock->info.window.ekey;
|
||||
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey
|
||||
: */pBlock->info.window.ekey;
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
|
||||
}
|
||||
}
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
if (pInfo->pRes->info.rows > 0) { // current group has no more result to return
|
||||
return pInfo->pRes;
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey
|
||||
:*/ pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
*newgroup = true;
|
||||
|
||||
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5219,9 +5339,10 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
|
|||
TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
|
||||
getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w);
|
||||
|
||||
pInfo->pFillInfo = taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
|
||||
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, (int8_t)pQueryAttr->precision,
|
||||
pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
|
||||
pInfo->pFillInfo =
|
||||
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
|
||||
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit,
|
||||
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -5264,7 +5385,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
|||
|
||||
int32_t numOfCols = pInfo->orderColumnList != NULL? taosArrayGetSize(pInfo->orderColumnList):0;
|
||||
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
|
||||
int32_t offset = POINTER_BYTES * numOfOutput;
|
||||
int32_t offset = POINTER_BYTES * numOfCols;
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
|
||||
|
@ -5288,7 +5409,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
|||
}
|
||||
|
||||
|
||||
static SSDataBlock* doTagScan(void* param) {
|
||||
static SSDataBlock* doTagScan(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -5300,6 +5421,7 @@ static SSDataBlock* doTagScan(void* param) {
|
|||
|
||||
STagScanInfo *pInfo = pOperator->info;
|
||||
SSDataBlock *pRes = pInfo->pRes;
|
||||
*newgroup = false;
|
||||
|
||||
int32_t count = 0;
|
||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||
|
|
|
@ -464,10 +464,13 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t compare_aRv(SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order) {
|
||||
int32_t compare_aRv(SSDataBlock* pBlock, SArray* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order) {
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t index = colIndex[i];
|
||||
SColIndex* pColIndex = taosArrayGet(colIndex, i);
|
||||
int32_t index = pColIndex->colIndex;
|
||||
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index);
|
||||
assert(pColIndex->colId == pColInfo->info.colId);
|
||||
|
||||
char* data = pColInfo->pData + rowIndex * pColInfo->info.bytes;
|
||||
if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
|
||||
for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[j];
|
||||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
|
||||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || TSDB_COL_IS_UD_COL(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -126,10 +126,10 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
|
|||
} else {
|
||||
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
|
||||
}
|
||||
} else { /* fill the default value */
|
||||
} else { // fill the default value */
|
||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -210,7 +210,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
|
|||
// assign rows to dst buffer
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -275,13 +275,16 @@ static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t r
|
|||
// there are no duplicated tags in the SFillTagColInfo list
|
||||
static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t capacity) {
|
||||
int32_t rowsize = 0;
|
||||
int32_t numOfTags = 0;
|
||||
|
||||
int32_t k = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
|
||||
pFillInfo->pData[i] = NULL;
|
||||
|
||||
if (TSDB_COL_IS_TAG(pColInfo->flag)) {
|
||||
if (TSDB_COL_IS_TAG(pColInfo->flag) || pColInfo->col.type == TSDB_DATA_TYPE_BINARY) {
|
||||
numOfTags += 1;
|
||||
|
||||
bool exists = false;
|
||||
int32_t index = -1;
|
||||
for (int32_t j = 0; j < k; ++j) {
|
||||
|
@ -310,6 +313,8 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
|
|||
rowsize += pColInfo->col.bytes;
|
||||
}
|
||||
|
||||
pFillInfo->numOfTags = numOfTags;
|
||||
|
||||
assert(k <= pFillInfo->numOfTags);
|
||||
return rowsize;
|
||||
}
|
||||
|
@ -347,12 +352,13 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
|
|||
pFillInfo->interval.slidingUnit = slidingUnit;
|
||||
|
||||
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
|
||||
if (numOfTags > 0) {
|
||||
pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo));
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
|
||||
// if (numOfTags > 0) {
|
||||
pFillInfo->pTags = calloc(numOfCols, sizeof(SFillTagColInfo));
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
pFillInfo->pTags[i].col.colId = -2; // TODO
|
||||
}
|
||||
}
|
||||
// }
|
||||
|
||||
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
|
||||
assert(pFillInfo->rowSize > 0);
|
||||
|
@ -367,6 +373,7 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
|
|||
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
|
||||
pFillInfo->start = startTimestamp;
|
||||
pFillInfo->currentKey = startTimestamp;
|
||||
pFillInfo->end = startTimestamp;
|
||||
pFillInfo->index = -1;
|
||||
pFillInfo->numOfRows = 0;
|
||||
pFillInfo->numOfCurrent = 0;
|
||||
|
@ -425,6 +432,8 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
|||
|
||||
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
|
||||
// pFillInfo->pData[i] = pColData->pData;
|
||||
if (pInput->info.rows > pFillInfo->alloc) {
|
||||
|
@ -436,6 +445,12 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
|
|||
}
|
||||
|
||||
memcpy(pFillInfo->pData[i], pColData->pData, pColData->info.bytes * pInput->info.rows);
|
||||
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||
assert (pTag->col.colId == pCol->col.colId);
|
||||
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -456,7 +471,7 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage*
|
|||
|
||||
memcpy(pFillInfo->pData[i], data, (size_t)(pCol->col.bytes * pInput->num));
|
||||
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||
assert (pTag->col.colId == pCol->col.colId);
|
||||
memcpy(pTag->tagVal, data, pCol->col.bytes); // TODO not memcpy??
|
||||
|
@ -465,7 +480,17 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage*
|
|||
}
|
||||
|
||||
bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
|
||||
return taosNumOfRemainRows(pFillInfo) > 0;
|
||||
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
||||
if (remain > 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (pFillInfo->numOfTotal > 0 && (((pFillInfo->end > pFillInfo->start) && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->end < pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo)))) {
|
||||
return getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, 4096) > 0;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
|
||||
|
|
|
@ -132,15 +132,11 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
|
|||
return plan;
|
||||
}
|
||||
|
||||
// todo: exchange operator?
|
||||
// todo:
|
||||
int32_t op = OP_MultiwaySort;
|
||||
taosArrayPush(plan, &op);
|
||||
|
||||
// arithmetic operator
|
||||
if (!pQueryAttr->simpleAgg && pQueryAttr->interval.interval == 0) {
|
||||
op = OP_Arithmetic;
|
||||
taosArrayPush(plan, &op);
|
||||
} else {
|
||||
if (pQueryAttr->simpleAgg || (pQueryAttr->interval.interval > 0 || pQueryAttr->sw.gap > 0)) {
|
||||
op = OP_GlobalAggregate;
|
||||
taosArrayPush(plan, &op);
|
||||
|
||||
|
@ -157,7 +153,8 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
|
|||
}
|
||||
|
||||
// limit/offset operator
|
||||
if (pQueryAttr->slimit.limit > 0 || pQueryAttr->slimit.offset > 0) {
|
||||
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0 ||
|
||||
pQueryAttr->slimit.limit > 0 || pQueryAttr->slimit.offset > 0) {
|
||||
op = OP_SLimit;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
|
|
@ -236,7 +236,8 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
|
|||
|
||||
qDebug("QInfo:%"PRIu64" query task is launched", pQInfo->qId);
|
||||
|
||||
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
|
||||
bool newgroup = false;
|
||||
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
|
||||
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
qDebug("QInfo:%"PRIu64" query is killed", pQInfo->qId);
|
||||
|
|
|
@ -148,6 +148,9 @@ if $rows != 8200 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000) limit 100000;
|
||||
|
||||
|
||||
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 10 offset 8190;
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
|
|
|
@ -26,7 +26,7 @@ sql drop database if exists $db -x step1
|
|||
step1:
|
||||
sql create database if not exists $db keep 36500
|
||||
sql use $db
|
||||
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12))
|
||||
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12), t3 int)
|
||||
|
||||
$i = 0
|
||||
$j = 1
|
||||
|
@ -36,7 +36,7 @@ while $i < $tbNum
|
|||
$tg2 = ' . abc
|
||||
$tg2 = $tg2 . $i
|
||||
$tg2 = $tg2 . '
|
||||
sql create table $tb using $mt tags( $i , $tg2 )
|
||||
sql create table $tb using $mt tags( $i , $tg2 , 123 )
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
|
@ -85,6 +85,7 @@ if $data00 != @70-01-01 08:01:40.000@ then
|
|||
endi
|
||||
|
||||
if $data01 != @select_tags_tb0@ then
|
||||
print expect: select_tags_tb0, actual: $data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -804,7 +805,46 @@ if $row != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print ======= selectivity + tags+ group by + tags + filter + interval + join===========
|
||||
print TODO ======= selectivity + tags+ group by + tags + filter + interval + join===========
|
||||
|
||||
print ==========================mix tag columns and group by columns======================
|
||||
sql select top(c1, 100), tbname from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by t3
|
||||
if $rows != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @70-01-01 08:01:40.094@ then
|
||||
print expect: 70-01-01 08:01:40.094, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 94 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != @select_tags_tb0@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 123 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != @70-01-01 08:01:40.095@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 95 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != @select_tags_tb0@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 123 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
print ======error sql=============================================
|
||||
|
|
Loading…
Reference in New Issue