[TD-225] refactor codes.
This commit is contained in:
parent
3afd7a65a7
commit
d7e8b4c184
|
@ -171,11 +171,10 @@ typedef struct SQuery {
|
||||||
|
|
||||||
typedef struct SQueryRuntimeEnv {
|
typedef struct SQueryRuntimeEnv {
|
||||||
jmp_buf env;
|
jmp_buf env;
|
||||||
SResultRow* pResultRow; // todo refactor to merge with SWindowResInfo
|
|
||||||
SQuery* pQuery;
|
SQuery* pQuery;
|
||||||
SQLFunctionCtx* pCtx;
|
SQLFunctionCtx* pCtx;
|
||||||
int32_t numOfRowsPerPage;
|
int32_t numOfRowsPerPage;
|
||||||
uint16_t offset[TSDB_MAX_COLUMNS];
|
uint16_t* offset;
|
||||||
uint16_t scanFlag; // denotes reversed scan of data or not
|
uint16_t scanFlag; // denotes reversed scan of data or not
|
||||||
SFillInfo* pFillInfo;
|
SFillInfo* pFillInfo;
|
||||||
SWindowResInfo windowResInfo;
|
SWindowResInfo windowResInfo;
|
||||||
|
|
|
@ -1627,10 +1627,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
|
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
|
||||||
|
pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t));
|
||||||
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
|
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
|
||||||
pRuntimeEnv->pResultRow = getNewResultRow(pRuntimeEnv->pool);
|
|
||||||
|
|
||||||
if (pRuntimeEnv->pResultRow == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) {
|
if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) {
|
||||||
goto _clean;
|
goto _clean;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1670,15 +1670,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
||||||
assert(isValidDataType(pCtx->inputType));
|
assert(isValidDataType(pCtx->inputType));
|
||||||
pCtx->ptsOutputBuf = NULL;
|
pCtx->ptsOutputBuf = NULL;
|
||||||
|
|
||||||
pCtx->outputBytes = pQuery->pExpr1[i].bytes;
|
pCtx->outputBytes = pQuery->pExpr1[i].bytes;
|
||||||
pCtx->outputType = pQuery->pExpr1[i].type;
|
pCtx->outputType = pQuery->pExpr1[i].type;
|
||||||
|
|
||||||
pCtx->order = pQuery->order.order;
|
pCtx->order = pQuery->order.order;
|
||||||
pCtx->functionId = pSqlFuncMsg->functionId;
|
pCtx->functionId = pSqlFuncMsg->functionId;
|
||||||
pCtx->stableQuery = pRuntimeEnv->stableQuery;
|
pCtx->stableQuery = pRuntimeEnv->stableQuery;
|
||||||
pCtx->interBufBytes = pQuery->pExpr1[i].interBytes;
|
pCtx->interBufBytes = pQuery->pExpr1[i].interBytes;
|
||||||
|
|
||||||
pCtx->numOfParams = pSqlFuncMsg->numOfParams;
|
pCtx->numOfParams = pSqlFuncMsg->numOfParams;
|
||||||
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
||||||
int16_t type = pSqlFuncMsg->arg[j].argType;
|
int16_t type = pSqlFuncMsg->arg[j].argType;
|
||||||
int16_t bytes = pSqlFuncMsg->arg[j].argBytes;
|
int16_t bytes = pSqlFuncMsg->arg[j].argBytes;
|
||||||
|
@ -1726,6 +1726,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
||||||
|
|
||||||
_clean:
|
_clean:
|
||||||
tfree(pRuntimeEnv->pCtx);
|
tfree(pRuntimeEnv->pCtx);
|
||||||
|
tfree(pRuntimeEnv->offset);
|
||||||
|
tfree(pRuntimeEnv->rowCellInfoOffset);
|
||||||
|
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -1775,6 +1777,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
doFreeQueryHandle(pQInfo);
|
doFreeQueryHandle(pQInfo);
|
||||||
|
|
||||||
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
|
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
|
||||||
|
|
||||||
|
tfree(pRuntimeEnv->offset);
|
||||||
tfree(pRuntimeEnv->keyBuf);
|
tfree(pRuntimeEnv->keyBuf);
|
||||||
tfree(pRuntimeEnv->rowCellInfoOffset);
|
tfree(pRuntimeEnv->rowCellInfoOffset);
|
||||||
|
|
||||||
|
@ -3337,7 +3341,13 @@ int32_t initResultRow(SResultRow *pResultRow) {
|
||||||
|
|
||||||
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SResultRow* pRow = pRuntimeEnv->pResultRow;
|
|
||||||
|
SResultRow* pRow = NULL;
|
||||||
|
if (pRuntimeEnv->windowResInfo.size == 0) {
|
||||||
|
int32_t groupIndex = 0;
|
||||||
|
int32_t uid = 0;
|
||||||
|
pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid);
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||||
|
@ -4619,12 +4629,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
||||||
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
|
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
|
||||||
}
|
}
|
||||||
|
|
||||||
// create runtime environment
|
|
||||||
code = setupQueryRuntimeEnv(pRuntimeEnv, pQuery->order.order);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ps = DEFAULT_PAGE_SIZE;
|
int32_t ps = DEFAULT_PAGE_SIZE;
|
||||||
int32_t rowsize = 0;
|
int32_t rowsize = 0;
|
||||||
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
||||||
|
@ -4656,7 +4660,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) {
|
||||||
int32_t numOfResultRows = getInitialPageNum(pQInfo);
|
int32_t numOfResultRows = getInitialPageNum(pQInfo);
|
||||||
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
||||||
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo);
|
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo);
|
||||||
|
@ -4677,6 +4681,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create runtime environment
|
||||||
|
code = setupQueryRuntimeEnv(pRuntimeEnv, pQuery->order.order);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
||||||
SFillColInfo* pColInfo = createFillColInfo(pQuery);
|
SFillColInfo* pColInfo = createFillColInfo(pQuery);
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
|
|
Loading…
Reference in New Issue