[td-5126]<enhance>:improve the nest query performance.
This commit is contained in:
parent
3d7b4e431e
commit
03455e6fe9
|
@ -276,6 +276,7 @@ typedef struct SQueryRuntimeEnv {
|
||||||
bool enableGroupData;
|
bool enableGroupData;
|
||||||
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||||
|
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
||||||
char* keyBuf; // window key buffer
|
char* keyBuf; // window key buffer
|
||||||
SResultRowPool* pool; // window result object pool
|
SResultRowPool* pool; // window result object pool
|
||||||
char** prevRow;
|
char** prevRow;
|
||||||
|
|
|
@ -411,25 +411,25 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
|
||||||
pResultRowInfo->capacity = (int32_t)newCapacity;
|
pResultRowInfo->capacity = (int32_t)newCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t ascResultRowCompareFn(const void* p1, const void* p2) {
|
//static int32_t ascResultRowCompareFn(const void* p1, const void* p2) {
|
||||||
SResultRow* pRow1 = *(SResultRow**)p1;
|
// SResultRow* pRow1 = *(SResultRow**)p1;
|
||||||
SResultRow* pRow2 = *(SResultRow**)p2;
|
// SResultRow* pRow2 = *(SResultRow**)p2;
|
||||||
|
//
|
||||||
|
// if (pRow1 == pRow2) {
|
||||||
|
// return 0;
|
||||||
|
// } else {
|
||||||
|
// return pRow1->win.skey < pRow2->win.skey? -1:1;
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
if (pRow1 == pRow2) {
|
//static int32_t descResultRowCompareFn(const void* p1, const void* p2) {
|
||||||
return 0;
|
// return -ascResultRowCompareFn(p1, p2);
|
||||||
} else {
|
//}
|
||||||
return pRow1->win.skey < pRow2->win.skey? -1:1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t descResultRowCompareFn(const void* p1, const void* p2) {
|
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, char *pData,
|
||||||
return -ascResultRowCompareFn(p1, p2);
|
int16_t bytes, bool masterscan, uint64_t tableGroupId) {
|
||||||
}
|
|
||||||
|
|
||||||
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
|
|
||||||
int16_t bytes, bool masterscan, uint64_t uid) {
|
|
||||||
bool existed = false;
|
bool existed = false;
|
||||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId);
|
||||||
|
|
||||||
SResultRow **p1 =
|
SResultRow **p1 =
|
||||||
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
|
@ -447,16 +447,20 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
||||||
existed = false;
|
existed = false;
|
||||||
} else if (pResultRowInfo->size == 1) {
|
} else if (pResultRowInfo->size == 1) {
|
||||||
existed = (pResultRowInfo->pResult[0] == (*p1));
|
existed = (pResultRowInfo->pResult[0] == (*p1));
|
||||||
} else {
|
} else { // check if current pResultRowInfo contains the existed pResultRow
|
||||||
__compar_fn_t fn = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr)? ascResultRowCompareFn:descResultRowCompareFn;
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
|
||||||
void* ptr = taosbsearch(p1, pResultRowInfo->pResult, pResultRowInfo->size, POINTER_BYTES, fn, TD_EQ);
|
void* ptr = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
if (ptr != NULL) {
|
existed = (ptr != NULL);
|
||||||
existed = true;
|
// __compar_fn_t fn = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr)? ascResultRowCompareFn:descResultRowCompareFn;
|
||||||
}
|
// void* ptr = taosbsearch(p1, pResultRowInfo->pResult, pResultRowInfo->size, POINTER_BYTES, fn, TD_EQ);
|
||||||
|
// if (ptr != NULL) {
|
||||||
|
// existed = true;
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (p1 != NULL) { // group by column query
|
// In case of group by column query, the required SResultRow object must be existed in the pResultRowInfo object.
|
||||||
|
if (p1 != NULL) {
|
||||||
return *p1;
|
return *p1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -480,6 +484,10 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
||||||
|
|
||||||
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
|
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
|
||||||
pResultRowInfo->current = pResult;
|
pResultRowInfo->current = pResult;
|
||||||
|
|
||||||
|
int64_t dummyVal = 0;
|
||||||
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
|
||||||
|
taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &dummyVal, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
// too many time window in query
|
// too many time window in query
|
||||||
|
@ -615,13 +623,13 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
|
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
|
||||||
bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx,
|
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx,
|
||||||
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
||||||
assert(win->skey <= win->ekey);
|
assert(win->skey <= win->ekey);
|
||||||
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
||||||
|
|
||||||
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId);
|
||||||
if (pResultRow == NULL) {
|
if (pResultRow == NULL) {
|
||||||
*pResult = NULL;
|
*pResult = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -629,7 +637,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
|
||||||
|
|
||||||
// not assign result buffer yet, add new result buffer
|
// not assign result buffer yet, add new result buffer
|
||||||
if (pResultRow->pageId == -1) {
|
if (pResultRow->pageId == -1) {
|
||||||
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQueryAttr->intermediateResultRowSize);
|
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) tableGroupId, pRuntimeEnv->pQueryAttr->intermediateResultRowSize);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1248,7 +1256,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
|
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
|
||||||
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
|
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
|
||||||
|
|
||||||
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
|
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
|
||||||
|
@ -1276,7 +1284,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||||
|
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
||||||
numOfOutput, pInfo->rowCellInfoOffset);
|
numOfOutput, pInfo->rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
@ -1305,7 +1313,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
}
|
}
|
||||||
|
|
||||||
STimeWindow w = pRes->win;
|
STimeWindow w = pRes->win;
|
||||||
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, groupId, pInfo->pCtx,
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
||||||
numOfOutput, pInfo->rowCellInfoOffset);
|
numOfOutput, pInfo->rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
@ -1323,7 +1331,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore current time window
|
// restore current time window
|
||||||
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
||||||
numOfOutput, pInfo->rowCellInfoOffset);
|
numOfOutput, pInfo->rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
@ -1343,7 +1351,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
}
|
}
|
||||||
|
|
||||||
// null data, failed to allocate more memory buffer
|
// null data, failed to allocate more memory buffer
|
||||||
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, groupId,
|
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId,
|
||||||
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
||||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
@ -1483,7 +1491,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
||||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
||||||
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
||||||
pBInfo->rowCellInfoOffset);
|
pBInfo->rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
|
@ -1504,7 +1512,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
||||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
||||||
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
||||||
pBInfo->rowCellInfoOffset);
|
pBInfo->rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
|
@ -1547,7 +1555,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic
|
||||||
len = varDataLen(pData);
|
len = varDataLen(pData);
|
||||||
}
|
}
|
||||||
|
|
||||||
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex);
|
int64_t tid = 0;
|
||||||
|
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex);
|
||||||
assert (pResultRow != NULL);
|
assert (pResultRow != NULL);
|
||||||
|
|
||||||
setResultRowKey(pResultRow, pData, type);
|
setResultRowKey(pResultRow, pData, type);
|
||||||
|
@ -1812,6 +1821,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
||||||
pRuntimeEnv->pQueryAttr = pQueryAttr;
|
pRuntimeEnv->pQueryAttr = pQueryAttr;
|
||||||
|
|
||||||
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t));
|
pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t));
|
||||||
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
|
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
|
||||||
|
|
||||||
|
@ -2061,6 +2071,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
|
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
|
||||||
pRuntimeEnv->pTableRetrieveTsMap = NULL;
|
pRuntimeEnv->pTableRetrieveTsMap = NULL;
|
||||||
|
|
||||||
|
taosHashCleanup(pRuntimeEnv->pResultRowListSet);
|
||||||
|
pRuntimeEnv->pResultRowListSet = NULL;
|
||||||
|
|
||||||
destroyOperatorInfo(pRuntimeEnv->proot);
|
destroyOperatorInfo(pRuntimeEnv->proot);
|
||||||
|
|
||||||
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
|
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
|
||||||
|
@ -2791,7 +2804,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
||||||
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
||||||
|
|
||||||
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
||||||
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
|
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
||||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
@ -2837,7 +2850,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
||||||
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
||||||
|
|
||||||
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
||||||
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
|
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
||||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
@ -3251,8 +3264,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
|
||||||
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
|
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
|
||||||
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
|
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
|
||||||
|
|
||||||
int32_t tid = 0;
|
int64_t tid = 0;
|
||||||
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid);
|
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
|
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
|
@ -3483,10 +3496,13 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
|
||||||
}
|
}
|
||||||
|
|
||||||
void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx,
|
void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx,
|
||||||
int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t groupIndex) {
|
int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId) {
|
||||||
|
// for simple group by query without interval, all the tables belong to one group result.
|
||||||
int64_t uid = 0;
|
int64_t uid = 0;
|
||||||
|
int64_t tid = 0;
|
||||||
|
|
||||||
SResultRow* pResultRow =
|
SResultRow* pResultRow =
|
||||||
doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid);
|
doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid);
|
||||||
assert (pResultRow != NULL);
|
assert (pResultRow != NULL);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -3494,7 +3510,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
|
||||||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||||
*/
|
*/
|
||||||
if (pResultRow->pageId == -1) {
|
if (pResultRow->pageId == -1) {
|
||||||
int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQueryAttr->resultRowSize);
|
int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, tableGroupId, pRuntimeEnv->pQueryAttr->resultRowSize);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -3503,20 +3519,20 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
|
||||||
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
|
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex,
|
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId,
|
||||||
TSKEY nextKey) {
|
TSKEY nextKey) {
|
||||||
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
|
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
|
||||||
|
|
||||||
// lastKey needs to be updated
|
// lastKey needs to be updated
|
||||||
pTableQueryInfo->lastKey = nextKey;
|
pTableQueryInfo->lastKey = nextKey;
|
||||||
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
|
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == tableGroupId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, groupIndex);
|
doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, tableGroupId);
|
||||||
|
|
||||||
// record the current active group id
|
// record the current active group id
|
||||||
pRuntimeEnv->prevGroupId = groupIndex;
|
pRuntimeEnv->prevGroupId = tableGroupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
|
void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
|
||||||
|
@ -5498,7 +5514,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
||||||
} else {
|
} else {
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
||||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
||||||
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
||||||
pBInfo->rowCellInfoOffset);
|
pBInfo->rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
|
@ -5518,7 +5534,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
||||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
||||||
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
||||||
pBInfo->rowCellInfoOffset);
|
pBInfo->rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
|
|
|
@ -1,147 +0,0 @@
|
||||||
system sh/stop_dnodes.sh
|
|
||||||
|
|
||||||
system sh/deploy.sh -n dnode1 -i 1
|
|
||||||
system sh/cfg.sh -n dnode1 -c walLevel -v 1
|
|
||||||
system sh/exec.sh -n dnode1 -s start
|
|
||||||
sleep 100
|
|
||||||
sql connect
|
|
||||||
sleep 100
|
|
||||||
|
|
||||||
print ========== sub_in_from.sim
|
|
||||||
$i = 0
|
|
||||||
|
|
||||||
$dbPrefix = subdb
|
|
||||||
$tbPrefix = sub_tb
|
|
||||||
$stbPrefix = sub_stb
|
|
||||||
$tbNum = 10
|
|
||||||
$rowNum = 1000
|
|
||||||
$totalNum = $tbNum * $rowNum
|
|
||||||
$loops = 200000
|
|
||||||
$log = 10000
|
|
||||||
$ts0 = 1537146000000
|
|
||||||
$delta = 600000
|
|
||||||
$i = 0
|
|
||||||
$db = $dbPrefix . $i
|
|
||||||
$stb = $stbPrefix . $i
|
|
||||||
|
|
||||||
sql drop database $db -x step1
|
|
||||||
step1:
|
|
||||||
sql create database $db cache 16 maxrows 4096 keep 36500
|
|
||||||
print ====== create tables
|
|
||||||
sql use $db
|
|
||||||
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
|
|
||||||
|
|
||||||
$i = 0
|
|
||||||
$ts = $ts0
|
|
||||||
$halfNum = $tbNum / 2
|
|
||||||
while $i < $halfNum
|
|
||||||
$tbId = $i + $halfNum
|
|
||||||
$tb = $tbPrefix . $i
|
|
||||||
$tb1 = $tbPrefix . $tbId
|
|
||||||
sql create table $tb using $stb tags( $i )
|
|
||||||
sql create table $tb1 using $stb tags( $tbId )
|
|
||||||
|
|
||||||
$x = 0
|
|
||||||
while $x < $rowNum
|
|
||||||
$xs = $x * $delta
|
|
||||||
$ts = $ts0 + $xs
|
|
||||||
$c = $x / 10
|
|
||||||
$c = $c * 10
|
|
||||||
$c = $x - $c
|
|
||||||
$binary = 'binary . $c
|
|
||||||
$binary = $binary . '
|
|
||||||
$nchar = 'nchar . $c
|
|
||||||
$nchar = $nchar . '
|
|
||||||
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar )
|
|
||||||
sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
|
|
||||||
$x = $x + 1
|
|
||||||
endw
|
|
||||||
|
|
||||||
$i = $i + 1
|
|
||||||
endw
|
|
||||||
print ====== tables created
|
|
||||||
|
|
||||||
sql_error select count(*) from (select count(*) from abc.sub_stb0)
|
|
||||||
sql_error select val + 20 from (select count(*) from sub_stb0 interval(10h))
|
|
||||||
sql_error select abc+20 from (select count(*) from sub_stb0 interval(1s))
|
|
||||||
|
|
||||||
sql select count(*) from (select count(*) from sub_stb0 interval(10h))
|
|
||||||
if $rows != 1 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data00 != 18 then
|
|
||||||
print expect 18, actual: $data00
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
sql select ts from (select count(*) from sub_stb0 interval(10h))
|
|
||||||
if $rows != 18 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data00 != @18-09-17 04:00:00.000@ then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data01 != @18-09-17 14:00:00.000@ then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
sql select val + 20, val from (select count(*) as val from sub_stb0 interval(10h))
|
|
||||||
if $rows != 18 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data00 != 320.000000 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data01 != 300 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data10 != 620 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data11 != 600 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data20 != 620 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data21 != 600 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
sql select max(val), min(val), max(val) - min(val) from (select count(*) val from sub_stb0 interval(10h))
|
|
||||||
if $rows != 1 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data00 != 600 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data01 != 100 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
if $data02 != 500.000000 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
sql select first(ts,val),last(ts,val) from (select count(*) val from sub_stb0 interval(10h))
|
|
||||||
sql select top(val, 5) from (select count(*) val from sub_stb0 interval(10h))
|
|
||||||
sql select diff(val) from (select count(*) val from sub_stb0 interval(10h))
|
|
||||||
sql select apercentile(val, 50) from (select count(*) val from sub_stb0 interval(10h))
|
|
||||||
|
|
||||||
# not support yet
|
|
||||||
sql select percentile(val, 50) from (select count(*) val from sub_stb0 interval(10h))
|
|
||||||
sql select stddev(val) from (select count(*) val from sub_stb0 interval(10h))
|
|
||||||
|
|
||||||
print ====================>complex query
|
|
||||||
|
|
Loading…
Reference in New Issue