Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

This commit is contained in:
Hongze Cheng 2022-06-13 09:11:11 +00:00
commit c94ac9a1e4
15 changed files with 409 additions and 151 deletions

View File

@ -124,7 +124,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
// distributed splitting functions // distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL, FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
FUNCTION_TYPE_APERCENTILE_MERGE, FUNCTION_TYPE_APERCENTILE_MERGE,
FUNCTION_TYPE_SPREAD_PARTIAL, FUNCTION_TYPE_SPREAD_PARTIAL,
FUNCTION_TYPE_SPREAD_MERGE, FUNCTION_TYPE_SPREAD_MERGE,
@ -134,6 +134,10 @@ typedef enum EFunctionType {
FUNCTION_TYPE_HYPERLOGLOG_MERGE, FUNCTION_TYPE_HYPERLOGLOG_MERGE,
FUNCTION_TYPE_ELAPSED_PARTIAL, FUNCTION_TYPE_ELAPSED_PARTIAL,
FUNCTION_TYPE_ELAPSED_MERGE, FUNCTION_TYPE_ELAPSED_MERGE,
FUNCTION_TYPE_TOP_PARTIAL,
FUNCTION_TYPE_TOP_MERGE,
FUNCTION_TYPE_BOTTOM_PARTIAL,
FUNCTION_TYPE_BOTTOM_MERGE,
// user defined funcion // user defined funcion
FUNCTION_TYPE_UDF = 10000 FUNCTION_TYPE_UDF = 10000

View File

@ -830,6 +830,11 @@ void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
SRequestObj *pRequest = res; SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false; pRequest->body.resInfo.convertUcs4 = false;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed
pResultInfo->current = pResultInfo->numOfRows;
taos_fetch_rows_a(res, fp, param); taos_fetch_rows_a(res, fp, param);
} }

View File

@ -1320,8 +1320,24 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col
void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GE); void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GE);
if(p == NULL){ if(p == NULL){
taosArrayPush(oneTable->cols, &cols); taosArrayPush(oneTable->cols, &cols);
}else{ }else{ // to make the sort stable for update data
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols); SArray *sa = (SArray *)p;
SSmlKv *cur = (SSmlKv*)taosArrayGet(sa, 0);
SSmlKv *dCur = (SSmlKv*)taosArrayGet(cols, 0);
if(cur->i > dCur->i){
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
}else{
ASSERT(cur->i == dCur->i);
int32_t index = TARRAY_ELEM_IDX(oneTable->cols, p) + 1;
for(; index < taosArrayGetSize(oneTable->cols); index++){
SArray *tmp = (SArray *)taosArrayGet(oneTable->cols, index);
SSmlKv *curTs = (SSmlKv*)taosArrayGet(tmp, 0);
if(curTs->i > dCur->i){
break;
}
}
taosArrayInsert(oneTable->cols, index, &cols);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1339,8 +1355,24 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col
void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GE); void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GE);
if(p == NULL){ if(p == NULL){
taosArrayPush(oneTable->cols, &kvHash); taosArrayPush(oneTable->cols, &kvHash);
}else{ }else{ // to make the sort stable for update data
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash); SHashObj *sa = (SHashObj *)p;
SSmlKv *cur = (SSmlKv *)taosHashGet(sa, TS, TS_LEN);
SSmlKv *dCur = (SSmlKv*)taosArrayGet(cols, 0);
if(cur->i > dCur->i){
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
}else{
ASSERT(cur->i == dCur->i);
int32_t index = TARRAY_ELEM_IDX(oneTable->cols, p) + 1;
for(; index < taosArrayGetSize(oneTable->cols); index++){
SHashObj *tmp = (SHashObj *)taosArrayGet(oneTable->cols, index);
SSmlKv *curTs = (SSmlKv *)taosHashGet(tmp, TS, TS_LEN);
if(curTs->i > dCur->i){
break;
}
}
taosArrayInsert(oneTable->cols, index, &cols);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -968,7 +968,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
pAction->rawWritten = 0; pAction->rawWritten = 0;
pAction->errCode = 0; pAction->errCode = 0;
mDebug("trans:%d, %s:%d null action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id); mDebug("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
pTrans->lastAction = pAction->id; pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType; pTrans->lastMsgType = pAction->msgType;
@ -1025,18 +1025,18 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
if (numOfExecuted == numOfActions) { if (numOfExecuted == numOfActions) {
if (errCode == 0) { if (errCode == 0) {
pTrans->lastAction = 0; pTrans->lastAction = 0;
pTrans->lastErrorNo = 0;
pTrans->lastMsgType = 0; pTrans->lastMsgType = 0;
memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset)); memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset));
pTrans->lastErrorNo = 0;
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions); mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0; return 0;
} else { } else {
mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF); mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF);
if (pErrAction != NULL) { if (pErrAction != NULL) {
pTrans->lastMsgType = pErrAction->msgType;
pTrans->lastAction = pErrAction->id; pTrans->lastAction = pErrAction->id;
pTrans->lastErrorNo = pErrAction->errCode; pTrans->lastMsgType = pErrAction->msgType;
pTrans->lastEpset = pErrAction->epSet; pTrans->lastEpset = pErrAction->epSet;
pTrans->lastErrorNo = pErrAction->errCode;
} }
mndTransResetActions(pMnode, pTrans, pArray); mndTransResetActions(pMnode, pTrans, pArray);
terrno = errCode; terrno = errCode;
@ -1103,13 +1103,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} }
if (code == 0) { if (code == 0) {
pTrans->lastAction = 0; pTrans->lastAction = action;
pTrans->lastErrorNo = 0;
pTrans->lastMsgType = 0; pTrans->lastMsgType = 0;
pTrans->lastErrorNo = 0;
memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset)); memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset));
} else { } else {
pTrans->lastMsgType = pAction->msgType;
pTrans->lastAction = action; pTrans->lastAction = action;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastErrorNo = code; pTrans->lastErrorNo = code;
pTrans->lastEpset = pAction->epSet; pTrans->lastEpset = pAction->epSet;
} }

View File

@ -3159,6 +3159,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
} }
// handle data in cache situation // handle data in cache situation
// bool tsdbNextDataBlock(tsdbReaderT pHandle, uint64_t uid)
bool tsdbNextDataBlock(tsdbReaderT pHandle) { bool tsdbNextDataBlock(tsdbReaderT pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;

View File

@ -1034,6 +1034,39 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
// sort key
EXPLAIN_ROW_NEW(level + 1, "Merge Key: ");
if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
}
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
// sort method
EXPLAIN_ROW_NEW(level + 1, "Sort Method: ");
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0);
SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo;
EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort");
if (pExecInfo->sortBuffer > 1024 * 1024) {
EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0));
} else if (pExecInfo->sortBuffer > 1024) {
EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0));
} else {
EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer);
}
EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
}
if (verbose) { if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,

View File

@ -4553,7 +4553,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
} }
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId,
SNode* pTagCond); SNode* pTagCond);
static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo,

View File

@ -3195,7 +3195,6 @@ _error:
typedef struct SMergeIntervalAggOperatorInfo { typedef struct SMergeIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo; SIntervalAggOperatorInfo intervalAggOperatorInfo;
SHashObj* groupIntervalHash;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SSDataBlock* prefetchedBlock; SSDataBlock* prefetchedBlock;
@ -3204,39 +3203,24 @@ typedef struct SMergeIntervalAggOperatorInfo {
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
taosHashCleanup(miaInfo->groupIntervalHash);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
} }
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, TSKEY wstartTs) {
STimeWindow* newWin) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
if (prevWin == NULL) { SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return 0; ASSERT(p1 != NULL);
}
if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) { finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId); pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, pTaskInfo);
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
if (newWin == NULL) {
taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
} else {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
}
}
return 0; return 0;
} }
@ -3252,13 +3236,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
int32_t numOfOutput = pOperatorInfo->numOfExprs; int32_t numOfOutput = pOperatorInfo->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo); int64_t* tsCols = extractTsCol(pBlock, iaInfo);
uint64_t tableGroupId = pBlock->info.groupId; uint64_t tableGroupId = pBlock->info.groupId;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, STimeWindow win;
iaInfo->interval.precision, &iaInfo->win); win.skey = blockStartTs;
win.ekey = taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
//TODO: remove the hash table usage (groupid + winkey => result row position)
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
@ -3266,72 +3251,39 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
TSKEY ekey = ascScan ? win.ekey : win.skey; TSKEY currTs = blockStartTs;
int32_t forwardRows = TSKEY currPos = startPos;
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); STimeWindow currWin = win;
ASSERT(forwardRows > 0);
// prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup,
pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// window start key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, iaInfo->order);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos; ++currPos;
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order); if (currPos >= pBlock->info.rows) {
if (startPos < 0) {
break; break;
} }
if (tsCols[currPos] == currTs) {
continue;
} else {
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
// null data, failed to allocate more memory buffer outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, currTs = tsCols[currPos];
&iaInfo->aggSup, pTaskInfo); currWin.skey = currTs;
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
} }
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
// window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos,
forwardRows);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
if (iaInfo->timeWindowInterpo) { outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
}
} }
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
@ -3385,13 +3337,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
} }
pRes->info.groupId = miaInfo->groupId; pRes->info.groupId = miaInfo->groupId;
} else {
void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL);
if (p != NULL) {
size_t len = 0;
uint64_t* pKey = taosHashGetKey(p, &len);
outputPrevIntervalResult(pOperator, *pKey, pRes, NULL);
}
} }
if (pRes->info.rows == 0) { if (pRes->info.rows == 0) {
@ -3421,7 +3366,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
iaInfo->execModel = pTaskInfo->execModel; iaInfo->execModel = pTaskInfo->execModel;
iaInfo->primaryTsIndex = primaryTsSlotId; iaInfo->primaryTsIndex = primaryTsSlotId;
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);

View File

@ -99,11 +99,18 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t topFunctionMerge(SqlFunctionCtx *pCtx);
int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx);
int32_t bottomFunctionMerge(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getTopBotInfoSize(int64_t numOfItems);
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);

View File

@ -326,7 +326,7 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (2 != numOfParams) { if (2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
@ -361,8 +361,62 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
return translateTop(pFunc, pErrBuf, len); int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (isPartial) {
if (2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)pParamNode1;
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
pValue->notReserved = true;
// set result type
pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (TSDB_DATA_TYPE_BINARY != para1Type) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// Do nothing. We can only access output of partial functions as input,
// so original input type cannot be obtained, resType will be set same
// as original function input type after merge function created.
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTopBotImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTopBotImpl(pFunc, pErrBuf, len, false);
} }
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
@ -1483,23 +1537,71 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "top", .name = "top",
.type = FUNCTION_TYPE_TOP, .type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTop, .translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup, .initFunc = topBotFunctionSetup,
.processFunc = topFunction, .processFunc = topFunction,
.finalizeFunc = topBotFinalize, .finalizeFunc = topBotFinalize,
.combineFunc = topCombine, .combineFunc = topCombine,
.pPartialFunc = "_top_partial",
.pMergeFunc = "_top_merge"
},
{
.name = "_top_partial",
.type = FUNCTION_TYPE_TOP_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotPartial,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup,
.processFunc = topFunction,
.finalizeFunc = topBotPartialFinalize,
.combineFunc = topCombine,
},
{
.name = "_top_merge",
.type = FUNCTION_TYPE_TOP_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotMerge,
.getEnvFunc = getTopBotMergeFuncEnv,
.initFunc = functionSetup,
.processFunc = topFunctionMerge,
.finalizeFunc = topBotMergeFinalize,
.combineFunc = topCombine,
}, },
{ {
.name = "bottom", .name = "bottom",
.type = FUNCTION_TYPE_BOTTOM, .type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateBottom, .translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup, .initFunc = topBotFunctionSetup,
.processFunc = bottomFunction, .processFunc = bottomFunction,
.finalizeFunc = topBotFinalize, .finalizeFunc = topBotFinalize,
.combineFunc = bottomCombine, .combineFunc = bottomCombine,
.pPartialFunc = "_bottom_partial",
.pMergeFunc = "_bottom_merge"
},
{
.name = "_bottom_partial",
.type = FUNCTION_TYPE_BOTTOM_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotPartial,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup,
.processFunc = bottomFunction,
.finalizeFunc = topBotPartialFinalize,
.combineFunc = bottomCombine,
},
{
.name = "_bottom_merge",
.type = FUNCTION_TYPE_BOTTOM_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotMerge,
.getEnvFunc = getTopBotMergeFuncEnv,
.initFunc = functionSetup,
.processFunc = bottomFunctionMerge,
.finalizeFunc = topBotMergeFinalize,
.combineFunc = bottomCombine,
}, },
{ {
.name = "spread", .name = "spread",

View File

@ -66,6 +66,9 @@ typedef struct STopBotResItem {
} STopBotResItem; } STopBotResItem;
typedef struct STopBotRes { typedef struct STopBotRes {
int32_t maxSize;
int16_t type; //store the original input type, used in merge function
int32_t numOfItems;
STopBotResItem* pItems; STopBotResItem* pItems;
} STopBotRes; } STopBotRes;
@ -2647,12 +2650,35 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
return numOfElems; return numOfElems;
} }
int32_t getTopBotInfoSize(int64_t numOfItems) {
return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem);
}
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem); pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
return true; return true;
} }
bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
//intermediate result is binary and length contains VAR header size
pEnv->calcMemSize = pFunc->node.resType.bytes;
return true;
}
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
pRes->maxSize = pCtx->param[1].param.i;
return true;
}
static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
@ -2664,6 +2690,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t topFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
@ -2671,7 +2699,8 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type; STopBotRes* pRes = getTopBotOutputInfo(pCtx);
pRes->type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < pInput->numOfRows + start; ++i) { for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
@ -2681,7 +2710,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
numOfElems++; numOfElems++;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true); doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2694,7 +2723,8 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type; STopBotRes* pRes = getTopBotOutputInfo(pCtx);
pRes->type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < pInput->numOfRows + start; ++i) { for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
@ -2704,12 +2734,57 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
numOfElems++; numOfElems++;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false); doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) {
for (int32_t i = 0; i < pInput->numOfItems; i++) {
addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery);
}
}
int32_t topFunctionMerge(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->maxSize = pInputInfo->maxSize;
pInfo->type = pInputInfo->type;
topBotTransferInfo(pCtx, pInputInfo, true);
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->maxSize = pInputInfo->maxSize;
pInfo->type = pInputInfo->type;
topBotTransferInfo(pCtx, pInputInfo, false);
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) { static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) {
uint16_t type = *(uint16_t*)param; uint16_t type = *(uint16_t*)param;
@ -2740,7 +2815,6 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
SVariant val = {0}; SVariant val = {0};
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
@ -2749,7 +2823,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
assert(pItems != NULL); assert(pItems != NULL);
// not full yet // not full yet
if (pEntryInfo->numOfRes < maxSize) { if (pEntryInfo->numOfRes < pRes->maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = val; pItem->v = val;
pItem->uid = uid; pItem->uid = uid;
@ -2759,6 +2833,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// allocate the buffer and keep the data of this row into the new allocated buffer // allocate the buffer and keep the data of this row into the new allocated buffer
pEntryInfo->numOfRes++; pEntryInfo->numOfRes++;
// accumulate number of items for each vgroup, this info is needed for merge
pRes->numOfItems++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
!isTopQuery); !isTopQuery);
} else { // replace the minimum value in the result } else { // replace the minimum value in the result
@ -2860,11 +2936,11 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
releaseBufPage(pCtx->pBuf, pPage); releaseBufPage(pCtx->pBuf, pPage);
} }
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t type = pCtx->input.pData[0]->info.type; int16_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
@ -2880,29 +2956,58 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
} }
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); if (!isMerge) {
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
}
currentRow += 1; currentRow += 1;
} }
return pEntryInfo->numOfRes; return pEntryInfo->numOfRes;
} }
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return topBotFinalizeImpl(pCtx, pBlock, false);
}
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return topBotFinalizeImpl(pCtx, pBlock, true);
}
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getTopBotInfoSize(pRes->maxSize);
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pRes, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return 1;
}
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
bool isTopQuery) { bool isTopQuery) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
STopBotResItem* pItems = pRes->pItems; STopBotResItem* pItems = pRes->pItems;
assert(pItems != NULL); assert(pItems != NULL);
// not full yet // not full yet
if (pEntryInfo->numOfRes < maxSize) { if (pEntryInfo->numOfRes < pRes->maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = pSourceItem->v; pItem->v = pSourceItem->v;
pItem->uid = pSourceItem->uid; pItem->uid = pSourceItem->uid;
pItem->tuplePos.pageId = -1; pItem->tuplePos.pageId = -1;
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
pEntryInfo->numOfRes++; pEntryInfo->numOfRes++;
// accumulate number of items for each vgroup, this info is needed for merge
pRes->numOfItems++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
!isTopQuery); !isTopQuery);
} else { // replace the minimum value in the result } else { // replace the minimum value in the result
@ -2948,15 +3053,15 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getSpreadInfoSize() {
return (int32_t)sizeof(SSpreadInfo);
}
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSpreadInfo); pEnv->calcMemSize = sizeof(SSpreadInfo);
return true; return true;
} }
int32_t getSpreadInfoSize() {
return (int32_t)sizeof(SSpreadInfo);
}
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) { if (!functionSetup(pCtx, pResultInfo)) {
return false; return false;
@ -3083,7 +3188,7 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = (int32_t)sizeof(SSpreadInfo); int32_t resultBytes = getSpreadInfoSize();
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes); memcpy(varDataVal(res), pInfo, resultBytes);

View File

@ -202,6 +202,27 @@ bool fmIsInvertible(int32_t funcId) {
return res; return res;
} }
//function has same input/output type
bool fmIsSameInOutType(int32_t funcId) {
bool res = false;
switch (funcMgtBuiltins[funcId].type) {
case FUNCTION_TYPE_MAX:
case FUNCTION_TYPE_MIN:
case FUNCTION_TYPE_TOP:
case FUNCTION_TYPE_BOTTOM:
case FUNCTION_TYPE_FIRST:
case FUNCTION_TYPE_LAST:
case FUNCTION_TYPE_SAMPLE:
case FUNCTION_TYPE_TAIL:
case FUNCTION_TYPE_UNIQUE:
res = true;
break;
default:
break;
}
return res;
}
static int32_t getFuncInfo(SFunctionNode* pFunc) { static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[64] = {0}; char msg[64] = {0};
if (NULL != gFunMgtService.pFuncNameHashTable) { if (NULL != gFunMgtService.pFuncNameHashTable) {
@ -276,6 +297,10 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
nodesDestroyList(pParameterList); nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
//overwrite function restype set by translate function
if (fmIsSameInOutType(pSrcFunc->funcId)) {
(*pMergeFunc)->node.resType = pSrcFunc->node.resType;
}
strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName); strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -133,8 +133,8 @@ class TDDnode:
"qDebugFlag": "143", "qDebugFlag": "143",
"rpcDebugFlag": "143", "rpcDebugFlag": "143",
"tmrDebugFlag": "131", "tmrDebugFlag": "131",
"uDebugFlag": "143", "uDebugFlag": "131",
"sDebugFlag": "135", "sDebugFlag": "143",
"wDebugFlag": "143", "wDebugFlag": "143",
"qdebugFlag": "143", "qdebugFlag": "143",
"numOfLogLines": "100000000", "numOfLogLines": "100000000",