Merge branch '3.0' into feature/tq
This commit is contained in:
commit
c765b0b826
|
@ -71,7 +71,7 @@ typedef struct SDataBlockInfo {
|
|||
int64_t uid;
|
||||
int64_t blockId;
|
||||
};
|
||||
int64_t groupId; // no need to serialize
|
||||
uint64_t groupId; // no need to serialize
|
||||
} SDataBlockInfo;
|
||||
|
||||
typedef struct SSDataBlock {
|
||||
|
|
|
@ -182,6 +182,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
|
|||
int32_t pageSize);
|
||||
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
|
||||
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
|
||||
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity);
|
||||
|
||||
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
|
||||
|
||||
|
|
|
@ -30,11 +30,19 @@ extern "C" {
|
|||
#define FOREACH(node, list) \
|
||||
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext)
|
||||
|
||||
// only be use in FOREACH
|
||||
#define ERASE_NODE(list) cell = nodesListErase(list, cell);
|
||||
|
||||
#define REPLACE_NODE(newNode) cell->pNode = (SNode*)(newNode)
|
||||
|
||||
#define INSERT_LIST(target, src) nodesListInsertList((target), cell, src)
|
||||
|
||||
#define WHERE_EACH(node, list) \
|
||||
SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); \
|
||||
while (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false))
|
||||
|
||||
#define WHERE_NEXT cell = cell->pNext
|
||||
|
||||
// only be use in WHERE_EACH
|
||||
#define ERASE_NODE(list) cell = nodesListErase((list), cell)
|
||||
|
||||
#define FORBOTH(node1, list1, node2, list2) \
|
||||
for (SListCell* cell1 = (NULL != (list1) ? (list1)->pHead : NULL), *cell2 = (NULL != (list2) ? (list2)->pHead : NULL); \
|
||||
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \
|
||||
|
@ -202,7 +210,9 @@ int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode);
|
|||
int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode);
|
||||
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
|
||||
int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc);
|
||||
int32_t nodesListPushFront(SNodeList* pList, SNodeptr pNode);
|
||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
|
||||
void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc);
|
||||
SNodeptr nodesListGetNode(SNodeList* pList, int32_t index);
|
||||
void nodesDestroyList(SNodeList* pList);
|
||||
// Only clear the linked list structure, without releasing the elements inside
|
||||
|
|
|
@ -415,10 +415,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
bool isNull = colDataIsNull(pColInfoData, numOfRows, j, NULL);
|
||||
if (isNull) {
|
||||
// do nothing
|
||||
} else {
|
||||
if (pColInfoData->varmeta.offset[j] != -1) {
|
||||
char* p = colDataGetData(pColInfoData, j);
|
||||
size += varDataTLen(p);
|
||||
}
|
||||
|
@ -547,8 +544,8 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
|
||||
memcpy(pCol->varmeta.offset, pStart, metaSize);
|
||||
pStart += metaSize;
|
||||
} else {
|
||||
|
@ -581,6 +578,49 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
|
||||
pBlock->info.rows = *(int32_t*)buf;
|
||||
|
||||
int32_t numOfCols = pBlock->info.numOfCols;
|
||||
const char* pStart = buf + sizeof(uint32_t);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
size_t metaSize = capacity * sizeof(int32_t);
|
||||
memcpy(pCol->varmeta.offset, pStart, metaSize);
|
||||
pStart += metaSize;
|
||||
} else {
|
||||
memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
|
||||
pStart += BitmapLen(capacity);
|
||||
}
|
||||
|
||||
int32_t colLength = *(int32_t*)pStart;
|
||||
pStart += sizeof(int32_t);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
if (pCol->varmeta.allocLen < colLength) {
|
||||
char* tmp = taosMemoryRealloc(pCol->pData, colLength);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pCol->pData = tmp;
|
||||
pCol->varmeta.allocLen = colLength;
|
||||
}
|
||||
|
||||
pCol->varmeta.length = colLength;
|
||||
ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen);
|
||||
}
|
||||
|
||||
memcpy(pCol->pData, pStart, colLength);
|
||||
pStart += pCol->info.bytes * capacity;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
|
||||
ASSERT(pBlock != NULL);
|
||||
if (pBlock->info.rowSize == 0) {
|
||||
|
@ -627,6 +667,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
|||
return rowSize;
|
||||
}
|
||||
|
||||
int32_t getAllowedRowsForPage(const SSDataBlock* pBlock, size_t pgSize) {
|
||||
return (int32_t) ((pgSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock));
|
||||
}
|
||||
|
||||
typedef struct SSDataBlockSortHelper {
|
||||
SArray* orderInfo; // SArray<SBlockOrderInfo>
|
||||
SSDataBlock* pDataBlock;
|
||||
|
@ -1178,6 +1222,9 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
|
|||
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
||||
int32_t code = 0;
|
||||
if (numOfRows == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
|
@ -1220,7 +1267,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
|
|||
}
|
||||
|
||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
||||
return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock));
|
||||
return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock));
|
||||
}
|
||||
|
||||
void colDataDestroy(SColumnInfoData* pColData) {
|
||||
|
|
|
@ -394,7 +394,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeQueryThreads = tsNumOfCores / 2;
|
||||
tsNumOfVnodeQueryThreads = TMIN(tsNumOfVnodeQueryThreads, 1);
|
||||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeFetchThreads = tsNumOfCores / 2;
|
||||
|
@ -402,11 +402,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeWriteThreads = tsNumOfCores;
|
||||
tsNumOfVnodeWriteThreads = TMIN(tsNumOfVnodeWriteThreads, 1);
|
||||
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeSyncThreads = tsNumOfCores / 2;
|
||||
tsNumOfVnodeSyncThreads = TMIN(tsNumOfVnodeSyncThreads, 1);
|
||||
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeMergeThreads = tsNumOfCores / 8;
|
||||
|
@ -414,7 +414,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "numOfVnodeMergeThreads", tsNumOfVnodeMergeThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfQnodeQueryThreads = tsNumOfCores / 2;
|
||||
tsNumOfQnodeQueryThreads = TMIN(tsNumOfQnodeQueryThreads, 1);
|
||||
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
|
||||
|
|
|
@ -511,6 +511,12 @@ typedef struct SProjectOperatorInfo {
|
|||
SSDataBlock *existDataBlock;
|
||||
SArray *pPseudoColInfo;
|
||||
SLimit limit;
|
||||
SLimit slimit;
|
||||
|
||||
uint64_t groupId;
|
||||
int64_t curSOffset;
|
||||
int64_t curGroupOutput;
|
||||
|
||||
int64_t curOffset;
|
||||
int64_t curOutput;
|
||||
} SProjectOperatorInfo;
|
||||
|
@ -563,6 +569,29 @@ typedef struct SGroupbyOperatorInfo {
|
|||
SAggSupporter aggSup;
|
||||
} SGroupbyOperatorInfo;
|
||||
|
||||
typedef struct SDataGroupInfo {
|
||||
uint64_t groupId;
|
||||
int64_t numOfRows;
|
||||
SArray *pPageList;
|
||||
} SDataGroupInfo;
|
||||
|
||||
// The sort in partition may be needed later.
|
||||
typedef struct SPartitionOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SArray* pGroupCols;
|
||||
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
|
||||
char* keyBuf; // group by keys for hash
|
||||
int32_t groupKeyLen; // total group by column width
|
||||
SHashObj* pGroupSet; // quick locate the window object for each result
|
||||
|
||||
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
|
||||
int32_t rowCapacity; // maximum number of rows for each buffer page
|
||||
int32_t* columnOffset; // start position for each column data
|
||||
|
||||
void* pGroupIter; // group iterator
|
||||
int32_t pageIndex; // page index of current group
|
||||
} SPartitionOperatorInfo;
|
||||
|
||||
typedef struct SSessionAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SAggSupporter aggSup;
|
||||
|
@ -650,7 +679,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
|||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
||||
|
@ -667,8 +696,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
|
||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
#if 0
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||
|
|
|
@ -302,6 +302,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
|
|||
}
|
||||
|
||||
taosArrayPush(pBlock->pDataBlock, &idata);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(idata.info.type)) {
|
||||
pBlock->info.hasVarCol = true;
|
||||
}
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
|
@ -1259,6 +1263,7 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S
|
|||
static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, SArray* pPseudoList) {
|
||||
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
||||
pResult->info.groupId = pSrcBlock->info.groupId;
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||
|
@ -5422,7 +5427,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
|
|||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
assert(*newgroup == false);
|
||||
*newgroup = prevVal;
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
break;
|
||||
|
@ -5450,6 +5454,38 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
|
|||
|
||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo);
|
||||
|
||||
if (pProjectInfo->curSOffset > 0) {
|
||||
if (pProjectInfo->groupId == 0) { // it is the first group
|
||||
pProjectInfo->groupId = pBlock->info.groupId;
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
continue;
|
||||
} else if (pProjectInfo->groupId != pBlock->info.groupId) {
|
||||
pProjectInfo->curSOffset -= 1;
|
||||
|
||||
// ignore data block in current group
|
||||
if (pProjectInfo->curSOffset > 0) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
pProjectInfo->groupId = pBlock->info.groupId;
|
||||
}
|
||||
|
||||
if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
|
||||
pProjectInfo->curGroupOutput += 1;
|
||||
if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// reset the value for a new group data
|
||||
pProjectInfo->curOffset = 0;
|
||||
pProjectInfo->curOutput = 0;
|
||||
}
|
||||
|
||||
pProjectInfo->groupId = pBlock->info.groupId;
|
||||
|
||||
// todo extract method
|
||||
if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) {
|
||||
blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset);
|
||||
|
@ -6317,7 +6353,7 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols)
|
|||
}
|
||||
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num,
|
||||
SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
|
||||
SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo) {
|
||||
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -6325,7 +6361,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
|||
}
|
||||
|
||||
pInfo->limit = *pLimit;
|
||||
pInfo->slimit = *pSlimit;
|
||||
pInfo->curOffset = pLimit->offset;
|
||||
pInfo->curSOffset = pSlimit->offset;
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
int32_t numOfCols = num;
|
||||
|
@ -7052,11 +7091,13 @@ static SArray* extractScanColumnId(SNodeList* pNodeList);
|
|||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||
|
||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
|
||||
int32_t type = nodeType(pPhyNode);
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
|
@ -7064,11 +7105,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
|
||||
pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
|
||||
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
|
||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||
|
@ -7081,7 +7122,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo);
|
||||
taosArrayDestroy(tableIdList);
|
||||
return pOperator;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
|
||||
|
||||
|
@ -7097,93 +7138,76 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
}
|
||||
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) {
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
assert(size == 1);
|
||||
int32_t type = nodeType(pPhyNode);
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
ASSERT(size == 1);
|
||||
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
|
||||
int32_t num = 0;
|
||||
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
|
||||
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
|
||||
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
||||
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
|
||||
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
|
||||
int32_t num = 0;
|
||||
|
||||
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
assert(size == 1);
|
||||
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
int32_t num = 0;
|
||||
|
||||
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
if (pAggNode->pGroupKeys != NULL) {
|
||||
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
|
||||
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL);
|
||||
} else {
|
||||
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
|
||||
}
|
||||
if (pAggNode->pGroupKeys != NULL) {
|
||||
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
|
||||
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL);
|
||||
} else {
|
||||
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) {
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
assert(size == 1);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) {
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SInterval interval = {
|
||||
.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = pIntervalPhyNode->precision
|
||||
};
|
||||
|
||||
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId;
|
||||
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
assert(size == 1);
|
||||
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
SInterval interval = {
|
||||
.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = pIntervalPhyNode->precision
|
||||
};
|
||||
|
||||
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId;
|
||||
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
||||
|
||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
||||
return createSortOperatorInfo(op, pResBlock, info, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) {
|
||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||
assert(size == 1);
|
||||
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
|
||||
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
|
||||
|
||||
return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
|
||||
|
@ -7266,11 +7290,38 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||
|
||||
// todo extract method
|
||||
SColumn c = {0};
|
||||
c.slotId = pColNode->slotId;
|
||||
c.colId = pColNode->colId;
|
||||
c.type = pColNode->node.resType.type;
|
||||
c.bytes = pColNode->node.resType.bytes;
|
||||
c.colId = pColNode->colId;
|
||||
c.type = pColNode->node.resType.type;
|
||||
c.bytes = pColNode->node.resType.bytes;
|
||||
c.precision = pColNode->node.resType.precision;
|
||||
c.scale = pColNode->node.resType.scale;
|
||||
|
||||
taosArrayPush(pList, &c);
|
||||
}
|
||||
|
||||
return pList;
|
||||
}
|
||||
|
||||
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
||||
if (pList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
|
||||
|
||||
// todo extract method
|
||||
SColumn c = {0};
|
||||
c.slotId = pColNode->slotId;
|
||||
c.colId = pColNode->colId;
|
||||
c.type = pColNode->node.resType.type;
|
||||
c.bytes = pColNode->node.resType.bytes;
|
||||
c.precision = pColNode->node.resType.precision;
|
||||
c.scale = pColNode->node.resType.scale;
|
||||
|
||||
|
@ -7296,15 +7347,6 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
|||
|
||||
SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
|
||||
bi.slotId = pColNode->slotId;
|
||||
// pColNode->order;
|
||||
// SColumn c = {0};
|
||||
// c.slotId = pColNode->slotId;
|
||||
// c.colId = pColNode->colId;
|
||||
// c.type = pColNode->node.resType.type;
|
||||
// c.bytes = pColNode->node.resType.bytes;
|
||||
// c.precision = pColNode->node.resType.precision;
|
||||
// c.scale = pColNode->node.resType.scale;
|
||||
|
||||
taosArrayPush(pList, &bi);
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,11 @@
|
|||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
|
||||
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
|
||||
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
|
||||
static uint64_t calcGroupId(char* pData, int32_t len);
|
||||
|
||||
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
taosMemoryFreeClear(pInfo->keyBuf);
|
||||
|
@ -33,44 +37,43 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosArrayDestroy(pInfo->pGroupColVals);
|
||||
}
|
||||
|
||||
static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) {
|
||||
pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
|
||||
if (pInfo->pGroupColVals == NULL) {
|
||||
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
|
||||
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
|
||||
if ((*pGroupColVals) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
SColumn* pCol = taosArrayGet(pGroupColList, i);
|
||||
pInfo->groupKeyLen += pCol->bytes;
|
||||
(*keyLen) += pCol->bytes;
|
||||
|
||||
struct SGroupKeys key = {0};
|
||||
key.bytes = pCol->bytes;
|
||||
key.type = pCol->type;
|
||||
key.bytes = pCol->bytes;
|
||||
key.type = pCol->type;
|
||||
key.isNull = false;
|
||||
key.pData = taosMemoryCalloc(1, pCol->bytes);
|
||||
key.pData = taosMemoryCalloc(1, pCol->bytes);
|
||||
if (key.pData == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosArrayPush(pInfo->pGroupColVals, &key);
|
||||
taosArrayPush((*pGroupColVals), &key);
|
||||
}
|
||||
|
||||
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
|
||||
pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize);
|
||||
|
||||
if (pInfo->keyBuf == NULL) {
|
||||
(*keyBuf) = taosMemoryCalloc(1, (*keyLen) + nullFlagSize);
|
||||
if ((*keyBuf) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex,
|
||||
int32_t numOfGroupCols) {
|
||||
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
|
||||
SColumn* pCol = taosArrayGet(pGroupCols, i);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||
if (pBlock->pBlockAgg != NULL) {
|
||||
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
|
||||
|
@ -78,7 +81,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
|
|||
|
||||
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
|
||||
|
||||
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
|
||||
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
|
||||
if (pkey->isNull && isNull) {
|
||||
continue;
|
||||
}
|
||||
|
@ -106,18 +109,18 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
|
|||
return true;
|
||||
}
|
||||
|
||||
static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
||||
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
|
||||
SColumn* pCol = taosArrayGet(pGroupCols, i);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||
|
||||
if (pBlock->pBlockAgg != NULL) {
|
||||
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
|
||||
}
|
||||
|
||||
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
|
||||
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
|
||||
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
||||
pkey->isNull = true;
|
||||
} else {
|
||||
|
@ -197,13 +200,13 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
||||
if (!pInfo->isInit) {
|
||||
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
||||
pInfo->isInit = true;
|
||||
num++;
|
||||
continue;
|
||||
}
|
||||
|
||||
bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols);
|
||||
bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
||||
if (equal) {
|
||||
num++;
|
||||
continue;
|
||||
|
@ -212,7 +215,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
// The first row of a new block does not belongs to the previous existed group
|
||||
if (!equal && j == 0) {
|
||||
num++;
|
||||
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -227,7 +230,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
|
||||
// assign the group keys or user input constant values if required
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
||||
num = 1;
|
||||
}
|
||||
|
||||
|
@ -259,7 +262,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
|||
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
return pRes;
|
||||
return (pRes->info.rows == 0)? NULL:pRes;
|
||||
}
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
|
@ -309,7 +312,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
|||
}
|
||||
}
|
||||
|
||||
return pInfo->binfo.pRes;
|
||||
return (pRes->info.rows == 0)? NULL:pRes;
|
||||
}
|
||||
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo,
|
||||
|
@ -325,7 +328,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||
|
||||
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
|
||||
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -339,7 +342,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
pOperator->info = pInfo;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = hashGroupbyAggregate;
|
||||
pOperator->closeFn = destroyGroupbyOperatorInfo;
|
||||
pOperator->closeFn = destroyGroupOperatorInfo;
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
@ -351,67 +354,263 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) {
|
||||
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
|
||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
||||
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||
|
||||
SDataGroupInfo* pGInfo = NULL;
|
||||
void *pPage = getCurrentDataGroupInfo(pInfo, &pGInfo, len);
|
||||
|
||||
pGInfo->numOfRows += 1;
|
||||
if (pGInfo->groupId == 0) {
|
||||
pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
|
||||
}
|
||||
|
||||
int32_t* rows = (int32_t*) pPage;
|
||||
|
||||
size_t numOfCols = pOperator->numOfOutput;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SExprInfo* pExpr = &pOperator->pExpr[i];
|
||||
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
int32_t bytes = pColInfoData->info.bytes;
|
||||
int32_t startOffset = pInfo->columnOffset[i];
|
||||
|
||||
char* columnLen = NULL;
|
||||
int32_t contentLen = 0;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
int32_t* offset = pPage + startOffset;
|
||||
columnLen = pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity;
|
||||
char* data = (char*)(columnLen + sizeof(int32_t));
|
||||
|
||||
if (colDataIsNull_s(pColInfoData, j)) {
|
||||
offset[(*rows)] = -1;
|
||||
contentLen = 0;
|
||||
} else {
|
||||
offset[*rows] = (*columnLen);
|
||||
char* src = colDataGetData(pColInfoData, j);
|
||||
memcpy(data + (*columnLen), src, varDataTLen(src));
|
||||
contentLen = varDataTLen(src);
|
||||
}
|
||||
} else {
|
||||
char* bitmap = pPage + startOffset;
|
||||
columnLen = pPage + startOffset + BitmapLen(pInfo->rowCapacity);
|
||||
char* data = (char*) columnLen + sizeof(int32_t);
|
||||
|
||||
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
|
||||
if (isNull) {
|
||||
colDataSetNull_f(bitmap, (*rows));
|
||||
} else {
|
||||
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
|
||||
}
|
||||
contentLen = bytes;
|
||||
}
|
||||
|
||||
(*columnLen) += contentLen;
|
||||
}
|
||||
|
||||
(*rows) += 1;
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pInfo->pBuf, pPage);
|
||||
}
|
||||
}
|
||||
|
||||
void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
|
||||
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
|
||||
|
||||
void* pPage = NULL;
|
||||
if (p == NULL) { // it is a new group
|
||||
SDataGroupInfo gi = {0};
|
||||
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
|
||||
taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
|
||||
|
||||
p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
|
||||
|
||||
int32_t pageId = 0;
|
||||
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
|
||||
taosArrayPush(p->pPageList, &pageId);
|
||||
|
||||
*(int32_t *) pPage = 0;
|
||||
} else {
|
||||
int32_t* curId = taosArrayGetLast(p->pPageList);
|
||||
pPage = getBufPage(pInfo->pBuf, *curId);
|
||||
|
||||
int32_t *rows = (int32_t*) pPage;
|
||||
if (*rows >= pInfo->rowCapacity) {
|
||||
// add a new page for current group
|
||||
int32_t pageId = 0;
|
||||
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
|
||||
taosArrayPush(p->pPageList, &pageId);
|
||||
|
||||
*(int32_t*) pPage = 0;
|
||||
}
|
||||
}
|
||||
|
||||
*pGroupInfo = p;
|
||||
return pPage;
|
||||
}
|
||||
|
||||
uint64_t calcGroupId(char* pData, int32_t len) {
|
||||
T_MD5_CTX context;
|
||||
tMD5Init(&context);
|
||||
tMD5Update(&context, (uint8_t*)pData, len);
|
||||
tMD5Final(&context);
|
||||
|
||||
// NOTE: only extract the initial 8 bytes of the final MD5 digest
|
||||
uint64_t id = 0;
|
||||
memcpy(&id, context.digest, sizeof(uint64_t));
|
||||
return id;
|
||||
}
|
||||
|
||||
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||
size_t numOfCols = pBlock->info.numOfCols;
|
||||
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));
|
||||
|
||||
offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
|
||||
|
||||
for(int32_t i = 0; i < numOfCols - 1; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
int32_t bytes = pColInfoData->info.bytes;
|
||||
int32_t payloadLen = bytes * rowCapacity;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
// offset segment + content length + payload
|
||||
offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
|
||||
} else {
|
||||
// bitmap + content length + payload
|
||||
offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
|
||||
}
|
||||
}
|
||||
|
||||
return offset;
|
||||
}
|
||||
|
||||
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SDataGroupInfo* pGroupInfo = pInfo->pGroupIter;
|
||||
if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
|
||||
// try next group data
|
||||
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
|
||||
if (pInfo->pGroupIter == NULL) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pGroupInfo = pInfo->pGroupIter;
|
||||
pInfo->pageIndex = 0;
|
||||
}
|
||||
|
||||
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
|
||||
void* page = getBufPage(pInfo->pBuf, *pageId);
|
||||
|
||||
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
|
||||
|
||||
pInfo->pageIndex += 1;
|
||||
|
||||
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSortOperatorInfo* pInfo = pOperator->info;
|
||||
bool hasVarCol = pInfo->pDataBlock->info.hasVarCol;
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||
blockDataCleanup(pRes);
|
||||
return buildPartitionResult(pOperator);
|
||||
}
|
||||
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pDataBlock, pTaskInfo->id.str);
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource));
|
||||
ps->param = pOperator->pDownstream[0];
|
||||
tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, terrno);
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
doHashPartition(pOperator, pBlock);
|
||||
}
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||
blockDataEnsureCapacity(pRes, 4096);
|
||||
return buildPartitionResult(pOperator);
|
||||
}
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
taosArrayDestroy(pInfo->pGroupCols);
|
||||
taosArrayDestroy(pInfo->pGroupColVals);
|
||||
taosMemoryFree(pInfo->keyBuf);
|
||||
taosMemoryFree(pInfo->columnOffset);
|
||||
}
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
|
||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
||||
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer
|
||||
pInfo->bufPageSize = 1024;
|
||||
pInfo->numOfRowsInRes = 1024;
|
||||
pInfo->pDataBlock = pResultBlock;
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->pGroupCols = pGroupColList;
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||
if (pInfo->pGroupSet == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
|
||||
pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pOperator->name = "PartitionOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||
|
||||
pInfo->binfo.pRes = pResultBlock;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = hashPartition;
|
||||
pOperator->closeFn = destroyPartitionOperatorInfo;
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->getNextFn = doPartitionData;
|
||||
// pOperator->closeFn = destroyOrderOperatorInfo;
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
}
|
|
@ -425,9 +425,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64,
|
||||
pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed);
|
||||
|
||||
size_t pgSize = pHandle->pageSize;
|
||||
int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pHandle->pDataBlock))/ blockDataGetSerialRowSize(pHandle->pDataBlock);
|
||||
|
||||
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
|
||||
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
|
||||
|
||||
size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
|
||||
|
|
|
@ -644,7 +644,7 @@ SNodeList* nodesMakeList() {
|
|||
|
||||
int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) {
|
||||
if (NULL == pList || NULL == pNode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell));
|
||||
if (NULL == p) {
|
||||
|
@ -688,7 +688,7 @@ int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode) {
|
|||
|
||||
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
|
||||
if (NULL == pTarget || NULL == pSrc) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (NULL == pTarget->pHead) {
|
||||
|
@ -717,11 +717,34 @@ int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t nodesListPushFront(SNodeList* pList, SNodeptr pNode) {
|
||||
if (NULL == pList || NULL == pNode) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell));
|
||||
if (NULL == p) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
p->pNode = pNode;
|
||||
if (NULL != pList->pHead) {
|
||||
pList->pHead->pPrev = p;
|
||||
p->pNext = pList->pHead;
|
||||
}
|
||||
pList->pHead = p;
|
||||
++(pList->length);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
|
||||
if (NULL == pCell->pPrev) {
|
||||
pList->pHead = pCell->pNext;
|
||||
} else {
|
||||
pCell->pPrev->pNext = pCell->pNext;
|
||||
}
|
||||
if (NULL == pCell->pNext) {
|
||||
pList->pTail = pCell->pPrev;
|
||||
} else {
|
||||
pCell->pNext->pPrev = pCell->pPrev;
|
||||
}
|
||||
SListCell* pNext = pCell->pNext;
|
||||
|
@ -731,6 +754,24 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
|
|||
return pNext;
|
||||
}
|
||||
|
||||
void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) {
|
||||
if (NULL == pTarget || NULL == pPos || NULL == pSrc) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (NULL == pPos->pPrev) {
|
||||
pTarget->pHead = pSrc->pHead;
|
||||
} else {
|
||||
pPos->pPrev->pNext = pSrc->pHead;
|
||||
}
|
||||
pSrc->pHead->pPrev = pPos->pPrev;
|
||||
pSrc->pTail->pNext = pPos;
|
||||
pPos->pPrev = pSrc->pTail;
|
||||
|
||||
pTarget->length += pSrc->length;
|
||||
taosMemoryFreeClear(pSrc);
|
||||
}
|
||||
|
||||
SNodeptr nodesListGetNode(SNodeList* pList, int32_t index) {
|
||||
SNode* node;
|
||||
FOREACH(node, pList) {
|
||||
|
|
|
@ -743,24 +743,99 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool* pIsSelectStar) {
|
||||
static int32_t createAllColumns(STranslateContext* pCxt, SNodeList** pCols) {
|
||||
*pCols = nodesMakeList();
|
||||
if (NULL == *pCols) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
|
||||
size_t nums = taosArrayGetSize(pTables);
|
||||
for (size_t i = 0; i < nums; ++i) {
|
||||
STableNode* pTable = taosArrayGetP(pTables, i);
|
||||
int32_t code = createColumnNodeByTable(pCxt, pTable, *pCols);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool isFirstLastFunc(SFunctionNode* pFunc) {
|
||||
return (FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType);
|
||||
}
|
||||
|
||||
static bool isFirstLastStar(SNode* pNode) {
|
||||
if (QUERY_NODE_FUNCTION != nodeType(pNode) || !isFirstLastFunc((SFunctionNode*)pNode)) {
|
||||
return false;
|
||||
}
|
||||
SNodeList* pParameterList = ((SFunctionNode*)pNode)->pParameterList;
|
||||
if (LIST_LENGTH(pParameterList) != 1) {
|
||||
return false;
|
||||
}
|
||||
SNode* pParam = nodesListGetNode(pParameterList, 0);
|
||||
return (QUERY_NODE_COLUMN == nodeType(pParam) ? 0 == strcmp(((SColumnNode*)pParam)->colName, "*") : false);
|
||||
}
|
||||
|
||||
static SNode* createFirstLastFunc(SFunctionNode* pSrcFunc, SColumnNode* pCol) {
|
||||
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
return NULL;
|
||||
}
|
||||
pFunc->pParameterList = nodesMakeList();
|
||||
if (NULL == pFunc->pParameterList || TSDB_CODE_SUCCESS != nodesListAppend(pFunc->pParameterList, pCol)) {
|
||||
nodesDestroyNode(pFunc);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pFunc->node.resType = pCol->node.resType;
|
||||
pFunc->funcId = pSrcFunc->funcId;
|
||||
pFunc->funcType = pSrcFunc->funcType;
|
||||
strcpy(pFunc->functionName, pSrcFunc->functionName);
|
||||
snprintf(pFunc->node.aliasName, sizeof(pFunc->node.aliasName), (FUNCTION_TYPE_FIRST == pSrcFunc->funcType ? "first(%s)" : "last(%s)"), pCol->colName);
|
||||
|
||||
return (SNode*)pFunc;
|
||||
}
|
||||
|
||||
static int32_t createFirstLastAllCols(STranslateContext* pCxt, SFunctionNode* pSrcFunc, SNodeList** pOutput) {
|
||||
SNodeList* pCols = NULL;
|
||||
if (TSDB_CODE_SUCCESS != createAllColumns(pCxt, &pCols)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SNodeList* pFuncs = nodesMakeList();
|
||||
if (NULL == pFuncs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pCols) {
|
||||
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pFuncs, createFirstLastFunc(pSrcFunc, (SColumnNode*)pCol))) {
|
||||
nodesDestroyNode(pFuncs);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
*pOutput = pFuncs;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
if (NULL == pSelect->pProjectionList) { // select * ...
|
||||
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
|
||||
size_t nums = taosArrayGetSize(pTables);
|
||||
pSelect->pProjectionList = nodesMakeList();
|
||||
if (NULL == pSelect->pProjectionList) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
for (size_t i = 0; i < nums; ++i) {
|
||||
STableNode* pTable = taosArrayGetP(pTables, i);
|
||||
int32_t code = createColumnNodeByTable(pCxt, pTable, pSelect->pProjectionList);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
*pIsSelectStar = true;
|
||||
return createAllColumns(pCxt, &pSelect->pProjectionList);
|
||||
} else {
|
||||
// todo : t.*
|
||||
SNode* pNode = NULL;
|
||||
WHERE_EACH(pNode, pSelect->pProjectionList) {
|
||||
if (isFirstLastStar(pNode)) {
|
||||
SNodeList* pFuncs = NULL;
|
||||
if (TSDB_CODE_SUCCESS != createFirstLastAllCols(pCxt, (SFunctionNode*)pNode, &pFuncs)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
INSERT_LIST(pSelect->pProjectionList, pFuncs);
|
||||
ERASE_NODE(pSelect->pProjectionList);
|
||||
continue;
|
||||
}
|
||||
WHERE_NEXT;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -797,8 +872,8 @@ static int32_t getPositionValue(const SValueNode* pVal) {
|
|||
|
||||
static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pProjectionList, SNodeList* pOrderByList, bool* pOther) {
|
||||
*pOther = false;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pOrderByList) {
|
||||
SNode* pNode = NULL;
|
||||
WHERE_EACH(pNode, pOrderByList) {
|
||||
SNode* pExpr = ((SOrderByExprNode*)pNode)->pExpr;
|
||||
if (QUERY_NODE_VALUE == nodeType(pExpr)) {
|
||||
SValueNode* pVal = (SValueNode*)pExpr;
|
||||
|
@ -823,6 +898,7 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
|
|||
} else {
|
||||
*pOther = true;
|
||||
}
|
||||
WHERE_NEXT;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -845,11 +921,10 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
}
|
||||
|
||||
static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
bool isSelectStar = false;
|
||||
int32_t code = translateStar(pCxt, pSelect, &isSelectStar);
|
||||
if (TSDB_CODE_SUCCESS == code && !isSelectStar) {
|
||||
pCxt->currClause = SQL_CLAUSE_SELECT;
|
||||
code = translateExprList(pCxt, pSelect->pProjectionList);
|
||||
pCxt->currClause = SQL_CLAUSE_SELECT;
|
||||
int32_t code = translateExprList(pCxt, pSelect->pProjectionList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateStar(pCxt, pSelect);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkExprListForGroupBy(pCxt, pSelect->pProjectionList);
|
||||
|
@ -1831,10 +1906,13 @@ static int32_t getSmaIndexBuildAst(STranslateContext* pCxt, SCreateIndexStmt* pS
|
|||
pSelect->pFromTable = (SNode*)pTable;
|
||||
|
||||
pSelect->pProjectionList = nodesCloneList(pStmt->pOptions->pFuncs);
|
||||
if (NULL == pTable) {
|
||||
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pSelect->pProjectionList || NULL == pFunc) {
|
||||
nodesDestroyNode(pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pFunc->functionName, "_wstartts");
|
||||
nodesListPushFront(pSelect->pProjectionList, pFunc);
|
||||
SNode* pProject = NULL;
|
||||
FOREACH(pProject, pSelect->pProjectionList) {
|
||||
sprintf(((SExprNode*)pProject)->aliasName, "#sma_%p", pProject);
|
||||
|
|
|
@ -295,11 +295,11 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
|
|||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||
|
||||
char *in = pInputData->pData;
|
||||
char *in = pInputData->pData + pInputData->varmeta.offset[0];
|
||||
int16_t *out = (int16_t *)pOutputData->pData;
|
||||
|
||||
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||
if (colDataIsNull_f(pInputData->nullbitmap, i)) {
|
||||
if (colDataIsNull_s(pInputData, i)) {
|
||||
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||
continue;
|
||||
}
|
||||
|
@ -357,7 +357,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
|||
}
|
||||
for (int32_t i = 0; i < inputNum; ++i) {
|
||||
pInputData[i] = pInput[i].columnData;
|
||||
input[i] = pInputData[i]->pData;
|
||||
input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0];
|
||||
int32_t factor = 1;
|
||||
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
|
||||
factor = TSDB_NCHAR_SIZE;
|
||||
|
@ -438,7 +438,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
|||
}
|
||||
for (int32_t i = 0; i < inputNum; ++i) {
|
||||
pInputData[i] = pInput[i].columnData;
|
||||
input[i] = pInputData[i]->pData;
|
||||
input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0];
|
||||
int32_t factor = 1;
|
||||
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
|
||||
factor = TSDB_NCHAR_SIZE;
|
||||
|
@ -509,7 +509,7 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala
|
|||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||
|
||||
char *input = pInputData->pData;
|
||||
char *input = pInputData->pData + pInputData->varmeta.offset[0];
|
||||
char *output = NULL;
|
||||
|
||||
int32_t outputLen = pInputData->varmeta.length;
|
||||
|
@ -554,7 +554,7 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
|
|||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||
|
||||
char *input = pInputData->pData;
|
||||
char *input = pInputData->pData + pInputData->varmeta.offset[0];
|
||||
char *output = NULL;
|
||||
|
||||
int32_t outputLen = pInputData->varmeta.length;
|
||||
|
@ -606,7 +606,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
|||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||
|
||||
char *input = pInputData->pData;
|
||||
char *input = pInputData->pData + pInputData->varmeta.offset[0];
|
||||
char *output = NULL;
|
||||
|
||||
int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows;
|
||||
|
|
|
@ -205,10 +205,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
|||
transRefCliHandle(conn); \
|
||||
} \
|
||||
} while (0)
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) \
|
||||
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_RELEASE_BY_SERVER(conn) \
|
||||
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
|
||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
||||
|
@ -281,9 +281,8 @@ void cliHandleResp(SCliConn* conn) {
|
|||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||
|
||||
conn->secured = pHead->secured;
|
||||
|
||||
|
@ -349,12 +348,10 @@ void cliHandleExcept(SCliConn* pConn) {
|
|||
|
||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle,
|
||||
TMSG_INFO(transMsg.msgType));
|
||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType));
|
||||
if (transMsg.ahandle == NULL) {
|
||||
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
transMsg.ahandle);
|
||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle);
|
||||
}
|
||||
} else {
|
||||
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||
|
@ -628,9 +625,8 @@ void cliSend(SCliConn* pConn) {
|
|||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
|
||||
if (pHead->persist == 1) {
|
||||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
|
@ -741,8 +737,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
if (ret) {
|
||||
tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
||||
}
|
||||
|
||||
struct sockaddr_in addr;
|
||||
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
|
||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||
// uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||
// handle error in callback if fail to connect
|
||||
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||
|
|
|
@ -48,10 +48,8 @@ struct SDiskbasedBuf {
|
|||
};
|
||||
|
||||
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
|
||||
// pBuf->file = fopen(pBuf->path, "wb+");
|
||||
pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
|
||||
pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
||||
if (pBuf->pFile == NULL) {
|
||||
// qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue