Merge pull request #11188 from taosdata/feature/3.0_liaohj
[td-13039] support limit/offset
This commit is contained in:
commit
632f3b1cbf
|
@ -179,12 +179,14 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
|
||||||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||||
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
||||||
|
|
||||||
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||||
void blockDataCleanup(SSDataBlock* pDataBlock);
|
void blockDataCleanup(SSDataBlock* pDataBlock);
|
||||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n);
|
||||||
|
|
||||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
||||||
|
|
||||||
void blockDebugShowData(const SArray* dataBlocks);
|
void blockDebugShowData(const SArray* dataBlocks);
|
||||||
|
|
|
@ -1139,7 +1139,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||||
if (0 == numOfRows) {
|
if (0 == numOfRows) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1181,7 +1181,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
code = blockDataEnsureColumnCapacity(p, numOfRows);
|
code = colInfoDataEnsureCapacity(p, numOfRows);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1233,6 +1233,63 @@ void colDataDestroy(SColumnInfoData* pColData) {
|
||||||
taosMemoryFree(pColData->pData);
|
taosMemoryFree(pColData->pData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
||||||
|
int32_t len = BitmapLen(total);
|
||||||
|
|
||||||
|
int32_t newLen = BitmapLen(total - n);
|
||||||
|
if (n%8 == 0) {
|
||||||
|
memmove(nullBitmap, nullBitmap + n/8, newLen);
|
||||||
|
} else {
|
||||||
|
int32_t tail = n % 8;
|
||||||
|
int32_t i = 0;
|
||||||
|
|
||||||
|
uint8_t* p = (uint8_t*) nullBitmap;
|
||||||
|
while(i < len) {
|
||||||
|
uint8_t v = p[i];
|
||||||
|
|
||||||
|
p[i] = 0;
|
||||||
|
p[i] = (v << tail);
|
||||||
|
|
||||||
|
if (i < len - 1) {
|
||||||
|
uint8_t next = p[i + 1];
|
||||||
|
p[i] |= (next >> (8 - tail));
|
||||||
|
}
|
||||||
|
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n));
|
||||||
|
memset(&pColInfoData->varmeta.offset[total - n - 1], 0, n);
|
||||||
|
} else {
|
||||||
|
int32_t bytes = pColInfoData->info.bytes;
|
||||||
|
memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
|
||||||
|
doShiftBitmap(pColInfoData->nullbitmap, n, total);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) {
|
||||||
|
if (n == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock->info.rows <= n) {
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
} else {
|
||||||
|
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->info.rows -= n;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||||
int64_t tbUid = pBlock->info.uid;
|
int64_t tbUid = pBlock->info.uid;
|
||||||
int16_t numOfCols = pBlock->info.numOfCols;
|
int16_t numOfCols = pBlock->info.numOfCols;
|
||||||
|
@ -1372,6 +1429,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
void blockDebugShowData(const SArray* dataBlocks) {
|
void blockDebugShowData(const SArray* dataBlocks) {
|
||||||
char pBuf[128];
|
char pBuf[128];
|
||||||
int32_t sz = taosArrayGetSize(dataBlocks);
|
int32_t sz = taosArrayGetSize(dataBlocks);
|
||||||
|
|
|
@ -142,7 +142,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
colInfo.info.colId = pColSchema->colId;
|
colInfo.info.colId = pColSchema->colId;
|
||||||
colInfo.info.type = pColSchema->type;
|
colInfo.info.type = pColSchema->type;
|
||||||
|
|
||||||
if (blockDataEnsureColumnCapacity(&colInfo, numOfRows) < 0) {
|
if (colInfoDataEnsureCapacity(&colInfo, numOfRows) < 0) {
|
||||||
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -403,7 +403,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
||||||
SColumnInfoData colInfo = {{0}, 0};
|
SColumnInfoData colInfo = {{0}, 0};
|
||||||
colInfo.info = pCond->colList[i];
|
colInfo.info = pCond->colList[i];
|
||||||
|
|
||||||
int32_t code = blockDataEnsureColumnCapacity(&colInfo, pReadHandle->outputCapacity);
|
int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
|
@ -496,6 +496,9 @@ typedef struct SProjectOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
SSDataBlock *existDataBlock;
|
SSDataBlock *existDataBlock;
|
||||||
int32_t threshold;
|
int32_t threshold;
|
||||||
|
SLimit limit;
|
||||||
|
int64_t curOffset;
|
||||||
|
int64_t curOutput;
|
||||||
} SProjectOperatorInfo;
|
} SProjectOperatorInfo;
|
||||||
|
|
||||||
typedef struct SLimitOperatorInfo {
|
typedef struct SLimitOperatorInfo {
|
||||||
|
@ -640,13 +643,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
||||||
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
||||||
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
|
||||||
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
|
||||||
|
@ -654,6 +655,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
||||||
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||||
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
|
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
|
||||||
int32_t numOfOutput);
|
int32_t numOfOutput);
|
||||||
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||||
|
@ -673,8 +675,7 @@ SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExp
|
||||||
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||||
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo,
|
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo,
|
||||||
bool groupResultMixedUp);
|
bool groupResultMixedUp);
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
|
|
||||||
int32_t numOfOutput);
|
|
||||||
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
|
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
|
||||||
int32_t numOfOutput, void* merger, bool multigroupResult);
|
int32_t numOfOutput, void* merger, bool multigroupResult);
|
||||||
|
|
||||||
|
|
|
@ -1023,7 +1023,7 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer
|
||||||
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
pColData->info.bytes = sizeof(int64_t);
|
pColData->info.bytes = sizeof(int64_t);
|
||||||
|
|
||||||
blockDataEnsureColumnCapacity(pColData, 5);
|
colInfoDataEnsureCapacity(pColData, 5);
|
||||||
colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
|
colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
|
||||||
colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);
|
colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);
|
||||||
|
|
||||||
|
@ -6802,7 +6802,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
|
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
|
#if 0
|
||||||
if (pProjectInfo->existDataBlock) { // TODO refactor
|
if (pProjectInfo->existDataBlock) { // TODO refactor
|
||||||
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
|
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
|
||||||
pProjectInfo->existDataBlock = NULL;
|
pProjectInfo->existDataBlock = NULL;
|
||||||
|
@ -6824,6 +6824,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
@ -6863,10 +6864,27 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
|
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
|
|
||||||
|
if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) {
|
||||||
|
blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset);
|
||||||
|
pProjectInfo->curOffset = 0;
|
||||||
|
break;
|
||||||
|
} else if (pProjectInfo->curOffset >= pInfo->pRes->info.rows) {
|
||||||
|
pProjectInfo->curOffset -= pInfo->pRes->info.rows;
|
||||||
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pInfo->pRes->info.rows >= pProjectInfo->limit.limit) {
|
||||||
|
pInfo->pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
pProjectInfo->curOutput += pInfo->pRes->info.rows;
|
||||||
|
|
||||||
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
|
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
|
||||||
|
@ -7294,22 +7312,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
|
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
|
||||||
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
|
||||||
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||||
|
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
// pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
// }
|
||||||
|
|
||||||
return pBInfo->pRes;
|
return pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
int32_t order = pQueryAttr->order.order;
|
STimeWindow win = pTaskInfo->window;
|
||||||
STimeWindow win = pQueryAttr->window;
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
while (1) {
|
while (1) {
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
|
@ -7319,28 +7337,29 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order);
|
|
||||||
if (pWindowInfo->colIndex == -1) {
|
// setInputDataBlock(pOperator, pBInfo->pCtx, pDataBlock, TSDB_ORDER_ASC);
|
||||||
pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
|
// if (pWindowInfo->colIndex == -1) {
|
||||||
}
|
// pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
|
||||||
|
// }
|
||||||
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
|
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore the value
|
// restore the value
|
||||||
pQueryAttr->order.order = order;
|
// pQueryAttr->order.order = order;
|
||||||
pQueryAttr->window = win;
|
// pQueryAttr->window = win;
|
||||||
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
closeAllResultRows(&pBInfo->resultRowInfo);
|
closeAllResultRows(&pBInfo->resultRowInfo);
|
||||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||||
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
|
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
|
||||||
|
|
||||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
|
// initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||||
|
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
// pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
// }
|
||||||
|
|
||||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -7846,19 +7865,20 @@ _error:
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num,
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num,
|
||||||
SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
|
SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
|
||||||
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
|
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->limit = *pLimit;
|
||||||
|
pInfo->curOffset = pLimit->offset;
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
|
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
|
||||||
if (pInfo->binfo.pCtx == NULL) {
|
if (pInfo->binfo.pCtx == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
// initResultRowInfo(&pBInfo->resultRowInfo, 8);
|
// initResultRowInfo(&pBInfo->resultRowInfo, 8);
|
||||||
// setFunctionResultOutput(pBInfo, MAIN_SCAN);
|
// setFunctionResultOutput(pBInfo, MAIN_SCAN);
|
||||||
|
|
||||||
|
@ -7886,35 +7906,6 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
|
|
||||||
SLimitOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SLimitOperatorInfo));
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->limit = *pLimit;
|
|
||||||
pInfo->currentOffset = pLimit->offset;
|
|
||||||
|
|
||||||
pOperator->name = "LimitOperator";
|
|
||||||
// pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LIMIT;
|
|
||||||
pOperator->blockingOptr = false;
|
|
||||||
pOperator->status = OP_NOT_OPENED;
|
|
||||||
pOperator->_openFn = operatorDummyOpenFn;
|
|
||||||
pOperator->getNextFn = doLimit;
|
|
||||||
pOperator->info = pInfo;
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
|
||||||
return pOperator;
|
|
||||||
|
|
||||||
_error:
|
|
||||||
taosMemoryFreeClear(pInfo);
|
|
||||||
taosMemoryFreeClear(pOperator);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval,
|
SSDataBlock* pResBlock, SInterval* pInterval,
|
||||||
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
|
@ -7996,9 +7987,9 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
int32_t numOfOutput) {
|
|
||||||
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
|
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
|
||||||
|
|
||||||
pInfo->colIndex = -1;
|
pInfo->colIndex = -1;
|
||||||
pInfo->reptScan = false;
|
pInfo->reptScan = false;
|
||||||
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
||||||
|
@ -8009,13 +8000,14 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
|
||||||
pOperator->name = "StateWindowOperator";
|
pOperator->name = "StateWindowOperator";
|
||||||
// pOperator->operatorType = OP_StateWindow;
|
// pOperator->operatorType = OP_StateWindow;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blockingOptr = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExpr;
|
pOperator->pExpr = pExpr;
|
||||||
pOperator->numOfOutput = numOfOutput;
|
pOperator->numOfOutput = numOfCols;
|
||||||
pOperator->info = pInfo;
|
|
||||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->getNextFn = doStateWindowAgg;
|
pOperator->info = pInfo;
|
||||||
pOperator->closeFn = destroyStateWindowOperatorInfo;
|
pOperator->getNextFn = doStateWindowAgg;
|
||||||
|
pOperator->closeFn = destroyStateWindowOperatorInfo;
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -8861,9 +8853,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num);
|
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||||
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
|
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
||||||
|
|
||||||
|
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
|
||||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||||
assert(size == 1);
|
assert(size == 1);
|
||||||
|
|
|
@ -29,7 +29,7 @@ SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
|
||||||
pColumnData->info.scale = pType->scale;
|
pColumnData->info.scale = pType->scale;
|
||||||
pColumnData->info.precision = pType->precision;
|
pColumnData->info.precision = pType->precision;
|
||||||
|
|
||||||
int32_t code = blockDataEnsureColumnCapacity(pColumnData, numOfRows);
|
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(pColumnData);
|
taosMemoryFree(pColumnData);
|
||||||
|
@ -44,7 +44,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
|
||||||
in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
|
in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
|
||||||
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
|
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
|
||||||
|
|
||||||
blockDataEnsureColumnCapacity(out->columnData, 1);
|
colInfoDataEnsureCapacity(out->columnData, 1);
|
||||||
|
|
||||||
int32_t code = vectorConvertImpl(&in, out);
|
int32_t code = vectorConvertImpl(&in, out);
|
||||||
sclFreeParam(&in);
|
sclFreeParam(&in);
|
||||||
|
|
|
@ -155,7 +155,7 @@ void flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
||||||
res->info.numOfCols++;
|
res->info.numOfCols++;
|
||||||
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
|
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
|
||||||
|
|
||||||
blockDataEnsureColumnCapacity(pColumn, rowNum);
|
colInfoDataEnsureCapacity(pColumn, rowNum);
|
||||||
|
|
||||||
for (int32_t i = 0; i < rowNum; ++i) {
|
for (int32_t i = 0; i < rowNum; ++i) {
|
||||||
colDataAppend(pColumn, i, (const char *)value, false);
|
colDataAppend(pColumn, i, (const char *)value, false);
|
||||||
|
|
|
@ -99,7 +99,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s
|
||||||
SColumnInfoData idata = {0};
|
SColumnInfoData idata = {0};
|
||||||
idata.info = *colInfo;
|
idata.info = *colInfo;
|
||||||
|
|
||||||
blockDataEnsureColumnCapacity(&idata, rows);
|
colInfoDataEnsureCapacity(&idata, rows);
|
||||||
|
|
||||||
taosArrayPush(res->pDataBlock, &idata);
|
taosArrayPush(res->pDataBlock, &idata);
|
||||||
|
|
||||||
|
@ -186,7 +186,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
||||||
res->info.numOfCols++;
|
res->info.numOfCols++;
|
||||||
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
|
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
|
||||||
|
|
||||||
blockDataEnsureColumnCapacity(pColumn, rowNum);
|
colInfoDataEnsureCapacity(pColumn, rowNum);
|
||||||
|
|
||||||
for (int32_t i = 0; i < rowNum; ++i) {
|
for (int32_t i = 0; i < rowNum; ++i) {
|
||||||
colDataAppend(pColumn, i, (const char *)value, false);
|
colDataAppend(pColumn, i, (const char *)value, false);
|
||||||
|
@ -1467,7 +1467,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t
|
||||||
input->numOfRows = num;
|
input->numOfRows = num;
|
||||||
|
|
||||||
input->columnData->info = createColumnInfo(0, type, bytes);
|
input->columnData->info = createColumnInfo(0, type, bytes);
|
||||||
blockDataEnsureColumnCapacity(input->columnData, num);
|
colInfoDataEnsureCapacity(input->columnData, num);
|
||||||
|
|
||||||
if (setVal) {
|
if (setVal) {
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
|
|
Loading…
Reference in New Issue