From fa43fc455a96f6231c7e38bb7b6b99884a8d1d94 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 17 Jan 2023 14:05:46 +0800 Subject: [PATCH 1/4] refactor stream interval build result data --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executil.c | 2 +- source/libs/executor/src/executorimpl.c | 20 +- source/libs/executor/src/timewindowoperator.c | 172 +++++++----------- 4 files changed, 79 insertions(+), 117 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7d33f150ff..7afc9ef830 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -570,7 +570,7 @@ typedef struct SStreamIntervalOperatorInfo { SWinKey delKey; uint64_t numOfDatapack; SArray* pUpdated; - SHashObj* pUpdatedMap; + SSHashObj* pUpdatedMap; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 1d16a3418d..f8cce56dd4 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -155,7 +155,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { if (pGroupResInfo->pRows != NULL) { - taosArrayDestroyP(pGroupResInfo->pRows, taosMemoryFree); + taosArrayDestroy(pGroupResInfo->pRows); } pGroupResInfo->pRows = pArrayList; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 21ef5dfab3..08d7f02f8c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2589,26 +2589,22 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { - SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + SWinKey* pKey = taosArrayGet(pGroupResInfo->pRows, i); int32_t size = 0; void* pVal = NULL; - SWinKey key = { - .ts = *(TSKEY*)pPos->key, - .groupId = pPos->groupId, - }; - int32_t code = streamStateGet(pState, &key, &pVal, &size); + int32_t code = streamStateGet(pState, pKey, &pVal, &size); ASSERT(code == 0); SResultRow* pRow = (SResultRow*)pVal; doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); // no results, continue to check the next one if (pRow->numOfRows == 0) { pGroupResInfo->index += 1; - releaseOutputBuf(pState, &key, pRow); + releaseOutputBuf(pState, pKey, pRow); continue; } if (pBlock->info.id.groupId == 0) { - pBlock->info.id.groupId = pPos->groupId; + pBlock->info.id.groupId = pKey->groupId; void* tbname = NULL; if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; @@ -2618,15 +2614,15 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat tdbFree(tbname); } else { // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.id.groupId != pPos->groupId) { - releaseOutputBuf(pState, &key, pRow); + if (pBlock->info.id.groupId != pKey->groupId) { + releaseOutputBuf(pState, pKey, pRow); break; } } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { ASSERT(pBlock->info.rows > 0); - releaseOutputBuf(pState, &key, pRow); + releaseOutputBuf(pState, pKey, pRow); break; } @@ -2656,7 +2652,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat } pBlock->info.rows += pRow->numOfRows; - releaseOutputBuf(pState, &key, pRow); + releaseOutputBuf(pState, pKey, pRow); } pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index eccdcb85bf..c0d89aa8c7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num if (NULL == pr) { T_LONG_JMP(pTaskInfo->env, terrno); } - + ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId); if (pr->closed) { @@ -842,68 +842,61 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } -static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) { - SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - if (newPos == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - newPos->groupId = groupId; - newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; - *(int64_t*)newPos->key = ts; +static int32_t saveWinResult(int64_t ts, uint64_t groupId, SSHashObj* pUpdatedMap) { SWinKey key = {.ts = ts, .groupId = groupId}; - if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { - taosMemoryFree(newPos); - } + tSimpleHashPut(pUpdatedMap, &key, sizeof(SWinKey), NULL, 0); return TSDB_CODE_SUCCESS; } -static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) { - return saveWinResult(ts, -1, -1, groupId, pUpdatedMap); +static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SSHashObj* pUpdatedMap) { + return saveWinResult(ts, groupId, pUpdatedMap); } -static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { +static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) { int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { SWinKey* pW = taosArrayGet(pWins, i); - void* tmp = taosHashGet(pUpdatedMap, pW, sizeof(SWinKey)); + void* tmp = tSimpleHashGet(pUpdatedMap, pW, sizeof(SWinKey)); if (tmp) { void* value = *(void**)tmp; taosMemoryFree(value); - taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey)); + tSimpleHashRemove(pUpdatedMap, pW, sizeof(SWinKey)); } } } -int32_t compareWinRes(void* pKey, void* data, int32_t index) { - SArray* res = (SArray*)data; - SWinKey* pDataPos = taosArrayGet(res, index); - SResKeyPos* pRKey = (SResKeyPos*)pKey; - if (pRKey->groupId > pDataPos->groupId) { +int32_t compareWinKey(void* pKey, void* data, int32_t index) { + SArray* res = (SArray*)data; + SWinKey* pDataPos = taosArrayGet(res, index); + SWinKey* pWKey = (SWinKey*)pKey; + + if (pWKey->groupId > pDataPos->groupId) { return 1; - } else if (pRKey->groupId < pDataPos->groupId) { + } else if (pWKey->groupId < pDataPos->groupId) { return -1; } - if (*(int64_t*)pRKey->key > pDataPos->ts) { + if (pWKey->ts > pDataPos->ts) { return 1; - } else if (*(int64_t*)pRKey->key < pDataPos->ts) { + } else if (pWKey->ts < pDataPos->ts) { return -1; } return 0; } -static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { +static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) { taosArraySort(pDelWins, winKeyCmprImpl); taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL); int32_t delSize = taosArrayGetSize(pDelWins); - if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) { + if (tSimpleHashGetSize(pUpdatedMap) == 0 || delSize == 0) { return; } - void* pIte = NULL; - while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { - SResKeyPos* pResKey = *(SResKeyPos**)pIte; - int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); - if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) { + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pUpdatedMap, pIte, &iter)) != NULL) { + SWinKey* pResKey = tSimpleHashGetKey(pIte, NULL); + int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinKey); + if (index >= 0 && 0 == compareWinKey(pResKey, pDelWins, index)) { taosArrayRemove(pDelWins, index); delSize = taosArrayGetSize(pDelWins); } @@ -1318,11 +1311,11 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type } static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { - SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); + SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); if (NULL == pResult) { return; } - + SqlFunctionCtx* pCtx = pSup->pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset); @@ -1352,7 +1345,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) } static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, - SHashObj* pUpdatedMap) { + SSHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; @@ -1388,28 +1381,21 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa taosArrayPush(pUpWins, &winRes); } if (pUpdatedMap) { - void* tmp = taosHashGet(pUpdatedMap, &winRes, sizeof(SWinKey)); - if (tmp) { - void* value = *(void**)tmp; - taosMemoryFree(value); - taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); - } + tSimpleHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); } getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); } while (win.ekey <= endTsCols[i]); } } -static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { +static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) { void* pIte = NULL; - size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pIte, &keyLen); - uint64_t groupId = *(uint64_t*)key; - TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); - SResultRowPosition* pPos = (SResultRowPosition*)pIte; - int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins); + SWinKey* pKey = tSimpleHashGetKey(pIte, NULL); + uint64_t groupId = pKey->groupId; + TSKEY ts = pKey->ts; + int32_t code = saveWinResult(ts, groupId, resWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1417,36 +1403,16 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { return TSDB_CODE_SUCCESS; } -int32_t compareWinKey(void* pKey, void* data, int32_t index) { - SArray* res = (SArray*)data; - SWinKey* pDataPos = taosArrayGet(res, index); - SWinKey* pWKey = (SWinKey*)pKey; - - if (pWKey->groupId > pDataPos->groupId) { - return 1; - } else if (pWKey->groupId < pDataPos->groupId) { - return -1; - } - - if (pWKey->ts > pDataPos->ts) { - return 1; - } else if (pWKey->ts < pDataPos->ts) { - return -1; - } - return 0; -} - static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval, - SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins, + SHashObj* pPullDataMap, SSHashObj* closeWins, SArray* pDelWins, SOperatorInfo* pOperator) { qDebug("===stream===close interval window"); void* pIte = NULL; - size_t keyLen = 0; int32_t iter = 0; SStreamIntervalOperatorInfo* pInfo = pOperator->info; int32_t delSize = taosArrayGetSize(pDelWins); while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pIte, &keyLen); + void* key = tSimpleHashGetKey(pIte, NULL); SWinKey* pWinKey = (SWinKey*)key; if (delSize > 0) { int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey); @@ -2157,7 +2123,7 @@ bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0); } -static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SHashObj* pUpdatedMap) { +static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t size = taosArrayGetSize(pWinArray); @@ -2343,7 +2309,8 @@ static void clearFunctionContext(SExprSupp* pSup) { } } -void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { +void doBuildStreamIntervalResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, + SGroupResInfo* pGroupResInfo) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // set output datablock version pBlock->info.version = pTaskInfo->version; @@ -2370,7 +2337,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN } static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, - SHashObj* pUpdatedMap) { + SSHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); @@ -2516,7 +2483,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); + doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; @@ -2543,7 +2510,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); + doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; @@ -2552,11 +2519,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); + pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey)); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); } while (1) { @@ -2649,13 +2616,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; - void* pIte = NULL; - while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) { + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { taosArrayPush(pInfo->pUpdated, pIte); } - taosHashCleanup(pInfo->pUpdatedMap); + tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; - taosArraySort(pInfo->pUpdated, resultrowComparAsc); + taosArraySort(pInfo->pUpdated, winKeyCmprImpl); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; @@ -2675,7 +2643,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); + doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; @@ -3239,10 +3207,9 @@ static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) { void* pIte = NULL; - size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pIte, &keyLen); + void* key = tSimpleHashGetKey(pIte, NULL); taosArrayPush(pUpdated, key); } taosArraySort(pUpdated, sessionKeyCompareAsc); @@ -3256,13 +3223,12 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo return; } blockDataEnsureCapacity(pBlock, size); - size_t keyLen = 0; int32_t iter = 0; while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) { if (pBlock->info.rows + 1 > pBlock->info.capacity) { break; } - SSessionKey* res = tSimpleHashGetKey(*Ite, &keyLen); + SSessionKey* res = tSimpleHashGetKey(*Ite, NULL); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -3351,7 +3317,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) { void* pIte = NULL; - size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { SResultWindowInfo* pWinInfo = pIte; @@ -3362,7 +3327,7 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa return code; } } - SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen); + SSessionKey* pKey = tSimpleHashGetKey(pIte, NULL); tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter); } } @@ -3451,7 +3416,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); } if (!pInfo->pStUpdated) { - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); } while (1) { @@ -3675,7 +3640,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); } if (!pInfo->pStUpdated) { - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); } while (1) { @@ -4006,7 +3971,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey)); } if (!pInfo->pSeUpdated) { - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); } while (1) { @@ -4761,7 +4726,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); + doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows > 0) { printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes; @@ -4776,14 +4741,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { - pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES); + pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey)); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); } - while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -4832,19 +4796,21 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } pOperator->status = OP_RES_TO_RETURN; removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); - closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pInfo->pUpdatedMap, - pInfo->pDelWins, pOperator); + closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, + pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); - void* pIte = NULL; - while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) { - taosArrayPush(pInfo->pUpdated, pIte); + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { + SWinKey* pKey = tSimpleHashGetKey(pIte, NULL); + taosArrayPush(pInfo->pUpdated, pKey); } - taosArraySort(pInfo->pUpdated, resultrowComparAsc); + taosArraySort(pInfo->pUpdated, winKeyCmprImpl); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - taosHashCleanup(pInfo->pUpdatedMap); + tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); @@ -4853,7 +4819,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); + doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows > 0) { printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes; From 637195ce94d0efb78dd12e52512e2e0336b80494 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 31 Jan 2023 14:00:37 +0800 Subject: [PATCH 2/4] fix: the problem of creating a stream without partition by clause --- source/libs/parser/src/parTranslater.c | 10 ++++------ source/libs/planner/src/planLogicCreater.c | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 6e94883849..0e92d66d73 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3281,9 +3281,6 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { } static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (NULL == pSelect->pPartitionByList) { - return TSDB_CODE_SUCCESS; - } pCxt->currClause = SQL_CLAUSE_PARTITION_BY; int32_t code = translateExprList(pCxt, pSelect->pPartitionByList); if (TSDB_CODE_SUCCESS == code) { @@ -5733,12 +5730,13 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (NULL == pSelect->pPartitionByList) { - return addNullTagsForExistTable(pCxt, pMeta, pSelect); + code = addNullTagsForExistTable(pCxt, pMeta, pSelect); + } else { + code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect); } - - int32_t code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect); if (TSDB_CODE_SUCCESS == code) { code = addSubtableNameToCreateStreamQuery(pCxt, pStmt, pSelect); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index fef5bd654e..001ec66725 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -374,6 +374,20 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols); } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags && NULL == pSelect->pPartitionByList) { + pScan->pTags = nodesCloneList(pSelect->pTags); + if (NULL == pScan->pTags) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pSubtable && NULL == pSelect->pPartitionByList) { + pScan->pSubtable = nodesCloneNode(pSelect->pSubtable); + if (NULL == pScan->pSubtable) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + // set output if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets); From c4414ec646aad39f4c701592b9372ff73042be85 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 1 Feb 2023 14:49:42 +0800 Subject: [PATCH 3/4] feat(stream):exists stable bug&ci --- source/dnode/vnode/src/tq/tqSink.c | 4 +- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 2 +- .../script/tsim/stream/checkStreamSTable.sim | 64 ++- tests/script/tsim/stream/udTableAndTag0.sim | 71 ++++ tests/script/tsim/stream/udTableAndTag2.sim | 371 ++++++++++++++++++ 7 files changed, 498 insertions(+), 18 deletions(-) create mode 100644 tests/script/tsim/stream/udTableAndTag2.sim diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f1103ad48a..3801a25d6d 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -488,9 +488,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* }; void* pData = colDataGetData(pTagData, rowId); if (colDataIsNull_s(pTagData, rowId)) { - tagVal.type = TSDB_DATA_TYPE_NULL; - tagVal.pData = NULL; - tagVal.nData = 0; + continue; } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { tagVal.nData = varDataLen(pData); tagVal.pData = varDataVal(pData); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index fb122b077f..9c8137db34 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -971,7 +971,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId, SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) { void* pValue = NULL; - if (groupId != 0 && streamStateGetParName(pState, groupId, &pValue) != 0) { + if (streamStateGetParName(pState, groupId, &pValue) != 0) { SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); if (pTableSup->numOfExprs > 0) { projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 37c33c44e2..25de627127 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1786,7 +1786,7 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = pPacked->pDataBlock; - if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { + if (pBlock->info.parTbName[0]) { streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName); } // TODO move into scan diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c0d89aa8c7..648580b913 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1614,7 +1614,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { } nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); - cleanupGroupResInfo(&pInfo->groupResInfo); + pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows); cleanupExprSupp(&pInfo->scalarSupp); taosMemoryFreeClear(param); diff --git a/tests/script/tsim/stream/checkStreamSTable.sim b/tests/script/tsim/stream/checkStreamSTable.sim index 2ed6958196..b60ab0ac05 100644 --- a/tests/script/tsim/stream/checkStreamSTable.sim +++ b/tests/script/tsim/stream/checkStreamSTable.sim @@ -19,9 +19,9 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int); +sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb varchar(100),tc int); -sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s); +sql create stream streams0 trigger at_once into result.streamt0 tags(tb) as select _wstart, count(*) c1, max(a) c2 from st partition by tbname tb interval(10s); sql insert into t1 values(1648791213000,1,2,3); sql insert into t2 values(1648791213000,2,2,3); @@ -61,6 +61,16 @@ if $data02 != 1 then goto loop0 endi +if $data03 != NULL then + print =====data03=$data03 + goto loop0 +endi + +if $data04 != t1 then + print =====data04=$data04 + goto loop0 +endi + if $data11 != 1 then print =====data11=$data11 goto loop0 @@ -71,6 +81,16 @@ if $data12 != 2 then goto loop0 endi +if $data13 != NULL then + print =====data13=$data13 + goto loop0 +endi + +if $data14 != t2 then + print =====data14=$data14 + goto loop0 +endi + print ===== step3 sql create database result1 vgroups 1; @@ -83,9 +103,9 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int); +sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta varchar(100),tb int,tc int); -sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s); +sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) tags(ta) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname as ta interval(10s); sql insert into t1 values(1648791213000,10,20,30); sql insert into t2 values(1648791213000,40,50,60); @@ -161,7 +181,7 @@ sql create table t2 using st tags(2,2,2); sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20)); # tag dest 1, source 2 -##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s); +sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s); # column dest 3, source 4 sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s); @@ -173,7 +193,7 @@ sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s); # column dest 3, source 2 -sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s); +sql create stream streams2 trigger at_once into result2.streamt2(ts, a) tags(ta) as select _wstart, count(*) c1 from st partition by tbname as ta interval(10s); print ===== step5 @@ -252,16 +272,16 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,2,3); sql create table t2 using st tags(4,5,6); -sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int); +sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(tg1 int,tg2 int,tg3 int); -sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s); +sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2, tg3, tg1) subtable( concat("tbl-", cast(tg1 as varchar(10)) ) ) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s); sql insert into t1 values(1648791213000,10,20,30); sql insert into t2 values(1648791213000,40,50,60); $loop_count = 0 -sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s); +sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s); print $data00, $data01, $data02, $data03 print $data10, $data11, $data12, $data13 print $data20, $data21, $data22, $data23 @@ -275,7 +295,7 @@ if $loop_count == 10 then return -1 endi -sql select * from result4.streamt4; +sql select * from result4.streamt4 order by tg1; if $rows != 2 then print =====rows=$rows @@ -285,7 +305,7 @@ if $rows != 2 then goto loop2 endi -if $data01 != 40 then +if $data01 != 10 then print =====data01=$data01 goto loop2 endi @@ -295,7 +315,7 @@ if $data02 != 20 then goto loop2 endi -if $data03 != 2 then +if $data03 != 1 then print =====data03=$data03 goto loop2 endi @@ -305,6 +325,26 @@ if $data04 != NULL then goto loop2 endi +if $data11 != 40 then + print =====data11=$data11 + goto loop2 +endi + +if $data12 != 50 then + print =====data12=$data12 + goto loop2 +endi + +if $data13 != 1 then + print =====data13=$data13 + goto loop2 +endi + +if $data14 != NULL then + print =====data14=$data14 + goto loop2 +endi + print ======over system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/udTableAndTag0.sim b/tests/script/tsim/stream/udTableAndTag0.sim index 5cb5c2dd8b..bfc299df0f 100644 --- a/tests/script/tsim/stream/udTableAndTag0.sim +++ b/tests/script/tsim/stream/udTableAndTag0.sim @@ -367,6 +367,77 @@ if $data22 != tag-t3 then goto loop8 endi +print ===== step6 +print ===== transform tag value + +sql drop stream if exists streams1; +sql drop stream if exists streams2; +sql drop stream if exists streams3; +sql drop stream if exists streams4; +sql drop stream if exists streams5; + +sql drop database if exists test1; +sql drop database if exists test2; +sql drop database if exists test3; +sql drop database if exists test4; +sql drop database if exists test5; + +sql drop database if exists result1; +sql drop database if exists result2; +sql drop database if exists result3; +sql drop database if exists result4; +sql drop database if exists result5; + + + +sql create database result6 vgroups 1; + +sql create database test6 vgroups 4; +sql use test6; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta varchar(20), tb int, tc int); +sql create table t1 using st tags("1",1,1); +sql create table t2 using st tags("2",2,2); +sql create table t3 using st tags("3",3,3); + +sql create stream streams6 trigger at_once into result6.streamt6 TAGS(dd int) as select _wstart, count(*) c1 from st partition by concat(ta, "0") as dd, tbname interval(10s); +sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3); + + +$loop_count = 0 +loop9: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result6.streamt6 order by 3; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop9 +endi + +if $data02 != 10 then + print =====data02=$data02 + goto loop9 +endi + +if $data12 != 20 then + print =====data12=$data12 + goto loop9 +endi + +if $data22 != 30 then + print =====data22=$data22 + goto loop8 +endi + print ======over system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/udTableAndTag2.sim b/tests/script/tsim/stream/udTableAndTag2.sim new file mode 100644 index 0000000000..c42ac21dcd --- /dev/null +++ b/tests/script/tsim/stream/udTableAndTag2.sim @@ -0,0 +1,371 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +print ===== step1 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print ===== step2 +print ===== table name + +sql create database result vgroups 1; + +sql create database test vgroups 4; +sql use test; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s); +print ===== insert into 1 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + +$loop_count = 0 +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result" order by 1; + +if $rows != 1 then + print =====rows=$rows + print $data00 + print $data10 + goto loop0 +endi + +if $data00 != aaa then + print =====data00=$data00 + goto loop0 +endi + + +$loop_count = 0 +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result.streamt; + +if $rows != 1 then + print =====rows=$rows + print $data00 $data10 + goto loop1 +endi + + +print ===== step3 +print ===== column name + +sql create database result2 vgroups 1; + +sql create database test2 vgroups 4; +sql use test2; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st interval(10s); +print ===== insert into 2 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1; + +sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1; + +if $rows != 1 then + print =====rows=$rows + print $data00 + print $data10 + goto loop2 +endi + +if $data00 != cc then + print =====data00=$data00 + goto loop2 +endi + +print sql select cc from result2.streamt2 order by 1; + +$loop_count = 0 +loop21: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select cc from result2.streamt2 order by 1; + +if $rows != 1 then + print =====rows=$rows + print $data00 + print $data10 + goto loop21 +endi + +if $data00 != NULL then + print =====data00=$data00 + goto loop21 +endi + + +$loop_count = 0 +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result2.streamt2; + +if $rows != 1 then + print =====rows=$rows + print $data00 $data10 + goto loop3 +endi + + +print ===== step4 +print ===== column name + table name + +sql create database result3 vgroups 1; + +sql create database test3 vgroups 4; +sql use test3; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", 1 ) ) as select _wstart, count(*) c1 from st interval(10s); +print ===== insert into 3 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + + +$loop_count = 0 +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1; + +if $rows != 1 then + print =====rows=$rows + print $data00 + print $data10 + goto loop4 +endi + +if $data00 != NULL then + print =====data00=$data00 + goto loop4 +endi + +sql select dd from result3.streamt3 order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop4 +endi + +if $data00 != col-1 then + print =====data00=$data00 + goto loop4 +endi + +if $data10 != col-2 then + print =====data10=$data10 + goto loop4 +endi + +$loop_count = 0 +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result3.streamt3; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop5 +endi + +$loop_count = 0 +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result3" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop6 +endi + +if $data00 != tbn-1 then + print =====data00=$data00 + goto loop6 +endi + +if $data10 != tbn-2 then + print =====data10=$data10 + goto loop6 +endi + +print ===== step5 +print ===== tag name + table name + +sql create database result4 vgroups 1; + +sql create database test4 vgroups 4; +sql use test4; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); + +sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st interval(10s); +sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3); + + +$loop_count = 0 +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result4" order by 1; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop7 +endi + +if $data00 != tbn-t1 then + print =====data00=$data00 + goto loop7 +endi + +if $data10 != tbn-t2 then + print =====data10=$data10 + goto loop7 +endi + +if $data20 != tbn-t3 then + print =====data20=$data20 + goto loop7 +endi + +$loop_count = 0 +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result4.streamt4 order by 3; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop8 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != t1 then + print =====data02=$data02 + goto loop8 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop8 +endi + +if $data12 != t2 then + print =====data12=$data12 + goto loop8 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop8 +endi + +if $data22 != t3 then + print =====data22=$data22 + goto loop8 +endi + +print ======over + +system sh/stop_dnodes.sh From cea8a920f7a1c623760c6b8a0d2468bf1483959e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 1 Feb 2023 17:50:08 +0800 Subject: [PATCH 4/4] feat(stream): add ci --- source/libs/parser/src/parTranslater.c | 76 ++++++++++++-- tests/script/tsim/stream/udTableAndTag2.sim | 109 ++++++++++---------- 2 files changed, 120 insertions(+), 65 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0e92d66d73..0a9adaf60c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5671,10 +5671,6 @@ static SNode* createNullValue() { } static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) { - if (NULL == pMeta) { - return TSDB_CODE_SUCCESS; - } - int32_t numOfTags = getNumOfTags(pMeta); int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) { @@ -5728,12 +5724,27 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea return pCxt->errCode; } +static int32_t addNullTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < LIST_LENGTH(pStmt->pTags); ++i) { + code = nodesListMakeStrictAppend(&((SSelectStmt*)pStmt->pQuery)->pTags, createNullValue()); + } + return code; +} + +static int32_t addNullTagsToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) { + if (NULL == pMeta) { + return addNullTagsForCreateTable(pCxt, pStmt); + } + return addNullTagsForExistTable(pCxt, pMeta, (SSelectStmt*)pStmt->pQuery); +} + static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) { int32_t code = TSDB_CODE_SUCCESS; SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (NULL == pSelect->pPartitionByList) { - code = addNullTagsForExistTable(pCxt, pMeta, pSelect); + code = addNullTagsToCreateStreamQuery(pCxt, pMeta, pStmt); } else { code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect); } @@ -6011,17 +6022,66 @@ static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStm return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq); } +static int32_t adjustTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if (NULL == pSelect->pPartitionByList || NULL == pSelect->pTags) { + return TSDB_CODE_SUCCESS; + } + + SNode* pTagDef = NULL; + SNode* pTagExpr = NULL; + FORBOTH(pTagDef, pStmt->pTags, pTagExpr, pSelect->pTags) { + SColumnDefNode* pDef = (SColumnDefNode*)pTagDef; + if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) { + SNode* pFunc = NULL; + int32_t code = createCastFunc(pCxt, pTagExpr, pDef->dataType, &pFunc); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + REPLACE_LIST2_NODE(pFunc); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustTags(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta, + SCMCreateStreamReq* pReq) { + if (NULL == pMeta) { + return adjustTagsForCreateTable(pCxt, pStmt, pReq); + } + return adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq); +} + +static bool isTagDef(SNodeList* pTags) { + if (NULL == pTags) { + return false; + } + return QUERY_NODE_COLUMN_DEF == nodeType(nodesListGetNode(pTags, 0)); +} + +static bool isTagBound(SNodeList* pTags) { + if (NULL == pTags) { + return false; + } + return QUERY_NODE_COLUMN == nodeType(nodesListGetNode(pTags, 0)); +} + static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq, STableMeta** pMeta) { int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta); if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { - if (NULL != pStmt->pCols) { + if (NULL != pStmt->pCols || isTagBound(pStmt->pTags)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName); } pReq->createStb = STREAM_CREATE_STABLE_TRUE; pReq->targetStbUid = 0; return TSDB_CODE_SUCCESS; } else { + if (isTagDef(pStmt->pTags)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Table already exist: %s", + pStmt->targetTabName); + } pReq->createStb = STREAM_CREATE_STABLE_FALSE; pReq->targetStbUid = (*pMeta)->suid; } @@ -6047,8 +6107,8 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt if (TSDB_CODE_SUCCESS == code && NULL != pMeta) { code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq); } - if (TSDB_CODE_SUCCESS == code && NULL != pMeta) { - code = adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq); + if (TSDB_CODE_SUCCESS == code) { + code = adjustTags(pCxt, pStmt, pMeta, pReq); } if (TSDB_CODE_SUCCESS == code) { getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); diff --git a/tests/script/tsim/stream/udTableAndTag2.sim b/tests/script/tsim/stream/udTableAndTag2.sim index c42ac21dcd..5dd2e3ae2b 100644 --- a/tests/script/tsim/stream/udTableAndTag2.sim +++ b/tests/script/tsim/stream/udTableAndTag2.sim @@ -68,6 +68,17 @@ if $rows != 1 then goto loop1 endi +if $data01 != 2 then + print =====data01=$data01 + goto loop1 +endi + +# group id +if $data02 == NULL then + print =====data02=$data02 + goto loop1 +endi + print ===== step3 print ===== column name @@ -155,7 +166,18 @@ sql select * from result2.streamt2; if $rows != 1 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 + goto loop3 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != NULL then + print =====data02=$data02 goto loop3 endi @@ -173,7 +195,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", 1 ) ) as select _wstart, count(*) c1 from st interval(10s); +sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", "1") ) as select _wstart, count(*) c1 from st interval(10s); print ===== insert into 3 sql insert into t1 values(1648791213000,1,2,3); sql insert into t2 values(1648791213000,2,2,3); @@ -198,29 +220,24 @@ if $rows != 1 then goto loop4 endi -if $data00 != NULL then +if $data00 != dd then print =====data00=$data00 goto loop4 endi sql select dd from result3.streamt3 order by 1; -if $rows != 2 then +if $rows != 1 then print =====rows=$rows print $data00 $data10 goto loop4 endi -if $data00 != col-1 then +if $data00 != NULL then print =====data00=$data00 goto loop4 endi -if $data10 != col-2 then - print =====data10=$data10 - goto loop4 -endi - $loop_count = 0 loop5: @@ -233,9 +250,20 @@ endi sql select * from result3.streamt3; -if $rows != 2 then +if $rows != 1 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 + goto loop5 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop5 +endi + +if $data02 != NULL then + print =====data02=$data02 goto loop5 endi @@ -251,9 +279,10 @@ endi sql select table_name from information_schema.ins_tables where db_name="result3" order by 1; -if $rows != 2 then +if $rows != 1 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 goto loop6 endi @@ -262,17 +291,12 @@ if $data00 != tbn-1 then goto loop6 endi -if $data10 != tbn-2 then - print =====data10=$data10 - goto loop6 -endi - print ===== step5 print ===== tag name + table name sql create database result4 vgroups 1; -sql create database test4 vgroups 4; +sql create database test4 vgroups 1; sql use test4; @@ -281,7 +305,7 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create table t3 using st tags(3,3,3); -sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st interval(10s); +sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", "1")) as select _wstart, count(*) c1 from st interval(10s); sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3); @@ -297,27 +321,18 @@ endi sql select table_name from information_schema.ins_tables where db_name="result4" order by 1; -if $rows != 3 then +if $rows != 1 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 goto loop7 endi -if $data00 != tbn-t1 then +if $data00 != tbn-1 then print =====data00=$data00 goto loop7 endi -if $data10 != tbn-t2 then - print =====data10=$data10 - goto loop7 -endi - -if $data20 != tbn-t3 then - print =====data20=$data20 - goto loop7 -endi - $loop_count = 0 loop8: @@ -330,42 +345,22 @@ endi sql select * from result4.streamt4 order by 3; -if $rows != 3 then +if $rows != 1 then print =====rows=$rows print $data00 $data10 goto loop8 endi -if $data01 != 1 then +if $data01 != 3 then print =====data01=$data01 goto loop8 endi -if $data02 != t1 then +if $data02 != NULL then print =====data02=$data02 goto loop8 endi -if $data11 != 1 then - print =====data11=$data11 - goto loop8 -endi - -if $data12 != t2 then - print =====data12=$data12 - goto loop8 -endi - -if $data21 != 1 then - print =====data21=$data21 - goto loop8 -endi - -if $data22 != t3 then - print =====data22=$data22 - goto loop8 -endi - print ======over system sh/stop_dnodes.sh