refactor(query): do some internal refactor.
This commit is contained in:
parent
0444f0835e
commit
3db512e08f
|
@ -3738,6 +3738,32 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: sources columns are more than the destination SSDatablock columns.
|
||||||
|
static void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) {
|
||||||
|
size_t numOfSrcCols = taosArrayGetSize(pCols);
|
||||||
|
ASSERT(numOfSrcCols >= pBlock->info.numOfCols);
|
||||||
|
|
||||||
|
int32_t i = 0, j = 0;
|
||||||
|
while(i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
|
||||||
|
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||||
|
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j);
|
||||||
|
if (!pmInfo->output) {
|
||||||
|
j++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->info.colId == pmInfo->colId) {
|
||||||
|
taosArraySet(pBlock->pDataBlock, pmInfo->targetSlotId, p);
|
||||||
|
i++;
|
||||||
|
j++;
|
||||||
|
} else if (p->info.colId < pmInfo->colId) {
|
||||||
|
i++;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
||||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||||
SArray* pColList) {
|
SArray* pColList) {
|
||||||
|
@ -3755,7 +3781,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
char* pStart = pData + sizeof(int32_t) * numOfOutput;
|
char* pStart = pData + sizeof(int32_t) * numOfOutput;
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
colLen[i] = htonl(colLen[i]);
|
colLen[i] = htonl(colLen[i]);
|
||||||
ASSERT(colLen[i] > 0);
|
ASSERT(colLen[i] >= 0);
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
||||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
@ -3765,13 +3791,18 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
|
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
|
||||||
pStart += sizeof(int32_t) * numOfRows;
|
pStart += sizeof(int32_t) * numOfRows;
|
||||||
|
|
||||||
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
|
if (colLen[i] > 0) {
|
||||||
|
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
|
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
|
||||||
pStart += BitmapLen(numOfRows);
|
pStart += BitmapLen(numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
if (colLen[i] > 0) {
|
||||||
|
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
||||||
|
}
|
||||||
|
|
||||||
//TODO setting this flag to true temporarily so aggregate function on stable will
|
//TODO setting this flag to true temporarily so aggregate function on stable will
|
||||||
//examine NULL value for non-primary key column
|
//examine NULL value for non-primary key column
|
||||||
pColInfoData->hasNull = true;
|
pColInfoData->hasNull = true;
|
||||||
|
@ -3784,6 +3815,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
int32_t numOfCols = htonl(*(int32_t*)pStart);
|
int32_t numOfCols = htonl(*(int32_t*)pStart);
|
||||||
pStart += sizeof(int32_t);
|
pStart += sizeof(int32_t);
|
||||||
|
|
||||||
|
// todo refactor:extract method
|
||||||
SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
|
SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SSysTableSchema* p = (SSysTableSchema*)pStart;
|
SSysTableSchema* p = (SSysTableSchema*)pStart;
|
||||||
|
@ -3838,19 +3870,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
}
|
}
|
||||||
|
|
||||||
// data from mnode
|
// data from mnode
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
relocateColumnData(pRes, pColList, block.pDataBlock);
|
||||||
SColumnInfoData* pSrc = taosArrayGet(block.pDataBlock, i);
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfOutput; ++j) {
|
|
||||||
int16_t colIndex = *(int16_t*)taosArrayGet(pColList, j);
|
|
||||||
|
|
||||||
if (colIndex - 1 == i) {
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, j);
|
|
||||||
colDataAssign(pColInfoData, pSrc, numOfRows);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes->info.rows = numOfRows;
|
pRes->info.rows = numOfRows;
|
||||||
|
@ -6422,7 +6442,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
|
||||||
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
|
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
|
||||||
uint64_t queryId, uint64_t taskId);
|
uint64_t queryId, uint64_t taskId);
|
||||||
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||||
static SArray* extractScanColumnId(SNodeList* pNodeList);
|
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
||||||
|
|
||||||
|
@ -6493,8 +6512,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
|
||||||
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
|
|
||||||
|
|
||||||
|
int32_t numOfOutputCols = 0;
|
||||||
|
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
||||||
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
||||||
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet,
|
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet,
|
||||||
colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
||||||
|
@ -6657,28 +6677,6 @@ static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableS
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* extractScanColumnId(SNodeList* pNodeList) {
|
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(int16_t));
|
|
||||||
if (pList == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
for (int32_t j = 0; j < numOfCols; ++j) {
|
|
||||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, j);
|
|
||||||
if (pNode->slotId == i) {
|
|
||||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
|
||||||
taosArrayPush(pList, &pColNode->colId);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pList;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* extractColumnInfo(SNodeList* pNodeList) {
|
SArray* extractColumnInfo(SNodeList* pNodeList) {
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
||||||
|
@ -6814,9 +6812,9 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||||
|
|
||||||
SColMatchInfo c = {0};
|
SColMatchInfo c = {0};
|
||||||
|
c.output = true;
|
||||||
c.colId = pColNode->colId;
|
c.colId = pColNode->colId;
|
||||||
c.targetSlotId = pNode->slotId;
|
c.targetSlotId = pNode->slotId;
|
||||||
c.output = true;
|
|
||||||
taosArrayPush(pList, &c);
|
taosArrayPush(pList, &c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6824,8 +6822,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
|
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
|
SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
|
||||||
|
|
||||||
// todo: add reserve flag check
|
// todo: add reserve flag check
|
||||||
if (pNode->slotId >= numOfCols) { // it is a column reserved for the arithmetic expression calculation
|
// it is a column reserved for the arithmetic expression calculation
|
||||||
|
if (pNode->slotId >= numOfCols) {
|
||||||
(*numOfOutputCols) += 1;
|
(*numOfOutputCols) += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue