feat:sort table group if needed
This commit is contained in:
parent
c8f223a6ec
commit
26cceaf172
|
@ -50,7 +50,7 @@ typedef enum EStreamType {
|
|||
} EStreamType;
|
||||
|
||||
typedef struct {
|
||||
SArray *pGroupList;
|
||||
SArray* pGroupList;
|
||||
SArray* pTableList;
|
||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||
bool needSortTableByGroupId;
|
||||
|
|
|
@ -100,7 +100,7 @@ static void *mndThreadFp(void *param) {
|
|||
taosMsleep(100);
|
||||
if (mndGetStop(pMnode)) break;
|
||||
|
||||
if (lastTime % (tsTransPullupInterval * 10) == 1) {
|
||||
if (lastTime % (tsTtlPushInterval * 10) == 1) {
|
||||
mndTtlTimer(pMnode);
|
||||
}
|
||||
|
||||
|
|
|
@ -116,7 +116,8 @@ typedef void *tsdbReaderT;
|
|||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||
|
||||
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
|
||||
int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList);
|
||||
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableInfoGroup, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
|
||||
void *pMemRef);
|
||||
|
@ -195,7 +196,6 @@ struct SVnodeCfg {
|
|||
typedef struct {
|
||||
TSKEY lastKey;
|
||||
uint64_t uid;
|
||||
uint64_t groupId;
|
||||
} STableKeyInfo;
|
||||
|
||||
struct SMetaEntry {
|
||||
|
|
|
@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
|
|||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||
SSubmitBlkRsp* pRsp);
|
||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
void* pMemRef);
|
||||
|
|
|
@ -224,8 +224,8 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
|
|||
return rows;
|
||||
}
|
||||
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) {
|
||||
size_t tableSize = taosArrayGetSize(pTableList->pTableList);
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, SArray* pTableList) {
|
||||
size_t tableSize = taosArrayGetSize(pTableList);
|
||||
assert(tableSize >= 1);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
|
@ -236,7 +236,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
|||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
for (int32_t j = 0; j < tableSize; ++j) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList, j);
|
||||
|
||||
STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
|
||||
info.suid = pTsdbReadHandle->suid;
|
||||
|
@ -255,8 +255,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
|||
pTsdbReadHandle->idStr);
|
||||
}
|
||||
|
||||
// TODO group table according to the tag value.
|
||||
taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
||||
return pTableCheckInfo;
|
||||
}
|
||||
|
||||
|
@ -500,7 +498,17 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){
|
||||
STsdbReadHandle* pTsdbReadHandle = reader;
|
||||
if(pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo);
|
||||
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList);
|
||||
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
|
||||
return TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
}
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
|
||||
uint64_t taskId) {
|
||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
||||
if (pTsdbReadHandle == NULL) {
|
||||
|
@ -546,7 +554,7 @@ tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLis
|
|||
}
|
||||
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
|
||||
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList),
|
||||
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList),
|
||||
pTsdbReadHandle->idStr);
|
||||
|
||||
return (tsdbReaderT)pTsdbReadHandle;
|
||||
|
@ -642,7 +650,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId);
|
||||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList->pTableList, qId, taskId);
|
||||
if (pTsdbReadHandle == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2845,7 +2853,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
|
|||
break;
|
||||
}
|
||||
|
||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id, .groupId = 0};
|
||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
|
||||
taosArrayPush(list, &info);
|
||||
}
|
||||
|
||||
|
@ -3647,17 +3655,6 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
|
|||
}
|
||||
}
|
||||
|
||||
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
|
||||
if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
|
||||
return -1;
|
||||
} else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
|
||||
return 1;
|
||||
} else {
|
||||
ASSERT(false);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
|
||||
if (pColumnInfoData == NULL) {
|
||||
return NULL;
|
||||
|
|
|
@ -273,6 +273,10 @@ typedef struct STableScanInfo {
|
|||
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
int32_t curTWinIdx;
|
||||
|
||||
int32_t currentGroupId;
|
||||
uint64_t queryId;
|
||||
uint64_t taskId;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STagScanInfo {
|
||||
|
@ -336,7 +340,6 @@ typedef struct SStreamBlockScanInfo {
|
|||
int32_t numOfPseudoExpr;
|
||||
|
||||
int32_t primaryTsIndex; // primary time stamp slot id
|
||||
void* pDataReader;
|
||||
SReadHandle readHandle;
|
||||
uint64_t tableUid; // queried super table uid
|
||||
EStreamScanMode scanMode;
|
||||
|
@ -707,7 +710,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
|
||||
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId);
|
||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
|
||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
|
@ -750,8 +753,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
|
||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId);
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
@ -846,7 +849,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
|
||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||
|
||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey);
|
||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -297,6 +297,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
|
|||
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||
if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
uint64_t tableUid = pScanNode->uid;
|
||||
|
||||
|
@ -322,7 +323,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
|||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
|
||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};
|
||||
taosArrayPush(pListInfo->pTableList, &info);
|
||||
}
|
||||
taosArrayDestroy(res);
|
||||
|
@ -343,9 +344,14 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
|||
}
|
||||
}
|
||||
}else { // Create one table group.
|
||||
STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
|
||||
STableKeyInfo info = {.lastKey = 0, .uid = tableUid};
|
||||
taosArrayPush(pListInfo->pTableList, &info);
|
||||
}
|
||||
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
|
||||
if(pListInfo->pGroupList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
//put into list as default group, remove it if grouping sorting is required later
|
||||
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -3871,9 +3871,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
|
|||
return pTaskInfo;
|
||||
}
|
||||
|
||||
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
|
||||
|
||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||
|
||||
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -3989,9 +3986,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
|||
taosMemoryFree(keyBuf);
|
||||
|
||||
if(pTableListInfo->needSortTableByGroupId){
|
||||
pTableListInfo->pGroupList = taosArrayInit(groupNum, POINTER_BYTES);
|
||||
taosArrayClear(pTableListInfo->pGroupList);
|
||||
SArray *sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
|
||||
if(pTableListInfo->pGroupList == NULL || sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
if(sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
|
||||
|
@ -3999,11 +3996,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
|||
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
|
||||
if (index == -1){
|
||||
void *p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
|
||||
SArray *tGroup = taosArrayInit(8, sizeof(uint64_t));
|
||||
SArray *tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||
if(tGroup == NULL) {
|
||||
taosArrayDestroy(sortSupport);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if(taosArrayPush(tGroup, info) == NULL){
|
||||
qError("taos push info array error");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
if(p == NULL){
|
||||
if(taosArrayPush(sortSupport, groupId) != NULL){
|
||||
qError("taos push support array error");
|
||||
|
@ -4030,7 +4031,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
|||
}
|
||||
}else{
|
||||
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
|
||||
if(taosArrayPush(tGroup, &info->uid) == NULL){
|
||||
if(taosArrayPush(tGroup, info) == NULL){
|
||||
qError("taos push uid array error");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
@ -4051,35 +4052,31 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
|
||||
tsdbReaderT pDataReader =
|
||||
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
|
||||
if (pDataReader == NULL && terrno != 0) {
|
||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
if(code){
|
||||
return NULL;
|
||||
}
|
||||
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||
if (code) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||
if (code) {
|
||||
tsdbCleanupReadHandle(pDataReader);
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
|
||||
if (code) {
|
||||
tsdbCleanupReadHandle(pDataReader);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||
return pOperator;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
if(code){
|
||||
return NULL;
|
||||
}
|
||||
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||
if (code) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
|
@ -4095,33 +4092,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
.calTrigger = pTableScanNode->triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
tsdbReaderT pDataReader = NULL;
|
||||
|
||||
if (pHandle) {
|
||||
if (pHandle->vnode) {
|
||||
// for stram
|
||||
pDataReader =
|
||||
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
|
||||
} else {
|
||||
// for tq
|
||||
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
|
||||
}
|
||||
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
}
|
||||
|
||||
if (pDataReader == NULL && terrno != 0) {
|
||||
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
|
||||
// return NULL;
|
||||
} else {
|
||||
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
|
||||
}
|
||||
|
||||
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
|
||||
if (code) {
|
||||
tsdbCleanupReadHandle(pDataReader);
|
||||
return NULL;
|
||||
}
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
|
||||
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
|
||||
return pOperator;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||
|
@ -4147,7 +4122,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
return NULL;
|
||||
}
|
||||
} else { // Create one table group.
|
||||
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
|
||||
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid};
|
||||
taosArrayPush(pTableListInfo->pTableList, &info);
|
||||
}
|
||||
|
||||
|
@ -4172,7 +4147,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
cond.suid = pBlockNode->suid;
|
||||
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||
}
|
||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
|
||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, queryId, taskId);
|
||||
cleanupQueryTableDataCond(&cond);
|
||||
|
||||
return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
||||
|
@ -4390,35 +4365,6 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
return pList;
|
||||
}
|
||||
|
||||
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
|
||||
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
|
||||
code = 0;
|
||||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SQueryTableDataCond cond = {0};
|
||||
code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
|
||||
cleanupQueryTableDataCond(&cond);
|
||||
|
||||
return pReader;
|
||||
|
||||
_error:
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
|
||||
int32_t code = TDB_CODE_SUCCESS;
|
||||
char* pCurrent = NULL;
|
||||
|
@ -4575,6 +4521,13 @@ _complete:
|
|||
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
|
||||
taosArrayDestroy(pTableqinfoList->pTableList);
|
||||
taosHashCleanup(pTableqinfoList->map);
|
||||
if(pTableqinfoList->needSortTableByGroupId){
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++){
|
||||
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
|
||||
taosArrayDestroy(tmp);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pTableqinfoList->pGroupList);
|
||||
|
||||
pTableqinfoList->pTableList = NULL;
|
||||
pTableqinfoList->map = NULL;
|
||||
|
|
|
@ -419,7 +419,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
|
@ -501,7 +501,44 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
if(pInfo->currentGroupId == -1){
|
||||
pInfo->currentGroupId++;
|
||||
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
||||
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
|
||||
pInfo->dataReader = pReader;
|
||||
}
|
||||
|
||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||
if(result){
|
||||
return result;
|
||||
}
|
||||
|
||||
pInfo->currentGroupId++;
|
||||
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
||||
tsdbSetTableList(pInfo->dataReader, tableList);
|
||||
|
||||
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
||||
pInfo->curTWinIdx = 0;
|
||||
pInfo->scanTimes = 0;
|
||||
|
||||
result = doTableScanGroup(pOperator);
|
||||
if(result){
|
||||
return result;
|
||||
}
|
||||
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -526,8 +563,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
|
||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||
SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId) {
|
||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -562,10 +599,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||
pInfo->dataReader = pDataReader;
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
pInfo->pColMatchInfo = pColList;
|
||||
pInfo->curTWinIdx = 0;
|
||||
pInfo->queryId = queryId;
|
||||
pInfo->taskId = taskId;
|
||||
pInfo->currentGroupId = -1;
|
||||
|
||||
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
|
@ -1077,9 +1116,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
|||
return tableIdList;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
|
||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
|
||||
STimeWindowAggSupp* pTwSup) {
|
||||
STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) {
|
||||
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
|
@ -1119,7 +1158,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
}
|
||||
|
||||
if (pHandle) {
|
||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
|
||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||
|
@ -1153,7 +1192,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
pInfo->pRes = createResDataBlock(pDescNode);
|
||||
pInfo->pUpdateRes = createResDataBlock(pDescNode);
|
||||
pInfo->pCondition = pScanPhyNode->node.pConditions;
|
||||
pInfo->pDataReader = pDataReader;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||
pInfo->groupId = 0;
|
||||
|
@ -1918,10 +1956,7 @@ _error:
|
|||
|
||||
typedef struct STableMergeScanInfo {
|
||||
STableListInfo* tableListInfo;
|
||||
int32_t tableStartIndex;
|
||||
int32_t tableEndIndex;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
int32_t currentGroupId;
|
||||
|
||||
SArray* dataReaders; // array of tsdbReaderT*
|
||||
SReadHandle readHandle;
|
||||
|
@ -1967,12 +2002,6 @@ typedef struct STableMergeScanInfo {
|
|||
SSampleExecInfo sample; // sample execution info
|
||||
} STableMergeScanInfo;
|
||||
|
||||
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
|
||||
const STableKeyInfo* info1 = p1;
|
||||
const STableKeyInfo* info2 = p2;
|
||||
return info1->groupId - info2->groupId;
|
||||
}
|
||||
|
||||
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
|
||||
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
|
||||
|
@ -1984,55 +2013,9 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
|
|||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
|
||||
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
|
||||
if (groupKeys) {
|
||||
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
|
||||
}
|
||||
taosArrayDestroy(groupKeys);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
|
||||
uint64_t taskId) {
|
||||
SQueryTableDataCond cond = {0};
|
||||
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) {
|
||||
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
|
||||
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
|
||||
|
||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId);
|
||||
taosArrayPush(arrayReader, &pReader);
|
||||
|
||||
taosArrayDestroy(subListInfo->pTableList);
|
||||
taosMemoryFree(subListInfo);
|
||||
}
|
||||
cleanupQueryTableDataCond(&cond);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_error:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
|
||||
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
|
||||
uint64_t taskId) {
|
||||
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
|
||||
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
|
||||
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
|
||||
|
||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subListInfo, queryId, taskId);
|
||||
taosArrayPush(arrayReader, &pReader);
|
||||
|
||||
taosArrayDestroy(subListInfo->pTableList);
|
||||
taosMemoryFree(subListInfo);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2221,28 +2204,15 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
{
|
||||
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
|
||||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < tableListSize; ++i) {
|
||||
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
|
||||
if (tableKeyInfo->groupId != pInfo->groupId) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pInfo->tableEndIndex = i - 1;
|
||||
}
|
||||
SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId);
|
||||
|
||||
int32_t tableStartIdx = pInfo->tableStartIndex;
|
||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
||||
|
||||
STableListInfo* tableListInfo = pInfo->tableListInfo;
|
||||
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
|
||||
pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
|
||||
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
|
||||
taosArrayPush(pInfo->dataReaders, &pReader);
|
||||
|
||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||
// the additional one is reserved for merge result
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
||||
int32_t tableLen = taosArrayGetSize(tableList);
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (tableLen + 1);
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
||||
|
@ -2329,37 +2299,38 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->hasGroupId = true;
|
||||
|
||||
if (tableListSize == 0) {
|
||||
if (pInfo->currentGroupId == -1) {
|
||||
pInfo->currentGroupId++;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
}
|
||||
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
||||
if (pBlock != NULL) {
|
||||
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
|
||||
if(groupId) pBlock->info.groupId = *groupId;
|
||||
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
pInfo->currentGroupId++;
|
||||
stopGroupTableMergeScan(pOperator);
|
||||
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
pInfo->tableStartIndex = 0;
|
||||
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
}
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while (pInfo->tableStartIndex < tableListSize) {
|
||||
|
||||
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
||||
if (pBlock != NULL) {
|
||||
pBlock->info.groupId = pInfo->groupId;
|
||||
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
|
||||
if(groupId) pBlock->info.groupId = *groupId;
|
||||
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
return pBlock;
|
||||
} else {
|
||||
stopGroupTableMergeScan(pOperator);
|
||||
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
||||
}
|
||||
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId =
|
||||
((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
}
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
@ -2437,6 +2408,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
||||
pInfo->queryId = queryId;
|
||||
pInfo->taskId = taskId;
|
||||
pInfo->currentGroupId = -1;
|
||||
|
||||
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
||||
|
||||
|
|
Loading…
Reference in New Issue