feat(stream):change single interval disc buff and interval delete
This commit is contained in:
parent
aa09f79bbb
commit
1f5d9a38e7
|
@ -554,6 +554,8 @@ typedef struct {
|
|||
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
||||
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
||||
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal);
|
||||
void streamFreeVal(void* val);
|
||||
|
||||
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
|
||||
|
|
|
@ -411,7 +411,7 @@ typedef enum EStreamScanMode {
|
|||
STREAM_SCAN_FROM_READERHANDLE = 1,
|
||||
STREAM_SCAN_FROM_RES,
|
||||
STREAM_SCAN_FROM_UPDATERES,
|
||||
STREAM_SCAN_FROM_DELETERES,
|
||||
STREAM_SCAN_FROM_DELETE_DATA,
|
||||
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
|
||||
STREAM_SCAN_FROM_DATAREADER_RANGE,
|
||||
} EStreamScanMode;
|
||||
|
@ -794,6 +794,7 @@ typedef struct SStreamPartitionOperatorInfo {
|
|||
void* parIte;
|
||||
SSDataBlock* pInputDataBlock;
|
||||
int32_t tsColIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
} SStreamPartitionOperatorInfo;
|
||||
|
||||
typedef struct STimeSliceOperatorInfo {
|
||||
|
@ -1108,6 +1109,13 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
|
|||
bool groupbyTbname(SNodeList* pGroupList);
|
||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
|
||||
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||
int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||
SGroupResInfo* pGroupResInfo);
|
||||
int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult);
|
||||
int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -3938,7 +3938,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
|
||||
pTaskInfo, isStream);
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
|
||||
pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
|
||||
SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
|
||||
|
@ -4410,3 +4410,108 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SWinKey key = {
|
||||
.ts = win->skey,
|
||||
.groupId = tableGroupId,
|
||||
};
|
||||
char* value = NULL;
|
||||
int32_t size = pAggSup->resultRowSize;
|
||||
if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
*pResult = (SResultRow*)value;
|
||||
ASSERT(*pResult);
|
||||
// set time window for current result
|
||||
(*pResult)->win = (*win);
|
||||
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult) {
|
||||
streamStateReleaseBuf(pTaskInfo->streamInfo.pState, pKey, pResult);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
|
||||
streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||
SGroupResInfo* pGroupResInfo) {
|
||||
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||
int32_t numOfExprs = pSup->numOfExprs;
|
||||
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
|
||||
SqlFunctionCtx* pCtx = pSup->pCtx;
|
||||
|
||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||
|
||||
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
|
||||
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
int32_t size = 0;
|
||||
void* pVal = NULL;
|
||||
SWinKey key = {
|
||||
.ts = *(TSKEY*)pPos->key,
|
||||
.groupId = pPos->groupId,
|
||||
};
|
||||
int32_t code = streamStateGet(pTaskInfo->streamInfo.pState, &key, &pVal, &size);
|
||||
ASSERT(code == 0);
|
||||
SResultRow* pRow = (SResultRow*)pVal;
|
||||
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
||||
// no results, continue to check the next one
|
||||
if (pRow->numOfRows == 0) {
|
||||
pGroupResInfo->index += 1;
|
||||
releaseOutputBuf(pTaskInfo, &key, pRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pBlock->info.groupId == 0) {
|
||||
pBlock->info.groupId = pPos->groupId;
|
||||
} else {
|
||||
// current value belongs to different group, it can't be packed into one datablock
|
||||
if (pBlock->info.groupId != pPos->groupId) {
|
||||
releaseOutputBuf(pTaskInfo, &key, pRow);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||
ASSERT(pBlock->info.rows > 0);
|
||||
releaseOutputBuf(pTaskInfo, &key, pRow);
|
||||
break;
|
||||
}
|
||||
|
||||
pGroupResInfo->index += 1;
|
||||
|
||||
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
|
||||
|
||||
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
||||
if (pCtx[j].fpSet.finalize) {
|
||||
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (TAOS_FAILED(code)) {
|
||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||
// do nothing, todo refactor
|
||||
} else {
|
||||
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
|
||||
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||
}
|
||||
}
|
||||
}
|
||||
releaseOutputBuf(pTaskInfo, &key, pRow);
|
||||
pBlock->info.rows += pRow->numOfRows;
|
||||
}
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -13,26 +13,26 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "function.h"
|
||||
#include "os.h"
|
||||
#include "tname.h"
|
||||
|
||||
#include "tdatablock.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
#include "executorInt.h"
|
||||
#include "executorimpl.h"
|
||||
#include "tcompare.h"
|
||||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
#include "executorInt.h"
|
||||
|
||||
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
|
||||
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
|
||||
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
|
||||
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
|
||||
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
|
||||
int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
|
||||
|
||||
static void freeGroupKey(void* param) {
|
||||
SGroupKeys* pKey = (SGroupKeys*) param;
|
||||
SGroupKeys* pKey = (SGroupKeys*)param;
|
||||
taosMemoryFree(pKey->pData);
|
||||
}
|
||||
|
||||
|
@ -62,13 +62,13 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
|
|||
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
SColumn* pCol = taosArrayGet(pGroupColList, i);
|
||||
(*keyLen) += pCol->bytes; // actual data + null_flag
|
||||
(*keyLen) += pCol->bytes; // actual data + null_flag
|
||||
|
||||
SGroupKeys key = {0};
|
||||
key.bytes = pCol->bytes;
|
||||
key.type = pCol->type;
|
||||
key.bytes = pCol->bytes;
|
||||
key.type = pCol->type;
|
||||
key.isNull = false;
|
||||
key.pData = taosMemoryCalloc(1, pCol->bytes);
|
||||
key.pData = taosMemoryCalloc(1, pCol->bytes);
|
||||
if (key.pData == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -87,7 +87,8 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
||||
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
|
||||
int32_t numOfGroupCols) {
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
SColumn* pCol = taosArrayGet(pGroupCols, i);
|
||||
|
@ -112,7 +113,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
|
|||
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t dataLen = getJsonValueLen(val);
|
||||
|
||||
if (memcmp(pkey->pData, val, dataLen) == 0){
|
||||
if (memcmp(pkey->pData, val, dataLen) == 0) {
|
||||
continue;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -154,7 +155,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
|||
pkey->isNull = false;
|
||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||
if(tTagIsJson(val)){
|
||||
if (tTagIsJson(val)) {
|
||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||
return;
|
||||
}
|
||||
|
@ -198,13 +199,13 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
|||
}
|
||||
}
|
||||
|
||||
return (int32_t) (pStart - (char*)pKey);
|
||||
return (int32_t)(pStart - (char*)pKey);
|
||||
}
|
||||
|
||||
// assign the group keys or user input constant values if required
|
||||
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
if (pCtx[i].functionId == -1) { // select count(*),key from t group by key.
|
||||
if (pCtx[i].functionId == -1) { // select count(*),key from t group by key.
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
|
||||
|
||||
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
|
||||
|
@ -221,7 +222,7 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
|
|||
} else {
|
||||
memcpy(dest, data, pColInfoData->info.bytes);
|
||||
}
|
||||
} else { // it is a NULL value
|
||||
} else { // it is a NULL value
|
||||
pEntryInfo->isNullRes = 1;
|
||||
}
|
||||
|
||||
|
@ -275,7 +276,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
|
||||
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
|
||||
len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
@ -291,9 +293,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
|
||||
if (num > 0) {
|
||||
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||
int32_t ret =
|
||||
setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
|
||||
pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
|
||||
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
|
||||
len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
@ -308,7 +309,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
|
|||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
while(1) {
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pRes, NULL);
|
||||
|
||||
|
@ -323,7 +324,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
return (pRes->info.rows == 0)? NULL:pRes;
|
||||
return (pRes->info.rows == 0) ? NULL : pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||
|
@ -334,7 +335,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
return buildGroupResultDataBlock(pOperator);
|
||||
|
@ -343,7 +344,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int64_t st = taosGetTimestampUs();
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while (1) {
|
||||
|
@ -362,7 +363,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
|
||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||
if (pInfo->scalarSup.pExprInfo != NULL) {
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
|
||||
pInfo->scalarSup.numOfExprs, NULL);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
@ -403,8 +405,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pGroupCols = pGroupColList;
|
||||
pInfo->pCondition = pCondition;
|
||||
pInfo->pGroupCols = pGroupColList;
|
||||
pInfo->pCondition = pCondition;
|
||||
|
||||
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -425,14 +427,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
initBasicInfo(&pInfo->binfo, pResultBlock);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
|
||||
pOperator->name = "GroupbyAggOperator";
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->name = "GroupbyAggOperator";
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
// pOperator->operatorType = OP_Groupby;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL,
|
||||
destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -440,7 +443,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
destroyGroupOperatorInfo(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
@ -448,7 +451,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
}
|
||||
|
||||
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
|
@ -457,7 +460,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||
|
||||
SDataGroupInfo* pGroupInfo = NULL;
|
||||
void *pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
|
||||
void* pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
|
||||
|
||||
pGroupInfo->numOfRows += 1;
|
||||
|
||||
|
@ -467,32 +470,32 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
// number of rows
|
||||
int32_t* rows = (int32_t*) pPage;
|
||||
int32_t* rows = (int32_t*)pPage;
|
||||
|
||||
size_t numOfCols = pOperator->exprSupp.numOfExprs;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
|
||||
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
int32_t bytes = pColInfoData->info.bytes;
|
||||
int32_t startOffset = pInfo->columnOffset[i];
|
||||
|
||||
int32_t* columnLen = NULL;
|
||||
int32_t* columnLen = NULL;
|
||||
int32_t contentLen = 0;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
int32_t* offset = (int32_t*)((char*)pPage + startOffset);
|
||||
columnLen = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
|
||||
char* data = (char*)((char*) columnLen + sizeof(int32_t));
|
||||
columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
|
||||
char* data = (char*)((char*)columnLen + sizeof(int32_t));
|
||||
|
||||
if (colDataIsNull_s(pColInfoData, j)) {
|
||||
offset[(*rows)] = -1;
|
||||
contentLen = 0;
|
||||
} else if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
|
||||
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
offset[*rows] = (*columnLen);
|
||||
char* src = colDataGetData(pColInfoData, j);
|
||||
char* src = colDataGetData(pColInfoData, j);
|
||||
int32_t dataLen = getJsonValueLen(src);
|
||||
|
||||
memcpy(data + (*columnLen), src, dataLen);
|
||||
|
@ -511,8 +514,8 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
} else {
|
||||
char* bitmap = (char*)pPage + startOffset;
|
||||
columnLen = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
|
||||
char* data = (char*) columnLen + sizeof(int32_t);
|
||||
columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
|
||||
char* data = (char*)columnLen + sizeof(int32_t);
|
||||
|
||||
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
|
||||
if (isNull) {
|
||||
|
@ -539,7 +542,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
|||
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
|
||||
|
||||
void* pPage = NULL;
|
||||
if (p == NULL) { // it is a new group
|
||||
if (p == NULL) { // it is a new group
|
||||
SDataGroupInfo gi = {0};
|
||||
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
|
||||
taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
|
||||
|
@ -550,12 +553,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
|||
pPage = getNewBufPage(pInfo->pBuf, &pageId);
|
||||
taosArrayPush(p->pPageList, &pageId);
|
||||
|
||||
*(int32_t *) pPage = 0;
|
||||
*(int32_t*)pPage = 0;
|
||||
} else {
|
||||
int32_t* curId = taosArrayGetLast(p->pPageList);
|
||||
pPage = getBufPage(pInfo->pBuf, *curId);
|
||||
|
||||
int32_t *rows = (int32_t*) pPage;
|
||||
int32_t* rows = (int32_t*)pPage;
|
||||
if (*rows >= pInfo->rowCapacity) {
|
||||
// release buffer
|
||||
releaseBufPage(pInfo->pBuf, pPage);
|
||||
|
@ -585,17 +588,18 @@ uint64_t calcGroupId(char* pData, int32_t len) {
|
|||
}
|
||||
|
||||
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
|
||||
|
||||
offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
|
||||
offset[0] = sizeof(int32_t) +
|
||||
sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
|
||||
|
||||
for(int32_t i = 0; i < numOfCols - 1; ++i) {
|
||||
for (int32_t i = 0; i < numOfCols - 1; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
int32_t bytes = pColInfoData->info.bytes;
|
||||
int32_t payloadLen = bytes * rowCapacity;
|
||||
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
// offset segment + content length + payload
|
||||
offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
|
||||
|
@ -609,9 +613,9 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
|||
}
|
||||
|
||||
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
|
||||
void *ite = NULL;
|
||||
while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) {
|
||||
taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList);
|
||||
void* ite = NULL;
|
||||
while ((ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL) {
|
||||
taosArrayDestroy(((SDataGroupInfo*)ite)->pPageList);
|
||||
}
|
||||
taosArrayClear(pInfo->sortedGroupArray);
|
||||
clearDiskbasedBuf(pInfo->pBuf);
|
||||
|
@ -626,13 +630,14 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
return (pGroupInfo1->groupId < pGroupInfo2->groupId)? -1:1;
|
||||
return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
|
||||
}
|
||||
|
||||
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
|
||||
SDataGroupInfo* pGroupInfo =
|
||||
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
|
||||
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
|
||||
// try next group data
|
||||
++pInfo->groupIndex;
|
||||
|
@ -647,7 +652,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
|
||||
void* page = getBufPage(pInfo->pBuf, *pageId);
|
||||
void* page = getBufPage(pInfo->pBuf, *pageId);
|
||||
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
|
||||
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
|
||||
|
@ -670,14 +675,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
blockDataCleanup(pRes);
|
||||
return buildPartitionResult(pOperator);
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int64_t st = taosGetTimestampUs();
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while (1) {
|
||||
|
@ -688,7 +693,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
|
||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||
if (pInfo->scalarSup.pExprInfo != NULL) {
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
|
||||
pInfo->scalarSup.numOfExprs, NULL);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
@ -727,7 +733,7 @@ static void destroyPartitionOperatorInfo(void* param) {
|
|||
cleanupBasicInfo(&pInfo->binfo);
|
||||
taosArrayDestroy(pInfo->pGroupCols);
|
||||
|
||||
for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
|
||||
for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
|
||||
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
|
||||
taosMemoryFree(key.pData);
|
||||
}
|
||||
|
@ -743,24 +749,25 @@ static void destroyPartitionOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
||||
|
||||
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
|
||||
if (pPartNode->pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
|
||||
int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
|
||||
int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -772,7 +779,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
|||
goto _error;
|
||||
}
|
||||
|
||||
uint32_t defaultPgsz = 0;
|
||||
uint32_t defaultPgsz = 0;
|
||||
uint32_t defaultBufsz = 0;
|
||||
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
|
||||
|
||||
|
@ -794,15 +801,15 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pOperator->name = "PartitionOperator";
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->name = "PartitionOperator";
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
|
@ -810,16 +817,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
|||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
|
||||
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
|
||||
int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
|
||||
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||
|
||||
|
@ -833,37 +840,36 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
|
|||
|
||||
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {
|
||||
if (pExprSup->pExprInfo != NULL) {
|
||||
int32_t code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||
int32_t code =
|
||||
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("calaculate group id error, code:%d", code);
|
||||
}
|
||||
}
|
||||
recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId);
|
||||
int32_t len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
|
||||
int32_t len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
|
||||
uint64_t groupId = calcGroupId(pParSup->keyBuf, len);
|
||||
return groupId;
|
||||
}
|
||||
|
||||
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) {
|
||||
return pInfo->parIte != NULL;
|
||||
}
|
||||
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
|
||||
|
||||
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pDest = pInfo->binfo.pRes;
|
||||
SSDataBlock* pDest = pInfo->binfo.pRes;
|
||||
ASSERT(hasRemainPartion(pInfo));
|
||||
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
|
||||
blockDataCleanup(pDest);
|
||||
int32_t rows = taosArrayGetSize(pParInfo->rowIds);
|
||||
int32_t rows = taosArrayGetSize(pParInfo->rowIds);
|
||||
SSDataBlock* pSrc = pInfo->pInputDataBlock;
|
||||
for (int32_t i = 0; i < rows; i++) {
|
||||
int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);
|
||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
|
||||
int32_t slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
|
||||
int32_t slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
|
||||
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
|
||||
bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
|
||||
char* pSrcData = colDataGetData(pSrcCol, rowIndex);
|
||||
bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
|
||||
char* pSrcData = colDataGetData(pSrcCol, rowIndex);
|
||||
colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
|
||||
}
|
||||
pDest->info.rows++;
|
||||
|
@ -881,9 +887,9 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
|
|||
pInfo->pInputDataBlock = pBlock;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i);
|
||||
int32_t keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
|
||||
SPartitionDataInfo* pParData =
|
||||
(SPartitionDataInfo*) taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
|
||||
int32_t keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
|
||||
SPartitionDataInfo* pParData =
|
||||
(SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
|
||||
if (pParData) {
|
||||
taosArrayPush(pParData->rowIds, &i);
|
||||
} else {
|
||||
|
@ -891,8 +897,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
|
|||
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
|
||||
newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
|
||||
taosArrayPush(newParData.rowIds, &i);
|
||||
taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData,
|
||||
sizeof(SPartitionDataInfo));
|
||||
taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -902,13 +907,13 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
if (hasRemainPartion(pInfo)) {
|
||||
return buildStreamPartitionResult(pOperator);
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int64_t st = taosGetTimestampUs();
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
{
|
||||
pInfo->pInputDataBlock = NULL;
|
||||
|
@ -924,14 +929,18 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
case STREAM_INVALID:
|
||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||
break;
|
||||
case STREAM_DELETE_DATA: {
|
||||
copyDataBlock(pInfo->pDelRes, pBlock);
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||
} break;
|
||||
default:
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||
if (pInfo->scalarSup.pExprInfo != NULL) {
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock,
|
||||
pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
|
||||
pInfo->scalarSup.numOfExprs, NULL);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
@ -940,7 +949,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
doStreamHashPartitionImpl(pInfo, pBlock);
|
||||
}
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
|
||||
pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
|
||||
return buildStreamPartitionResult(pOperator);
|
||||
}
|
||||
|
@ -950,7 +959,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
|
|||
cleanupBasicInfo(&pInfo->binfo);
|
||||
taosArrayDestroy(pInfo->partitionSup.pGroupCols);
|
||||
|
||||
for(int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++){
|
||||
for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
|
||||
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
|
||||
taosMemoryFree(key.pData);
|
||||
}
|
||||
|
@ -958,6 +967,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
|
|||
|
||||
taosMemoryFree(pInfo->partitionSup.keyBuf);
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
blockDataDestroy(pInfo->pDelRes);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -970,7 +980,8 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
|
|||
pScanInfo->pPartScalarSup = pExpr;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -980,7 +991,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
|
||||
if (pPartNode->pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
int32_t num = 0;
|
||||
SExprInfo* pCalExprInfo = createExprInfo(pPartNode->pExprs, NULL, &num);
|
||||
code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -989,7 +1000,8 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
}
|
||||
|
||||
int32_t keyLen = 0;
|
||||
code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupCols);
|
||||
code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
|
||||
pInfo->partitionSup.pGroupCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -1000,35 +1012,35 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
goto _error;
|
||||
}
|
||||
blockDataEnsureCapacity(pResBlock, 4096);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->parIte = NULL;
|
||||
pInfo->pInputDataBlock = NULL;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->parIte = NULL;
|
||||
pInfo->pInputDataBlock = NULL;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
pInfo->tsColIndex = 0;
|
||||
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
pInfo->tsColIndex = 0;
|
||||
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
||||
|
||||
pOperator->name = "StreamPartitionOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->name = "StreamPartitionOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL, destroyStreamPartitionOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL,
|
||||
destroyStreamPartitionOperatorInfo, NULL, NULL, NULL);
|
||||
|
||||
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(pInfo);
|
||||
destroyStreamPartitionOperatorInfo(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -1057,24 +1057,24 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
|
|||
return true;
|
||||
}
|
||||
|
||||
static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo,
|
||||
int32_t* pRowIndex, bool hasGroup) {
|
||||
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, SInterval* pInterval,
|
||||
SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
|
||||
STimeWindow endWin = win;
|
||||
STimeWindow preWin = win;
|
||||
while (1) {
|
||||
if (hasGroup) {
|
||||
(*pRowIndex) += 1;
|
||||
} else {
|
||||
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, startTsCol, *pRowIndex, endWin.ekey, binarySearchForKey,
|
||||
NULL, TSDB_ORDER_ASC);
|
||||
}
|
||||
do {
|
||||
preWin = endWin;
|
||||
getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
|
||||
} while (tsCol[(*pRowIndex) - 1] >= endWin.skey);
|
||||
} while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
|
||||
endWin = preWin;
|
||||
if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows) {
|
||||
win.ekey = endWin.ekey;
|
||||
|
@ -1102,6 +1102,11 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
|||
return NULL;
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, pResult, NULL);
|
||||
if (pResult->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pInfo->partitionSup.needCalc) {
|
||||
SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
|
||||
blockDataCleanup(pResult);
|
||||
|
@ -1188,13 +1193,15 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
return code;
|
||||
}
|
||||
|
||||
SColumnInfoData* pSrcTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
||||
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
|
||||
ASSERT(pSrcTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* tsCol = (TSKEY*)pSrcTsCol->pData;
|
||||
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
|
@ -1204,12 +1211,13 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
int64_t version = pSrcBlock->info.version - 1;
|
||||
for (int32_t i = 0; i < rows;) {
|
||||
uint64_t srcUid = srcUidData[i];
|
||||
uint64_t groupId = getGroupIdByData(pInfo, srcUid, tsCol[i], version);
|
||||
uint64_t groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
|
||||
uint64_t srcGpId = srcGp[i];
|
||||
TSKEY calStartTs = tsCol[i];
|
||||
TSKEY calStartTs = srcStartTsCol[i];
|
||||
colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
|
||||
STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i, pInfo->partitionSup.needCalc);
|
||||
TSKEY calEndTs = tsCol[i - 1];
|
||||
STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, &pInfo->interval, &pSrcBlock->info, &i,
|
||||
pInfo->partitionSup.needCalc);
|
||||
TSKEY calEndTs = srcStartTsCol[i - 1];
|
||||
colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
|
||||
colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
|
||||
colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
|
||||
|
@ -1229,11 +1237,49 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||
if (pSrcBlock->info.rows == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
blockDataCleanup(pDestBlock);
|
||||
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
|
||||
|
||||
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
int32_t dummy = 0;
|
||||
int64_t version = pSrcBlock->info.version - 1;
|
||||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
|
||||
colDataAppend(pDestStartCol, i, (const char*)(startData + i), false);
|
||||
colDataAppend(pDestEndCol, i, (const char*)(endData + i), false);
|
||||
colDataAppendNULL(pDestUidCol, i);
|
||||
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
|
||||
colDataAppendNULL(pDestCalStartTsCol, i);
|
||||
colDataAppendNULL(pDestCalEndTsCol, i);
|
||||
pDestBlock->info.rows++;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (isIntervalWindow(pInfo)) {
|
||||
code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
|
||||
} else {
|
||||
} else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
|
||||
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
|
||||
}
|
||||
pDestBlock->info.type = STREAM_CLEAR;
|
||||
|
@ -1510,14 +1556,23 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
|
||||
} break;
|
||||
case STREAM_DELETE_DATA: {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->updateResIndex = 0;
|
||||
generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
return pInfo->pDeleteDataRes;
|
||||
printDataBlock(pBlock, "stream scan delete recv");
|
||||
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
|
||||
generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
|
||||
printDataBlock(pBlock, "stream scan delete result");
|
||||
return pInfo->pDeleteDataRes;
|
||||
} else {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->updateResIndex = 0;
|
||||
generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
printDataBlock(pBlock, "stream scan delete data");
|
||||
return pInfo->pDeleteDataRes;
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
|
@ -1532,7 +1587,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
return pInfo->pRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_DELETERES: {
|
||||
case STREAM_SCAN_FROM_DELETE_DATA: {
|
||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
|
@ -1646,7 +1701,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
return pInfo->pUpdateDataRes;
|
||||
} else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DELETERES;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -955,8 +955,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
|
||||
STimeWindow win =
|
||||
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
|
||||
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
||||
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
||||
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -983,7 +983,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
|
||||
numOfOutput);
|
||||
numOfOutput);
|
||||
|
||||
doCloseWindow(pResultRowInfo, pInfo, pResult);
|
||||
|
||||
|
@ -1406,20 +1406,25 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
|
|||
SHashObj* pUpdatedMap) {
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
|
||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* tsEnds = (TSKEY*)pEndCol->pData;
|
||||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* groupIds = (uint64_t*)pGroupCol->pData;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC);
|
||||
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
|
||||
SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]};
|
||||
if (pDelWins) {
|
||||
taosArrayPush(pDelWins, &winRes);
|
||||
}
|
||||
if (pUpdatedMap) {
|
||||
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||
}
|
||||
do {
|
||||
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
|
||||
SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]};
|
||||
if (pDelWins) {
|
||||
taosArrayPush(pDelWins, &winRes);
|
||||
}
|
||||
if (pUpdatedMap) {
|
||||
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||
}
|
||||
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
||||
} while (win.skey < tsEnds[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2775,7 +2780,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
|
|||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
|
||||
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
||||
}
|
||||
if (num > 1 && pUpdatedMap) {
|
||||
if (num > 0 && pUpdatedMap) {
|
||||
saveWinResultRow(pCurResult, pWinRes->groupId, pUpdatedMap);
|
||||
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur);
|
||||
}
|
||||
|
@ -2807,15 +2812,14 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
|
|||
|
||||
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
|
||||
|
||||
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
|
||||
SHashObj* pUpdatedMap) {
|
||||
static void doHashIntervalAgg(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
|
||||
SHashObj* pUpdatedMap) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
||||
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
SExprSupp* pSup = &pOperatorInfo->exprSupp;
|
||||
int32_t numOfOutput = pSup->numOfExprs;
|
||||
int32_t step = 1;
|
||||
bool ascScan = true;
|
||||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t forwardRows = 0;
|
||||
|
@ -2824,7 +2828,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
|||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
|
||||
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
||||
int32_t startPos = 0;
|
||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
|
||||
STimeWindow nextWin = {0};
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
|
@ -3165,7 +3169,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||
}
|
||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
|
||||
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
|
||||
doHashIntervalAgg(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
int32_t chIndex = getChildIndex(pBlock);
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
|
@ -3183,7 +3187,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
|
||||
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
||||
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
|
||||
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
|
||||
doHashIntervalAgg(pChildOp, pBlock, pBlock->info.groupId, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5468,25 +5472,24 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
|
||||
SSDataBlock* pBlock, int32_t scanFlag, SHashObj* pUpdatedMap) {
|
||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
||||
int32_t scanFlag, SHashObj* pUpdatedMap) {
|
||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
SExprSupp* pSup = &pOperatorInfo->exprSupp;
|
||||
|
||||
int32_t startPos = 0;
|
||||
int32_t numOfOutput = pSup->numOfExprs;
|
||||
int32_t startPos = 0;
|
||||
int32_t numOfOutput = pSup->numOfExprs;
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
uint64_t tableGroupId = pBlock->info.groupId;
|
||||
bool ascScan = true;
|
||||
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
|
||||
SResultRow* pResult = NULL;
|
||||
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
uint64_t tableGroupId = pBlock->info.groupId;
|
||||
bool ascScan = true;
|
||||
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
STimeWindow win =
|
||||
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
|
||||
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
|
||||
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
||||
|
@ -5547,11 +5550,88 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo
|
|||
}
|
||||
}
|
||||
|
||||
static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
|
||||
SHashObj* pUpdatedMap) {
|
||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
|
||||
|
||||
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
SExprSupp* pSup = &pOperatorInfo->exprSupp;
|
||||
int32_t numOfOutput = pSup->numOfExprs;
|
||||
int32_t step = 1;
|
||||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t forwardRows = 0;
|
||||
int32_t aa = 4;
|
||||
|
||||
ASSERT(pSDataBlock->pDataBlock != NULL);
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
|
||||
int32_t startPos = 0;
|
||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
|
||||
STimeWindow nextWin =
|
||||
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
|
||||
while (1) {
|
||||
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
|
||||
if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
|
||||
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t code = setOutputBuf(&nextWin, &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset,
|
||||
&pInfo->aggSup, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
|
||||
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
|
||||
}
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
pSDataBlock->info.rows, numOfOutput);
|
||||
SWinKey key = {
|
||||
.ts = nextWin.skey,
|
||||
.groupId = tableGroupId,
|
||||
};
|
||||
saveOutput(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize);
|
||||
releaseOutputBuf(pTaskInfo, &key, pResult);
|
||||
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
|
||||
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
|
||||
startPos =
|
||||
getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void doBuildResult(SOperatorInfo* pOperator, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
// set output datablock version
|
||||
pBlock->info.version = pTaskInfo->version;
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
if (!hasRemainResults(pGroupResInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// clear the existed group id
|
||||
pBlock->info.groupId = 0;
|
||||
buildDataBlockFromGroupRes(pTaskInfo, pBlock, &pOperator->exprSupp, pGroupResInfo);
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int64_t maxTs = INT64_MIN;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int64_t maxTs = INT64_MIN;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -5622,6 +5702,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
|
||||
// new disc buf
|
||||
// doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
|
||||
}
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
|
||||
|
@ -5664,6 +5746,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
taosArraySort(pUpdated, resultrowComparAsc);
|
||||
|
||||
// new disc buf
|
||||
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
@ -5676,6 +5759,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
// new disc buf
|
||||
// doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
}
|
||||
|
@ -5697,25 +5782,29 @@ void destroyStreamIntervalOperatorInfo(void* param) {
|
|||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||
ASSERT(numOfCols > 0);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, };
|
||||
STimeWindowAggSupp twAggSupp = {.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN, };
|
||||
SInterval interval = {
|
||||
.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
|
||||
};
|
||||
STimeWindowAggSupp twAggSupp = {
|
||||
.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pInfo->interval = interval;
|
||||
|
@ -5732,11 +5821,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
}
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;;
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -5758,8 +5847,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL,
|
||||
destroyStreamIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamIntervalOperatorInfo,
|
||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
|
@ -112,6 +112,29 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
|
|||
return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn);
|
||||
}
|
||||
|
||||
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
|
||||
// todo refactor
|
||||
int32_t size = *pVLen;
|
||||
if (streamStateGet(pState, key, pVal, pVLen) == 0) {
|
||||
return 0;
|
||||
}
|
||||
void* tmp = taosMemoryCalloc(1, size);
|
||||
if (streamStatePut(pState, key, &tmp, size) == 0) {
|
||||
taosMemoryFree(tmp);
|
||||
int32_t code = streamStateGet(pState, key, pVal, pVLen);
|
||||
ASSERT(code == 0);
|
||||
return code;
|
||||
}
|
||||
taosMemoryFree(tmp);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
|
||||
// todo refactor
|
||||
streamFreeVal(pVal);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
|
||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||
if (pCur == NULL) return NULL;
|
||||
|
|
|
@ -5,7 +5,7 @@ sleep 50
|
|||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 1
|
||||
sql create database test vgroups 1;
|
||||
sql select * from information_schema.ins_databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
|
@ -13,7 +13,7 @@ endi
|
|||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
sql use test
|
||||
sql use test;
|
||||
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
|
|
|
@ -0,0 +1,419 @@
|
|||
$loop_all = 0
|
||||
looptest:
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 200
|
||||
sql connect
|
||||
|
||||
sql drop stream if exists streams0;
|
||||
sql drop stream if exists streams1;
|
||||
sql drop stream if exists streams2;
|
||||
sql drop stream if exists streams3;
|
||||
sql drop stream if exists streams4;
|
||||
sql drop database if exists test;
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from t1 interval(10s);
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
sleep 200
|
||||
sql delete from t1 where ts = 1648791213000;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791213001,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791213002,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791213003,4,4,4,4.0);
|
||||
|
||||
sleep 200
|
||||
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791223000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223001,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223002,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791223003,3,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop4:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop5:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791213005,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791213006,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791213007,4,4,4,4.0);
|
||||
|
||||
sql insert into t1 values(1648791223000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791223002,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791223003,4,4,4,4.0);
|
||||
|
||||
sql insert into t1 values(1648791233000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791233001,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791233008,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791233009,4,4,4,4.0);
|
||||
|
||||
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop6:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data02 != 1 then
|
||||
print =====data02=$data02
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data12 != 4 then
|
||||
print =====data12=$data12
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
sql drop stream if exists streams2;
|
||||
sql drop database if exists test2;
|
||||
sql create database test2 vgroups 4;
|
||||
sql use test2;
|
||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st interval(10s);
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop7:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t1 where ts = 1648791213000;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop8:
|
||||
sleep 200
|
||||
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791223000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223001,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223002,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791223003,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791223000,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791223001,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791223002,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791223003,3,2,3,1.0);
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t2 where ts >= 1648791223000 and ts <= 1648791223001;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop11:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop11
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop11
|
||||
endi
|
||||
|
||||
if $data11 != 6 then
|
||||
print =====data11=$data11
|
||||
goto loop11
|
||||
endi
|
||||
|
||||
if $data12 != 3 then
|
||||
print =====data12=$data12
|
||||
goto loop11
|
||||
endi
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from st where ts >= 1648791223000 and ts <= 1648791223003;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop12:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213004,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791213005,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791213006,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791223004,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791213004,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791213005,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791213006,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791223004,1,2,3,1.0);
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t2 where ts >= 1648791213004 and ts <= 1648791213006;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop13:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data01 != 4 then
|
||||
print =====data01=$data01
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data02 != 3 then
|
||||
print =====data02=$data02
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data12 != 1 then
|
||||
print =====data12=$data12
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791223005,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223006,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791223005,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791223006,1,2,3,1.0);
|
||||
|
||||
sql insert into t1 values(1648791233005,4,2,3,1.0);
|
||||
sql insert into t1 values(1648791233006,2,2,3,1.0);
|
||||
sql insert into t2 values(1648791233005,5,2,3,1.0);
|
||||
sql insert into t2 values(1648791233006,3,2,3,1.0);
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from st where ts >= 1648791213001 and ts <= 1648791233005;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop14:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data12 != 3 then
|
||||
print =====data12=$data12
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
|
||||
#goto looptest
|
Loading…
Reference in New Issue