commit
4ba7f20504
|
@ -37,7 +37,7 @@ SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
|
|||
#define curTimeWindow(_winres) ((_winres)->curIndex)
|
||||
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
|
||||
|
||||
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize);
|
||||
int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize);
|
||||
|
||||
char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult);
|
||||
|
||||
|
|
|
@ -2978,16 +2978,23 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
}
|
||||
}
|
||||
|
||||
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize) {
|
||||
int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize) {
|
||||
int32_t numOfCols = pQuery->numOfOutput;
|
||||
|
||||
pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo));
|
||||
if (pResultRow->resultInfo == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
pResultRow->pos = *posInfo;
|
||||
|
||||
char* buf = calloc(1, interBufSize);
|
||||
if (buf == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
// set the intermediate result output buffer
|
||||
setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery, buf);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
|
@ -3368,7 +3375,10 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
|
|||
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
||||
int32_t initialSize = 16;
|
||||
int32_t initialThreshold = 100;
|
||||
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
|
||||
int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
} else { // in other aggregate query, do not initialize the windowResInfo
|
||||
}
|
||||
|
||||
|
@ -4189,7 +4199,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
type = TSDB_DATA_TYPE_INT; // group id
|
||||
}
|
||||
|
||||
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 32, 4096, type);
|
||||
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 32, 4096, type);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
|
@ -4206,7 +4219,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
}
|
||||
|
||||
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type);
|
||||
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
||||
|
@ -5738,6 +5754,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
|
||||
size_t s = taosArrayGetSize(pa);
|
||||
SArray* p1 = taosArrayInit(s, POINTER_BYTES);
|
||||
if (p1 == NULL) {
|
||||
goto _cleanup;
|
||||
}
|
||||
taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1);
|
||||
|
||||
for(int32_t j = 0; j < s; ++j) {
|
||||
void* pTable = taosArrayGetP(pa, j);
|
||||
|
@ -5752,13 +5772,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
|
||||
void* buf = pQInfo->pBuf + index * sizeof(STableQueryInfo);
|
||||
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window, buf);
|
||||
if (item == NULL) {
|
||||
goto _cleanup;
|
||||
}
|
||||
item->groupIndex = i;
|
||||
taosArrayPush(p1, &item);
|
||||
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
|
||||
index += 1;
|
||||
}
|
||||
|
||||
taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1);
|
||||
}
|
||||
|
||||
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo));
|
||||
|
|
|
@ -50,9 +50,15 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
|
|||
|
||||
// use the pointer arraylist
|
||||
pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult));
|
||||
if (pWindowResInfo->pResult == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
|
||||
SPosInfo posInfo = {-1, -1};
|
||||
createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo, pRuntimeEnv->interBufSize);
|
||||
int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo, pRuntimeEnv->interBufSize);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -76,9 +82,11 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
|
|||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
|
||||
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
||||
destroyTimeWindowRes(pResult, numOfCols);
|
||||
if (pWindowResInfo->pResult != NULL) {
|
||||
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
|
||||
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
||||
destroyTimeWindowRes(pResult, numOfCols);
|
||||
}
|
||||
}
|
||||
|
||||
taosHashCleanup(pWindowResInfo->hashList);
|
||||
|
|
Loading…
Reference in New Issue