Merge branch '3.0' into feature/compressData
This commit is contained in:
parent
a77d98cab6
commit
d6f6c77590
|
@ -45,8 +45,8 @@ static FilterCondType checkTagCond(SNode* cond);
|
||||||
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
|
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
|
||||||
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI);
|
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI);
|
||||||
|
|
||||||
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||||
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
|
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
|
||||||
|
|
||||||
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
|
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
|
||||||
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
|
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
|
||||||
|
@ -643,7 +643,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
||||||
info->groupId = calcGroupId(keyBuf, len);
|
info->groupId = calcGroupId(keyBuf, len);
|
||||||
if (initRemainGroups) {
|
if (initRemainGroups) {
|
||||||
// groupId ~ table uid
|
// groupId ~ table uid
|
||||||
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid));
|
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
|
||||||
|
sizeof(info->uid));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -859,7 +860,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
||||||
SStorageAPI* pStorageAPI) {
|
SStorageAPI* pStorageAPI) {
|
||||||
SSDataBlock* pResBlock = createDataBlock();
|
SSDataBlock* pResBlock = createDataBlock();
|
||||||
if (pResBlock == NULL) {
|
if (pResBlock == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -940,11 +941,12 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
|
||||||
return pResBlock;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) {
|
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
|
||||||
|
bool* pResultList, bool addUid) {
|
||||||
taosArrayClear(pUidList);
|
taosArrayClear(pUidList);
|
||||||
|
|
||||||
STableKeyInfo info = {.uid = 0, .groupId = 0};
|
STableKeyInfo info = {.uid = 0, .groupId = 0};
|
||||||
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
if (pResultList[i]) {
|
if (pResultList[i]) {
|
||||||
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
|
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
|
||||||
|
@ -1144,7 +1146,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
|
||||||
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
|
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
|
||||||
qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
|
qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
|
||||||
} else {
|
} else {
|
||||||
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
|
qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1166,7 +1168,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
|
||||||
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
|
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
|
pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
|
||||||
|
pPayload, size, 1);
|
||||||
digest[0] = 1;
|
digest[0] = 1;
|
||||||
memcpy(digest + 1, context.digest, tListLen(context.digest));
|
memcpy(digest + 1, context.digest, tListLen(context.digest));
|
||||||
}
|
}
|
||||||
|
@ -1728,7 +1731,8 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) {
|
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
|
||||||
|
const SReadHandle* readHandle) {
|
||||||
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
||||||
|
|
||||||
|
@ -1751,8 +1755,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
||||||
|
|
||||||
// allowed read stt file optimization mode
|
// allowed read stt file optimization mode
|
||||||
pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
|
pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
|
||||||
(pTableScanNode->scan.node.pConditions == NULL) &&
|
(pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
|
||||||
(pTableScanNode->interval == 0);
|
|
||||||
|
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||||
|
@ -1895,7 +1898,8 @@ void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t orde
|
||||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
|
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
|
||||||
slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||||
tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||||
int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
int64_t slidingEnd =
|
||||||
|
taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||||
tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2140,7 +2144,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
if (groupSort && groupByTbname) {
|
if (groupSort && groupByTbname) {
|
||||||
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
} else if (groupByTbname && pScanNode->groupOrderScan){
|
} else if (groupByTbname && pScanNode->groupOrderScan) {
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
} else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
|
} else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
|
@ -2151,7 +2155,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
bool initRemainGroups = false;
|
bool initRemainGroups = false;
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
|
||||||
if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && !(groupSort || pScanNode->groupOrderScan)) {
|
if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
|
||||||
|
!(groupSort || pScanNode->groupOrderScan)) {
|
||||||
initRemainGroups = true;
|
initRemainGroups = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2275,7 +2280,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
|
||||||
}
|
}
|
||||||
if (qDebugFlag & DEBUG_DEBUG) {
|
if (qDebugFlag & DEBUG_DEBUG) {
|
||||||
char* pBuf = NULL;
|
char* pBuf = NULL;
|
||||||
char flagBuf[64];
|
char flagBuf[64];
|
||||||
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
|
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
|
||||||
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
|
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
|
@ -2284,7 +2289,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
|
||||||
|
|
||||||
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
|
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
|
||||||
|
|
||||||
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
|
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
|
||||||
int64_t* ts = (int64_t*)pColData->pData;
|
int64_t* ts = (int64_t*)pColData->pData;
|
||||||
|
|
||||||
int64_t duration = pWin->ekey - pWin->skey + delta;
|
int64_t duration = pWin->ekey - pWin->skey + delta;
|
||||||
|
@ -2293,13 +2298,14 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
|
||||||
ts[4] = pWin->ekey + delta; // window end key
|
ts[4] = pWin->ekey + delta; // window end key
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) {
|
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
|
||||||
|
int32_t rowIndex) {
|
||||||
SColumnDataAgg* pColAgg = NULL;
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
const char* isNull = oldkeyBuf;
|
const char* isNull = oldkeyBuf;
|
||||||
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
|
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
|
for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
|
||||||
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
|
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
|
||||||
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
|
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
|
||||||
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
||||||
|
|
||||||
|
@ -2325,8 +2331,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock,
|
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
|
||||||
int32_t rowIndex) {
|
|
||||||
uint32_t colNum = pSortGroupCols->size;
|
uint32_t colNum = pSortGroupCols->size;
|
||||||
SColumnDataAgg* pColAgg = NULL;
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
char* isNull = keyBuf;
|
char* isNull = keyBuf;
|
||||||
|
@ -2374,7 +2379,7 @@ uint64_t calcGroupId(char* pData, int32_t len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
|
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
|
||||||
SNode* node;
|
SNode* node;
|
||||||
SNodeList* ret = NULL;
|
SNodeList* ret = NULL;
|
||||||
FOREACH(node, pSortKeys) {
|
FOREACH(node, pSortKeys) {
|
||||||
SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
|
SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
|
||||||
|
@ -2390,6 +2395,6 @@ int32_t extractKeysLen(const SArray* keys) {
|
||||||
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
|
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
|
||||||
len += pCol->bytes;
|
len += pCol->bytes;
|
||||||
}
|
}
|
||||||
len += sizeof(int8_t) * keyNum; //null flag
|
len += sizeof(int8_t) * keyNum; // null flag
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue