Merge pull request #19732 from taosdata/feat/ly_stream_stable
Feat/ly stream stable
This commit is contained in:
commit
97ca4dfd0b
|
@ -488,9 +488,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
};
|
};
|
||||||
void* pData = colDataGetData(pTagData, rowId);
|
void* pData = colDataGetData(pTagData, rowId);
|
||||||
if (colDataIsNull_s(pTagData, rowId)) {
|
if (colDataIsNull_s(pTagData, rowId)) {
|
||||||
tagVal.type = TSDB_DATA_TYPE_NULL;
|
continue;
|
||||||
tagVal.pData = NULL;
|
|
||||||
tagVal.nData = 0;
|
|
||||||
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
|
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
|
||||||
tagVal.nData = varDataLen(pData);
|
tagVal.nData = varDataLen(pData);
|
||||||
tagVal.pData = varDataVal(pData);
|
tagVal.pData = varDataVal(pData);
|
||||||
|
|
|
@ -570,7 +570,7 @@ typedef struct SStreamIntervalOperatorInfo {
|
||||||
SWinKey delKey;
|
SWinKey delKey;
|
||||||
uint64_t numOfDatapack;
|
uint64_t numOfDatapack;
|
||||||
SArray* pUpdated;
|
SArray* pUpdated;
|
||||||
SHashObj* pUpdatedMap;
|
SSHashObj* pUpdatedMap;
|
||||||
} SStreamIntervalOperatorInfo;
|
} SStreamIntervalOperatorInfo;
|
||||||
|
|
||||||
typedef struct SDataGroupInfo {
|
typedef struct SDataGroupInfo {
|
||||||
|
|
|
@ -155,7 +155,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
|
||||||
|
|
||||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
|
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
|
||||||
if (pGroupResInfo->pRows != NULL) {
|
if (pGroupResInfo->pRows != NULL) {
|
||||||
taosArrayDestroyP(pGroupResInfo->pRows, taosMemoryFree);
|
taosArrayDestroy(pGroupResInfo->pRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
pGroupResInfo->pRows = pArrayList;
|
pGroupResInfo->pRows = pArrayList;
|
||||||
|
|
|
@ -2589,26 +2589,22 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
|
|
||||||
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
|
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
|
||||||
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
SWinKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
SWinKey key = {
|
int32_t code = streamStateGet(pState, pKey, &pVal, &size);
|
||||||
.ts = *(TSKEY*)pPos->key,
|
|
||||||
.groupId = pPos->groupId,
|
|
||||||
};
|
|
||||||
int32_t code = streamStateGet(pState, &key, &pVal, &size);
|
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
SResultRow* pRow = (SResultRow*)pVal;
|
SResultRow* pRow = (SResultRow*)pVal;
|
||||||
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
||||||
// no results, continue to check the next one
|
// no results, continue to check the next one
|
||||||
if (pRow->numOfRows == 0) {
|
if (pRow->numOfRows == 0) {
|
||||||
pGroupResInfo->index += 1;
|
pGroupResInfo->index += 1;
|
||||||
releaseOutputBuf(pState, &key, pRow);
|
releaseOutputBuf(pState, pKey, pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock->info.id.groupId == 0) {
|
if (pBlock->info.id.groupId == 0) {
|
||||||
pBlock->info.id.groupId = pPos->groupId;
|
pBlock->info.id.groupId = pKey->groupId;
|
||||||
void* tbname = NULL;
|
void* tbname = NULL;
|
||||||
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
|
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
|
||||||
pBlock->info.parTbName[0] = 0;
|
pBlock->info.parTbName[0] = 0;
|
||||||
|
@ -2618,15 +2614,15 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
|
||||||
tdbFree(tbname);
|
tdbFree(tbname);
|
||||||
} else {
|
} else {
|
||||||
// current value belongs to different group, it can't be packed into one datablock
|
// current value belongs to different group, it can't be packed into one datablock
|
||||||
if (pBlock->info.id.groupId != pPos->groupId) {
|
if (pBlock->info.id.groupId != pKey->groupId) {
|
||||||
releaseOutputBuf(pState, &key, pRow);
|
releaseOutputBuf(pState, pKey, pRow);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
ASSERT(pBlock->info.rows > 0);
|
ASSERT(pBlock->info.rows > 0);
|
||||||
releaseOutputBuf(pState, &key, pRow);
|
releaseOutputBuf(pState, pKey, pRow);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2656,7 +2652,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlock->info.rows += pRow->numOfRows;
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
releaseOutputBuf(pState, &key, pRow);
|
releaseOutputBuf(pState, pKey, pRow);
|
||||||
}
|
}
|
||||||
pBlock->info.dataLoad = 1;
|
pBlock->info.dataLoad = 1;
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
|
|
|
@ -971,7 +971,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||||
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
|
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
|
||||||
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
|
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
|
||||||
void* pValue = NULL;
|
void* pValue = NULL;
|
||||||
if (groupId != 0 && streamStateGetParName(pState, groupId, &pValue) != 0) {
|
if (streamStateGetParName(pState, groupId, &pValue) != 0) {
|
||||||
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
|
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
|
||||||
if (pTableSup->numOfExprs > 0) {
|
if (pTableSup->numOfExprs > 0) {
|
||||||
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
|
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
|
||||||
|
|
|
@ -1786,7 +1786,7 @@ FETCH_NEXT_BLOCK:
|
||||||
int32_t current = pInfo->validBlockIndex++;
|
int32_t current = pInfo->validBlockIndex++;
|
||||||
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
|
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
|
||||||
SSDataBlock* pBlock = pPacked->pDataBlock;
|
SSDataBlock* pBlock = pPacked->pDataBlock;
|
||||||
if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
|
if (pBlock->info.parTbName[0]) {
|
||||||
streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
|
streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
|
||||||
}
|
}
|
||||||
// TODO move into scan
|
// TODO move into scan
|
||||||
|
|
|
@ -842,68 +842,61 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
|
||||||
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) {
|
static int32_t saveWinResult(int64_t ts, uint64_t groupId, SSHashObj* pUpdatedMap) {
|
||||||
SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
|
||||||
if (newPos == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
newPos->groupId = groupId;
|
|
||||||
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
|
|
||||||
*(int64_t*)newPos->key = ts;
|
|
||||||
SWinKey key = {.ts = ts, .groupId = groupId};
|
SWinKey key = {.ts = ts, .groupId = groupId};
|
||||||
if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
|
tSimpleHashPut(pUpdatedMap, &key, sizeof(SWinKey), NULL, 0);
|
||||||
taosMemoryFree(newPos);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) {
|
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SSHashObj* pUpdatedMap) {
|
||||||
return saveWinResult(ts, -1, -1, groupId, pUpdatedMap);
|
return saveWinResult(ts, groupId, pUpdatedMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
|
static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) {
|
||||||
int32_t size = taosArrayGetSize(pWins);
|
int32_t size = taosArrayGetSize(pWins);
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
SWinKey* pW = taosArrayGet(pWins, i);
|
SWinKey* pW = taosArrayGet(pWins, i);
|
||||||
void* tmp = taosHashGet(pUpdatedMap, pW, sizeof(SWinKey));
|
void* tmp = tSimpleHashGet(pUpdatedMap, pW, sizeof(SWinKey));
|
||||||
if (tmp) {
|
if (tmp) {
|
||||||
void* value = *(void**)tmp;
|
void* value = *(void**)tmp;
|
||||||
taosMemoryFree(value);
|
taosMemoryFree(value);
|
||||||
taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
|
tSimpleHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
||||||
SArray* res = (SArray*)data;
|
SArray* res = (SArray*)data;
|
||||||
SWinKey* pDataPos = taosArrayGet(res, index);
|
SWinKey* pDataPos = taosArrayGet(res, index);
|
||||||
SResKeyPos* pRKey = (SResKeyPos*)pKey;
|
SWinKey* pWKey = (SWinKey*)pKey;
|
||||||
if (pRKey->groupId > pDataPos->groupId) {
|
|
||||||
|
if (pWKey->groupId > pDataPos->groupId) {
|
||||||
return 1;
|
return 1;
|
||||||
} else if (pRKey->groupId < pDataPos->groupId) {
|
} else if (pWKey->groupId < pDataPos->groupId) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*(int64_t*)pRKey->key > pDataPos->ts) {
|
if (pWKey->ts > pDataPos->ts) {
|
||||||
return 1;
|
return 1;
|
||||||
} else if (*(int64_t*)pRKey->key < pDataPos->ts) {
|
} else if (pWKey->ts < pDataPos->ts) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) {
|
||||||
taosArraySort(pDelWins, winKeyCmprImpl);
|
taosArraySort(pDelWins, winKeyCmprImpl);
|
||||||
taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
|
taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
|
||||||
int32_t delSize = taosArrayGetSize(pDelWins);
|
int32_t delSize = taosArrayGetSize(pDelWins);
|
||||||
if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
|
if (tSimpleHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
|
int32_t iter = 0;
|
||||||
SResKeyPos* pResKey = *(SResKeyPos**)pIte;
|
while ((pIte = tSimpleHashIterate(pUpdatedMap, pIte, &iter)) != NULL) {
|
||||||
int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
|
SWinKey* pResKey = tSimpleHashGetKey(pIte, NULL);
|
||||||
if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) {
|
int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinKey);
|
||||||
|
if (index >= 0 && 0 == compareWinKey(pResKey, pDelWins, index)) {
|
||||||
taosArrayRemove(pDelWins, index);
|
taosArrayRemove(pDelWins, index);
|
||||||
delSize = taosArrayGetSize(pDelWins);
|
delSize = taosArrayGetSize(pDelWins);
|
||||||
}
|
}
|
||||||
|
@ -1352,7 +1345,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
|
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
|
||||||
SHashObj* pUpdatedMap) {
|
SSHashObj* pUpdatedMap) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
|
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
|
||||||
|
@ -1388,28 +1381,21 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
|
||||||
taosArrayPush(pUpWins, &winRes);
|
taosArrayPush(pUpWins, &winRes);
|
||||||
}
|
}
|
||||||
if (pUpdatedMap) {
|
if (pUpdatedMap) {
|
||||||
void* tmp = taosHashGet(pUpdatedMap, &winRes, sizeof(SWinKey));
|
tSimpleHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||||
if (tmp) {
|
|
||||||
void* value = *(void**)tmp;
|
|
||||||
taosMemoryFree(value);
|
|
||||||
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
||||||
} while (win.ekey <= endTsCols[i]);
|
} while (win.ekey <= endTsCols[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
|
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) {
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
|
||||||
uint64_t groupId = *(uint64_t*)key;
|
uint64_t groupId = pKey->groupId;
|
||||||
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
TSKEY ts = pKey->ts;
|
||||||
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
int32_t code = saveWinResult(ts, groupId, resWins);
|
||||||
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1417,36 +1403,16 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
|
||||||
SArray* res = (SArray*)data;
|
|
||||||
SWinKey* pDataPos = taosArrayGet(res, index);
|
|
||||||
SWinKey* pWKey = (SWinKey*)pKey;
|
|
||||||
|
|
||||||
if (pWKey->groupId > pDataPos->groupId) {
|
|
||||||
return 1;
|
|
||||||
} else if (pWKey->groupId < pDataPos->groupId) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pWKey->ts > pDataPos->ts) {
|
|
||||||
return 1;
|
|
||||||
} else if (pWKey->ts < pDataPos->ts) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
||||||
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins,
|
SHashObj* pPullDataMap, SSHashObj* closeWins, SArray* pDelWins,
|
||||||
SOperatorInfo* pOperator) {
|
SOperatorInfo* pOperator) {
|
||||||
qDebug("===stream===close interval window");
|
qDebug("===stream===close interval window");
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t delSize = taosArrayGetSize(pDelWins);
|
int32_t delSize = taosArrayGetSize(pDelWins);
|
||||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
void* key = tSimpleHashGetKey(pIte, NULL);
|
||||||
SWinKey* pWinKey = (SWinKey*)key;
|
SWinKey* pWinKey = (SWinKey*)key;
|
||||||
if (delSize > 0) {
|
if (delSize > 0) {
|
||||||
int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
|
int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
|
||||||
|
@ -1648,7 +1614,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
}
|
}
|
||||||
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows);
|
||||||
cleanupExprSupp(&pInfo->scalarSupp);
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
|
@ -2157,7 +2123,7 @@ bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) {
|
||||||
return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0);
|
return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SHashObj* pUpdatedMap) {
|
static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pUpdatedMap) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
int32_t size = taosArrayGetSize(pWinArray);
|
int32_t size = taosArrayGetSize(pWinArray);
|
||||||
|
@ -2343,7 +2309,8 @@ static void clearFunctionContext(SExprSupp* pSup) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
|
void doBuildStreamIntervalResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
|
||||||
|
SGroupResInfo* pGroupResInfo) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
// set output datablock version
|
// set output datablock version
|
||||||
pBlock->info.version = pTaskInfo->version;
|
pBlock->info.version = pTaskInfo->version;
|
||||||
|
@ -2370,7 +2337,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
|
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
|
||||||
SHashObj* pUpdatedMap) {
|
SSHashObj* pUpdatedMap) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
|
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
|
||||||
|
|
||||||
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
||||||
|
@ -2516,7 +2483,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
|
@ -2543,7 +2510,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
|
@ -2552,11 +2519,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pInfo->pUpdated) {
|
if (!pInfo->pUpdated) {
|
||||||
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES);
|
pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
|
||||||
}
|
}
|
||||||
if (!pInfo->pUpdatedMap) {
|
if (!pInfo->pUpdatedMap) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -2650,12 +2617,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||||
|
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) {
|
int32_t iter = 0;
|
||||||
|
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
|
||||||
taosArrayPush(pInfo->pUpdated, pIte);
|
taosArrayPush(pInfo->pUpdated, pIte);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pInfo->pUpdatedMap);
|
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
taosArraySort(pInfo->pUpdated, resultrowComparAsc);
|
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||||
pInfo->pUpdated = NULL;
|
pInfo->pUpdated = NULL;
|
||||||
|
@ -2675,7 +2643,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
|
@ -3239,10 +3207,9 @@ static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2)
|
||||||
|
|
||||||
static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
|
static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
|
||||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
void* key = tSimpleHashGetKey(pIte, NULL);
|
||||||
taosArrayPush(pUpdated, key);
|
taosArrayPush(pUpdated, key);
|
||||||
}
|
}
|
||||||
taosArraySort(pUpdated, sessionKeyCompareAsc);
|
taosArraySort(pUpdated, sessionKeyCompareAsc);
|
||||||
|
@ -3256,13 +3223,12 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
blockDataEnsureCapacity(pBlock, size);
|
blockDataEnsureCapacity(pBlock, size);
|
||||||
size_t keyLen = 0;
|
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) {
|
while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) {
|
||||||
if (pBlock->info.rows + 1 > pBlock->info.capacity) {
|
if (pBlock->info.rows + 1 > pBlock->info.capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
SSessionKey* res = tSimpleHashGetKey(*Ite, &keyLen);
|
SSessionKey* res = tSimpleHashGetKey(*Ite, NULL);
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
|
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
|
||||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
@ -3351,7 +3317,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
|
|
||||||
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) {
|
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) {
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||||
SResultWindowInfo* pWinInfo = pIte;
|
SResultWindowInfo* pWinInfo = pIte;
|
||||||
|
@ -3362,7 +3327,7 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
|
SSessionKey* pKey = tSimpleHashGetKey(pIte, NULL);
|
||||||
tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
|
tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4761,7 +4726,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||||
if (pInfo->binfo.pRes->info.rows > 0) {
|
if (pInfo->binfo.pRes->info.rows > 0) {
|
||||||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
|
@ -4776,14 +4741,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
if (!pInfo->pUpdated) {
|
if (!pInfo->pUpdated) {
|
||||||
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES);
|
pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
|
||||||
}
|
}
|
||||||
if (!pInfo->pUpdatedMap) {
|
if (!pInfo->pUpdatedMap) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
|
@ -4832,19 +4796,21 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
|
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
|
||||||
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pInfo->pUpdatedMap,
|
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL,
|
||||||
pInfo->pDelWins, pOperator);
|
pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
|
||||||
|
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
while ((pIte = taosHashIterate(pInfo->pUpdatedMap, pIte)) != NULL) {
|
int32_t iter = 0;
|
||||||
taosArrayPush(pInfo->pUpdated, pIte);
|
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
|
||||||
|
SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
|
||||||
|
taosArrayPush(pInfo->pUpdated, pKey);
|
||||||
}
|
}
|
||||||
taosArraySort(pInfo->pUpdated, resultrowComparAsc);
|
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||||
pInfo->pUpdated = NULL;
|
pInfo->pUpdated = NULL;
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
taosHashCleanup(pInfo->pUpdatedMap);
|
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
|
|
||||||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||||
|
@ -4853,7 +4819,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||||
if (pInfo->binfo.pRes->info.rows > 0) {
|
if (pInfo->binfo.pRes->info.rows > 0) {
|
||||||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
|
|
|
@ -3281,9 +3281,6 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
if (NULL == pSelect->pPartitionByList) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
|
pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
|
||||||
int32_t code = translateExprList(pCxt, pSelect->pPartitionByList);
|
int32_t code = translateExprList(pCxt, pSelect->pPartitionByList);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -5674,10 +5671,6 @@ static SNode* createNullValue() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) {
|
static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) {
|
||||||
if (NULL == pMeta) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfTags = getNumOfTags(pMeta);
|
int32_t numOfTags = getNumOfTags(pMeta);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
|
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
|
||||||
|
@ -5731,14 +5724,30 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea
|
||||||
return pCxt->errCode;
|
return pCxt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta,
|
static int32_t addNullTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||||
SCreateStreamStmt* pStmt) {
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < LIST_LENGTH(pStmt->pTags); ++i) {
|
||||||
if (NULL == pSelect->pPartitionByList) {
|
code = nodesListMakeStrictAppend(&((SSelectStmt*)pStmt->pQuery)->pTags, createNullValue());
|
||||||
return addNullTagsForExistTable(pCxt, pMeta, pSelect);
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
|
static int32_t addNullTagsToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) {
|
||||||
|
if (NULL == pMeta) {
|
||||||
|
return addNullTagsForCreateTable(pCxt, pStmt);
|
||||||
|
}
|
||||||
|
return addNullTagsForExistTable(pCxt, pMeta, (SSelectStmt*)pStmt->pQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta,
|
||||||
|
SCreateStreamStmt* pStmt) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
|
if (NULL == pSelect->pPartitionByList) {
|
||||||
|
code = addNullTagsToCreateStreamQuery(pCxt, pMeta, pStmt);
|
||||||
|
} else {
|
||||||
|
code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = addSubtableNameToCreateStreamQuery(pCxt, pStmt, pSelect);
|
code = addSubtableNameToCreateStreamQuery(pCxt, pStmt, pSelect);
|
||||||
}
|
}
|
||||||
|
@ -6013,17 +6022,66 @@ static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStm
|
||||||
return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq);
|
return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t adjustTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
|
if (NULL == pSelect->pPartitionByList || NULL == pSelect->pTags) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pTagDef = NULL;
|
||||||
|
SNode* pTagExpr = NULL;
|
||||||
|
FORBOTH(pTagDef, pStmt->pTags, pTagExpr, pSelect->pTags) {
|
||||||
|
SColumnDefNode* pDef = (SColumnDefNode*)pTagDef;
|
||||||
|
if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) {
|
||||||
|
SNode* pFunc = NULL;
|
||||||
|
int32_t code = createCastFunc(pCxt, pTagExpr, pDef->dataType, &pFunc);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
REPLACE_LIST2_NODE(pFunc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustTags(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta,
|
||||||
|
SCMCreateStreamReq* pReq) {
|
||||||
|
if (NULL == pMeta) {
|
||||||
|
return adjustTagsForCreateTable(pCxt, pStmt, pReq);
|
||||||
|
}
|
||||||
|
return adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isTagDef(SNodeList* pTags) {
|
||||||
|
if (NULL == pTags) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return QUERY_NODE_COLUMN_DEF == nodeType(nodesListGetNode(pTags, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isTagBound(SNodeList* pTags) {
|
||||||
|
if (NULL == pTags) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return QUERY_NODE_COLUMN == nodeType(nodesListGetNode(pTags, 0));
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq,
|
static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq,
|
||||||
STableMeta** pMeta) {
|
STableMeta** pMeta) {
|
||||||
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta);
|
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta);
|
||||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||||
if (NULL != pStmt->pCols) {
|
if (NULL != pStmt->pCols || isTagBound(pStmt->pTags)) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
||||||
}
|
}
|
||||||
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
||||||
pReq->targetStbUid = 0;
|
pReq->targetStbUid = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
|
if (isTagDef(pStmt->pTags)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Table already exist: %s",
|
||||||
|
pStmt->targetTabName);
|
||||||
|
}
|
||||||
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
|
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
|
||||||
pReq->targetStbUid = (*pMeta)->suid;
|
pReq->targetStbUid = (*pMeta)->suid;
|
||||||
}
|
}
|
||||||
|
@ -6049,8 +6107,8 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
||||||
code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq);
|
code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq);
|
code = adjustTags(pCxt, pStmt, pMeta, pReq);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||||
|
|
|
@ -374,6 +374,20 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols);
|
code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags && NULL == pSelect->pPartitionByList) {
|
||||||
|
pScan->pTags = nodesCloneList(pSelect->pTags);
|
||||||
|
if (NULL == pScan->pTags) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pSubtable && NULL == pSelect->pPartitionByList) {
|
||||||
|
pScan->pSubtable = nodesCloneNode(pSelect->pSubtable);
|
||||||
|
if (NULL == pScan->pSubtable) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// set output
|
// set output
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
|
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
|
||||||
|
|
|
@ -19,9 +19,9 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int);
|
sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb varchar(100),tc int);
|
||||||
|
|
||||||
sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
|
sql create stream streams0 trigger at_once into result.streamt0 tags(tb) as select _wstart, count(*) c1, max(a) c2 from st partition by tbname tb interval(10s);
|
||||||
sql insert into t1 values(1648791213000,1,2,3);
|
sql insert into t1 values(1648791213000,1,2,3);
|
||||||
sql insert into t2 values(1648791213000,2,2,3);
|
sql insert into t2 values(1648791213000,2,2,3);
|
||||||
|
|
||||||
|
@ -61,6 +61,16 @@ if $data02 != 1 then
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
if $data03 != NULL then
|
||||||
|
print =====data03=$data03
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data04 != t1 then
|
||||||
|
print =====data04=$data04
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
if $data11 != 1 then
|
if $data11 != 1 then
|
||||||
print =====data11=$data11
|
print =====data11=$data11
|
||||||
goto loop0
|
goto loop0
|
||||||
|
@ -71,6 +81,16 @@ if $data12 != 2 then
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
if $data13 != NULL then
|
||||||
|
print =====data13=$data13
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data14 != t2 then
|
||||||
|
print =====data14=$data14
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
print ===== step3
|
print ===== step3
|
||||||
|
|
||||||
sql create database result1 vgroups 1;
|
sql create database result1 vgroups 1;
|
||||||
|
@ -83,9 +103,9 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int);
|
sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta varchar(100),tb int,tc int);
|
||||||
|
|
||||||
sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
|
sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) tags(ta) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname as ta interval(10s);
|
||||||
sql insert into t1 values(1648791213000,10,20,30);
|
sql insert into t1 values(1648791213000,10,20,30);
|
||||||
sql insert into t2 values(1648791213000,40,50,60);
|
sql insert into t2 values(1648791213000,40,50,60);
|
||||||
|
|
||||||
|
@ -161,7 +181,7 @@ sql create table t2 using st tags(2,2,2);
|
||||||
sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20));
|
sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20));
|
||||||
|
|
||||||
# tag dest 1, source 2
|
# tag dest 1, source 2
|
||||||
##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s);
|
sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s);
|
||||||
|
|
||||||
# column dest 3, source 4
|
# column dest 3, source 4
|
||||||
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
|
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
|
||||||
|
@ -173,7 +193,7 @@ sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b
|
||||||
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s);
|
sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s);
|
||||||
|
|
||||||
# column dest 3, source 2
|
# column dest 3, source 2
|
||||||
sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
|
sql create stream streams2 trigger at_once into result2.streamt2(ts, a) tags(ta) as select _wstart, count(*) c1 from st partition by tbname as ta interval(10s);
|
||||||
|
|
||||||
|
|
||||||
print ===== step5
|
print ===== step5
|
||||||
|
@ -252,16 +272,16 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,2,3);
|
sql create table t1 using st tags(1,2,3);
|
||||||
sql create table t2 using st tags(4,5,6);
|
sql create table t2 using st tags(4,5,6);
|
||||||
|
|
||||||
sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
|
sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(tg1 int,tg2 int,tg3 int);
|
||||||
|
|
||||||
sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
|
sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2, tg3, tg1) subtable( concat("tbl-", cast(tg1 as varchar(10)) ) ) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
|
||||||
|
|
||||||
sql insert into t1 values(1648791213000,10,20,30);
|
sql insert into t1 values(1648791213000,10,20,30);
|
||||||
sql insert into t2 values(1648791213000,40,50,60);
|
sql insert into t2 values(1648791213000,40,50,60);
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
|
||||||
sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
|
sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
|
||||||
print $data00, $data01, $data02, $data03
|
print $data00, $data01, $data02, $data03
|
||||||
print $data10, $data11, $data12, $data13
|
print $data10, $data11, $data12, $data13
|
||||||
print $data20, $data21, $data22, $data23
|
print $data20, $data21, $data22, $data23
|
||||||
|
@ -275,7 +295,7 @@ if $loop_count == 10 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select * from result4.streamt4;
|
sql select * from result4.streamt4 order by tg1;
|
||||||
|
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
|
@ -285,7 +305,7 @@ if $rows != 2 then
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data01 != 40 then
|
if $data01 != 10 then
|
||||||
print =====data01=$data01
|
print =====data01=$data01
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
@ -295,7 +315,7 @@ if $data02 != 20 then
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data03 != 2 then
|
if $data03 != 1 then
|
||||||
print =====data03=$data03
|
print =====data03=$data03
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
@ -305,6 +325,26 @@ if $data04 != NULL then
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
if $data11 != 40 then
|
||||||
|
print =====data11=$data11
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 50 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data13 != 1 then
|
||||||
|
print =====data13=$data13
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data14 != NULL then
|
||||||
|
print =====data14=$data14
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
print ======over
|
print ======over
|
||||||
|
|
||||||
system sh/stop_dnodes.sh
|
system sh/stop_dnodes.sh
|
||||||
|
|
|
@ -367,6 +367,77 @@ if $data22 != tag-t3 then
|
||||||
goto loop8
|
goto loop8
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print ===== step6
|
||||||
|
print ===== transform tag value
|
||||||
|
|
||||||
|
sql drop stream if exists streams1;
|
||||||
|
sql drop stream if exists streams2;
|
||||||
|
sql drop stream if exists streams3;
|
||||||
|
sql drop stream if exists streams4;
|
||||||
|
sql drop stream if exists streams5;
|
||||||
|
|
||||||
|
sql drop database if exists test1;
|
||||||
|
sql drop database if exists test2;
|
||||||
|
sql drop database if exists test3;
|
||||||
|
sql drop database if exists test4;
|
||||||
|
sql drop database if exists test5;
|
||||||
|
|
||||||
|
sql drop database if exists result1;
|
||||||
|
sql drop database if exists result2;
|
||||||
|
sql drop database if exists result3;
|
||||||
|
sql drop database if exists result4;
|
||||||
|
sql drop database if exists result5;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
sql create database result6 vgroups 1;
|
||||||
|
|
||||||
|
sql create database test6 vgroups 4;
|
||||||
|
sql use test6;
|
||||||
|
|
||||||
|
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta varchar(20), tb int, tc int);
|
||||||
|
sql create table t1 using st tags("1",1,1);
|
||||||
|
sql create table t2 using st tags("2",2,2);
|
||||||
|
sql create table t3 using st tags("3",3,3);
|
||||||
|
|
||||||
|
sql create stream streams6 trigger at_once into result6.streamt6 TAGS(dd int) as select _wstart, count(*) c1 from st partition by concat(ta, "0") as dd, tbname interval(10s);
|
||||||
|
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop9:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from result6.streamt6 order by 3;
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00 $data10
|
||||||
|
goto loop9
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 10 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop9
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 20 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop9
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data22 != 30 then
|
||||||
|
print =====data22=$data22
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
print ======over
|
print ======over
|
||||||
|
|
||||||
system sh/stop_dnodes.sh
|
system sh/stop_dnodes.sh
|
||||||
|
|
|
@ -0,0 +1,366 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
|
||||||
|
print ===== step1
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print ===== step2
|
||||||
|
print ===== table name
|
||||||
|
|
||||||
|
sql create database result vgroups 1;
|
||||||
|
|
||||||
|
sql create database test vgroups 4;
|
||||||
|
sql use test;
|
||||||
|
|
||||||
|
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
|
||||||
|
print ===== insert into 1
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3);
|
||||||
|
sql insert into t2 values(1648791213000,2,2,3);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop0:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select table_name from information_schema.ins_tables where db_name="result" order by 1;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00
|
||||||
|
print $data10
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != aaa then
|
||||||
|
print =====data00=$data00
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop1:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from result.streamt;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00 $data10
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# group id
|
||||||
|
if $data02 == NULL then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
print ===== step3
|
||||||
|
print ===== column name
|
||||||
|
|
||||||
|
sql create database result2 vgroups 1;
|
||||||
|
|
||||||
|
sql create database test2 vgroups 4;
|
||||||
|
sql use test2;
|
||||||
|
|
||||||
|
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st interval(10s);
|
||||||
|
print ===== insert into 2
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3);
|
||||||
|
sql insert into t2 values(1648791213000,2,2,3);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop2:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1;
|
||||||
|
|
||||||
|
sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00
|
||||||
|
print $data10
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != cc then
|
||||||
|
print =====data00=$data00
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
print sql select cc from result2.streamt2 order by 1;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop21:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select cc from result2.streamt2 order by 1;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00
|
||||||
|
print $data10
|
||||||
|
goto loop21
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != NULL then
|
||||||
|
print =====data00=$data00
|
||||||
|
goto loop21
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop3:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from result2.streamt2;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00
|
||||||
|
print $data10
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != NULL then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
print ===== step4
|
||||||
|
print ===== column name + table name
|
||||||
|
|
||||||
|
sql create database result3 vgroups 1;
|
||||||
|
|
||||||
|
sql create database test3 vgroups 4;
|
||||||
|
sql use test3;
|
||||||
|
|
||||||
|
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", "1") ) as select _wstart, count(*) c1 from st interval(10s);
|
||||||
|
print ===== insert into 3
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3);
|
||||||
|
sql insert into t2 values(1648791213000,2,2,3);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00
|
||||||
|
print $data10
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != dd then
|
||||||
|
print =====data00=$data00
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select dd from result3.streamt3 order by 1;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00 $data10
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != NULL then
|
||||||
|
print =====data00=$data00
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop5:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from result3.streamt3;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00
|
||||||
|
print $data10
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != NULL then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop6:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select table_name from information_schema.ins_tables where db_name="result3" order by 1;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00
|
||||||
|
print $data10
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != tbn-1 then
|
||||||
|
print =====data00=$data00
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ===== step5
|
||||||
|
print ===== tag name + table name
|
||||||
|
|
||||||
|
sql create database result4 vgroups 1;
|
||||||
|
|
||||||
|
sql create database test4 vgroups 1;
|
||||||
|
sql use test4;
|
||||||
|
|
||||||
|
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
sql create table t3 using st tags(3,3,3);
|
||||||
|
|
||||||
|
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", "1")) as select _wstart, count(*) c1 from st interval(10s);
|
||||||
|
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop7:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00
|
||||||
|
print $data10
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != tbn-1 then
|
||||||
|
print =====data00=$data00
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop8:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from result4.streamt4 order by 3;
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
print $data00 $data10
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 3 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != NULL then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ======over
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
Loading…
Reference in New Issue