opt: optimize group by tag
This commit is contained in:
parent
8691faa8ec
commit
fc32f71628
|
@ -27,10 +27,7 @@ typedef struct {
|
||||||
int32_t bytes;
|
int32_t bytes;
|
||||||
} SGroupKeys, SStateKeys;
|
} SGroupKeys, SStateKeys;
|
||||||
|
|
||||||
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList);
|
|
||||||
uint64_t calcGroupId(char* pData, int32_t len);
|
uint64_t calcGroupId(char* pData, int32_t len);
|
||||||
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex);
|
|
||||||
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals);
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -336,12 +336,6 @@ typedef struct STableScanInfo {
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
||||||
|
|
||||||
SArray* pGroupCols;
|
|
||||||
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
|
|
||||||
char* keyBuf; // group by keys for hash
|
|
||||||
int32_t groupKeyLen; // total group by column width
|
|
||||||
SHashObj* pGroupSet; // quick locate the window object for each result
|
|
||||||
|
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
int32_t curTWinIdx;
|
int32_t curTWinIdx;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
@ -789,7 +783,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
|
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
|
||||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
|
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
|
@ -4592,6 +4592,85 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey){
|
||||||
|
if(groupKey == NULL) {
|
||||||
|
return TDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
if (pTableListInfo->map == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
int32_t keyLen = 0;
|
||||||
|
void *keyBuf = NULL;
|
||||||
|
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
|
||||||
|
for (int32_t j = 0; j < numOfGroupCols; ++j) {
|
||||||
|
SColumn* pCol = taosArrayGet(groupKey, j);
|
||||||
|
keyLen += pCol->bytes; // actual data + null_flag
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
|
||||||
|
keyLen += nullFlagSize;
|
||||||
|
|
||||||
|
keyBuf = taosMemoryCalloc(1, keyLen);
|
||||||
|
if (keyBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++){
|
||||||
|
STableKeyInfo *info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
|
metaGetTableEntryByUid(&mr, info->uid);
|
||||||
|
|
||||||
|
char* isNull = (char*)keyBuf;
|
||||||
|
char* pStart = (char*)keyBuf + sizeof(int8_t) * numOfGroupCols;
|
||||||
|
for (int32_t j = 0; j < numOfGroupCols; ++j) {
|
||||||
|
SColumn* pCol = taosArrayGet(groupKey, j);
|
||||||
|
|
||||||
|
if(strcmp(pCol->name, "tbname") == 0){
|
||||||
|
isNull[i] = 0;
|
||||||
|
memcpy(pStart, mr.me.name, strlen(mr.me.name));
|
||||||
|
pStart += strlen(mr.me.name);
|
||||||
|
}else{
|
||||||
|
STagVal tagVal = {0};
|
||||||
|
tagVal.cid = pCol->colId;
|
||||||
|
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal);
|
||||||
|
if(p == NULL){
|
||||||
|
isNull[j] = 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
isNull[i] = 0;
|
||||||
|
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
// int32_t dataLen = getJsonValueLen(pkey->pData);
|
||||||
|
// memcpy(pStart, (pkey->pData), dataLen);
|
||||||
|
// pStart += dataLen;
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
memcpy(pStart, tagVal.pData, tagVal.nData);
|
||||||
|
pStart += tagVal.nData;
|
||||||
|
ASSERT(tagVal.nData <= pCol->bytes);
|
||||||
|
} else {
|
||||||
|
memcpy(pStart, &(tagVal.i64), pCol->bytes);
|
||||||
|
pStart += pCol->bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = (int32_t) (pStart - (char*)keyBuf);
|
||||||
|
uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len);
|
||||||
|
if (groupId) {
|
||||||
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t));
|
||||||
|
} else {
|
||||||
|
uint64_t tmpId = calcGroupId(keyBuf, len);
|
||||||
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
}
|
||||||
|
taosMemoryFree(keyBuf);
|
||||||
|
return TDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||||
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, SNode* pTagCond) {
|
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, SNode* pTagCond) {
|
||||||
int32_t type = nodeType(pPhyNode);
|
int32_t type = nodeType(pPhyNode);
|
||||||
|
@ -4605,15 +4684,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
|
|
||||||
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbCleanupReadHandle(pDataReader);
|
tsdbCleanupReadHandle(pDataReader);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
|
||||||
|
code = generateGroupIdMap(pTableListInfo, pHandle, groupKyes); //todo for json
|
||||||
|
if (code){
|
||||||
|
tsdbCleanupReadHandle(pDataReader);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* pOperator =
|
SOperatorInfo* pOperator =
|
||||||
createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo);
|
createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||||
|
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
|
|
|
@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
taosArrayDestroy(pInfo->pGroupColVals);
|
taosArrayDestroy(pInfo->pGroupColVals);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
|
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
|
||||||
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
|
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
|
||||||
if ((*pGroupColVals) == NULL) {
|
if ((*pGroupColVals) == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -118,7 +118,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
|
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||||
SColumnDataAgg* pColAgg = NULL;
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
|
|
||||||
size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
|
size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
|
||||||
|
@ -150,7 +150,7 @@ void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
||||||
ASSERT(pKey != NULL);
|
ASSERT(pKey != NULL);
|
||||||
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
||||||
|
|
||||||
|
|
|
@ -391,22 +391,16 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
longjmp(pOperator->pTaskInfo->env, code);
|
longjmp(pOperator->pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0);
|
|
||||||
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
|
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
|
|
||||||
if (groupId) {
|
|
||||||
pBlock->info.groupId = *groupId;
|
|
||||||
} else if (len != 0) {
|
|
||||||
pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len);
|
|
||||||
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
// current block is filter out according to filter condition, continue load the next block
|
// current block is filter out according to filter condition, continue load the next block
|
||||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
|
if (groupId) {
|
||||||
|
pBlock->info.groupId = *groupId;
|
||||||
|
}
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
@ -530,21 +524,13 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
|
||||||
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
|
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->pGroupCols);
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++) {
|
|
||||||
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i);
|
|
||||||
taosMemoryFree(key.pData);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(pTableScanInfo->pGroupColVals);
|
|
||||||
taosMemoryFree(pTableScanInfo->keyBuf);
|
|
||||||
taosHashCleanup(pTableScanInfo->pGroupSet);
|
|
||||||
if (pTableScanInfo->pColMatchInfo != NULL) {
|
if (pTableScanInfo->pColMatchInfo != NULL) {
|
||||||
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
|
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
|
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
|
||||||
SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) {
|
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -591,18 +577,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
// for table group
|
|
||||||
pInfo->pGroupCols = groupKyes;
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
|
||||||
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
|
||||||
if (pInfo->pGroupSet == NULL) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
||||||
NULL, NULL, getTableScannerExecInfo);
|
NULL, NULL, getTableScannerExecInfo);
|
||||||
|
|
||||||
|
@ -992,7 +966,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
||||||
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
|
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
|
||||||
|
|
||||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo);
|
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||||
|
|
||||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue