[td-805] decrease the memory consumption during interval query.
This commit is contained in:
parent
9191700f10
commit
f14c76d2f6
|
@ -52,10 +52,10 @@ typedef struct SWindowStatus {
|
|||
|
||||
typedef struct SWindowResult {
|
||||
uint16_t numOfRows; // number of rows of current time window
|
||||
SWindowStatus status; // this result status: closed or opened
|
||||
SPosInfo pos; // Position of current result in disk-based output buffer
|
||||
SResultInfo* resultInfo; // For each result column, there is a resultInfo
|
||||
STimeWindow window; // The time window that current result covers.
|
||||
SWindowStatus status; // this result status: closed or opened
|
||||
} SWindowResult;
|
||||
|
||||
/**
|
||||
|
@ -122,6 +122,7 @@ typedef struct SQueryCostInfo {
|
|||
uint32_t discardBlocks;
|
||||
uint64_t elapsedTime;
|
||||
uint64_t computTime;
|
||||
uint64_t internalSupSize;
|
||||
} SQueryCostInfo;
|
||||
|
||||
typedef struct SQuery {
|
||||
|
|
|
@ -398,8 +398,18 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
|||
|
||||
// more than the capacity, reallocate the resources
|
||||
if (pWindowResInfo->size >= pWindowResInfo->capacity) {
|
||||
int64_t newCap = pWindowResInfo->capacity * 1.5;
|
||||
int64_t newCap = 0;
|
||||
if (pWindowResInfo->capacity > 10000) {
|
||||
newCap = pWindowResInfo->capacity * 1.25;
|
||||
} else {
|
||||
newCap = pWindowResInfo->capacity * 1.5;
|
||||
}
|
||||
|
||||
printf("%ld\n", newCap);
|
||||
|
||||
char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult));
|
||||
pRuntimeEnv->summary.internalSupSize += (newCap - pWindowResInfo->capacity) * sizeof(SWindowResult);
|
||||
|
||||
if (t == NULL) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -2659,7 +2669,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
|
|||
qDebug("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
|
||||
}
|
||||
|
||||
if (pQInfo->groupIndex == numOfGroups) {
|
||||
if (pQInfo->groupIndex == numOfGroups && pQInfo->offset == pQInfo->numOfGroupResultPages) {
|
||||
SET_STABLE_QUERY_OVER(pQInfo);
|
||||
}
|
||||
|
||||
|
@ -2705,7 +2715,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
|||
memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->num, bytes * pData->num);
|
||||
}
|
||||
|
||||
// rows += pData->num;
|
||||
offset += pData->num;
|
||||
}
|
||||
|
||||
|
@ -2796,6 +2805,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|||
int64_t startt = taosGetTimestampMs();
|
||||
|
||||
while (1) {
|
||||
if (IS_QUERY_KILLED(pQInfo)) {
|
||||
qDebug("QInfo:%p it is already killed, abort", pQInfo);
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
int32_t pos = pTree->pNode[0].index;
|
||||
|
||||
SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
|
||||
|
@ -3958,6 +3972,8 @@ static void queryCostStatis(SQInfo *pQInfo) {
|
|||
" load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
|
||||
pQInfo, pSummary->elapsedTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
|
||||
pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
|
||||
|
||||
qDebug("QInfo:%p :cost summary: internal size:%"PRId64, pQInfo, pSummary->internalSupSize);
|
||||
}
|
||||
|
||||
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
|
||||
|
@ -6039,8 +6055,6 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
setQueryKilled(pQInfo);
|
||||
|
||||
qDebug("QInfo:%p start to free QInfo", pQInfo);
|
||||
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
|
||||
taosTFree(pQuery->sdata[col]);
|
||||
|
|
|
@ -251,6 +251,10 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
// todo add more error check here
|
||||
// register the qhandle to connect to quit query immediate if connection is broken
|
||||
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId);
|
||||
|
||||
bool freeHandle = true;
|
||||
bool buildRes = false;
|
||||
|
||||
|
|
Loading…
Reference in New Issue