refactor: do some internal refactor.
This commit is contained in:
parent
81c3d16a2d
commit
08bd021a6f
|
@ -485,10 +485,11 @@ typedef struct SStreamScanInfo {
|
||||||
} SStreamScanInfo;
|
} SStreamScanInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SVnode* vnode;
|
SVnode* vnode;
|
||||||
SSDataBlock pRes; // result SSDataBlock
|
SSDataBlock pRes; // result SSDataBlock
|
||||||
STsdbReader* dataReader;
|
STsdbReader* dataReader;
|
||||||
SSnapContext* sContext;
|
SSnapContext* sContext;
|
||||||
|
STableListInfo* pTableListInfo;
|
||||||
} SStreamRawScanInfo;
|
} SStreamRawScanInfo;
|
||||||
|
|
||||||
typedef struct STableCountScanSupp {
|
typedef struct STableCountScanSupp {
|
||||||
|
|
|
@ -34,6 +34,12 @@
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
bool hasAgg;
|
||||||
|
int32_t numOfRows;
|
||||||
|
int32_t startOffset;
|
||||||
|
} SFunctionCtxStatus;
|
||||||
|
|
||||||
static void destroyAggOperatorInfo(void* param);
|
static void destroyAggOperatorInfo(void* param);
|
||||||
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||||
|
|
||||||
|
@ -44,10 +50,16 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator);
|
||||||
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
|
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
|
||||||
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator);
|
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator);
|
||||||
|
|
||||||
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
|
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
||||||
|
const char* pKey);
|
||||||
|
|
||||||
|
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
|
||||||
|
|
||||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||||
|
|
||||||
|
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
|
||||||
|
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
|
||||||
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
||||||
|
@ -366,7 +378,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
|
||||||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||||
*/
|
*/
|
||||||
if (pResultRow->pageId == -1) {
|
if (pResultRow->pageId == -1) {
|
||||||
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
|
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
}
|
}
|
||||||
|
@ -376,7 +388,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
|
||||||
}
|
}
|
||||||
|
|
||||||
// a new buffer page for each table. Needs to opt this design
|
// a new buffer page for each table. Needs to opt this design
|
||||||
int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
|
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
|
||||||
if (pWindowRes->pageId != -1) {
|
if (pWindowRes->pageId != -1) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -425,3 +437,127 @@ int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf,
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
||||||
|
const char* pKey) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
pAggSup->currentPageId = -1;
|
||||||
|
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
||||||
|
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
|
||||||
|
pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
|
||||||
|
|
||||||
|
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t defaultPgsz = 0;
|
||||||
|
uint32_t defaultBufsz = 0;
|
||||||
|
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
|
||||||
|
|
||||||
|
if (!osTempSpaceAvailable()) {
|
||||||
|
code = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
|
qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void cleanupAggSup(SAggSupporter* pAggSup) {
|
||||||
|
taosMemoryFreeClear(pAggSup->keyBuf);
|
||||||
|
tSimpleHashCleanup(pAggSup->pResultRowHashTable);
|
||||||
|
destroyDiskbasedBuf(pAggSup->pResultBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
||||||
|
const char* pkey, void* pState) {
|
||||||
|
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
if (pState) {
|
||||||
|
pSup->pCtx[i].saveHandle.pBuf = NULL;
|
||||||
|
pSup->pCtx[i].saveHandle.pState = pState;
|
||||||
|
pSup->pCtx[i].exprIdx = i;
|
||||||
|
} else {
|
||||||
|
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
|
||||||
|
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
|
||||||
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
|
// keep it temporarily
|
||||||
|
SFunctionCtxStatus status = {0};
|
||||||
|
functionCtxSave(&pCtx[k], &status);
|
||||||
|
|
||||||
|
pCtx[k].input.startRowIndex = offset;
|
||||||
|
pCtx[k].input.numOfRows = forwardStep;
|
||||||
|
|
||||||
|
// not a whole block involved in query processing, statistics data can not be used
|
||||||
|
// NOTE: the original value of isSet have been changed here
|
||||||
|
if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
|
||||||
|
pCtx[k].input.colDataSMAIsSet = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCtx[k].isPseudoFunc) {
|
||||||
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
|
||||||
|
|
||||||
|
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
|
||||||
|
SColumnInfoData idata = {0};
|
||||||
|
idata.info.type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
|
||||||
|
idata.pData = p;
|
||||||
|
|
||||||
|
SScalarParam out = {.columnData = &idata};
|
||||||
|
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
|
||||||
|
pCtx[k].sfp.process(&tw, 1, &out);
|
||||||
|
pEntryInfo->numOfRes = 1;
|
||||||
|
} else {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
|
||||||
|
code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
|
||||||
|
taskInfo->code = code;
|
||||||
|
T_LONG_JMP(taskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// restore it
|
||||||
|
functionCtxRestore(&pCtx[k], &status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
||||||
|
pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
|
||||||
|
pStatus->numOfRows = pCtx->input.numOfRows;
|
||||||
|
pStatus->startOffset = pCtx->input.startRowIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
||||||
|
pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
|
||||||
|
pCtx->input.numOfRows = pStatus->numOfRows;
|
||||||
|
pCtx->input.startRowIndex = pStatus->startOffset;
|
||||||
|
}
|
|
@ -1192,7 +1192,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
SSnapContext* sContext = pInfo->sContext;
|
SSnapContext* sContext = pInfo->sContext;
|
||||||
|
|
||||||
SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
|
SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
|
||||||
STableListInfo* pTableListInfo = ((STableScanInfo*)(p->info))->base.pTableListInfo;
|
STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
|
||||||
|
|
||||||
if (setForSnapShot(sContext, pOffset->uid) != 0) {
|
if (setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||||
qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
|
qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -34,9 +34,7 @@
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
|
|
||||||
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
|
|
||||||
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
||||||
|
|
||||||
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -81,14 +79,14 @@ static void releaseQueryBuf(size_t numOfTables);
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||||
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
|
||||||
const char* pKey);
|
|
||||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
|
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
|
||||||
int32_t status);
|
int32_t status);
|
||||||
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
||||||
bool createDummyCol);
|
bool createDummyCol);
|
||||||
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo);
|
||||||
|
static SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
||||||
|
|
||||||
void setOperatorCompleted(SOperatorInfo* pOperator) {
|
void setOperatorCompleted(SOperatorInfo* pOperator) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -268,72 +266,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
|
||||||
colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
|
colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
bool hasAgg;
|
|
||||||
int32_t numOfRows;
|
|
||||||
int32_t startOffset;
|
|
||||||
} SFunctionCtxStatus;
|
|
||||||
|
|
||||||
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
|
||||||
pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
|
|
||||||
pStatus->numOfRows = pCtx->input.numOfRows;
|
|
||||||
pStatus->startOffset = pCtx->input.startRowIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
|
||||||
pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
|
|
||||||
pCtx->input.numOfRows = pStatus->numOfRows;
|
|
||||||
pCtx->input.startRowIndex = pStatus->startOffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
|
|
||||||
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
|
|
||||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
|
||||||
// keep it temporarily
|
|
||||||
SFunctionCtxStatus status = {0};
|
|
||||||
functionCtxSave(&pCtx[k], &status);
|
|
||||||
|
|
||||||
pCtx[k].input.startRowIndex = offset;
|
|
||||||
pCtx[k].input.numOfRows = forwardStep;
|
|
||||||
|
|
||||||
// not a whole block involved in query processing, statistics data can not be used
|
|
||||||
// NOTE: the original value of isSet have been changed here
|
|
||||||
if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
|
|
||||||
pCtx[k].input.colDataSMAIsSet = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCtx[k].isPseudoFunc) {
|
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
|
|
||||||
|
|
||||||
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
|
|
||||||
|
|
||||||
SColumnInfoData idata = {0};
|
|
||||||
idata.info.type = TSDB_DATA_TYPE_BIGINT;
|
|
||||||
idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
|
|
||||||
idata.pData = p;
|
|
||||||
|
|
||||||
SScalarParam out = {.columnData = &idata};
|
|
||||||
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
|
|
||||||
pCtx[k].sfp.process(&tw, 1, &out);
|
|
||||||
pEntryInfo->numOfRes = 1;
|
|
||||||
} else {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
|
|
||||||
code = pCtx[k].fpSet.process(&pCtx[k]);
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
|
|
||||||
taskInfo->code = code;
|
|
||||||
T_LONG_JMP(taskInfo->env, code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// restore it
|
|
||||||
functionCtxRestore(&pCtx[k], &status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
||||||
SqlFunctionCtx* pCtx = pExprSup->pCtx;
|
SqlFunctionCtx* pCtx = pExprSup->pCtx;
|
||||||
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
|
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
|
||||||
|
@ -1018,161 +950,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
|
|
||||||
// STableQueryInfo* pTableQueryInfo) {
|
|
||||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
||||||
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
|
|
||||||
//
|
|
||||||
// STimeWindow tw = *win;
|
|
||||||
// getNextTimeWindow(pQueryAttr, &tw);
|
|
||||||
//
|
|
||||||
// if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
|
|
||||||
// (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
|
|
||||||
//
|
|
||||||
// // load the data block and check data remaining in current data block
|
|
||||||
// // TODO optimize performance
|
|
||||||
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
|
|
||||||
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
|
|
||||||
//
|
|
||||||
// tw = *win;
|
|
||||||
// int32_t startPos =
|
|
||||||
// getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
|
|
||||||
//
|
|
||||||
// // set the abort info
|
|
||||||
// pQueryAttr->pos = startPos;
|
|
||||||
//
|
|
||||||
// // reset the query start timestamp
|
|
||||||
// pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
|
|
||||||
// pQueryAttr->window.skey = pTableQueryInfo->win.skey;
|
|
||||||
// TSKEY key = pTableQueryInfo->win.skey;
|
|
||||||
//
|
|
||||||
// pWindowResInfo->prevSKey = tw.skey;
|
|
||||||
// int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
|
|
||||||
//
|
|
||||||
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
|
|
||||||
// pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index
|
|
||||||
//
|
|
||||||
// //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
|
|
||||||
// lastKey:%" PRId64,
|
|
||||||
// GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
|
|
||||||
// pQueryAttr->current->lastKey);
|
|
||||||
//
|
|
||||||
// return key;
|
|
||||||
// } else { // do nothing
|
|
||||||
// pQueryAttr->window.skey = tw.skey;
|
|
||||||
// pWindowResInfo->prevSKey = tw.skey;
|
|
||||||
// pTableQueryInfo->lastKey = tw.skey;
|
|
||||||
//
|
|
||||||
// return tw.skey;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return true;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
|
||||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
||||||
//
|
|
||||||
// // if queried with value filter, do NOT forward query start position
|
|
||||||
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
|
|
||||||
// pRuntimeEnv->pFillInfo != NULL) {
|
|
||||||
// return true;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /*
|
|
||||||
// * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
|
|
||||||
// * pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
|
|
||||||
// value is
|
|
||||||
// * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
|
|
||||||
// */
|
|
||||||
// STimeWindow w = TSWINDOW_INITIALIZER;
|
|
||||||
// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
|
||||||
//
|
|
||||||
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
|
|
||||||
// STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
|
|
||||||
//
|
|
||||||
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
|
||||||
// while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
|
|
||||||
// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
|
|
||||||
//
|
|
||||||
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
|
||||||
// if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
|
||||||
// getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
|
|
||||||
// &w); pWindowResInfo->prevSKey = w.skey;
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
|
|
||||||
// pWindowResInfo->prevSKey = w.skey;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // the first time window
|
|
||||||
// STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
|
|
||||||
//
|
|
||||||
// while (pQueryAttr->limit.offset > 0) {
|
|
||||||
// STimeWindow tw = win;
|
|
||||||
//
|
|
||||||
// if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
|
|
||||||
// pQueryAttr->limit.offset -= 1;
|
|
||||||
// pWindowResInfo->prevSKey = win.skey;
|
|
||||||
//
|
|
||||||
// // current time window is aligned with blockInfo.window.ekey
|
|
||||||
// // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
|
|
||||||
// if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
|
|
||||||
// pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (pQueryAttr->limit.offset == 0) {
|
|
||||||
// *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
|
|
||||||
// return true;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // current window does not ended in current data block, try next data block
|
|
||||||
// getNextTimeWindow(pQueryAttr, &tw);
|
|
||||||
//
|
|
||||||
// /*
|
|
||||||
// * If the next time window still starts from current data block,
|
|
||||||
// * load the primary timestamp column first, and then find the start position for the next queried time window.
|
|
||||||
// * Note that only the primary timestamp column is required.
|
|
||||||
// * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
|
|
||||||
// required
|
|
||||||
// * time window resides in current data block.
|
|
||||||
// */
|
|
||||||
// if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
|
|
||||||
//
|
|
||||||
// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
|
|
||||||
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
|
|
||||||
//
|
|
||||||
// if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
|
|
||||||
// pQueryAttr->limit.offset -= 1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (pQueryAttr->limit.offset == 0) {
|
|
||||||
// *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
|
|
||||||
// return true;
|
|
||||||
// } else {
|
|
||||||
// tw = win;
|
|
||||||
// int32_t startPos =
|
|
||||||
// getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
|
|
||||||
// // set the abort info
|
|
||||||
// pQueryAttr->pos = startPos;
|
|
||||||
// pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
|
|
||||||
// pWindowResInfo->prevSKey = tw.skey;
|
|
||||||
// win = tw;
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// break; // offset is not 0, and next time window begins or ends in the next block.
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // check for error
|
|
||||||
// if (terrno != TSDB_CODE_SUCCESS) {
|
|
||||||
// T_LONG_JMP(pRuntimeEnv->env, terrno);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return true;
|
|
||||||
// }
|
|
||||||
|
|
||||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
|
||||||
p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
|
p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
|
||||||
if (p->pDownstream == NULL) {
|
if (p->pDownstream == NULL) {
|
||||||
|
@ -1301,70 +1078,6 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
|
||||||
const char* pKey) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
|
||||||
|
|
||||||
pAggSup->currentPageId = -1;
|
|
||||||
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
|
||||||
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
|
|
||||||
pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
|
|
||||||
|
|
||||||
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t defaultPgsz = 0;
|
|
||||||
uint32_t defaultBufsz = 0;
|
|
||||||
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
|
|
||||||
|
|
||||||
if (!osTempSpaceAvailable()) {
|
|
||||||
code = TSDB_CODE_NO_AVAIL_DISK;
|
|
||||||
qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void cleanupAggSup(SAggSupporter* pAggSup) {
|
|
||||||
taosMemoryFreeClear(pAggSup->keyBuf);
|
|
||||||
tSimpleHashCleanup(pAggSup->pResultRowHashTable);
|
|
||||||
destroyDiskbasedBuf(pAggSup->pResultBuf);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
|
||||||
const char* pkey, void* pState) {
|
|
||||||
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
if (pState) {
|
|
||||||
pSup->pCtx[i].saveHandle.pBuf = NULL;
|
|
||||||
pSup->pCtx[i].saveHandle.pState = pState;
|
|
||||||
pSup->pCtx[i].exprIdx = i;
|
|
||||||
} else {
|
|
||||||
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
|
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
|
||||||
if (numOfRows == 0) {
|
if (numOfRows == 0) {
|
||||||
numOfRows = 4096;
|
numOfRows = 4096;
|
||||||
|
@ -1474,8 +1187,6 @@ SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t v
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
|
||||||
|
|
||||||
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
|
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
|
@ -2286,7 +1997,7 @@ void qStreamCloseTsdbReader(void* task) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
|
static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
|
||||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
|
taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
|
||||||
|
|
|
@ -2172,19 +2172,19 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
|
code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbReleaseDataBlock(pInfo->dataReader);
|
tsdbReleaseDataBlock(pInfo->dataReader);
|
||||||
longjmp(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->dataReader && hasNext) {
|
if (pInfo->dataReader && hasNext) {
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
tsdbReleaseDataBlock(pInfo->dataReader);
|
tsdbReleaseDataBlock(pInfo->dataReader);
|
||||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
|
SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
longjmp(pTaskInfo->env, terrno);
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
|
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
|
||||||
|
@ -2283,6 +2283,7 @@ static void destroyRawScanOperatorInfo(void* param) {
|
||||||
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
|
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
|
||||||
tsdbReaderClose(pRawScan->dataReader);
|
tsdbReaderClose(pRawScan->dataReader);
|
||||||
destroySnapContext(pRawScan->sContext);
|
destroySnapContext(pRawScan->sContext);
|
||||||
|
tableListDestroy(pRawScan->pTableListInfo);
|
||||||
taosMemoryFree(pRawScan);
|
taosMemoryFree(pRawScan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2304,6 +2305,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pTableListInfo = tableListCreate();
|
||||||
pInfo->vnode = pHandle->vnode;
|
pInfo->vnode = pHandle->vnode;
|
||||||
|
|
||||||
pInfo->sContext = pHandle->sContext;
|
pInfo->sContext = pHandle->sContext;
|
||||||
|
|
|
@ -938,6 +938,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
||||||
int32_t forwardRows =
|
int32_t forwardRows =
|
||||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
|
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
|
||||||
|
|
Loading…
Reference in New Issue