fix(query): check status before add table uid in hash map. and do some other refactor.
This commit is contained in:
parent
4774baa635
commit
2c896012df
|
@ -3463,6 +3463,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
|
|||
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
|
||||
clearBlockScanInfo(p);
|
||||
}
|
||||
|
||||
taosHashClear(pReader->status.pTableMap);
|
||||
|
||||
STableKeyInfo* pList = (STableKeyInfo*) pTableList;
|
||||
|
|
|
@ -105,10 +105,11 @@ typedef struct STableListInfo {
|
|||
} STableListInfo;
|
||||
|
||||
void destroyTableList(STableListInfo* pTableList);
|
||||
int32_t getNumOfGroups(STableListInfo* pTableList);
|
||||
uint64_t getTableGroupId(STableListInfo* pTableList, uint64_t tableUid);
|
||||
int32_t getTablesOfGroup(STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
|
||||
uint64_t getTotalTables(STableListInfo* pTableList);
|
||||
int32_t getNumOfGroups(const STableListInfo* pTableList);
|
||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
|
||||
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
|
||||
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
|
||||
uint64_t getTotalTables(const STableListInfo* pTableList);
|
||||
|
||||
struct SqlFunctionCtx;
|
||||
|
||||
|
|
|
@ -825,38 +825,86 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
|
|||
taosArrayDestroy(validUid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t nameComparFn(const void* p1, const void* p2) {
|
||||
const char* pName1 = *(const char**) p1;
|
||||
const char* pName2 = *(const char**) p2;
|
||||
|
||||
int32_t ret = strcmp(pName1, pName2);
|
||||
if (ret == 0) {
|
||||
return 0;
|
||||
} else {
|
||||
return (ret > 0)? 1:-1;
|
||||
}
|
||||
}
|
||||
|
||||
static SArray* getTableNameList(const SNodeListNode* pList) {
|
||||
int32_t len = LIST_LENGTH(pList->pNodeList);
|
||||
SListCell* cell = pList->pNodeList->pHead;
|
||||
|
||||
SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
|
||||
for (int i = 0; i < pList->pNodeList->length; i++) {
|
||||
SValueNode* valueNode = (SValueNode*) cell->pNode;
|
||||
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
taosArrayDestroy(pTbList);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char* name = varDataVal(valueNode->datum.p);
|
||||
taosArrayPush(pTbList, &name);
|
||||
cell = cell->pNext;
|
||||
}
|
||||
|
||||
size_t numOfTables = taosArrayGetSize(pTbList);
|
||||
|
||||
// order the name
|
||||
taosArraySort(pTbList, nameComparFn);
|
||||
|
||||
// remove the duplicates
|
||||
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
||||
taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
||||
|
||||
for (int32_t i = 1; i < numOfTables; ++i) {
|
||||
char** name = taosArrayGetLast(pNewList);
|
||||
char** nameInOldList = taosArrayGet(pTbList, i);
|
||||
if (strcmp(*name, *nameInOldList) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taosArrayPush(pNewList, nameInOldList);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTbList);
|
||||
return pNewList;
|
||||
}
|
||||
|
||||
static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) {
|
||||
if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SOperatorNode* pNode = (SOperatorNode*)pTagCond;
|
||||
if (pNode->opType != OP_TYPE_IN) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN &&
|
||||
((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) &&
|
||||
(pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
|
||||
SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
|
||||
|
||||
int32_t len = LIST_LENGTH(pList->pNodeList);
|
||||
if (len <= 0) return -1;
|
||||
|
||||
SListCell* cell = pList->pNodeList->pHead;
|
||||
|
||||
SArray* pTbList = taosArrayInit(len, sizeof(void*));
|
||||
for (int i = 0; i < pList->pNodeList->length; i++) {
|
||||
SValueNode* valueNode = (SValueNode*)cell->pNode;
|
||||
if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
|
||||
taosArrayDestroy(pTbList);
|
||||
return -1;
|
||||
}
|
||||
char* name = varDataVal(valueNode->datum.p);
|
||||
taosArrayPush(pTbList, &name);
|
||||
cell = cell->pNext;
|
||||
if (len <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(pTbList); i++) {
|
||||
char* name = taosArrayGetP(pTbList, i);
|
||||
SArray* pTbList = getTableNameList(pList);
|
||||
int32_t numOfTables = taosArrayGetSize(pTbList);
|
||||
|
||||
for (int i = 0; i < numOfTables; i++) {
|
||||
char* name = taosArrayGetP(pTbList, i);
|
||||
|
||||
uint64_t uid = 0;
|
||||
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
|
||||
ETableType tbType = TSDB_TABLE_MAX;
|
||||
|
@ -871,11 +919,14 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
|
|||
terrno = 0;
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTbList);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||
STableListInfo* pListInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1605,7 +1656,7 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
|
|||
pLimitInfo->remainGroupOffset = slimit.offset;
|
||||
}
|
||||
|
||||
uint64_t getTotalTables(STableListInfo* pTableList) {
|
||||
uint64_t getTotalTables(const STableListInfo* pTableList) {
|
||||
if (pTableList->map != NULL) {
|
||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
|
||||
}
|
||||
|
@ -1613,7 +1664,7 @@ uint64_t getTotalTables(STableListInfo* pTableList) {
|
|||
return taosArrayGetSize(pTableList->pTableList);
|
||||
}
|
||||
|
||||
uint64_t getTableGroupId(STableListInfo* pTableList, uint64_t tableUid) {
|
||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||
uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
|
||||
if (groupId != NULL) {
|
||||
return *groupId;
|
||||
|
@ -1622,7 +1673,17 @@ uint64_t getTableGroupId(STableListInfo* pTableList, uint64_t tableUid) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t getTablesOfGroup(STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) {
|
||||
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
||||
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
|
||||
|
||||
taosArrayPush(pTableList->pTableList, &keyInfo);
|
||||
if (pTableList->oneTableForEachGroup || pTableList->numOfGroups > 1) {
|
||||
taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) {
|
||||
int32_t total = getNumOfGroups(pTableList);
|
||||
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -1651,7 +1712,7 @@ int32_t getTablesOfGroup(STableListInfo* pTableList, int32_t ordinalGroupIndex,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getNumOfGroups(STableListInfo* pTableList) {
|
||||
int32_t getNumOfGroups(const STableListInfo* pTableList) {
|
||||
return pTableList->numOfGroups;
|
||||
}
|
||||
|
||||
|
|
|
@ -293,9 +293,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
||||
}
|
||||
|
||||
if (pListInfo->map == NULL) {
|
||||
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
}
|
||||
|
||||
|
||||
// traverse to the stream scanner node to add this table id
|
||||
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
||||
|
@ -307,8 +305,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
SStreamScanInfo* pScanInfo = pInfo->info;
|
||||
if (isAdd) { // add new table id
|
||||
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
|
||||
int32_t numOfQualifiedTables = taosArrayGetSize(qa);
|
||||
|
||||
qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
|
||||
|
||||
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
||||
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(qa);
|
||||
|
@ -328,7 +328,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
|
||||
STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList;
|
||||
|
||||
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
|
||||
uint64_t* uid = taosArrayGet(qa, i);
|
||||
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
|
||||
|
||||
|
@ -358,8 +360,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
|
||||
if (!exists) {
|
||||
#endif
|
||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||
addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId);
|
||||
}
|
||||
|
||||
if (keyBuf != NULL) {
|
||||
|
@ -935,7 +936,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
|
||||
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||
int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList);
|
||||
|
||||
#ifndef NDEBUG
|
||||
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
|
||||
|
@ -944,7 +945,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
#endif
|
||||
|
||||
bool found = false;
|
||||
for (int32_t i = 0; i < tableSz; i++) {
|
||||
for (int32_t i = 0; i < numOfTables; i++) {
|
||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||
if (pTableInfo->uid == uid) {
|
||||
found = true;
|
||||
|
@ -957,12 +958,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
ASSERT(found);
|
||||
|
||||
if (pTableScanInfo->dataReader == NULL) {
|
||||
|
||||
STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
||||
int32_t num = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||
int32_t num = getTotalTables(&pTaskInfo->tableqinfoList);
|
||||
|
||||
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
|
||||
&pTableScanInfo->dataReader, NULL) < 0 ||
|
||||
pTableScanInfo->dataReader == NULL) {
|
||||
&pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
@ -976,7 +976,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
pTableScanInfo->scanTimes = 0;
|
||||
|
||||
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
|
||||
ts, pTableScanInfo->currentTable, tableSz);
|
||||
ts, pTableScanInfo->currentTable, numOfTables);
|
||||
/*}*/
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -3521,6 +3521,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
|||
}
|
||||
}
|
||||
|
||||
qDebug("-------------------, %d", (int) taosHashGetSize(pTableListInfo->map));
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3622,8 +3623,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid);
|
||||
qDebug("creating stream task: add table uid:%" PRIu64, pKeyInfo->uid);
|
||||
}
|
||||
|
||||
qDebug("table in hashmap, %d", (int32_t) getTotalTables(pTableListInfo));
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
|
@ -1078,22 +1078,10 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
|||
pTableScanInfo->cond.twindows = *pWin;
|
||||
pTableScanInfo->scanTimes = 0;
|
||||
pTableScanInfo->currentGroupId = -1;
|
||||
tsdbReaderClose(pTableScanInfo->dataReader);
|
||||
pTableScanInfo->dataReader = NULL;
|
||||
}
|
||||
|
||||
//static void freeArray(void* array) { taosArrayDestroy(array); }
|
||||
//
|
||||
//static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
|
||||
// STableScanInfo* pTableScanInfo = pTableScanOp->info;
|
||||
// pTableScanInfo->cond.startVersion = -1;
|
||||
// pTableScanInfo->cond.endVersion = -1;
|
||||
//// SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
|
||||
//// SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList;
|
||||
//// taosArrayClearP(gpTbls, freeArray);
|
||||
//// taosArrayPush(gpTbls, &allTbls);
|
||||
// STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
// resetTableScanInfo(pTableScanOp->info, &win);
|
||||
//}
|
||||
|
||||
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
|
||||
int64_t maxVersion) {
|
||||
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
|
||||
|
@ -4104,6 +4092,9 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||
int32_t code =
|
||||
extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
Loading…
Reference in New Issue