Merge branch 'fix/liao_cov' of github.com:taosdata/tdengine into fix/liao_cov
This commit is contained in:
commit
c11ed48937
|
@ -164,14 +164,6 @@ typedef enum EStreamType {
|
||||||
STREAM_FILL_OVER,
|
STREAM_FILL_OVER,
|
||||||
} EStreamType;
|
} EStreamType;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SArray* pGroupList;
|
|
||||||
SArray* pTableList;
|
|
||||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
|
||||||
bool needSortTableByGroupId;
|
|
||||||
uint64_t suid;
|
|
||||||
} STableListInfo;
|
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef struct SColumnDataAgg {
|
typedef struct SColumnDataAgg {
|
||||||
int16_t colId;
|
int16_t colId;
|
||||||
|
|
|
@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||||
SSubmitBlkRsp* pRsp);
|
SSubmitBlkRsp* pRsp);
|
||||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
|
||||||
void* pMemRef);
|
|
||||||
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
|
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
int tqInit();
|
int tqInit();
|
||||||
|
|
|
@ -97,6 +97,35 @@ typedef struct SColMatchInfo {
|
||||||
|
|
||||||
struct SqlFunctionCtx;
|
struct SqlFunctionCtx;
|
||||||
|
|
||||||
|
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
|
||||||
|
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
|
||||||
|
//typedef struct STableListInfo {
|
||||||
|
// bool oneTableForEachGroup;
|
||||||
|
// int32_t numOfOuputGroups; // the data block will be generated one by one
|
||||||
|
// int32_t* groupOffset; // keep the offset value for each group in the tableList
|
||||||
|
// SArray* pTableList;
|
||||||
|
// SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||||
|
// uint64_t suid;
|
||||||
|
//} STableListInfo;
|
||||||
|
typedef struct {
|
||||||
|
bool oneTableForEachGroup;
|
||||||
|
int32_t numOfOuputGroups; // the data block will be generated one by one
|
||||||
|
int32_t* groupOffset; // keep the offset value for each group in the tableList
|
||||||
|
SArray* pGroupList;
|
||||||
|
SArray* pTableList;
|
||||||
|
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||||
|
bool needSortTableByGroupId;
|
||||||
|
uint64_t suid;
|
||||||
|
} STableListInfo;
|
||||||
|
|
||||||
|
void destroyTableList(STableListInfo* pTableList);
|
||||||
|
int32_t getNumOfOutputGroups(const STableListInfo* pTableList);
|
||||||
|
bool oneTableForEachGroup(const STableListInfo* pTableList);
|
||||||
|
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
|
||||||
|
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
|
||||||
|
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
|
||||||
|
uint64_t getTotalTables(const STableListInfo* pTableList);
|
||||||
|
|
||||||
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
||||||
void closeResultRow(SResultRow* pResultRow);
|
void closeResultRow(SResultRow* pResultRow);
|
||||||
|
|
|
@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||||
|
|
||||||
bool groupbyTbname(SNodeList* pGroupList);
|
bool groupbyTbname(SNodeList* pGroupList);
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
|
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey, bool groupSort);
|
||||||
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo);
|
||||||
|
|
|
@ -62,7 +62,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
// partition by tbname
|
// partition by tbname
|
||||||
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
|
if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) {
|
||||||
pInfo->retrieveType =
|
pInfo->retrieveType =
|
||||||
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
|
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
|
||||||
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
|
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
|
||||||
|
@ -167,17 +167,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTableList->map != NULL) {
|
pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid);
|
||||||
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
|
|
||||||
if (groupId != NULL) {
|
|
||||||
pInfo->pRes->info.groupId = *groupId;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
|
|
||||||
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
|
|
||||||
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->indexOfBufferedRes += 1;
|
pInfo->indexOfBufferedRes += 1;
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
} else {
|
} else {
|
||||||
|
@ -185,12 +175,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
size_t totalGroups = taosArrayGetSize(pTableList->pGroupList);
|
size_t num = getNumOfOutputGroups(pTableList);
|
||||||
|
while (pInfo->currentGroupIndex < num) {
|
||||||
|
|
||||||
while (pInfo->currentGroupIndex < totalGroups) {
|
STableKeyInfo* p = NULL;
|
||||||
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
|
int32_t s = 0;
|
||||||
|
getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &p, &s);
|
||||||
|
SArray* x = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||||
|
for(int32_t i = 0; i < s; ++i) {
|
||||||
|
taosArrayPush(x, &p[i]);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
|
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, x,
|
||||||
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
|
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
|
||||||
taosArrayClear(pInfo->pUidList);
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
|
||||||
|
@ -205,9 +201,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
||||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
|
pInfo->pRes->info.groupId = p->groupId;
|
||||||
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
|
|
||||||
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
||||||
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
|
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
|
||||||
|
|
|
@ -733,7 +733,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
|
|
||||||
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||||
info->groupId = calcGroupId(keyBuf, len);
|
info->groupId = calcGroupId(keyBuf, len);
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// int64_t st2 = taosGetTimestampUs();
|
// int64_t st2 = taosGetTimestampUs();
|
||||||
|
@ -751,14 +750,70 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t nameComparFn(const void* p1, const void* p2) {
|
||||||
|
const char* pName1 = *(const char**)p1;
|
||||||
|
const char* pName2 = *(const char**)p2;
|
||||||
|
|
||||||
|
int32_t ret = strcmp(pName1, pName2);
|
||||||
|
if (ret == 0) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return (ret > 0) ? 1 : -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SArray* getTableNameList(const SNodeListNode* pList) {
|
||||||
|
int32_t len = LIST_LENGTH(pList->pNodeList);
|
||||||
|
SListCell* cell = pList->pNodeList->pHead;
|
||||||
|
|
||||||
|
SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
|
||||||
|
for (int i = 0; i < pList->pNodeList->length; i++) {
|
||||||
|
SValueNode* valueNode = (SValueNode*)cell->pNode;
|
||||||
|
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
taosArrayDestroy(pTbList);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* name = varDataVal(valueNode->datum.p);
|
||||||
|
taosArrayPush(pTbList, &name);
|
||||||
|
cell = cell->pNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t numOfTables = taosArrayGetSize(pTbList);
|
||||||
|
|
||||||
|
// order the name
|
||||||
|
taosArraySort(pTbList, nameComparFn);
|
||||||
|
|
||||||
|
// remove the duplicates
|
||||||
|
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
||||||
|
taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
||||||
|
|
||||||
|
for (int32_t i = 1; i < numOfTables; ++i) {
|
||||||
|
char** name = taosArrayGetLast(pNewList);
|
||||||
|
char** nameInOldList = taosArrayGet(pTbList, i);
|
||||||
|
if (strcmp(*name, *nameInOldList) == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pNewList, nameInOldList);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pTbList);
|
||||||
|
return pNewList;
|
||||||
|
}
|
||||||
|
|
||||||
static int tableUidCompare(const void* a, const void* b) {
|
static int tableUidCompare(const void* a, const void* b) {
|
||||||
int64_t u1 = *(uint64_t*)a;
|
uint64_t u1 = *(uint64_t*)a;
|
||||||
int64_t u2 = *(uint64_t*)b;
|
uint64_t u2 = *(uint64_t*)b;
|
||||||
|
|
||||||
if (u1 == u2) {
|
if (u1 == u2) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return u1 < u2 ? -1 : 1;
|
return u1 < u2 ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) {
|
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) {
|
||||||
int32_t ret = -1;
|
int32_t ret = -1;
|
||||||
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
|
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
|
||||||
|
@ -778,7 +833,9 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
|
||||||
SNodeList* pList = (SNodeList*)pNode->pParameterList;
|
SNodeList* pList = (SNodeList*)pNode->pParameterList;
|
||||||
|
|
||||||
int32_t len = LIST_LENGTH(pList);
|
int32_t len = LIST_LENGTH(pList);
|
||||||
if (len <= 0) return ret;
|
if (len <= 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
SListCell* cell = pList->pHead;
|
SListCell* cell = pList->pHead;
|
||||||
for (int i = 0; i < len; i++) {
|
for (int i = 0; i < len; i++) {
|
||||||
|
@ -789,6 +846,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
|
||||||
}
|
}
|
||||||
cell = cell->pNext;
|
cell = cell->pNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArraySort(list, tableUidCompare);
|
taosArraySort(list, tableUidCompare);
|
||||||
taosArrayRemoveDuplicate(list, tableUidCompare, NULL);
|
taosArrayRemoveDuplicate(list, tableUidCompare, NULL);
|
||||||
|
|
||||||
|
@ -796,6 +854,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
|
||||||
ret = metaGetTableTagsByUids(metaHandle, suid, list, tags);
|
ret = metaGetTableTagsByUids(metaHandle, suid, list, tags);
|
||||||
removeInvalidTable(list, tags);
|
removeInvalidTable(list, tags);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,23 +890,13 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
|
||||||
SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
|
SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
|
||||||
|
|
||||||
int32_t len = LIST_LENGTH(pList->pNodeList);
|
int32_t len = LIST_LENGTH(pList->pNodeList);
|
||||||
if (len <= 0) return -1;
|
if (len <= 0) {
|
||||||
|
return -1;
|
||||||
SListCell* cell = pList->pNodeList->pHead;
|
|
||||||
|
|
||||||
SArray* pTbList = taosArrayInit(len, sizeof(void*));
|
|
||||||
for (int i = 0; i < pList->pNodeList->length; i++) {
|
|
||||||
SValueNode* valueNode = (SValueNode*)cell->pNode;
|
|
||||||
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
|
|
||||||
taosArrayDestroy(pTbList);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
char* name = varDataVal(valueNode->datum.p);
|
|
||||||
taosArrayPush(pTbList, &name);
|
|
||||||
cell = cell->pNext;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pTbList); i++) {
|
SArray* pTbList = getTableNameList(pList);
|
||||||
|
size_t num = taosArrayGetSize(pTbList);
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
char* name = taosArrayGetP(pTbList, i);
|
char* name = taosArrayGetP(pTbList, i);
|
||||||
uint64_t uid = 0;
|
uint64_t uid = 0;
|
||||||
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
|
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
|
||||||
|
@ -866,8 +915,10 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
|
||||||
taosArrayDestroy(pTbList);
|
taosArrayDestroy(pTbList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||||
STableListInfo* pListInfo) {
|
STableListInfo* pListInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -946,14 +997,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(res);
|
taosArrayDestroy(res);
|
||||||
|
|
||||||
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
if (pListInfo->pGroupList == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
// put into list as default group, remove it if grouping sorting is required later
|
|
||||||
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1604,3 +1647,79 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
|
||||||
pLimitInfo->remainOffset = limit.offset;
|
pLimitInfo->remainOffset = limit.offset;
|
||||||
pLimitInfo->remainGroupOffset = slimit.offset;
|
pLimitInfo->remainGroupOffset = slimit.offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t getTotalTables(const STableListInfo* pTableList) {
|
||||||
|
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
|
||||||
|
return taosArrayGetSize(pTableList->pTableList);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||||
|
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
|
||||||
|
ASSERT(pTableList->map != NULL && slot != NULL);
|
||||||
|
|
||||||
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
|
||||||
|
ASSERT(pKeyInfo->uid == tableUid);
|
||||||
|
|
||||||
|
return pKeyInfo->groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
||||||
|
if (pTableList->map == NULL) {
|
||||||
|
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
|
||||||
|
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
}
|
||||||
|
|
||||||
|
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
|
||||||
|
taosArrayPush(pTableList->pTableList, &keyInfo);
|
||||||
|
|
||||||
|
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
|
||||||
|
taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
|
||||||
|
|
||||||
|
qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
|
||||||
|
int32_t* size) {
|
||||||
|
int32_t total = getNumOfOutputGroups(pTableList);
|
||||||
|
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
// here handle two special cases:
|
||||||
|
// 1. only one group exists, and 2. one table exists for each group.
|
||||||
|
if (total == 1) {
|
||||||
|
*size = getTotalTables(pTableList);
|
||||||
|
*pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (total == getTotalTables(pTableList)) {
|
||||||
|
*size = 1;
|
||||||
|
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
|
||||||
|
if (ordinalGroupIndex < total - 1) {
|
||||||
|
*size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset];
|
||||||
|
} else {
|
||||||
|
*size = total - pTableList->groupOffset[offset] - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
|
||||||
|
|
||||||
|
// todo remove it
|
||||||
|
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
|
||||||
|
|
||||||
|
void destroyTableList(STableListInfo* pTableqinfoList) {
|
||||||
|
pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList);
|
||||||
|
taosMemoryFreeClear(pTableqinfoList->groupOffset);
|
||||||
|
|
||||||
|
taosHashCleanup(pTableqinfoList->map);
|
||||||
|
|
||||||
|
pTableqinfoList->pTableList = NULL;
|
||||||
|
pTableqinfoList->map = NULL;
|
||||||
|
}
|
||||||
|
|
|
@ -293,10 +293,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pListInfo->map == NULL) {
|
|
||||||
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
|
||||||
}
|
|
||||||
|
|
||||||
// traverse to the stream scanner node to add this table id
|
// traverse to the stream scanner node to add this table id
|
||||||
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
||||||
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
@ -358,8 +354,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
|
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
#endif
|
#endif
|
||||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
|
||||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
addTableIntoTableList(&pTaskInfo->tableqinfoList, keyInfo.uid, keyInfo.groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (keyBuf != NULL) {
|
if (keyBuf != NULL) {
|
||||||
|
|
|
@ -3366,7 +3366,47 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
|
||||||
|
|
||||||
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
|
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
|
||||||
|
|
||||||
|
|
||||||
|
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
|
||||||
|
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
|
||||||
|
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
|
||||||
|
|
||||||
|
if (pInfo1->groupId == pInfo2->groupId) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return pInfo1->groupId < pInfo2->groupId? -1:1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||||
|
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
|
||||||
|
SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
||||||
|
|
||||||
|
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
|
||||||
|
uint64_t gid = pInfo->groupId;
|
||||||
|
|
||||||
|
int32_t start = 0;
|
||||||
|
taosArrayPush(pList, &start);
|
||||||
|
|
||||||
|
for(int32_t i = 1; i < size; ++i) {
|
||||||
|
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
if (pInfo->groupId != gid) {
|
||||||
|
taosArrayPush(pList, &i);
|
||||||
|
gid = pInfo->groupId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
|
||||||
|
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
||||||
|
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
||||||
|
taosArrayDestroy(pList);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
#if 0
|
||||||
taosArrayClear(pTableListInfo->pGroupList);
|
taosArrayClear(pTableListInfo->pGroupList);
|
||||||
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
|
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
|
||||||
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -3422,6 +3462,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(sortSupport);
|
taosArrayDestroy(sortSupport);
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
bool groupbyTbname(SNodeList* pGroupList) {
|
bool groupbyTbname(SNodeList* pGroupList) {
|
||||||
|
@ -3437,35 +3478,41 @@ bool groupbyTbname(SNodeList* pGroupList) {
|
||||||
return bytbname;
|
return bytbname;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
|
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
|
||||||
if (group == NULL) {
|
|
||||||
return TDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (pTableListInfo->map == NULL) {
|
if (pTableListInfo->map == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool assignUid = groupbyTbname(group);
|
bool groupByTbname = groupbyTbname(group);
|
||||||
|
|
||||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
|
||||||
if (assignUid) {
|
if (groupByTbname || group == NULL) {
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
for (int32_t i = 0; i < numOfTables; i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
info->groupId = info->uid;
|
info->groupId = groupByTbname? info->uid:0;
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
}
|
||||||
|
|
||||||
|
pTableListInfo->oneTableForEachGroup = groupByTbname;
|
||||||
|
|
||||||
|
if (groupSort && groupByTbname) {
|
||||||
|
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||||
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (groupSort) {
|
||||||
|
return sortTableGroup(pTableListInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTableListInfo->needSortTableByGroupId) {
|
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||||
return sortTableGroup(pTableListInfo);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
|
@ -3551,7 +3598,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
int32_t code =
|
int32_t code =
|
||||||
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
|
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/false,
|
||||||
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
@ -3979,15 +4026,15 @@ void doDestroyTableList(STableListInfo* pTableqinfoList) {
|
||||||
taosArrayDestroy(pTableqinfoList->pTableList);
|
taosArrayDestroy(pTableqinfoList->pTableList);
|
||||||
taosHashCleanup(pTableqinfoList->map);
|
taosHashCleanup(pTableqinfoList->map);
|
||||||
if (pTableqinfoList->needSortTableByGroupId) {
|
if (pTableqinfoList->needSortTableByGroupId) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
|
// for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
|
||||||
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
|
// SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
|
||||||
if (tmp == pTableqinfoList->pTableList) {
|
// if (tmp == pTableqinfoList->pTableList) {
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
taosArrayDestroy(tmp);
|
// taosArrayDestroy(tmp);
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pTableqinfoList->pGroupList);
|
// taosArrayDestroy(pTableqinfoList->pGroupList);
|
||||||
|
|
||||||
pTableqinfoList->pTableList = NULL;
|
pTableqinfoList->pTableList = NULL;
|
||||||
pTableqinfoList->map = NULL;
|
pTableqinfoList->map = NULL;
|
||||||
|
|
|
@ -626,10 +626,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
pBlock->info = binfo;
|
pBlock->info = binfo;
|
||||||
ASSERT(binfo.uid != 0);
|
ASSERT(binfo.uid != 0);
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
if (groupId) {
|
|
||||||
pBlock->info.groupId = *groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
|
@ -683,10 +680,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
qDebug(
|
qDebug( "%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||||
"%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
|
|
||||||
"due to query func required",
|
|
||||||
GET_TASKID(pTaskInfo));
|
|
||||||
|
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -755,16 +749,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pInfo->currentGroupId == -1) {
|
if (pInfo->currentGroupId == -1) {
|
||||||
pInfo->currentGroupId++;
|
pInfo->currentGroupId++;
|
||||||
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
// qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList),
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
// getNumOfOutputGroups(&pTaskInfo->tableqinfoList));
|
||||||
|
if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) {
|
||||||
|
// if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
SArray* p = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||||
tsdbReaderClose(pInfo->dataReader);
|
tsdbReaderClose(pInfo->dataReader);
|
||||||
|
|
||||||
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
|
STableKeyInfo* x = NULL;
|
||||||
|
int32_t num = 0;
|
||||||
|
getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &x, &num);
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
taosArrayPush(p, &x[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, p, (STsdbReader**)&pInfo->dataReader,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
|
taosArrayDestroy(p);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -777,11 +783,16 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->currentGroupId++;
|
pInfo->currentGroupId++;
|
||||||
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) {
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset value for the next group data output
|
||||||
|
pOperator->status = OP_OPENED;
|
||||||
|
pInfo->limitInfo.numOfOutputRows = 0;
|
||||||
|
pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset;
|
||||||
|
|
||||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
|
|
||||||
|
@ -790,7 +801,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1077,39 +1088,59 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
||||||
pTableScanInfo->cond.twindows = *pWin;
|
pTableScanInfo->cond.twindows = *pWin;
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
pTableScanInfo->currentGroupId = -1;
|
pTableScanInfo->currentGroupId = -1;
|
||||||
}
|
tsdbReaderClose(pTableScanInfo->dataReader);
|
||||||
|
pTableScanInfo->dataReader = NULL;
|
||||||
static void freeArray(void* array) { taosArrayDestroy(array); }
|
|
||||||
|
|
||||||
static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
|
|
||||||
STableScanInfo* pTableScanInfo = pTableScanOp->info;
|
|
||||||
pTableScanInfo->cond.startVersion = -1;
|
|
||||||
pTableScanInfo->cond.endVersion = -1;
|
|
||||||
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
|
|
||||||
SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList;
|
|
||||||
taosArrayClearP(gpTbls, freeArray);
|
|
||||||
taosArrayPush(gpTbls, &allTbls);
|
|
||||||
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
|
||||||
resetTableScanInfo(pTableScanOp->info, &win);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
|
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
|
||||||
int64_t maxVersion) {
|
int64_t maxVersion) {
|
||||||
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
|
|
||||||
taosArrayClear(gpTbls);
|
|
||||||
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
|
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
|
||||||
SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo));
|
|
||||||
taosArrayPush(tbls, &tblInfo);
|
|
||||||
taosArrayPush(gpTbls, &tbls);
|
|
||||||
|
|
||||||
STimeWindow win = {.skey = startTs, .ekey = endTs};
|
STableScanInfo* pTableScanInfo = pTableScanOp->info;
|
||||||
STableScanInfo* pTableScanInfo = pTableScanOp->info;
|
SQueryTableDataCond cond = pTableScanInfo->cond;
|
||||||
pTableScanInfo->cond.startVersion = -1;
|
|
||||||
pTableScanInfo->cond.endVersion = maxVersion;
|
cond.startVersion = -1;
|
||||||
resetTableScanInfo(pTableScanOp->info, &win);
|
cond.endVersion = maxVersion;
|
||||||
SSDataBlock* pRes = doTableScan(pTableScanOp);
|
cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};
|
||||||
resetTableScanOperator(pTableScanOp);
|
|
||||||
return pRes;
|
SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
|
SArray* p = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
taosArrayPush(p, &tblInfo);
|
||||||
|
|
||||||
|
STsdbReader* pReader = NULL;
|
||||||
|
int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, p, (STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool hasBlock = tsdbNextDataBlock(pReader);
|
||||||
|
if (hasBlock) {
|
||||||
|
SDataBlockInfo binfo = {0};
|
||||||
|
tsdbRetrieveDataBlockInfo(pReader, &binfo);
|
||||||
|
|
||||||
|
SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
|
||||||
|
blockDataEnsureCapacity(pBlock, binfo.rows);
|
||||||
|
|
||||||
|
pBlock->info.window = binfo.window;
|
||||||
|
pBlock->info.uid = binfo.uid;
|
||||||
|
pBlock->info.rows = binfo.rows;
|
||||||
|
|
||||||
|
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
||||||
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
|
|
||||||
|
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbReaderClose(pReader);
|
||||||
|
qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
|
||||||
|
", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
|
||||||
|
|
||||||
|
return pBlock->info.rows > 0 ? pBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
||||||
|
@ -1122,12 +1153,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
||||||
SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map;
|
return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid);
|
||||||
uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
|
|
||||||
if (groupId) {
|
|
||||||
return *groupId;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
||||||
|
@ -1549,12 +1575,13 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
pInfo->pRes->info.type = STREAM_NORMAL;
|
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||||
pInfo->pRes->info.version = pBlock->info.version;
|
pInfo->pRes->info.version = pBlock->info.version;
|
||||||
|
|
||||||
uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
if (groupIdPre) {
|
// uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
pInfo->pRes->info.groupId = *groupIdPre;
|
// if (groupIdPre) {
|
||||||
} else {
|
// pInfo->pRes->info.groupId = *groupIdPre;
|
||||||
pInfo->pRes->info.groupId = 0;
|
// } else {
|
||||||
}
|
// pInfo->pRes->info.groupId = 0;
|
||||||
|
// }
|
||||||
|
|
||||||
// todo extract method
|
// todo extract method
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||||
|
@ -2335,11 +2362,20 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pTSInfo->cond.endVersion = pHandle->version;
|
pTSInfo->cond.endVersion = pHandle->version;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
// SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
||||||
|
STableKeyInfo* pList = NULL;
|
||||||
|
int32_t num = 0;
|
||||||
|
getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num);
|
||||||
|
|
||||||
|
SArray* p = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
taosArrayPush(p, &pList[i]);
|
||||||
|
}
|
||||||
|
|
||||||
if (pHandle->initTableReader) {
|
if (pHandle->initTableReader) {
|
||||||
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||||
pTSInfo->dataReader = NULL;
|
pTSInfo->dataReader = NULL;
|
||||||
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
|
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, p, &pTSInfo->dataReader, NULL) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -4202,6 +4238,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTableListInfo->numOfOuputGroups = 1;
|
||||||
|
|
||||||
int64_t st1 = taosGetTimestampUs();
|
int64_t st1 = taosGetTimestampUs();
|
||||||
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
|
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
|
||||||
|
|
||||||
|
@ -4211,7 +4249,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->needSortTableByGroupId = groupSort;
|
pTableListInfo->needSortTableByGroupId = groupSort;
|
||||||
code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags);
|
code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue