Merge pull request #14932 from taosdata/feature/3_liaohj
fix(query): handle multi group last_row query.
This commit is contained in:
commit
505e14892e
|
@ -177,7 +177,6 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
|
||||||
saveOneRow(pRow, pResBlock, pr, slotIds);
|
saveOneRow(pRow, pResBlock, pr, slotIds);
|
||||||
taosArrayPush(pTableUidList, &pKeyInfo->uid);
|
taosArrayPush(pTableUidList, &pKeyInfo->uid);
|
||||||
|
|
||||||
// taosMemoryFree(pRow);
|
|
||||||
tsdbCacheRelease(lruCache, h);
|
tsdbCacheRelease(lruCache, h);
|
||||||
|
|
||||||
pr->tableIndex += 1;
|
pr->tableIndex += 1;
|
||||||
|
|
|
@ -830,9 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
uint8_t *pb = NULL, *pb1 = NULL;
|
|
||||||
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
|
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
|
||||||
pBlockData, &pb, &pb1);
|
pBlockData, NULL, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -3007,11 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
|
|
||||||
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
|
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tBlockDataClear(&pStatus->fileBlockData);
|
||||||
|
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
||||||
|
tBlockDataClear(&pStatus->fileBlockData);
|
||||||
return pReader->pResBlock->pDataBlock;
|
return pReader->pResBlock->pDataBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -316,12 +316,16 @@ typedef struct STagScanInfo {
|
||||||
|
|
||||||
typedef struct SLastrowScanInfo {
|
typedef struct SLastrowScanInfo {
|
||||||
SSDataBlock *pRes;
|
SSDataBlock *pRes;
|
||||||
SArray *pTableList;
|
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
void *pLastrowReader;
|
void *pLastrowReader;
|
||||||
SArray *pColMatchInfo;
|
SArray *pColMatchInfo;
|
||||||
int32_t *pSlotIds;
|
int32_t *pSlotIds;
|
||||||
SExprSupp pseudoExprSup;
|
SExprSupp pseudoExprSup;
|
||||||
|
int32_t retrieveType;
|
||||||
|
int32_t currentGroupIndex;
|
||||||
|
SSDataBlock *pBufferredRes;
|
||||||
|
SArray *pUidList;
|
||||||
|
int32_t indexOfBufferedRes;
|
||||||
} SLastrowScanInfo;
|
} SLastrowScanInfo;
|
||||||
|
|
||||||
typedef enum EStreamScanMode {
|
typedef enum EStreamScanMode {
|
||||||
|
@ -825,8 +829,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
|
||||||
SArray* pTableList, SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
|
@ -944,8 +947,9 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
|
||||||
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
int32_t createScanTableListInfo(SScanPhysiNode *pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
||||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
|
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
|
||||||
|
|
||||||
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
|
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
|
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
|
||||||
|
|
|
@ -30,15 +30,13 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator);
|
||||||
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput);
|
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput);
|
||||||
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
|
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
|
||||||
|
|
||||||
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList,
|
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||||
SExecTaskInfo* pTaskInfo) {
|
|
||||||
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
|
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pTableList = pTableList;
|
|
||||||
pInfo->readHandle = *readHandle;
|
pInfo->readHandle = *readHandle;
|
||||||
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
|
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
|
@ -50,8 +48,22 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo),
|
STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
|
||||||
&pInfo->pLastrowReader);
|
|
||||||
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
|
// partition by tbname
|
||||||
|
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
|
||||||
|
pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_ALL;
|
||||||
|
tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
|
||||||
|
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
|
||||||
|
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
|
||||||
|
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
|
||||||
|
} else { // by tags
|
||||||
|
pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_SINGLE;
|
||||||
|
}
|
||||||
|
|
||||||
if (pScanNode->scan.pScanPseudoCols != NULL) {
|
if (pScanNode->scan.pScanPseudoCols != NULL) {
|
||||||
SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;
|
SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;
|
||||||
|
@ -60,19 +72,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
|
||||||
pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
|
pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->name = "LastrowScanOperator";
|
pOperator->name = "LastrowScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 1024);
|
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
|
||||||
|
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
@ -90,43 +100,105 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SLastrowScanInfo* pInfo = pOperator->info;
|
SLastrowScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
|
||||||
int32_t size = taosArrayGetSize(pInfo->pTableList);
|
int32_t size = taosArrayGetSize(pTableList->pTableList);
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
// check if it is a group by tbname
|
// check if it is a group by tbname
|
||||||
if (size == taosArrayGetSize(pInfo->pTableList)) {
|
if (pInfo->retrieveType == LASTROW_RETRIEVE_TYPE_ALL) {
|
||||||
blockDataCleanup(pInfo->pRes);
|
if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
|
||||||
SArray* pUidList = taosArrayInit(1, sizeof(tb_uid_t));
|
blockDataCleanup(pInfo->pBufferredRes);
|
||||||
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pUidList);
|
taosArrayClear(pInfo->pUidList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
longjmp(pTaskInfo->env, code);
|
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check for tag values
|
||||||
|
int32_t resultRows = pInfo->pBufferredRes->info.rows;
|
||||||
|
ASSERT(resultRows == taosArrayGetSize(pInfo->pUidList));
|
||||||
|
pInfo->indexOfBufferedRes = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for tag values
|
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
|
||||||
if (pInfo->pRes->info.rows > 0 && pInfo->pseudoExprSup.numOfExprs > 0) {
|
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
|
||||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
SColMatchInfo* pMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
|
||||||
pInfo->pRes->info.uid = *(tb_uid_t*) taosArrayGet(pUidList, 0);
|
int32_t slotId = pMatchInfo->targetSlotId;
|
||||||
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo));
|
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId);
|
||||||
|
|
||||||
|
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
|
||||||
|
bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
|
||||||
|
colDataAppend(pDst, 0, p, isNull);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
||||||
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
|
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
|
||||||
|
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
|
||||||
|
pInfo->pRes->info.groupId = *groupId;
|
||||||
|
|
||||||
|
pInfo->indexOfBufferedRes += 1;
|
||||||
|
pInfo->pRes->info.rows = 1;
|
||||||
|
return pInfo->pRes;
|
||||||
|
} else {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
size_t totalGroups = taosArrayGetSize(pTableList->pGroupList);
|
||||||
|
|
||||||
|
while (pInfo->currentGroupIndex < totalGroups) {
|
||||||
|
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
|
||||||
|
|
||||||
|
tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
|
||||||
|
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
|
||||||
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
|
||||||
|
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->currentGroupIndex += 1;
|
||||||
|
|
||||||
|
// check for tag values
|
||||||
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
|
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
||||||
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
|
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
||||||
|
|
||||||
|
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
|
||||||
|
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
||||||
|
|
||||||
|
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbLastrowReaderClose(pInfo->pLastrowReader);
|
||||||
|
return pInfo->pRes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return NULL;
|
||||||
} else {
|
|
||||||
// todo fetch the result for each group
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
|
void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
|
||||||
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
|
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
|
||||||
blockDataDestroy(pInfo->pRes);
|
blockDataDestroy(pInfo->pRes);
|
||||||
tsdbLastrowReaderClose(pInfo->pLastrowReader);
|
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -514,8 +514,10 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
pInput->startRowIndex = 0;
|
pInput->startRowIndex = 0;
|
||||||
|
|
||||||
// NOTE: the last parameter is the primary timestamp column
|
// NOTE: the last parameter is the primary timestamp column
|
||||||
|
// todo: refactor this
|
||||||
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
||||||
pInput->pPTS = pInput->pData[j];
|
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
|
||||||
|
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
}
|
}
|
||||||
ASSERT(pInput->pData[j] != NULL);
|
ASSERT(pInput->pData[j] != NULL);
|
||||||
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||||
|
@ -4291,6 +4293,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||||
uint64_t groupId = calcGroupId(keyBuf, len);
|
uint64_t groupId = calcGroupId(keyBuf, len);
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
|
||||||
|
@ -4309,6 +4312,30 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
|
||||||
|
memset(pCond, 0, sizeof(SQueryTableDataCond));
|
||||||
|
|
||||||
|
pCond->order = TSDB_ORDER_ASC;
|
||||||
|
pCond->numOfCols = 1;
|
||||||
|
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
|
||||||
|
if (pCond->colList == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCond->colList->colId = 1;
|
||||||
|
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
pCond->colList->bytes = sizeof(TSKEY);
|
||||||
|
|
||||||
|
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
|
pCond->suid = uid;
|
||||||
|
pCond->type = BLOCK_LOAD_OFFSET_ORDER;
|
||||||
|
pCond->startVersion = -1;
|
||||||
|
pCond->endVersion = -1;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||||
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
|
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
|
||||||
const char* pUser) {
|
const char* pUser) {
|
||||||
|
@ -4318,7 +4345,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
|
||||||
|
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -4337,7 +4365,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
||||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
|
||||||
|
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -4366,7 +4395,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
.maxTs = INT64_MIN,
|
.maxTs = INT64_MIN,
|
||||||
};
|
};
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
|
||||||
|
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -4406,25 +4436,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
SQueryTableDataCond cond = {0};
|
||||||
|
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
|
||||||
{
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
cond.order = TSDB_ORDER_ASC;
|
return NULL;
|
||||||
cond.numOfCols = 1;
|
|
||||||
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
|
|
||||||
if (cond.colList == NULL) {
|
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
cond.colList->colId = 1;
|
|
||||||
cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
cond.colList->bytes = sizeof(TSKEY);
|
|
||||||
|
|
||||||
cond.twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
|
||||||
cond.suid = pBlockNode->suid;
|
|
||||||
cond.type = BLOCK_LOAD_OFFSET_ORDER;
|
|
||||||
cond.startVersion = -1;
|
|
||||||
cond.endVersion = -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbReader* pReader = NULL;
|
STsdbReader* pReader = NULL;
|
||||||
|
@ -4435,31 +4449,20 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
|
||||||
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
|
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
// int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
|
||||||
// if (code) {
|
queryId, taskId);
|
||||||
// pTaskInfo->code = code;
|
|
||||||
// return NULL;
|
|
||||||
// }
|
|
||||||
|
|
||||||
int32_t code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
|
code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
|
||||||
if (pScanNode->scan.tableType == TSDB_SUPER_TABLE) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
code = vnodeGetAllTableList(pHandle->vnode, pScanNode->scan.uid, pTableListInfo->pTableList);
|
pTaskInfo->code = code;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
return NULL;
|
||||||
pTaskInfo->code = terrno;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
} else { // Create one table group.
|
|
||||||
STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->scan.uid, .groupId = 0};
|
|
||||||
taosArrayPush(pTableListInfo->pTableList, &info);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return createLastrowScanOperator(pScanNode, pHandle, pTableListInfo->pTableList, pTaskInfo);
|
return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -4928,6 +4931,9 @@ static void doDestroyTableList(STableListInfo* pTableqinfoList) {
|
||||||
if (pTableqinfoList->needSortTableByGroupId) {
|
if (pTableqinfoList->needSortTableByGroupId) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
|
||||||
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
|
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
|
||||||
|
if (tmp == pTableqinfoList->pTableList) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
taosArrayDestroy(tmp);
|
taosArrayDestroy(tmp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2402,9 +2402,9 @@ typedef struct STableMergeScanInfo {
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
||||||
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
||||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
|
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
|
||||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
|
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTableListInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2414,8 +2414,8 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->needSortTableByGroupId = pTableScanNode->groupSort;
|
pTableListInfo->needSortTableByGroupId = groupSort;
|
||||||
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
|
code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2229,7 +2229,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "_cache_last_row",
|
.name = "_cache_last_row",
|
||||||
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
|
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||||
.translateFunc = translateFirstLast,
|
.translateFunc = translateFirstLast,
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
|
Loading…
Reference in New Issue