Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact
This commit is contained in:
commit
c5d27e7cbc
|
@ -188,6 +188,13 @@ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
|
|||
*/
|
||||
void dBufPrintStatis(const SDiskbasedBuf* pBuf);
|
||||
|
||||
/**
|
||||
* Set all of page buffer are not need
|
||||
* @param pBuf
|
||||
* @return
|
||||
*/
|
||||
void clearDiskbasedBuf(SDiskbasedBuf* pBuf);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1500,7 +1500,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
|
|||
for (int32_t k = 0; k < colNum; k++) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
if (pColInfoData->hasNull) {
|
||||
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
|
||||
printf(" %15s |", "NULL");
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -488,6 +488,8 @@ typedef struct SStreamFinalIntervalOperatorInfo {
|
|||
int32_t order; // current SSDataBlock scan order
|
||||
STimeWindowAggSupp twAggSup;
|
||||
SArray* pChildren;
|
||||
SSDataBlock* pUpdateRes;
|
||||
SPhysiNode* pPhyNode; // create new child
|
||||
} SStreamFinalIntervalOperatorInfo;
|
||||
|
||||
typedef struct SAggOperatorInfo {
|
||||
|
@ -793,9 +795,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
|||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
|
|
|
@ -4634,6 +4634,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
|
||||
int32_t children = 8;
|
||||
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
|
||||
int32_t children = 0;
|
||||
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||
|
||||
|
|
|
@ -956,16 +956,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
if (rows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
} else if (pInfo->pUpdateInfo) {
|
||||
SSDataBlock* upRes = createOneDataBlock(pInfo->pRes, false);
|
||||
getUpdateDataBlock(pInfo, true, pInfo->pRes, upRes);
|
||||
if (upRes) {
|
||||
pInfo->pUpdateRes = upRes;
|
||||
if (upRes->info.type == STREAM_REPROCESS) {
|
||||
blockDataCleanup(pInfo->pUpdateRes);
|
||||
getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
|
||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||
if (pInfo->pUpdateRes->info.type == STREAM_REPROCESS) {
|
||||
pInfo->updateResIndex = 0;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
||||
} else if (upRes->info.type == STREAM_INVERT) {
|
||||
} else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
return upRes;
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1044,6 +1043,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
pInfo->streamBlockReader = pHandle->reader;
|
||||
pInfo->pRes = createResDataBlock(pDescNode);
|
||||
pInfo->pUpdateRes = createResDataBlock(pDescNode);
|
||||
pInfo->pCondition = pScanPhyNode->node.pConditions;
|
||||
pInfo->pDataReader = pDataReader;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
|
|
|
@ -23,7 +23,6 @@ typedef enum SResultTsInterpType {
|
|||
RESULT_ROW_END_INTERP = 2,
|
||||
} SResultTsInterpType;
|
||||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator);
|
||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
|
||||
|
||||
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
|
||||
|
@ -778,6 +777,22 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void removeResult(SArray* pUpdated, TSKEY key) {
|
||||
int32_t size = taosArrayGetSize(pUpdated);
|
||||
int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey);
|
||||
if (index >= 0 && key == getReskey(pUpdated, index)) {
|
||||
taosArrayRemove(pUpdated, index);
|
||||
}
|
||||
}
|
||||
|
||||
static void removeResults(SArray* pWins, SArray* pUpdated) {
|
||||
int32_t size = taosArrayGetSize(pWins);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
STimeWindow* pW = taosArrayGet(pWins, i);
|
||||
removeResult(pUpdated, pW->skey);
|
||||
}
|
||||
}
|
||||
|
||||
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
||||
int32_t scanFlag, SArray* pUpdated) {
|
||||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
|
||||
|
@ -1212,6 +1227,7 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, int
|
|||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
ASSERT(p1);
|
||||
doClearWindowImpl(p1, pSup->pResultBuf, pBinfo, numOfOutput);
|
||||
}
|
||||
|
||||
|
@ -1363,6 +1379,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosMemoryFreeClear(pChildOp);
|
||||
}
|
||||
}
|
||||
nodesDestroyNode(pInfo->pPhyNode);
|
||||
}
|
||||
|
||||
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
|
||||
|
@ -1485,69 +1502,6 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval,
|
||||
int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
int32_t code =
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
||||
int32_t numOfChild = 8; // Todo(liuyao) get it from phy plan
|
||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo));
|
||||
for (int32_t i = 0; i < numOfChild; i++) {
|
||||
SSDataBlock* chRes = createOneDataBlock(pResBlock, false);
|
||||
SOperatorInfo* pChildOp = createIntervalOperatorInfo(NULL, pExprInfo, numOfCols, chRes, pInterval, primaryTsSlotId,
|
||||
pTwAggSupp, pTaskInfo);
|
||||
if (pChildOp && chRes) {
|
||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||
continue;
|
||||
}
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pOperator->name = "StreamFinalIntervalOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -1913,12 +1867,12 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
|
||||
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock,
|
||||
int32_t tableGroupId, SArray* pUpdated) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
||||
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
int32_t step = 1;
|
||||
bool ascScan = true;
|
||||
TSKEY* tsCols = NULL;
|
||||
|
@ -1929,7 +1883,7 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
|
|||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
} else {
|
||||
return pUpdated;
|
||||
return ;
|
||||
}
|
||||
|
||||
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
||||
|
@ -1946,13 +1900,13 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
|
|||
pos->groupId = tableGroupId;
|
||||
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
*(int64_t*)pos->key = pResult->win.skey;
|
||||
taosArrayPush(pUpdated, &pos);
|
||||
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos,
|
||||
nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
||||
saveResult(pResult, tableGroupId, pUpdated);
|
||||
}
|
||||
// window start(end) key interpolation
|
||||
// disable it temporarily
|
||||
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
||||
// forwardRows);
|
||||
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows);
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
@ -1962,7 +1916,6 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
|
|||
break;
|
||||
}
|
||||
}
|
||||
return pUpdated;
|
||||
}
|
||||
|
||||
bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; }
|
||||
|
@ -2006,24 +1959,72 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra
|
|||
}
|
||||
}
|
||||
|
||||
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
|
||||
taosHashClear(pInfo->aggSup.pResultRowHashTable);
|
||||
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
|
||||
cleanupResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 1);
|
||||
}
|
||||
|
||||
static void clearUpdateDataBlock(SSDataBlock* pBlock) {
|
||||
if (pBlock->info.rows <= 0) {
|
||||
return;
|
||||
}
|
||||
blockDataCleanup(pBlock);
|
||||
}
|
||||
|
||||
static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
|
||||
ASSERT(pDest->info.capacity >= pSource->info.rows);
|
||||
clearUpdateDataBlock(pDest);
|
||||
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0);
|
||||
SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex);
|
||||
// copy timestamp column
|
||||
colDataAssign(pDestCol, pSourceCol, pSource->info.rows);
|
||||
for (int32_t i = 1; i < pDest->info.numOfCols; i++) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pDest->pDataBlock, i);
|
||||
colDataAppendNNULL(pCol, 0, pSource->info.rows);
|
||||
}
|
||||
pDest->info.rows = pSource->info.rows;
|
||||
blockDataUpdateTsWindow(pDest, 0);
|
||||
}
|
||||
|
||||
static int32_t getChildIndex(SSDataBlock* pBlock) {
|
||||
// if (pBlock->info.type != STREAM_INVALID && pBlock->info.rows < 4) { // for test
|
||||
// return pBlock->info.rows - 1;
|
||||
// }
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = NULL;
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
if (pInfo->binfo.pRes->info.rows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
if (isFinalInterval(pInfo) || pInfo->pUpdateRes->info.rows == 0) {
|
||||
if (!isFinalInterval(pInfo)) {
|
||||
// semi interval operator clear disk buffer
|
||||
clearStreamIntervalOperator(pInfo);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
// process the rest of the data
|
||||
pOperator->status = OP_OPENED;
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
clearUpdateDataBlock(pInfo->pUpdateRes);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2033,31 +2034,149 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, pInfo->primaryTsIndex, pOperator->numOfExprs,
|
||||
pBlock, pUpWins);
|
||||
if (isFinalInterval(pInfo)) {
|
||||
int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock
|
||||
int32_t childIndex = getChildIndex(pBlock);
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
|
||||
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, pChildInfo->primaryTsIndex,
|
||||
pChildOp->numOfExprs, pBlock, NULL);
|
||||
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs,
|
||||
pOperator->pTaskInfo);
|
||||
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval,
|
||||
pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL);
|
||||
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
|
||||
pOperator->numOfExprs, pOperator->pTaskInfo);
|
||||
taosArrayDestroy(pUpWins);
|
||||
continue;
|
||||
}
|
||||
removeResults(pUpWins, pUpdated);
|
||||
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
|
||||
taosArrayDestroy(pUpWins);
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
if (isFinalInterval(pInfo)) {
|
||||
int32_t chIndex = 1; // Todo(liuyao) get it from SSDataBlock
|
||||
int32_t chIndex = getChildIndex(pBlock);
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
// if chIndex + 1 - size > 0, add new child
|
||||
for (int32_t i = 0; i < chIndex + 1 - size; i++) {
|
||||
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
|
||||
if (!pChildOp) {
|
||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||
}
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
|
||||
doStreamIntervalAgg(pChildOp);
|
||||
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
||||
setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
|
||||
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
|
||||
}
|
||||
pUpdated = doHashInterval(pOperator, pBlock, 0);
|
||||
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
}
|
||||
|
||||
if (isFinalInterval(pInfo)) {
|
||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
|
||||
&pInfo->interval, pClosed);
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed,
|
||||
pInfo->binfo.rowCellInfoOffset);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
taosArrayAddAll(pUpdated, pClosed);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pClosed);
|
||||
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
if (pInfo->binfo.pRes->info.rows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
if (pInfo->pUpdateRes->info.rows == 0) {
|
||||
return NULL;
|
||||
}
|
||||
// process the rest of the data
|
||||
pOperator->status = OP_OPENED;
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = (SInterval) {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision =
|
||||
((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
||||
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.winMap = NULL, };
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
|
||||
pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
||||
pInfo->pChildren = NULL;
|
||||
if (numOfChild > 0) {
|
||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo));
|
||||
for (int32_t i = 0; i < numOfChild; i++) {
|
||||
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
|
||||
if (pChildOp) {
|
||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||
continue;
|
||||
}
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
// semi interval operator does not catch result
|
||||
if (!isFinalInterval(pInfo)) {
|
||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
}
|
||||
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);\
|
||||
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
|
||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
||||
pInfo->pPhyNode = nodesCloneNode(pPhyNode);
|
||||
|
||||
pOperator->name = "StreamFinalIntervalOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL,
|
||||
destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow,
|
||||
NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
|
||||
|
|
|
@ -67,6 +67,7 @@ bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf
|
|||
int32_t leastSQRFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
|
|
@ -1118,7 +1118,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = leastSQRFunctionSetup,
|
||||
.processFunc = leastSQRFunction,
|
||||
.finalizeFunc = leastSQRFinalize,
|
||||
.invertFunc = leastSQRInvertFunction,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = leastSQRCombine,
|
||||
},
|
||||
{
|
||||
.name = "avg",
|
||||
|
|
|
@ -1799,17 +1799,17 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
param[1][1] = (double)pInfo->num;
|
||||
param[1][0] = param[0][1];
|
||||
|
||||
param[0][0] -= param[1][0] * (param[0][1] / param[1][1]);
|
||||
param[0][2] -= param[1][2] * (param[0][1] / param[1][1]);
|
||||
param[0][1] = 0;
|
||||
param[1][2] -= param[0][2] * (param[1][0] / param[0][0]);
|
||||
param[1][0] = 0;
|
||||
param[0][2] /= param[0][0];
|
||||
double param00 = param[0][0] - param[1][0] * (param[0][1] / param[1][1]);
|
||||
double param02 = param[0][2] - param[1][2] * (param[0][1] / param[1][1]);
|
||||
// param[0][1] = 0;
|
||||
double param12 = param[1][2] - param02 * (param[1][0] / param00);
|
||||
// param[1][0] = 0;
|
||||
param02 /= param00;
|
||||
|
||||
param[1][2] /= param[1][1];
|
||||
param12 /= param[1][1];
|
||||
|
||||
char buf[64] = {0};
|
||||
size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param[0][2], param[1][2]);
|
||||
size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param02, param12);
|
||||
varDataSetLen(buf, len);
|
||||
|
||||
colDataAppend(pCol, currentRow, buf, pResInfo->isNullRes);
|
||||
|
@ -1822,6 +1822,27 @@ int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SLeastSQRInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
double (*pDparam)[3] = pDBuf->matrix;
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SLeastSQRInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
double (*pSparam)[3] = pSBuf->matrix;
|
||||
for (int32_t i = 0; i < pSBuf->num; i++) {
|
||||
pDparam[0][0] += pDBuf->startVal * pDBuf->startVal;
|
||||
pDparam[0][1] += pDBuf->startVal;
|
||||
pDBuf->startVal += pDBuf->stepVal;
|
||||
}
|
||||
pDparam[0][2] += pSparam[0][2] + pDBuf->num * pDBuf->stepVal * pSparam[1][2];
|
||||
pDparam[1][2] += pSparam[1][2];
|
||||
pDBuf->num += pSBuf->num;
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SPercentileInfo);
|
||||
return true;
|
||||
|
|
|
@ -2441,6 +2441,7 @@ static const char* jkValueLiteralSize = "LiteralSize";
|
|||
static const char* jkValueLiteral = "Literal";
|
||||
static const char* jkValueDuration = "Duration";
|
||||
static const char* jkValueTranslate = "Translate";
|
||||
static const char* jkValueNotReserved = "NotReserved";
|
||||
static const char* jkValueDatum = "Datum";
|
||||
|
||||
static int32_t datumToJson(const void* pObj, SJson* pJson) {
|
||||
|
@ -2513,6 +2514,9 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkValueTranslate, pNode->translate);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkValueNotReserved, pNode->notReserved);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && pNode->translate) {
|
||||
code = datumToJson(pNode, pJson);
|
||||
}
|
||||
|
@ -2634,6 +2638,9 @@ static int32_t jsonToValueNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkValueTranslate, &pNode->translate);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkValueNotReserved, &pNode->notReserved);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && pNode->translate) {
|
||||
code = jsonToDatum(pJson, pNode);
|
||||
}
|
||||
|
|
|
@ -651,3 +651,31 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
|
|||
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages,
|
||||
ps->loadBytes / (1024.0 * ps->loadPages));
|
||||
}
|
||||
|
||||
void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
|
||||
SArray** p = taosHashIterate(pBuf->groupSet, NULL);
|
||||
while (p) {
|
||||
size_t n = taosArrayGetSize(*p);
|
||||
for (int32_t i = 0; i < n; ++i) {
|
||||
SPageInfo* pi = taosArrayGetP(*p, i);
|
||||
taosMemoryFreeClear(pi->pData);
|
||||
taosMemoryFreeClear(pi);
|
||||
}
|
||||
taosArrayDestroy(*p);
|
||||
p = taosHashIterate(pBuf->groupSet, p);
|
||||
}
|
||||
|
||||
tdListEmpty(pBuf->lruList);
|
||||
tdListEmpty(pBuf->freePgList);
|
||||
|
||||
taosArrayClear(pBuf->emptyDummyIdList);
|
||||
taosArrayClear(pBuf->pFree);
|
||||
|
||||
taosHashClear(pBuf->groupSet);
|
||||
taosHashClear(pBuf->all);
|
||||
|
||||
pBuf->numOfPages = 0; // all pages are in buffer in the first place
|
||||
pBuf->totalBufSize = 0;
|
||||
pBuf->allocateId = -1;
|
||||
pBuf->fileSize = 0;
|
||||
}
|
|
@ -72,6 +72,10 @@
|
|||
./test.sh -f tsim/stream/basic2.sim
|
||||
# ./test.sh -f tsim/stream/session0.sim
|
||||
# ./test.sh -f tsim/stream/session1.sim
|
||||
# ./test.sh -f tsim/stream/state0.sim
|
||||
# ./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
# ./test.sh -f tsim/stream/triggerSession0.sim
|
||||
|
||||
|
||||
# ---- transaction
|
||||
./test.sh -f tsim/trans/lossdata1.sim
|
||||
|
|
|
@ -23,89 +23,98 @@ sql insert into t1 values(1648791223001,10,2,3,1.1,2);
|
|||
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
|
||||
sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4);
|
||||
sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 300
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 3 then
|
||||
print ======$data01
|
||||
return -1
|
||||
print ======data01=$data01
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data02 != 3 then
|
||||
print ======$data02
|
||||
return -1
|
||||
print ======data02=$data02
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data03 != 3 then
|
||||
print ======$data03
|
||||
return -1
|
||||
print ======data03=$data03
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data04 != 2.100000000 then
|
||||
print ======$data04
|
||||
print ======data04=$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 0.000000000 then
|
||||
print ======$data05
|
||||
print ======data05=$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data06 != 3 then
|
||||
print ======$data05
|
||||
print ======data06=$data06
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data07 != 2.100000000 then
|
||||
print ======$data05
|
||||
print ======data07=$data07
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data08 != 6 then
|
||||
print ======$data05
|
||||
print ======data08=$data08
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
|
||||
if $data11 != 3 then
|
||||
print ======$data11
|
||||
return -1
|
||||
print ======data11=$data11
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data12 != 10 then
|
||||
print ======$data12
|
||||
return -1
|
||||
print ======data12=$data12
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data13 != 10 then
|
||||
print ======$data13
|
||||
return -1
|
||||
print ======data13=$data13
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data14 != 1.100000000 then
|
||||
print ======$data14
|
||||
print ======data14=$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data15 != 0.000000000 then
|
||||
print ======$data15
|
||||
print ======data15=$data15
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data16 != 10 then
|
||||
print ======$data15
|
||||
print ======data16=$data16
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data17 != 1.100000000 then
|
||||
print ======$data17
|
||||
print ======data17=$data17
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data18 != 5 then
|
||||
print ======$data18
|
||||
print ======data18=$data18
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -115,23 +124,31 @@ sql insert into t1 values(1648791233002,3,2,3,2.1,9);
|
|||
sql insert into t1 values(1648791243003,4,2,3,3.1,10);
|
||||
sql insert into t1 values(1648791213002,4,2,3,4.1,11) ;
|
||||
sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13);
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
sleep 300
|
||||
sql select * from streamt order by s desc ;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 7 then
|
||||
print ======$data01
|
||||
return -1
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 9 then
|
||||
print ======$data02
|
||||
return -1
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != 4 then
|
||||
print ======$data03
|
||||
return -1
|
||||
print =====data03=$data03
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data04 != 1.100000000 then
|
||||
|
|
|
@ -20,21 +20,33 @@ sql create stream streams1 trigger at_once into streamt1 as select _wstartts,
|
|||
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
sql select * from streamt1 order by c desc;
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print ======$rows
|
||||
return -1;
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791214000,1,2,3,1.0,3);
|
||||
$loop_count = 0
|
||||
loop00:
|
||||
sql select * from streamt1 order by c desc;
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print ======$rows
|
||||
return -1;
|
||||
print =====rows=$rows
|
||||
goto loop00
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213010,2,2,3,1.0,4);
|
||||
|
@ -44,27 +56,25 @@ $loop_count = 0
|
|||
loop1:
|
||||
sql select * from streamt1 where c >=4 order by `_wstartts`;
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 3 then
|
||||
print ======$rows
|
||||
print =====rows=$rows
|
||||
goto loop1
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 1 then
|
||||
print ======$data01
|
||||
return -1
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 1 then
|
||||
print ======$data02
|
||||
return -1
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != 1 then
|
||||
|
@ -151,11 +161,10 @@ endi
|
|||
|
||||
sql insert into t1 values(1648791213011,1,2,3,1.0,7);
|
||||
|
||||
loop2:
|
||||
$loop_count = 0
|
||||
loop2:
|
||||
sql select * from streamt1 where c in (5,4,7) order by `_wstartts`;
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
|
@ -163,57 +172,51 @@ endi
|
|||
|
||||
# row 2
|
||||
if $data21 != 2 then
|
||||
print ======$data21
|
||||
print =====data21=$data21
|
||||
goto loop2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data22 != 2 then
|
||||
print ======$data22
|
||||
print =====data22=$data22
|
||||
goto loop2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data23 != 2 then
|
||||
print ======$data23
|
||||
goto loop2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data24 != 1 then
|
||||
print ======$data24
|
||||
goto loop2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data25 != 3 then
|
||||
print ======$data25
|
||||
goto loop2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data26 != 7 then
|
||||
print ======$data26
|
||||
goto loop2
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213011,1,2,3,1.0,8);
|
||||
|
||||
loop21:
|
||||
$loop_count = 0
|
||||
loop21:
|
||||
sql select * from streamt1 where c in (5,4,8) order by `_wstartts`;
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data26 != 8 then
|
||||
print ======$data26
|
||||
print =====data26=$data26
|
||||
goto loop21
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
|
@ -222,11 +225,10 @@ sql insert into t1 values(1648791213020,3,2,3,1.0,10);
|
|||
sql insert into t1 values(1648791214000,1,2,3,1.0,11);
|
||||
sql insert into t1 values(1648791213011,10,20,10,10.0,12);
|
||||
|
||||
loop3:
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
sql select * from streamt1 where c in (5,4,10,11,12) order by `_wstartts`;
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
|
@ -234,112 +236,100 @@ endi
|
|||
|
||||
# row 2
|
||||
if $data21 != 1 then
|
||||
print ======$data21
|
||||
print =====data21=$data21
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data22 != 1 then
|
||||
print ======$data22
|
||||
print =====data22=$data22
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data23 != 10 then
|
||||
print ======$data23
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data24 != 10 then
|
||||
print ======$data24
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data25 != 10 then
|
||||
print ======$data25
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data26 != 12 then
|
||||
print ======$data26
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 3
|
||||
if $data31 != 1 then
|
||||
print ======$data31
|
||||
print =====data31=$data31
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data32 != 1 then
|
||||
print ======$data32
|
||||
print =====data32=$data32
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data33 != 3 then
|
||||
print ======$data33
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data34 != 3 then
|
||||
print ======$data34
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data35 != 3 then
|
||||
print ======$data35
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data36 != 10 then
|
||||
print ======$data36
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 4
|
||||
if $data41 != 1 then
|
||||
print ======$data41
|
||||
print =====data41=$data41
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data42 != 1 then
|
||||
print ======$data42
|
||||
print =====data42=$data42
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data43 != 1 then
|
||||
print ======$data43
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data44 != 1 then
|
||||
print ======$data44
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data45 != 3 then
|
||||
print ======$data45
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data46 != 11 then
|
||||
print ======$data46
|
||||
goto loop3
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -347,8 +337,8 @@ sql insert into t1 values(1648791213030,3,12,12,12.0,13);
|
|||
sql insert into t1 values(1648791214040,1,13,13,13.0,14);
|
||||
sql insert into t1 values(1648791213030,3,14,14,14.0,15) (1648791214020,15,15,15,15.0,16);
|
||||
|
||||
loop4:
|
||||
$loop_count = 0
|
||||
loop4:
|
||||
sql select * from streamt1 where c in (14,15,16) order by `_wstartts`;
|
||||
sleep 300
|
||||
|
||||
|
@ -358,119 +348,104 @@ if $loop_count == 10 then
|
|||
endi
|
||||
|
||||
if $rows != 3 then
|
||||
print ======$rows
|
||||
goto loop4
|
||||
return -1;
|
||||
print ====loop4=rows=$rows
|
||||
# goto loop4
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 2 then
|
||||
print ======$data01
|
||||
print =====data01=$data01
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 2 then
|
||||
print ======$data02
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 6 then
|
||||
print ======$data03
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 3 then
|
||||
print ======$data04
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 3 then
|
||||
print ======$data05
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data06 != 15 then
|
||||
print ======$data06
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 1 then
|
||||
print ======$data11
|
||||
print =====data11=$data11
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 1 then
|
||||
print ======$data12
|
||||
print =====data12=$data12
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 15 then
|
||||
print ======$data13
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 15 then
|
||||
print ======$data14
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data15 != 15 then
|
||||
print ======$data15
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data16 != 16 then
|
||||
print ======$data16
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 2
|
||||
if $data21 != 1 then
|
||||
print ======$data21
|
||||
print =====data21=$data21
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data22 != 1 then
|
||||
print ======$data22
|
||||
print =====data22=$data22
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data23 != 1 then
|
||||
print ======$data23
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data24 != 1 then
|
||||
print ======$data24
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data25 != 13 then
|
||||
print ======$data25
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data26 != 14 then
|
||||
print ======$data26
|
||||
goto loop4
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue