refactor: do some internal refactor.
This commit is contained in:
parent
d583f1fab6
commit
52c3777823
|
@ -164,14 +164,6 @@ typedef enum EStreamType {
|
||||||
STREAM_FILL_OVER,
|
STREAM_FILL_OVER,
|
||||||
} EStreamType;
|
} EStreamType;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SArray* pGroupList;
|
|
||||||
SArray* pTableList;
|
|
||||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
|
||||||
bool needSortTableByGroupId;
|
|
||||||
uint64_t suid;
|
|
||||||
} STableListInfo;
|
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef struct SColumnDataAgg {
|
typedef struct SColumnDataAgg {
|
||||||
int16_t colId;
|
int16_t colId;
|
||||||
|
|
|
@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||||
SSubmitBlkRsp* pRsp);
|
SSubmitBlkRsp* pRsp);
|
||||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
|
||||||
void* pMemRef);
|
|
||||||
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
|
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
int tqInit();
|
int tqInit();
|
||||||
|
|
|
@ -97,6 +97,35 @@ typedef struct SColMatchInfo {
|
||||||
|
|
||||||
struct SqlFunctionCtx;
|
struct SqlFunctionCtx;
|
||||||
|
|
||||||
|
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
|
||||||
|
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
|
||||||
|
//typedef struct STableListInfo {
|
||||||
|
// bool oneTableForEachGroup;
|
||||||
|
// int32_t numOfOuputGroups; // the data block will be generated one by one
|
||||||
|
// int32_t* groupOffset; // keep the offset value for each group in the tableList
|
||||||
|
// SArray* pTableList;
|
||||||
|
// SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||||
|
// uint64_t suid;
|
||||||
|
//} STableListInfo;
|
||||||
|
typedef struct {
|
||||||
|
bool oneTableForEachGroup;
|
||||||
|
int32_t numOfOuputGroups; // the data block will be generated one by one
|
||||||
|
int32_t* groupOffset; // keep the offset value for each group in the tableList
|
||||||
|
SArray* pGroupList;
|
||||||
|
SArray* pTableList;
|
||||||
|
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||||
|
bool needSortTableByGroupId;
|
||||||
|
uint64_t suid;
|
||||||
|
} STableListInfo;
|
||||||
|
|
||||||
|
void destroyTableList(STableListInfo* pTableList);
|
||||||
|
int32_t getNumOfOutputGroups(const STableListInfo* pTableList);
|
||||||
|
bool oneTableForEachGroup(const STableListInfo* pTableList);
|
||||||
|
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
|
||||||
|
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
|
||||||
|
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
|
||||||
|
uint64_t getTotalTables(const STableListInfo* pTableList);
|
||||||
|
|
||||||
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
||||||
void closeResultRow(SResultRow* pResultRow);
|
void closeResultRow(SResultRow* pResultRow);
|
||||||
|
|
|
@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||||
|
|
||||||
bool groupbyTbname(SNodeList* pGroupList);
|
bool groupbyTbname(SNodeList* pGroupList);
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
|
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey, bool groupSort);
|
||||||
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo);
|
||||||
|
|
|
@ -167,17 +167,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTableList->map != NULL) {
|
pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid);
|
||||||
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
|
|
||||||
if (groupId != NULL) {
|
|
||||||
pInfo->pRes->info.groupId = *groupId;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
|
|
||||||
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
|
|
||||||
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->indexOfBufferedRes += 1;
|
pInfo->indexOfBufferedRes += 1;
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -733,7 +733,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
|
|
||||||
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||||
info->groupId = calcGroupId(keyBuf, len);
|
info->groupId = calcGroupId(keyBuf, len);
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// int64_t st2 = taosGetTimestampUs();
|
// int64_t st2 = taosGetTimestampUs();
|
||||||
|
@ -751,14 +750,70 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t nameComparFn(const void* p1, const void* p2) {
|
||||||
|
const char* pName1 = *(const char**)p1;
|
||||||
|
const char* pName2 = *(const char**)p2;
|
||||||
|
|
||||||
|
int32_t ret = strcmp(pName1, pName2);
|
||||||
|
if (ret == 0) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return (ret > 0) ? 1 : -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SArray* getTableNameList(const SNodeListNode* pList) {
|
||||||
|
int32_t len = LIST_LENGTH(pList->pNodeList);
|
||||||
|
SListCell* cell = pList->pNodeList->pHead;
|
||||||
|
|
||||||
|
SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
|
||||||
|
for (int i = 0; i < pList->pNodeList->length; i++) {
|
||||||
|
SValueNode* valueNode = (SValueNode*)cell->pNode;
|
||||||
|
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
taosArrayDestroy(pTbList);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* name = varDataVal(valueNode->datum.p);
|
||||||
|
taosArrayPush(pTbList, &name);
|
||||||
|
cell = cell->pNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t numOfTables = taosArrayGetSize(pTbList);
|
||||||
|
|
||||||
|
// order the name
|
||||||
|
taosArraySort(pTbList, nameComparFn);
|
||||||
|
|
||||||
|
// remove the duplicates
|
||||||
|
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
||||||
|
taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
||||||
|
|
||||||
|
for (int32_t i = 1; i < numOfTables; ++i) {
|
||||||
|
char** name = taosArrayGetLast(pNewList);
|
||||||
|
char** nameInOldList = taosArrayGet(pTbList, i);
|
||||||
|
if (strcmp(*name, *nameInOldList) == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pNewList, nameInOldList);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pTbList);
|
||||||
|
return pNewList;
|
||||||
|
}
|
||||||
|
|
||||||
static int tableUidCompare(const void* a, const void* b) {
|
static int tableUidCompare(const void* a, const void* b) {
|
||||||
int64_t u1 = *(uint64_t*)a;
|
uint64_t u1 = *(uint64_t*)a;
|
||||||
int64_t u2 = *(uint64_t*)b;
|
uint64_t u2 = *(uint64_t*)b;
|
||||||
|
|
||||||
if (u1 == u2) {
|
if (u1 == u2) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return u1 < u2 ? -1 : 1;
|
return u1 < u2 ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) {
|
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) {
|
||||||
int32_t ret = -1;
|
int32_t ret = -1;
|
||||||
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
|
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
|
||||||
|
@ -778,7 +833,9 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
|
||||||
SNodeList* pList = (SNodeList*)pNode->pParameterList;
|
SNodeList* pList = (SNodeList*)pNode->pParameterList;
|
||||||
|
|
||||||
int32_t len = LIST_LENGTH(pList);
|
int32_t len = LIST_LENGTH(pList);
|
||||||
if (len <= 0) return ret;
|
if (len <= 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
SListCell* cell = pList->pHead;
|
SListCell* cell = pList->pHead;
|
||||||
for (int i = 0; i < len; i++) {
|
for (int i = 0; i < len; i++) {
|
||||||
|
@ -789,6 +846,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
|
||||||
}
|
}
|
||||||
cell = cell->pNext;
|
cell = cell->pNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArraySort(list, tableUidCompare);
|
taosArraySort(list, tableUidCompare);
|
||||||
taosArrayRemoveDuplicate(list, tableUidCompare, NULL);
|
taosArrayRemoveDuplicate(list, tableUidCompare, NULL);
|
||||||
|
|
||||||
|
@ -796,6 +854,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
|
||||||
ret = metaGetTableTagsByUids(metaHandle, suid, list, tags);
|
ret = metaGetTableTagsByUids(metaHandle, suid, list, tags);
|
||||||
removeInvalidTable(list, tags);
|
removeInvalidTable(list, tags);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,23 +890,13 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
|
||||||
SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
|
SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
|
||||||
|
|
||||||
int32_t len = LIST_LENGTH(pList->pNodeList);
|
int32_t len = LIST_LENGTH(pList->pNodeList);
|
||||||
if (len <= 0) return -1;
|
if (len <= 0) {
|
||||||
|
return -1;
|
||||||
SListCell* cell = pList->pNodeList->pHead;
|
|
||||||
|
|
||||||
SArray* pTbList = taosArrayInit(len, sizeof(void*));
|
|
||||||
for (int i = 0; i < pList->pNodeList->length; i++) {
|
|
||||||
SValueNode* valueNode = (SValueNode*)cell->pNode;
|
|
||||||
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
|
|
||||||
taosArrayDestroy(pTbList);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
char* name = varDataVal(valueNode->datum.p);
|
|
||||||
taosArrayPush(pTbList, &name);
|
|
||||||
cell = cell->pNext;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pTbList); i++) {
|
SArray* pTbList = getTableNameList(pList);
|
||||||
|
size_t num = taosArrayGetSize(pTbList);
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
char* name = taosArrayGetP(pTbList, i);
|
char* name = taosArrayGetP(pTbList, i);
|
||||||
uint64_t uid = 0;
|
uint64_t uid = 0;
|
||||||
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
|
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
|
||||||
|
@ -866,8 +915,10 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
|
||||||
taosArrayDestroy(pTbList);
|
taosArrayDestroy(pTbList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||||
STableListInfo* pListInfo) {
|
STableListInfo* pListInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1604,3 +1655,79 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
|
||||||
pLimitInfo->remainOffset = limit.offset;
|
pLimitInfo->remainOffset = limit.offset;
|
||||||
pLimitInfo->remainGroupOffset = slimit.offset;
|
pLimitInfo->remainGroupOffset = slimit.offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t getTotalTables(const STableListInfo* pTableList) {
|
||||||
|
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
|
||||||
|
return taosArrayGetSize(pTableList->pTableList);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||||
|
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
|
||||||
|
ASSERT(pTableList->map != NULL && slot != NULL);
|
||||||
|
|
||||||
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
|
||||||
|
ASSERT(pKeyInfo->uid == tableUid);
|
||||||
|
|
||||||
|
return pKeyInfo->groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
||||||
|
if (pTableList->map == NULL) {
|
||||||
|
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
|
||||||
|
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
}
|
||||||
|
|
||||||
|
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
|
||||||
|
taosArrayPush(pTableList->pTableList, &keyInfo);
|
||||||
|
|
||||||
|
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
|
||||||
|
taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
|
||||||
|
|
||||||
|
qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
|
||||||
|
int32_t* size) {
|
||||||
|
int32_t total = getNumOfOutputGroups(pTableList);
|
||||||
|
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
// here handle two special cases:
|
||||||
|
// 1. only one group exists, and 2. one table exists for each group.
|
||||||
|
if (total == 1) {
|
||||||
|
*size = getTotalTables(pTableList);
|
||||||
|
*pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (total == getTotalTables(pTableList)) {
|
||||||
|
*size = 1;
|
||||||
|
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
|
||||||
|
if (ordinalGroupIndex < total - 1) {
|
||||||
|
*size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset];
|
||||||
|
} else {
|
||||||
|
*size = total - pTableList->groupOffset[offset] - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
|
||||||
|
|
||||||
|
// todo remove it
|
||||||
|
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
|
||||||
|
|
||||||
|
void destroyTableList(STableListInfo* pTableqinfoList) {
|
||||||
|
pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList);
|
||||||
|
taosMemoryFreeClear(pTableqinfoList->groupOffset);
|
||||||
|
|
||||||
|
taosHashCleanup(pTableqinfoList->map);
|
||||||
|
|
||||||
|
pTableqinfoList->pTableList = NULL;
|
||||||
|
pTableqinfoList->map = NULL;
|
||||||
|
}
|
||||||
|
|
|
@ -293,10 +293,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pListInfo->map == NULL) {
|
|
||||||
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
|
||||||
}
|
|
||||||
|
|
||||||
// traverse to the stream scanner node to add this table id
|
// traverse to the stream scanner node to add this table id
|
||||||
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
||||||
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
@ -358,8 +354,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
|
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
#endif
|
#endif
|
||||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
|
||||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
addTableIntoTableList(&pTaskInfo->tableqinfoList, keyInfo.uid, keyInfo.groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (keyBuf != NULL) {
|
if (keyBuf != NULL) {
|
||||||
|
|
|
@ -3366,7 +3366,47 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
|
||||||
|
|
||||||
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
|
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
|
||||||
|
|
||||||
|
|
||||||
|
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
|
||||||
|
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
|
||||||
|
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
|
||||||
|
|
||||||
|
if (pInfo1->groupId == pInfo2->groupId) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return pInfo1->groupId < pInfo2->groupId? -1:1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||||
|
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
|
||||||
|
SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
||||||
|
|
||||||
|
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
|
||||||
|
uint64_t gid = pInfo->groupId;
|
||||||
|
|
||||||
|
int32_t start = 0;
|
||||||
|
taosArrayPush(pList, &start);
|
||||||
|
|
||||||
|
for(int32_t i = 1; i < size; ++i) {
|
||||||
|
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
if (pInfo->groupId != gid) {
|
||||||
|
taosArrayPush(pList, &i);
|
||||||
|
gid = pInfo->groupId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
|
||||||
|
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
||||||
|
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
||||||
|
taosArrayDestroy(pList);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
#if 0
|
||||||
taosArrayClear(pTableListInfo->pGroupList);
|
taosArrayClear(pTableListInfo->pGroupList);
|
||||||
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
|
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
|
||||||
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -3422,6 +3462,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(sortSupport);
|
taosArrayDestroy(sortSupport);
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
bool groupbyTbname(SNodeList* pGroupList) {
|
bool groupbyTbname(SNodeList* pGroupList) {
|
||||||
|
@ -3437,35 +3478,41 @@ bool groupbyTbname(SNodeList* pGroupList) {
|
||||||
return bytbname;
|
return bytbname;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
|
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
|
||||||
if (group == NULL) {
|
|
||||||
return TDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (pTableListInfo->map == NULL) {
|
if (pTableListInfo->map == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool assignUid = groupbyTbname(group);
|
bool groupByTbname = groupbyTbname(group);
|
||||||
|
|
||||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
|
||||||
if (assignUid) {
|
if (groupByTbname || group == NULL) {
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
for (int32_t i = 0; i < numOfTables; i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
info->groupId = info->uid;
|
info->groupId = groupByTbname? info->uid:0;
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
}
|
||||||
|
|
||||||
|
pTableListInfo->oneTableForEachGroup = groupByTbname;
|
||||||
|
|
||||||
|
if (groupSort && groupByTbname) {
|
||||||
|
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||||
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (groupSort) {
|
||||||
|
return sortTableGroup(pTableListInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTableListInfo->needSortTableByGroupId) {
|
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||||
return sortTableGroup(pTableListInfo);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
|
@ -3551,7 +3598,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
int32_t code =
|
int32_t code =
|
||||||
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
|
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/false,
|
||||||
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
|
|
@ -626,10 +626,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
pBlock->info = binfo;
|
pBlock->info = binfo;
|
||||||
ASSERT(binfo.uid != 0);
|
ASSERT(binfo.uid != 0);
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
if (groupId) {
|
|
||||||
pBlock->info.groupId = *groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
|
@ -683,10 +680,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
qDebug(
|
qDebug( "%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||||
"%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
|
|
||||||
"due to query func required",
|
|
||||||
GET_TASKID(pTaskInfo));
|
|
||||||
|
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -755,8 +749,12 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pInfo->currentGroupId == -1) {
|
if (pInfo->currentGroupId == -1) {
|
||||||
pInfo->currentGroupId++;
|
pInfo->currentGroupId++;
|
||||||
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList),
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
getNumOfOutputGroups(&pTaskInfo->tableqinfoList));
|
||||||
|
if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) {
|
||||||
|
// if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||||
|
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -790,7 +788,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1122,12 +1120,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
||||||
SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map;
|
return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid);
|
||||||
uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
|
|
||||||
if (groupId) {
|
|
||||||
return *groupId;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
||||||
|
@ -1549,12 +1542,13 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
pInfo->pRes->info.type = STREAM_NORMAL;
|
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||||
pInfo->pRes->info.version = pBlock->info.version;
|
pInfo->pRes->info.version = pBlock->info.version;
|
||||||
|
|
||||||
uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
if (groupIdPre) {
|
// uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
pInfo->pRes->info.groupId = *groupIdPre;
|
// if (groupIdPre) {
|
||||||
} else {
|
// pInfo->pRes->info.groupId = *groupIdPre;
|
||||||
pInfo->pRes->info.groupId = 0;
|
// } else {
|
||||||
}
|
// pInfo->pRes->info.groupId = 0;
|
||||||
|
// }
|
||||||
|
|
||||||
// todo extract method
|
// todo extract method
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||||
|
@ -4202,6 +4196,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTableListInfo->numOfOuputGroups = 1;
|
||||||
|
|
||||||
int64_t st1 = taosGetTimestampUs();
|
int64_t st1 = taosGetTimestampUs();
|
||||||
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
|
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
|
||||||
|
|
||||||
|
@ -4211,7 +4207,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->needSortTableByGroupId = groupSort;
|
pTableListInfo->needSortTableByGroupId = groupSort;
|
||||||
code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags);
|
code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue