fix(query): enable output check

This commit is contained in:
Liu Jicong 2022-06-28 13:44:50 +08:00
parent 474b6d1ee3
commit 383e7a495b
7 changed files with 152 additions and 138 deletions

View File

@ -98,9 +98,10 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn, pRes = taos_query(
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 partition " pConn,
"by tbname interval(10s) "); "create stream stream1 trigger max_delay 10s into outstb as select _wstartts, sum(k) from st1 partition "
"by tbname session(ts, 10s) ");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;

View File

@ -137,8 +137,8 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3,t1 from st1 where t1 = 2000");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;

View File

@ -327,9 +327,8 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} }
if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0 && if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
pTask->triggerStatus == TASK_TRIGGER_STATUS__IN_ACTIVE) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
} }
// TODO: back pressure // TODO: back pressure

View File

@ -231,7 +231,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle) {
tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter); tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
pBlock->info.groupId = 0; pBlock->info.groupId = 0;
pBlock->info.uid = pHandle->msgIter.uid; // set the uid of table for submit block pBlock->info.uid = pHandle->msgIter.uid;
pBlock->info.rows = pHandle->msgIter.numOfRows; pBlock->info.rows = pHandle->msgIter.numOfRows;
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
@ -251,8 +251,8 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle) {
} }
return 0; return 0;
FAIL: // todo refactor here FAIL:
// if (*ppCols) taosArrayDestroy(*ppCols); tDeleteSSDataBlock(pBlock);
return -1; return -1;
} }

View File

@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "index.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "index.h"
#include "os.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "thash.h" #include "thash.h"
#include "tmsg.h" #include "tmsg.h"
@ -46,13 +46,9 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
// do nothing // do nothing
} }
bool isResultRowClosed(SResultRow* pRow) { bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); }
return (pRow->closed == true);
}
void closeResultRow(SResultRow* pResultRow) { void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
pResultRow->closed = true;
}
// TODO refactor: use macro // TODO refactor: use macro
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) { SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
@ -96,9 +92,7 @@ static int32_t resultrowComparAsc(const void* p1, const void* p2) {
} }
} }
static int32_t resultrowComparDesc(const void* p1, const void* p2) { static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
return resultrowComparAsc(p2, p1);
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) { void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) {
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
@ -195,11 +189,12 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
// if (!pDescNode->output) { // todo disable it temporarily if (!pDescNode->output) { // todo disable it temporarily
// continue; continue;
// } }
SColumnInfoData idata = createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId); SColumnInfoData idata =
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
idata.info.scale = pDescNode->dataType.scale; idata.info.scale = pDescNode->dataType.scale;
idata.info.precision = pDescNode->dataType.precision; idata.info.precision = pDescNode->dataType.precision;
@ -383,7 +378,6 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) {
return pList; return pList;
} }
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type) { int32_t type) {
size_t numOfCols = LIST_LENGTH(pNodeList); size_t numOfCols = LIST_LENGTH(pNodeList);
@ -675,8 +669,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
} }
for (int32_t i = 1; i < numOfOutput; ++i) { for (int32_t i = 1; i < numOfOutput; ++i) {
(*rowEntryInfoOffset)[i] = (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
(int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize); pFuncCtx[i - 1].resDataInfo.interBufSize);
} }
setSelectValueColumnInfo(pFuncCtx, numOfOutput); setSelectValueColumnInfo(pFuncCtx, numOfOutput);

View File

@ -1297,8 +1297,8 @@ bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) {
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark; return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
} }
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins) { SHashObj* pPullDataMap, SArray* closeWins) {
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
@ -1309,7 +1309,10 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
SWinRes winRe = {.ts = win.skey, .groupId = groupId,}; SWinRes winRe = {
.ts = win.skey,
.groupId = groupId,
};
void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes)); void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes));
if (isCloseWindow(&win, pSup)) { if (isCloseWindow(&win, pSup)) {
if (chIds && pPullDataMap) { if (chIds && pPullDataMap) {
@ -2236,10 +2239,10 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
return p1 == NULL; return p1 == NULL;
} }
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey,
int32_t startPos, TSKEY eKey, STimeWindow* pNextWin) { STimeWindow* pNextWin) {
int32_t forwardRows = getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos, int32_t forwardRows =
eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC); getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos, eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
int32_t prevEndPos = forwardRows - 1 + startPos; int32_t prevEndPos = forwardRows - 1 + startPos;
return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC); return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
} }
@ -2278,7 +2281,10 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
while (1) { while (1) {
if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && pInfo->pChildren) { if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && pInfo->pChildren) {
bool ignore = true; bool ignore = true;
SWinRes winRes = {.ts = nextWin.skey, .groupId = tableGroupId,}; SWinRes winRes = {
.ts = nextWin.skey,
.groupId = tableGroupId,
};
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes)); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) { if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId}; SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
@ -2484,7 +2490,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv"); printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PUSH_DATA || pBlock->info.type == STREAM_INVALID) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PUSH_DATA ||
pBlock->info.type == STREAM_INVALID) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) { } else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
@ -2513,8 +2520,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
pBlock, pUpWins);
removeResults(pUpWins, pUpdated); removeResults(pUpWins, pUpdated);
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
if (taosArrayGetSize(pUpdated) > 0) { if (taosArrayGetSize(pUpdated) > 0) {
@ -2552,8 +2558,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
&pInfo->interval, pInfo->pPullDataMap, pUpdated); pUpdated);
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
} }
@ -2874,12 +2880,24 @@ static bool isInWindow(SResultWindowInfo* pWin, TSKEY ts, int64_t gap) {
} }
static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY ts, int32_t index) { static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY ts, int32_t index) {
SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false}; SResultWindowInfo win = {
.pos.offset = -1,
.pos.pageId = -1,
.win.skey = ts,
.win.ekey = ts,
.isOutput = false,
};
return taosArrayInsert(pWinInfos, index, &win); return taosArrayInsert(pWinInfos, index, &win);
} }
static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) { static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) {
SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false}; SResultWindowInfo win = {
.pos.offset = -1,
.pos.pageId = -1,
.win.skey = ts,
.win.ekey = ts,
.isOutput = false,
};
return taosArrayPush(pWinInfos, &win); return taosArrayPush(pWinInfos, &win);
} }
@ -4169,7 +4187,8 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
} }
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win, SSDataBlock* pResultBlock) { static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win,
SSDataBlock* pResultBlock) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;

View File

@ -46,15 +46,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
} }
if (output == NULL) { if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
//SSDataBlock block = {0}; SSDataBlock block = {0};
//block.info.type = STREAM_PUSH_EMPTY; /*block.info.type = STREAM_PUSH_EMPTY;*/
// block.info.childId = pTask->selfChildId; // block.info.childId = pTask->selfChildId;
SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data; SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
SSDataBlock* pBlock = createOneDataBlock(taosArrayGet(pRetrieveBlock->blocks, 0), true); /*SSDataBlock* pBlock = createOneDataBlock(taosArrayGet(pRetrieveBlock->blocks, 0), true);*/
pBlock->info.type = STREAM_PUSH_EMPTY; assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
pBlock->info.childId = pTask->selfChildId; block.info.type = STREAM_PUSH_EMPTY;
taosArrayPush(pRes, pBlock); block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
} }
break; break;
} }