Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_optimize
This commit is contained in:
commit
b833c3ef89
|
@ -2677,15 +2677,6 @@ typedef struct {
|
|||
int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
|
||||
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
SEpSet epSet;
|
||||
} SVgEpSet;
|
||||
|
||||
typedef struct {
|
||||
int32_t padding;
|
||||
} SRSmaExecMsg;
|
||||
|
||||
typedef struct {
|
||||
int8_t version; // for compatibility(default 0)
|
||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||
|
|
|
@ -38,7 +38,6 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
|
|||
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
|
||||
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
|
||||
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
|
||||
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups);
|
||||
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
|
||||
|
@ -841,6 +840,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
return code;
|
||||
|
@ -961,6 +961,7 @@ _OVER:
|
|||
mError("sma:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||
}
|
||||
|
||||
mndReleaseSma(pMnode, pSma);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -532,6 +532,9 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
|||
SDecoder dc = {0};
|
||||
|
||||
rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
|
||||
if (rc < 0) {
|
||||
return -1;
|
||||
}
|
||||
int64_t version = ((SUidIdxVal *)pData)[0].version;
|
||||
|
||||
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
|
||||
|
|
|
@ -129,11 +129,17 @@ typedef struct SFileBlockDumpInfo {
|
|||
bool allDumped;
|
||||
} SFileBlockDumpInfo;
|
||||
|
||||
typedef struct SUidOrderCheckInfo {
|
||||
uint64_t* tableUidList; // access table uid list in uid ascending order list
|
||||
int32_t currentIndex; // index in table uid list
|
||||
} SUidOrderCheckInfo;
|
||||
|
||||
typedef struct SReaderStatus {
|
||||
bool loadFromFile; // check file stage
|
||||
bool composedDataBlock; // the returned data block is a composed block or not
|
||||
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
||||
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks.
|
||||
SUidOrderCheckInfo uidCheckInfo; // check all table in uid order
|
||||
SFileBlockDumpInfo fBlockDumpInfo;
|
||||
SDFileSet* pCurrentFileset; // current opened file set
|
||||
SBlockData fileBlockData;
|
||||
|
@ -576,7 +582,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
|
|||
}
|
||||
|
||||
// reset the index in last block when handing a new file
|
||||
px->indexInBlockL = -1;
|
||||
px->indexInBlockL = DEFAULT_ROW_INDEX_VAL;
|
||||
tMapDataClear(&px->mapData);
|
||||
taosArrayClear(px->pBlockList);
|
||||
}
|
||||
|
@ -655,7 +661,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray*
|
|||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug(
|
||||
"load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
|
||||
"load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, block-info-size:%.2f Kb, elapsed "
|
||||
"time:%.2f ms %s",
|
||||
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk / 1000.0, el,
|
||||
pReader->idStr);
|
||||
|
||||
|
@ -1814,7 +1821,8 @@ static void setAllRowsChecked(SLastBlockReader* pLastBlockReader) {
|
|||
}
|
||||
|
||||
static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||
int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1;
|
||||
bool asc = ASCENDING_TRAVERSE(pLastBlockReader->order);
|
||||
int32_t step = (asc) ? 1 : -1;
|
||||
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1823,8 +1831,20 @@ static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
|
|||
|
||||
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
|
||||
for (int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
|
||||
if (pBlockData->aUid != NULL && pBlockData->aUid[i] != pLastBlockReader->uid) {
|
||||
continue;
|
||||
if (pBlockData->aUid != NULL) {
|
||||
if (asc) {
|
||||
if (pBlockData->aUid[i] < pLastBlockReader->uid) {
|
||||
continue;
|
||||
} else if (pBlockData->aUid[i] > pLastBlockReader->uid) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (pBlockData->aUid[i] > pLastBlockReader->uid) {
|
||||
continue;
|
||||
} else if (pBlockData->aUid[i] < pLastBlockReader->uid) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ts = pBlockData->aTSKEY[i];
|
||||
|
@ -1992,10 +2012,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
setComposedBlockFlag(pReader, true);
|
||||
int64_t et = taosGetTimestampUs();
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
|
||||
" rows:%d, elapsed time:%.2f ms %s",
|
||||
pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||
pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
|
||||
if (pResBlock->info.rows > 0) {
|
||||
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
|
||||
" rows:%d, elapsed time:%.2f ms %s",
|
||||
pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||
pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -2191,7 +2213,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
|
|||
return code;
|
||||
}
|
||||
|
||||
code = tsdbReadBlockL(pReader->pFileReader, 0, pLastBlocks);
|
||||
code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pIndexList);
|
||||
return code;
|
||||
|
@ -2266,7 +2288,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
|
|||
return code;
|
||||
}
|
||||
|
||||
code = tsdbReadLastBlock(pReader->pFileReader, 0, pBlock, &pLastBlockReader->lastBlockData);
|
||||
code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2287,22 +2309,99 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
uint64_t pu1 = *(uint64_t*)p1;
|
||||
uint64_t pu2 = *(uint64_t*)p2;
|
||||
if (pu1 == pu2) {
|
||||
return 0;
|
||||
} else {
|
||||
return (pu1 < pu2) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
|
||||
int32_t index = 0;
|
||||
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
||||
|
||||
void* p = taosHashIterate(pStatus->pTableMap, NULL);
|
||||
while (p != NULL) {
|
||||
STableBlockScanInfo* pScanInfo = p;
|
||||
pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
|
||||
p = taosHashIterate(pStatus->pTableMap, p);
|
||||
}
|
||||
|
||||
taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
|
||||
}
|
||||
|
||||
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
|
||||
if (pOrderCheckInfo->tableUidList == NULL) {
|
||||
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
||||
|
||||
pOrderCheckInfo->currentIndex = 0;
|
||||
pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
|
||||
if (pOrderCheckInfo->tableUidList == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
extractOrderedTableUidList(pOrderCheckInfo, pStatus);
|
||||
|
||||
uint64_t uid = pOrderCheckInfo->tableUidList[0];
|
||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
} else {
|
||||
if (pStatus->pTableIter == NULL) { // it is the last block of a new file
|
||||
// ASSERT(pOrderCheckInfo->currentIndex == taosHashGetSize(pStatus->pTableMap));
|
||||
|
||||
pOrderCheckInfo->currentIndex = 0;
|
||||
uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
|
||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
|
||||
// the tableMap has already updated
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
void* p =
|
||||
taosMemoryRealloc(pOrderCheckInfo->tableUidList, taosHashGetSize(pStatus->pTableMap) * sizeof(uint64_t));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pOrderCheckInfo->tableUidList = p;
|
||||
extractOrderedTableUidList(pOrderCheckInfo, pStatus);
|
||||
|
||||
uid = pOrderCheckInfo->tableUidList[0];
|
||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
|
||||
pOrderedCheckInfo->currentIndex += 1;
|
||||
if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
|
||||
pStatus->pTableIter = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
|
||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
ASSERT(pStatus->pTableIter != NULL);
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
||||
|
||||
while (1) {
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
|
||||
int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
|
||||
if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
|
||||
return code;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
// load the last data block of current table
|
||||
// todo opt perf by avoiding load last block repeatly
|
||||
STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
|
||||
int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
|
||||
code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2310,19 +2409,20 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
if (pLastBlockReader->currentBlockIndex != -1) {
|
||||
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
|
||||
int32_t index = pScanInfo->indexInBlockL;
|
||||
|
||||
if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
|
||||
bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
|
||||
if (!hasData) { // current table does not have rows in last block, try next table
|
||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
|
||||
if (!hasNexTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else { // no data in last block, try next table
|
||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
|
||||
if (!hasNexTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
continue;
|
||||
|
@ -2338,8 +2438,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
// current table is exhausted, let's try next table
|
||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
|
||||
if (!hasNexTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
@ -3354,6 +3454,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
}
|
||||
|
||||
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
|
||||
|
||||
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
||||
if (pFilesetIter->pLastBlockReader != NULL) {
|
||||
tBlockDataDestroy(&pFilesetIter->pLastBlockReader->lastBlockData, true);
|
||||
|
|
|
@ -207,6 +207,7 @@ typedef struct SExprSupp {
|
|||
|
||||
typedef struct SOperatorInfo {
|
||||
uint16_t operatorType;
|
||||
int16_t resultDataBlockId;
|
||||
bool blocking; // block operator or not
|
||||
uint8_t status; // denote if current operator is completed
|
||||
char* name; // name, for debug purpose
|
||||
|
@ -218,7 +219,6 @@ typedef struct SOperatorInfo {
|
|||
struct SOperatorInfo** pDownstream; // downstram pointer list
|
||||
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
||||
SOperatorFpSet fpSet;
|
||||
int16_t resultDataBlockId;
|
||||
} SOperatorInfo;
|
||||
|
||||
typedef enum {
|
||||
|
@ -909,21 +909,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
|
||||
|
||||
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
bool mergeResultBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
SNode* pCondition, bool mergeResultBlocks, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||
|
@ -931,22 +925,16 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
|||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
||||
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
|
||||
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId,
|
||||
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
@ -957,10 +945,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
|||
|
||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
#if 0
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
#endif
|
||||
|
||||
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, SArray* pPseudoList);
|
||||
|
||||
|
|
|
@ -600,7 +600,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||
if (pResult->info.rows > 0 && !createNewColModel) {
|
||||
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
|
||||
colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0],
|
||||
pInputData->numOfRows);
|
||||
} else {
|
||||
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
|
||||
|
@ -638,7 +638,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
|
||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||
ASSERT(pResult->info.capacity > 0);
|
||||
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
|
||||
colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
|
||||
colDataDestroy(&idata);
|
||||
|
||||
numOfRows = dest.numOfRows;
|
||||
|
@ -703,7 +703,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
|
||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||
ASSERT(pResult->info.capacity > 0);
|
||||
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
|
||||
colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
|
||||
colDataDestroy(&idata);
|
||||
|
||||
numOfRows = dest.numOfRows;
|
||||
|
@ -4024,7 +4024,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
|
||||
qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -4162,9 +4162,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
if (ops[i] == NULL) {
|
||||
taosMemoryFree(ops);
|
||||
return NULL;
|
||||
} else {
|
||||
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
|
||||
}
|
||||
|
||||
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
|
||||
}
|
||||
|
||||
SOperatorInfo* pOptr = NULL;
|
||||
|
@ -4216,37 +4216,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
|
||||
SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
||||
|
||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
|
||||
pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock,
|
||||
pTaskInfo);
|
||||
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
|
||||
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
||||
|
||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
|
||||
pIntervalPhyNode->window.mergeDataBlock, pTaskInfo);
|
||||
pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
|
||||
int32_t children = 0;
|
||||
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||
|
@ -4275,17 +4248,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
|
||||
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
|
||||
|
||||
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
|
||||
SColumn col = extractColumnFromColumnNode(pColNode);
|
||||
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
|
||||
pTaskInfo);
|
||||
pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
|
||||
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
|
||||
|
@ -4299,8 +4262,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
taosMemoryFree(ops);
|
||||
if (pOptr) pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
|
||||
if (pOptr) {
|
||||
pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
|
||||
}
|
||||
|
||||
return pOptr;
|
||||
}
|
||||
|
||||
|
|
|
@ -702,8 +702,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
|
||||
void* pGroupIter = NULL;
|
||||
pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
||||
|
||||
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
||||
while (pGroupIter != NULL) {
|
||||
SDataGroupInfo* pGroupInfo = pGroupIter;
|
||||
taosArrayPush(groupArray, pGroupInfo);
|
||||
|
|
|
@ -1900,8 +1900,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
|
||||
|
@ -2700,20 +2698,26 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId,
|
||||
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->stateCol = *pStateKeyCol;
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc);
|
||||
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
|
||||
|
||||
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
|
||||
|
||||
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
|
||||
pInfo->stateKey.type = pInfo->stateCol.type;
|
||||
pInfo->stateKey.bytes = pInfo->stateCol.bytes;
|
||||
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
|
||||
pInfo->pCondition = pCondition;
|
||||
pInfo->pCondition = pStateNode->window.node.pConditions;
|
||||
if (pInfo->stateKey.pData == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -2721,16 +2725,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
|
||||
pInfo->twAggSup = *pTwAggSup;
|
||||
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};;
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->tsSlotId = tsSlotId;
|
||||
|
@ -2738,8 +2741,6 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExpr;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
|
@ -2813,8 +2814,6 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
|||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
|
||||
|
@ -3449,8 +3448,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pOperator->operatorType = pPhyNode->type;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
|
@ -3636,8 +3633,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
|
||||
|
@ -4898,8 +4893,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
|
||||
|
@ -5115,9 +5108,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
|||
return (rows == 0) ? NULL : pRes;
|
||||
}
|
||||
|
||||
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo,
|
||||
int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
|
||||
int32_t primaryTsSlotId, SNode* pCondition, bool mergeResultBlock,
|
||||
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -5130,24 +5121,33 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pNode->window.node.pOutputDataBlockDesc);
|
||||
|
||||
SInterval interval = {.interval = pNode->interval,
|
||||
.sliding = pNode->sliding,
|
||||
.intervalUnit = pNode->intervalUnit,
|
||||
.slidingUnit = pNode->slidingUnit,
|
||||
.offset = pNode->offset,
|
||||
.precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision};
|
||||
|
||||
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
miaInfo->pCondition = pCondition;
|
||||
miaInfo->curTs = INT64_MIN;
|
||||
|
||||
iaInfo->win = pTaskInfo->window;
|
||||
iaInfo->inputOrder = TSDB_ORDER_ASC;
|
||||
iaInfo->interval = *pInterval;
|
||||
iaInfo->execModel = pTaskInfo->execModel;
|
||||
iaInfo->primaryTsIndex = primaryTsSlotId;
|
||||
iaInfo->binfo.mergeResultBlock = mergeResultBlock;
|
||||
miaInfo->pCondition = pNode->window.node.pConditions;
|
||||
miaInfo->curTs = INT64_MIN;
|
||||
iaInfo->win = pTaskInfo->window;
|
||||
iaInfo->inputOrder = TSDB_ORDER_ASC;
|
||||
iaInfo->interval = interval;
|
||||
iaInfo->execModel = pTaskInfo->execModel;
|
||||
iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
|
||||
iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t code =
|
||||
initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -5155,7 +5155,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
initBasicInfo(&iaInfo->binfo, pResBlock);
|
||||
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
|
||||
|
||||
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo);
|
||||
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
|
||||
if (iaInfo->timeWindowInterpo) {
|
||||
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
|
||||
}
|
||||
|
@ -5163,14 +5163,12 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
|
||||
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pOperator->name = "TimeMergeAlignedIntervalAggOperator";
|
||||
pOperator->name = "TimeMergeAlignedIntervalAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = miaInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = miaInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
|
||||
destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -5427,57 +5425,64 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
|||
return (rows == 0) ? NULL : pRes;
|
||||
}
|
||||
|
||||
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
bool mergeBlock, SExecTaskInfo* pTaskInfo) {
|
||||
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
|
||||
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (miaInfo == NULL || pOperator == NULL) {
|
||||
if (pMergeIntervalInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
|
||||
|
||||
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||
iaInfo->win = pTaskInfo->window;
|
||||
iaInfo->inputOrder = TSDB_ORDER_ASC;
|
||||
iaInfo->interval = *pInterval;
|
||||
iaInfo->execModel = pTaskInfo->execModel;
|
||||
iaInfo->binfo.mergeResultBlock = mergeBlock;
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
||||
|
||||
iaInfo->primaryTsIndex = primaryTsSlotId;
|
||||
pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
|
||||
|
||||
SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
|
||||
pIntervalInfo->win = pTaskInfo->window;
|
||||
pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
|
||||
pIntervalInfo->interval = interval;
|
||||
pIntervalInfo->execModel = pTaskInfo->execModel;
|
||||
pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
|
||||
pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
|
||||
SExprSupp* pExprSupp = &pOperator->exprSupp;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
int32_t code = initAggInfo(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initBasicInfo(&iaInfo->binfo, pResBlock);
|
||||
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
|
||||
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
|
||||
initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
|
||||
|
||||
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo);
|
||||
if (iaInfo->timeWindowInterpo) {
|
||||
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
|
||||
if (iaInfo->binfo.resultRowInfo.openWindow == NULL) {
|
||||
pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
|
||||
if (pIntervalInfo->timeWindowInterpo) {
|
||||
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
|
||||
if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
|
||||
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
|
||||
|
||||
pOperator->name = "TimeMergeIntervalAggOperator";
|
||||
pOperator->name = "TimeMergeIntervalAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = miaInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pMergeIntervalInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
||||
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -5490,7 +5495,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
|||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyMergeIntervalOperatorInfo(miaInfo);
|
||||
destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
|
|
@ -186,15 +186,16 @@ int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
|
|||
int32_t len1 = varDataLen(pLeft);
|
||||
int32_t len2 = varDataLen(pRight);
|
||||
|
||||
if (len1 != len2) {
|
||||
return len1 > len2 ? 1 : -1;
|
||||
} else {
|
||||
int32_t ret = strncmp(varDataVal(pLeft), varDataVal(pRight), len1);
|
||||
if (ret == 0) {
|
||||
int32_t minLen = TMIN(len1, len2);
|
||||
int32_t ret = strncmp(varDataVal(pLeft), varDataVal(pRight), minLen);
|
||||
if (ret == 0) {
|
||||
if (len1 == len2) {
|
||||
return 0;
|
||||
} else {
|
||||
return ret > 0 ? 1 : -1;
|
||||
return len1 > len2 ? 1 : -1;
|
||||
}
|
||||
} else {
|
||||
return ret > 0 ? 1 : -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -557,7 +557,7 @@ if $data10 != @{slop:0.000000, intercept:1.000000}@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != @{slop:0.000000, intercept:9.000000}@ then
|
||||
if $data90 != @{slop:0.000000, intercept:17.000000}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -338,7 +338,7 @@ class TDTestCase:
|
|||
tdSql.query(f"select * from {dbname}.jsons1 where jtag->'tag1' between 1 and 30")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.query(f"select * from {dbname}.jsons1 where jtag->'tag1' between 'femail' and 'beijing'")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(0)
|
||||
|
||||
# test with tbname/normal column
|
||||
tdSql.query(f"select * from {dbname}.jsons1 where tbname = 'jsons1_1'")
|
||||
|
|
Loading…
Reference in New Issue