Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/alter_pages_buffer
This commit is contained in:
commit
7941f3a36e
|
@ -49,7 +49,7 @@ typedef struct {
|
||||||
TSKEY ts;
|
TSKEY ts;
|
||||||
} SWinKey;
|
} SWinKey;
|
||||||
|
|
||||||
static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) {
|
||||||
SWinKey* pWin1 = (SWinKey*)pKey1;
|
SWinKey* pWin1 = (SWinKey*)pKey1;
|
||||||
SWinKey* pWin2 = (SWinKey*)pKey2;
|
SWinKey* pWin2 = (SWinKey*)pKey2;
|
||||||
|
|
||||||
|
@ -68,6 +68,10 @@ static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, i
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int winKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
|
return sWinKeyCmprImpl(pKey1, pKey2);
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
TSKEY ts;
|
TSKEY ts;
|
||||||
|
|
|
@ -606,8 +606,6 @@ typedef struct SStreamIntervalOperatorInfo {
|
||||||
SArray* pDelWins; // SWinRes
|
SArray* pDelWins; // SWinRes
|
||||||
int32_t delIndex;
|
int32_t delIndex;
|
||||||
SSDataBlock* pDelRes;
|
SSDataBlock* pDelRes;
|
||||||
SSDataBlock* pUpdateRes;
|
|
||||||
bool returnUpdate;
|
|
||||||
SPhysiNode* pPhyNode; // create new child
|
SPhysiNode* pPhyNode; // create new child
|
||||||
SHashObj* pPullDataMap;
|
SHashObj* pPullDataMap;
|
||||||
SArray* pPullWins; // SPullWindowInfo
|
SArray* pPullWins; // SPullWindowInfo
|
||||||
|
|
|
@ -1331,8 +1331,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t*
|
||||||
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
|
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
|
||||||
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
|
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
|
||||||
colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
|
colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
|
||||||
colDataAppendNULL(pCalStartCol, pBlock->info.rows);
|
colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
|
||||||
colDataAppendNULL(pCalEndCol, pBlock->info.rows);
|
colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
|
||||||
pBlock->info.rows++;
|
pBlock->info.rows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1376,7 +1376,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
|
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -1430,7 +1430,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (filter) {
|
||||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
||||||
|
}
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1466,7 +1468,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
setBlockIntoRes(pInfo, &block);
|
setBlockIntoRes(pInfo, &block, true);
|
||||||
|
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pBlockInfo->rows > 0) {
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
|
@ -1507,7 +1509,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
tqNextBlock(pInfo->tqReader, &ret);
|
tqNextBlock(pInfo->tqReader, &ret);
|
||||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
|
if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
|
@ -1771,6 +1773,7 @@ FETCH_NEXT_BLOCK:
|
||||||
// printDataBlock(pSDB, "stream scan update");
|
// printDataBlock(pSDB, "stream scan update");
|
||||||
return pSDB;
|
return pSDB;
|
||||||
}
|
}
|
||||||
|
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
|
@ -1821,7 +1824,7 @@ FETCH_NEXT_BLOCK:
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
setBlockIntoRes(pInfo, &block);
|
setBlockIntoRes(pInfo, &block, false);
|
||||||
|
|
||||||
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
|
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
|
||||||
pInfo->pRes->info.version)) {
|
pInfo->pRes->info.version)) {
|
||||||
|
@ -1830,28 +1833,7 @@ FETCH_NEXT_BLOCK:
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pInfo->pUpdateInfo) {
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pBlockInfo->rows > 0) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
pInfo->tqReader->pMsg = NULL;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
/*blockDataCleanup(pInfo->pRes);*/
|
|
||||||
}
|
|
||||||
|
|
||||||
// record the scan action.
|
|
||||||
pInfo->numOfExec++;
|
|
||||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
|
||||||
// printDataBlock(pInfo->pRes, "stream scan");
|
|
||||||
|
|
||||||
if (pBlockInfo->rows == 0) {
|
|
||||||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
|
||||||
/*pOperator->status = OP_EXEC_DONE;*/
|
|
||||||
} else if (pInfo->pUpdateInfo) {
|
|
||||||
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
|
||||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
|
@ -1867,13 +1849,37 @@ FETCH_NEXT_BLOCK:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
||||||
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
|
|
||||||
|
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
pInfo->tqReader->pMsg = NULL;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
/*blockDataCleanup(pInfo->pRes);*/
|
||||||
|
}
|
||||||
|
|
||||||
|
// record the scan action.
|
||||||
|
pInfo->numOfExec++;
|
||||||
|
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||||
|
// printDataBlock(pInfo->pRes, "stream scan");
|
||||||
|
|
||||||
qDebug("scan rows: %d", pBlockInfo->rows);
|
qDebug("scan rows: %d", pBlockInfo->rows);
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pBlockInfo->rows > 0) {
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
} else {
|
|
||||||
goto NEXT_SUBMIT_BLK;
|
|
||||||
}
|
}
|
||||||
/*return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;*/
|
|
||||||
|
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
|
goto FETCH_NEXT_BLOCK;
|
||||||
|
}
|
||||||
|
|
||||||
|
goto NEXT_SUBMIT_BLK;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -414,14 +414,17 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
|
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
|
||||||
if (pInterval->interval != pInterval->sliding &&
|
if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
|
||||||
(pWin->ekey < pBlockInfo->calWin.skey || pWin->skey > pBlockInfo->calWin.ekey)) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
|
||||||
|
return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
|
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
|
||||||
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
|
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
|
||||||
bool ascQuery = (order == TSDB_ORDER_ASC);
|
bool ascQuery = (order == TSDB_ORDER_ASC);
|
||||||
|
@ -912,6 +915,8 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
||||||
|
taosArraySort(pDelWins, sWinKeyCmprImpl);
|
||||||
|
taosArrayRemoveDuplicate(pDelWins, sWinKeyCmprImpl, NULL);
|
||||||
int32_t delSize = taosArrayGetSize(pDelWins);
|
int32_t delSize = taosArrayGetSize(pDelWins);
|
||||||
if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
|
if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -1387,7 +1392,7 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, int32_t numOfOutput) {
|
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SWinKey key = {.ts = ts, .groupId = groupId};
|
SWinKey key = {.ts = ts, .groupId = groupId};
|
||||||
tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
|
tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
|
||||||
|
@ -1395,21 +1400,37 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock,
|
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
|
||||||
SArray* pUpWins, SHashObj* pUpdatedMap) {
|
SHashObj* pUpdatedMap) {
|
||||||
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
|
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
|
||||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
|
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
|
||||||
|
SColumnInfoData* pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
|
TSKEY* calStTsCols = (TSKEY*)pCalStTsCol->pData;
|
||||||
|
SColumnInfoData* pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
|
TSKEY* calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
|
||||||
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
|
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||||
SResultRowInfo dumyInfo;
|
SResultRowInfo dumyInfo;
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
|
STimeWindow win = {0};
|
||||||
|
if (IS_FINAL_OP(pInfo)) {
|
||||||
|
win.skey = startTsCols[i];
|
||||||
|
win.ekey = endTsCols[i];
|
||||||
|
} else {
|
||||||
|
win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
|
||||||
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
|
||||||
|
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
uint64_t winGpId = pGpDatas[i];
|
uint64_t winGpId = pGpDatas[i];
|
||||||
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput);
|
bool res = doDeleteWindow(pOperator, win.skey, winGpId);
|
||||||
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
|
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
|
||||||
if (pUpWins && res) {
|
if (pUpWins && res) {
|
||||||
taosArrayPush(pUpWins, &winRes);
|
taosArrayPush(pUpWins, &winRes);
|
||||||
|
@ -1511,16 +1532,43 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
||||||
|
SArray* res = (SArray*)data;
|
||||||
|
SWinKey* pos = taosArrayGet(res, index);
|
||||||
|
SWinKey* pData = (SWinKey*)pKey;
|
||||||
|
if (pData->ts == pos->ts) {
|
||||||
|
if (pData->groupId > pos->groupId) {
|
||||||
|
return 1;
|
||||||
|
} else if (pData->groupId < pos->groupId) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
} else if (pData->ts > pos->ts) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
||||||
SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) {
|
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins,
|
||||||
|
SOperatorInfo* pOperator) {
|
||||||
qDebug("===stream===close interval window");
|
qDebug("===stream===close interval window");
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t delSize = taosArrayGetSize(pDelWins);
|
||||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
||||||
SWinKey* pWinKey = (SWinKey*)key;
|
SWinKey* pWinKey = (SWinKey*)key;
|
||||||
|
if (delSize > 0) {
|
||||||
|
int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
|
||||||
|
if (index >= 0 && 0 == compareWinKey(pWinKey, pDelWins, index)) {
|
||||||
|
taosArrayRemove(pDelWins, index);
|
||||||
|
delSize = taosArrayGetSize(pDelWins);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
|
void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
|
||||||
STimeWindow win = {
|
STimeWindow win = {
|
||||||
.skey = pWinKey->ts,
|
.skey = pWinKey->ts,
|
||||||
|
@ -1624,7 +1672,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
|
||||||
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
|
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
|
||||||
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
|
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
|
||||||
closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
|
closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
|
||||||
pOperator);
|
NULL, pOperator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1694,7 +1742,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
taosHashCleanup(pInfo->pPullDataMap);
|
taosHashCleanup(pInfo->pPullDataMap);
|
||||||
taosArrayDestroy(pInfo->pPullWins);
|
taosArrayDestroy(pInfo->pPullWins);
|
||||||
blockDataDestroy(pInfo->pPullDataRes);
|
blockDataDestroy(pInfo->pPullDataRes);
|
||||||
blockDataDestroy(pInfo->pUpdateRes);
|
|
||||||
taosArrayDestroy(pInfo->pDelWins);
|
taosArrayDestroy(pInfo->pDelWins);
|
||||||
blockDataDestroy(pInfo->pDelRes);
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
taosMemoryFreeClear(pInfo->pState);
|
taosMemoryFreeClear(pInfo->pState);
|
||||||
|
@ -2862,11 +2909,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr
|
||||||
isCloseWindow(&parentWin, &pInfo->twAggSup)) {
|
isCloseWindow(&parentWin, &pInfo->twAggSup)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
|
|
||||||
pSup->rowEntryInfoOffset, &pInfo->aggSup);
|
|
||||||
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
|
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
||||||
}
|
|
||||||
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
|
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
for (int32_t j = 0; j < numOfChildren; j++) {
|
for (int32_t j = 0; j < numOfChildren; j++) {
|
||||||
|
@ -2876,6 +2919,13 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr
|
||||||
if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
|
if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (num == 0) {
|
||||||
|
int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
|
||||||
|
pSup->rowEntryInfoOffset, &pInfo->aggSup);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
}
|
||||||
num++;
|
num++;
|
||||||
SResultRow* pChResult = NULL;
|
SResultRow* pChResult = NULL;
|
||||||
setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
|
setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
|
||||||
|
@ -3214,25 +3264,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
if (!IS_FINAL_OP(pInfo)) {
|
if (!IS_FINAL_OP(pInfo)) {
|
||||||
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
|
||||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
|
||||||
return pInfo->binfo.pRes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
|
|
||||||
pInfo->returnUpdate = false;
|
|
||||||
ASSERT(!IS_FINAL_OP(pInfo));
|
|
||||||
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
|
||||||
// process the rest of the data
|
|
||||||
return pInfo->pUpdateRes;
|
|
||||||
}
|
|
||||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||||
if (pInfo->pDelRes->info.rows != 0) {
|
if (pInfo->pDelRes->info.rows != 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||||
|
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||||
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||||
|
return pInfo->binfo.pRes;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -3241,8 +3285,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
clearSpecialDataBlock(pInfo->pUpdateRes);
|
|
||||||
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||||
break;
|
break;
|
||||||
|
@ -3252,34 +3294,16 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
ASSERT(pBlock->info.type != STREAM_INVERT);
|
ASSERT(pBlock->info.type != STREAM_INVERT);
|
||||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||||
} else if (pBlock->info.type == STREAM_CLEAR) {
|
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey));
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins, NULL);
|
|
||||||
if (IS_FINAL_OP(pInfo)) {
|
|
||||||
int32_t childIndex = getChildIndex(pBlock);
|
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
|
||||||
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
|
|
||||||
SExprSupp* pChildSup = &pChildOp->exprSupp;
|
|
||||||
|
|
||||||
doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL);
|
|
||||||
rebuildIntervalWindow(pOperator, pSup, pUpWins, pUpdatedMap);
|
|
||||||
taosArrayDestroy(pUpWins);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
removeResults(pUpWins, pUpdatedMap);
|
|
||||||
copyDataBlock(pInfo->pUpdateRes, pBlock);
|
|
||||||
pInfo->returnUpdate = true;
|
|
||||||
taosArrayDestroy(pUpWins);
|
|
||||||
break;
|
|
||||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
|
||||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||||
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, delWins, pUpdatedMap);
|
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap);
|
||||||
if (IS_FINAL_OP(pInfo)) {
|
if (IS_FINAL_OP(pInfo)) {
|
||||||
int32_t childIndex = getChildIndex(pBlock);
|
int32_t childIndex = getChildIndex(pBlock);
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||||
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
|
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
|
||||||
SExprSupp* pChildSup = &pChildOp->exprSupp;
|
SExprSupp* pChildSup = &pChildOp->exprSupp;
|
||||||
doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL);
|
doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL);
|
||||||
rebuildIntervalWindow(pOperator, pSup, delWins, pUpdatedMap);
|
rebuildIntervalWindow(pOperator, pSup, delWins, pUpdatedMap);
|
||||||
addRetriveWindow(delWins, pInfo);
|
addRetriveWindow(delWins, pInfo);
|
||||||
taosArrayAddAll(pInfo->pDelWins, delWins);
|
taosArrayAddAll(pInfo->pDelWins, delWins);
|
||||||
|
@ -3294,7 +3318,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
|
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
|
||||||
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, pUpdatedMap);
|
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap);
|
||||||
if (taosArrayGetSize(pUpdated) > 0) {
|
if (taosArrayGetSize(pUpdated) > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3334,11 +3358,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
minTs = TMIN(minTs, pBlock->info.window.skey);
|
minTs = TMIN(minTs, pBlock->info.window.skey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
|
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
|
||||||
if (IS_FINAL_OP(pInfo)) {
|
if (IS_FINAL_OP(pInfo)) {
|
||||||
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
|
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
|
||||||
pInfo->pPullDataMap, pUpdatedMap, pOperator);
|
pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator);
|
||||||
closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
|
closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
|
||||||
}
|
}
|
||||||
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||||
|
@ -3374,13 +3399,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
|
|
||||||
pInfo->returnUpdate = false;
|
|
||||||
ASSERT(!IS_FINAL_OP(pInfo));
|
|
||||||
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
|
||||||
// process the rest of the data
|
|
||||||
return pInfo->pUpdateRes;
|
|
||||||
}
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3455,9 +3473,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
|
|
||||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
|
||||||
pInfo->returnUpdate = false;
|
|
||||||
|
|
||||||
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
|
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
|
||||||
|
|
||||||
|
@ -4276,23 +4291,6 @@ static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
|
||||||
SArray* res = (SArray*)data;
|
|
||||||
SResKeyPos* pos = taosArrayGetP(res, index);
|
|
||||||
SWinKey* pData = (SWinKey*)pKey;
|
|
||||||
if (pData->ts == *(int64_t*)pos->key) {
|
|
||||||
if (pData->groupId > pos->groupId) {
|
|
||||||
return 1;
|
|
||||||
} else if (pData->groupId < pos->groupId) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
} else if (pData->ts > *(int64_t*)pos->key) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) {
|
static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) {
|
||||||
int32_t size = taosHashGetSize(pStDeleted);
|
int32_t size = taosHashGetSize(pStDeleted);
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
|
@ -5668,13 +5666,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, "single interval recv");
|
printDataBlock(pBlock, "single interval recv");
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL);
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
|
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
|
||||||
continue;
|
|
||||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
|
||||||
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins,
|
|
||||||
pUpdatedMap);
|
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
||||||
|
@ -5706,8 +5700,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
|
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
|
||||||
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
|
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
|
||||||
pOperator);
|
pInfo->pDelWins, pOperator);
|
||||||
|
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
|
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
|
||||||
|
@ -5717,7 +5712,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
|
|
||||||
taosHashCleanup(pUpdatedMap);
|
taosHashCleanup(pUpdatedMap);
|
||||||
|
|
||||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||||
|
@ -5803,8 +5797,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||||
streamStateSetNumber(pInfo->pState, -1);
|
streamStateSetNumber(pInfo->pState, -1);
|
||||||
|
|
||||||
pInfo->pUpdateRes = NULL;
|
|
||||||
pInfo->returnUpdate = false;
|
|
||||||
pInfo->pPhyNode = NULL; // create new child
|
pInfo->pPhyNode = NULL; // create new child
|
||||||
pInfo->pPullDataMap = NULL;
|
pInfo->pPullDataMap = NULL;
|
||||||
pInfo->pPullWins = NULL; // SPullWindowInfo
|
pInfo->pPullWins = NULL; // SPullWindowInfo
|
||||||
|
|
|
@ -959,8 +959,8 @@ int32_t udfdInitResidentFuncs() {
|
||||||
char* pSave = tsUdfdResFuncs;
|
char* pSave = tsUdfdResFuncs;
|
||||||
char* token;
|
char* token;
|
||||||
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
|
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
|
||||||
char func[TSDB_FUNC_NAME_LEN] = {0};
|
char func[TSDB_FUNC_NAME_LEN+1] = {0};
|
||||||
strncpy(func, token, sizeof(func));
|
strncpy(func, token, TSDB_FUNC_NAME_LEN);
|
||||||
taosArrayPush(global.residentFuncs, func);
|
taosArrayPush(global.residentFuncs, func);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ typedef struct SStateKey {
|
||||||
int64_t opNum;
|
int64_t opNum;
|
||||||
} SStateKey;
|
} SStateKey;
|
||||||
|
|
||||||
static inline int SStateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
SStateKey* pWin1 = (SStateKey*)pKey1;
|
SStateKey* pWin1 = (SStateKey*)pKey1;
|
||||||
SStateKey* pWin2 = (SStateKey*)pKey2;
|
SStateKey* pWin2 = (SStateKey*)pKey2;
|
||||||
|
|
||||||
|
@ -67,12 +67,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// open state storage backend
|
// open state storage backend
|
||||||
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, SStateKeyCmpr, pState->db, &pState->pStateDb) < 0) {
|
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pFillStateDb) < 0) {
|
if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -154,8 +154,8 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *line = NULL;
|
char line[1024];
|
||||||
ssize_t _bytes = taosGetLineFile(pFile, &line);
|
ssize_t _bytes = taosGetsFile(pFile, sizeof(line), line);
|
||||||
if ((_bytes < 0) || (line == NULL)) {
|
if ((_bytes < 0) || (line == NULL)) {
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -165,7 +165,6 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
|
||||||
sscanf(line, "%s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, cpu, &cpuInfo->user, &cpuInfo->nice, &cpuInfo->system,
|
sscanf(line, "%s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, cpu, &cpuInfo->user, &cpuInfo->nice, &cpuInfo->system,
|
||||||
&cpuInfo->idle);
|
&cpuInfo->idle);
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFreeClear(line);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -194,8 +193,8 @@ static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *line = NULL;
|
char line[1024];
|
||||||
ssize_t _bytes = taosGetLineFile(pFile, &line);
|
ssize_t _bytes = taosGetsFile(pFile, sizeof(line), line);
|
||||||
if ((_bytes < 0) || (line == NULL)) {
|
if ((_bytes < 0) || (line == NULL)) {
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -210,7 +209,6 @@ static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFreeClear(line);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -286,14 +284,14 @@ int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen) {
|
||||||
snprintf(releaseName, maxLen, "Windows");
|
snprintf(releaseName, maxLen, "Windows");
|
||||||
return 0;
|
return 0;
|
||||||
#elif defined(_TD_DARWIN_64)
|
#elif defined(_TD_DARWIN_64)
|
||||||
char *line = NULL;
|
char line[1024];
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile("/etc/os-release", TD_FILE_READ | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile("/etc/os-release", TD_FILE_READ | TD_FILE_STREAM);
|
||||||
if (pFile == NULL) return false;
|
if (pFile == NULL) return false;
|
||||||
|
|
||||||
while ((size = taosGetLineFile(pFile, &line)) != -1) {
|
while ((size = taosGetsFile(pFile, sizeof(line), line)) != -1) {
|
||||||
line[size - 1] = '\0';
|
line[size - 1] = '\0';
|
||||||
if (strncmp(line, "PRETTY_NAME", 11) == 0) {
|
if (strncmp(line, "PRETTY_NAME", 11) == 0) {
|
||||||
const char *p = strchr(line, '=') + 1;
|
const char *p = strchr(line, '=') + 1;
|
||||||
|
@ -307,18 +305,17 @@ int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFree(line);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return code;
|
return code;
|
||||||
#else
|
#else
|
||||||
char *line = NULL;
|
char line[1024];
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile("/etc/os-release", TD_FILE_READ | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile("/etc/os-release", TD_FILE_READ | TD_FILE_STREAM);
|
||||||
if (pFile == NULL) return false;
|
if (pFile == NULL) return false;
|
||||||
|
|
||||||
while ((size = taosGetLineFile(pFile, &line)) != -1) {
|
while ((size = taosGetsFile(pFile, sizeof(line), line)) != -1) {
|
||||||
line[size - 1] = '\0';
|
line[size - 1] = '\0';
|
||||||
if (strncmp(line, "PRETTY_NAME", 11) == 0) {
|
if (strncmp(line, "PRETTY_NAME", 11) == 0) {
|
||||||
const char *p = strchr(line, '=') + 1;
|
const char *p = strchr(line, '=') + 1;
|
||||||
|
@ -332,7 +329,6 @@ int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFree(line);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return code;
|
return code;
|
||||||
#endif
|
#endif
|
||||||
|
@ -374,7 +370,7 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
#else
|
#else
|
||||||
char *line = NULL;
|
char line[1024];
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
int32_t done = 0;
|
int32_t done = 0;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -383,7 +379,7 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
|
||||||
TdFilePtr pFile = taosOpenFile("/proc/cpuinfo", TD_FILE_READ | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile("/proc/cpuinfo", TD_FILE_READ | TD_FILE_STREAM);
|
||||||
if (pFile == NULL) return code;
|
if (pFile == NULL) return code;
|
||||||
|
|
||||||
while (done != 3 && (size = taosGetLineFile(pFile, &line)) != -1) {
|
while (done != 3 && (size = taosGetsFile(pFile, sizeof(line), line)) != -1) {
|
||||||
line[size - 1] = '\0';
|
line[size - 1] = '\0';
|
||||||
if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) {
|
if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) {
|
||||||
const char *v = strchr(line, ':') + 2;
|
const char *v = strchr(line, ':') + 2;
|
||||||
|
@ -398,7 +394,6 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
|
||||||
if (strncmp(line, "processor", 9) == 0) coreCount += 1;
|
if (strncmp(line, "processor", 9) == 0) coreCount += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFree(line);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
|
||||||
if (code != 0 && (done & 1) == 0) {
|
if (code != 0 && (done & 1) == 0) {
|
||||||
|
@ -517,9 +512,9 @@ int32_t taosGetProcMemory(int64_t *usedKB) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ssize_t _bytes = 0;
|
ssize_t _bytes = 0;
|
||||||
char *line = NULL;
|
char line[1024];
|
||||||
while (!taosEOFFile(pFile)) {
|
while (!taosEOFFile(pFile)) {
|
||||||
_bytes = taosGetLineFile(pFile, &line);
|
_bytes = taosGetsFile(pFile, sizeof(line), line);
|
||||||
if ((_bytes < 0) || (line == NULL)) {
|
if ((_bytes < 0) || (line == NULL)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -537,7 +532,6 @@ int32_t taosGetProcMemory(int64_t *usedKB) {
|
||||||
char tmp[10];
|
char tmp[10];
|
||||||
sscanf(line, "%s %" PRId64, tmp, usedKB);
|
sscanf(line, "%s %" PRId64, tmp, usedKB);
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFreeClear(line);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return 0;
|
return 0;
|
||||||
#endif
|
#endif
|
||||||
|
@ -631,12 +625,12 @@ int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int
|
||||||
if (pFile == NULL) return -1;
|
if (pFile == NULL) return -1;
|
||||||
|
|
||||||
ssize_t _bytes = 0;
|
ssize_t _bytes = 0;
|
||||||
char *line = NULL;
|
char line[1024];
|
||||||
char tmp[24];
|
char tmp[24];
|
||||||
int readIndex = 0;
|
int readIndex = 0;
|
||||||
|
|
||||||
while (!taosEOFFile(pFile)) {
|
while (!taosEOFFile(pFile)) {
|
||||||
_bytes = taosGetLineFile(pFile, &line);
|
_bytes = taosGetsFile(pFile, sizeof(line), line);
|
||||||
if (_bytes < 10 || line == NULL) {
|
if (_bytes < 10 || line == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -658,7 +652,6 @@ int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int
|
||||||
if (readIndex >= 4) break;
|
if (readIndex >= 4) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFreeClear(line);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
|
||||||
if (readIndex < 4) {
|
if (readIndex < 4) {
|
||||||
|
@ -709,7 +702,7 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
|
||||||
if (pFile == NULL) return -1;
|
if (pFile == NULL) return -1;
|
||||||
|
|
||||||
ssize_t _bytes = 0;
|
ssize_t _bytes = 0;
|
||||||
char *line = NULL;
|
char line[1024];
|
||||||
|
|
||||||
while (!taosEOFFile(pFile)) {
|
while (!taosEOFFile(pFile)) {
|
||||||
int64_t o_rbytes = 0;
|
int64_t o_rbytes = 0;
|
||||||
|
@ -724,7 +717,7 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
|
||||||
int64_t nouse6 = 0;
|
int64_t nouse6 = 0;
|
||||||
char nouse0[200] = {0};
|
char nouse0[200] = {0};
|
||||||
|
|
||||||
_bytes = taosGetLineFile(pFile, &line);
|
_bytes = taosGetsFile(pFile, sizeof(line), line);
|
||||||
if (_bytes < 0) {
|
if (_bytes < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -743,7 +736,6 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
|
||||||
*transmit_bytes = o_tbytes;
|
*transmit_bytes = o_tbytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFreeClear(line);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -714,7 +714,7 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
||||||
char *line = NULL, *name, *value, *value2, *value3;
|
char line[1024], *name, *value, *value2, *value3;
|
||||||
int32_t olen, vlen, vlen2, vlen3;
|
int32_t olen, vlen, vlen2, vlen3;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
ssize_t _bytes = 0;
|
ssize_t _bytes = 0;
|
||||||
|
@ -743,7 +743,7 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
||||||
name = value = value2 = value3 = NULL;
|
name = value = value2 = value3 = NULL;
|
||||||
olen = vlen = vlen2 = vlen3 = 0;
|
olen = vlen = vlen2 = vlen3 = 0;
|
||||||
|
|
||||||
_bytes = taosGetLineFile(pFile, &line);
|
_bytes = taosGetsFile(pFile, sizeof(line), line);
|
||||||
if (_bytes <= 0) {
|
if (_bytes <= 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -775,14 +775,13 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
if (line != NULL) taosMemoryFreeClear(line);
|
|
||||||
|
|
||||||
uInfo("load from env cfg file %s success", filepath);
|
uInfo("load from env cfg file %s success", filepath);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
||||||
char *line = NULL, *name, *value, *value2, *value3;
|
char line[1024], *name, *value, *value2, *value3;
|
||||||
int32_t olen, vlen, vlen2, vlen3;
|
int32_t olen, vlen, vlen2, vlen3;
|
||||||
ssize_t _bytes = 0;
|
ssize_t _bytes = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -804,7 +803,7 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
||||||
name = value = value2 = value3 = NULL;
|
name = value = value2 = value3 = NULL;
|
||||||
olen = vlen = vlen2 = vlen3 = 0;
|
olen = vlen = vlen2 = vlen3 = 0;
|
||||||
|
|
||||||
_bytes = taosGetLineFile(pFile, &line);
|
_bytes = taosGetsFile(pFile, sizeof(line), line);
|
||||||
if (_bytes <= 0) {
|
if (_bytes <= 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -836,7 +835,6 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
if (line != NULL) taosMemoryFreeClear(line);
|
|
||||||
|
|
||||||
if (code == 0 || (code != 0 && terrno == TSDB_CODE_CFG_NOT_FOUND)) {
|
if (code == 0 || (code != 0 && terrno == TSDB_CODE_CFG_NOT_FOUND)) {
|
||||||
uInfo("load from cfg file %s success", filepath);
|
uInfo("load from cfg file %s success", filepath);
|
||||||
|
|
|
@ -622,4 +622,56 @@ if $data12 != 2 then
|
||||||
goto loop3
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
sql create database test4 vgroups 1;
|
||||||
|
sql use test4;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from t1 where a > 5 interval(10s);
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
|
||||||
|
sleep 200
|
||||||
|
sql select * from streamt4;
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 0 then
|
||||||
|
print =====rows=$rows
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,6,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop4:
|
||||||
|
sleep 200
|
||||||
|
sql select * from streamt4;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 1 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,2,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop5:
|
||||||
|
sleep 200
|
||||||
|
sql select * from streamt4;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
|
@ -587,8 +587,6 @@ sleep 300
|
||||||
|
|
||||||
sql delete from st where ts = 1648791223000;
|
sql delete from st where ts = 1648791223000;
|
||||||
|
|
||||||
sql select * from test.streamt5;
|
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
|
||||||
loop15:
|
loop15:
|
||||||
|
@ -608,7 +606,6 @@ if $rows != 4 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
$loop_all = $loop_all + 1
|
$loop_all = $loop_all + 1
|
||||||
print ============loop_all=$loop_all
|
print ============loop_all=$loop_all
|
||||||
|
|
||||||
|
|
|
@ -5,15 +5,15 @@ sleep 50
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
print =============== create database
|
print =============== create database
|
||||||
sql create database test vgroups 1
|
sql create database test vgroups 1;
|
||||||
sql select * from information_schema.ins_databases
|
sql select * from information_schema.ins_databases;
|
||||||
if $rows != 3 then
|
if $rows != 3 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
sql use test
|
sql use test;
|
||||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
@ -48,8 +48,9 @@ if $loop_count == 10 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 0
|
||||||
|
|
||||||
sql select * from streamt
|
sql select * from streamt;
|
||||||
|
|
||||||
# row 0
|
# row 0
|
||||||
if $data01 != 1 then
|
if $data01 != 1 then
|
||||||
|
@ -97,7 +98,7 @@ endi
|
||||||
|
|
||||||
print step 1
|
print step 1
|
||||||
|
|
||||||
sql select * from streamt2
|
sql select * from streamt2;
|
||||||
|
|
||||||
# row 0
|
# row 0
|
||||||
if $data01 != 1 then
|
if $data01 != 1 then
|
||||||
|
@ -239,6 +240,67 @@ if $data32 != 6 then
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 3.1
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791216001,2,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop00:
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt2;
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 1 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop00
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 1 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop00
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 1
|
||||||
|
if $data11 != 3 then
|
||||||
|
print =====data11=$data11
|
||||||
|
goto loop00
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 5 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop00
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 2
|
||||||
|
if $data21 != 3 then
|
||||||
|
print =====data21=$data21
|
||||||
|
goto loop00
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data22 != 7 then
|
||||||
|
print =====data22=$data22
|
||||||
|
goto loop00
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 3
|
||||||
|
if $data31 != 1 then
|
||||||
|
print =====data31=$data31
|
||||||
|
goto loop00
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data32 != 3 then
|
||||||
|
print =====data32=$data32
|
||||||
|
goto loop00
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
print step 4
|
print step 4
|
||||||
|
|
||||||
sql create database test1 vgroups 1
|
sql create database test1 vgroups 1
|
||||||
|
@ -513,6 +575,8 @@ endi
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
|
||||||
|
print step 7
|
||||||
|
|
||||||
loop4:
|
loop4:
|
||||||
sleep 100
|
sleep 100
|
||||||
|
|
||||||
|
|
|
@ -88,9 +88,15 @@ int32_t shellRunSingleCommand(char *command) {
|
||||||
if (shellRegexMatch(command, "^[ \t]*source[\t ]+[^ ]+[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
|
if (shellRegexMatch(command, "^[ \t]*source[\t ]+[^ ]+[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
|
||||||
/* If source file. */
|
/* If source file. */
|
||||||
char *c_ptr = strtok(command, " ;");
|
char *c_ptr = strtok(command, " ;");
|
||||||
assert(c_ptr != NULL);
|
if (c_ptr == NULL) {
|
||||||
|
shellRunSingleCommandImp(command);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
c_ptr = strtok(NULL, " ;");
|
c_ptr = strtok(NULL, " ;");
|
||||||
assert(c_ptr != NULL);
|
if (c_ptr == NULL) {
|
||||||
|
shellRunSingleCommandImp(command);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
shellSourceFile(c_ptr);
|
shellSourceFile(c_ptr);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -781,9 +787,9 @@ void shellReadHistory() {
|
||||||
TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_READ | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_READ | TD_FILE_STREAM);
|
||||||
if (pFile == NULL) return;
|
if (pFile == NULL) return;
|
||||||
|
|
||||||
char *line = NULL;
|
char *line = taosMemoryMalloc(TSDB_MAX_ALLOWED_SQL_LEN + 1);
|
||||||
int32_t read_size = 0;
|
int32_t read_size = 0;
|
||||||
while ((read_size = taosGetLineFile(pFile, &line)) != -1) {
|
while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) != -1) {
|
||||||
line[read_size - 1] = '\0';
|
line[read_size - 1] = '\0';
|
||||||
taosMemoryFree(pHistory->hist[pHistory->hend]);
|
taosMemoryFree(pHistory->hist[pHistory->hend]);
|
||||||
pHistory->hist[pHistory->hend] = strdup(line);
|
pHistory->hist[pHistory->hend] = strdup(line);
|
||||||
|
@ -795,7 +801,7 @@ void shellReadHistory() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (line != NULL) taosMemoryFree(line);
|
taosMemoryFreeClear(line);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
int64_t file_size;
|
int64_t file_size;
|
||||||
if (taosStatFile(pHistory->file, &file_size, NULL) == 0 && file_size > SHELL_MAX_COMMAND_SIZE) {
|
if (taosStatFile(pHistory->file, &file_size, NULL) == 0 && file_size > SHELL_MAX_COMMAND_SIZE) {
|
||||||
|
@ -859,7 +865,6 @@ void shellSourceFile(const char *file) {
|
||||||
int32_t read_len = 0;
|
int32_t read_len = 0;
|
||||||
char *cmd = taosMemoryCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN + 1);
|
char *cmd = taosMemoryCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN + 1);
|
||||||
size_t cmd_len = 0;
|
size_t cmd_len = 0;
|
||||||
char *line = NULL;
|
|
||||||
char fullname[PATH_MAX] = {0};
|
char fullname[PATH_MAX] = {0};
|
||||||
char sourceFileCommand[PATH_MAX + 8] = {0};
|
char sourceFileCommand[PATH_MAX + 8] = {0};
|
||||||
|
|
||||||
|
@ -877,7 +882,8 @@ void shellSourceFile(const char *file) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((read_len = taosGetLineFile(pFile, &line)) != -1) {
|
char *line = taosMemoryMalloc(TSDB_MAX_ALLOWED_SQL_LEN + 1);
|
||||||
|
while ((read_len = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) != -1) {
|
||||||
if (read_len >= TSDB_MAX_ALLOWED_SQL_LEN) continue;
|
if (read_len >= TSDB_MAX_ALLOWED_SQL_LEN) continue;
|
||||||
line[--read_len] = '\0';
|
line[--read_len] = '\0';
|
||||||
|
|
||||||
|
@ -904,7 +910,7 @@ void shellSourceFile(const char *file) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(cmd);
|
taosMemoryFree(cmd);
|
||||||
if (line != NULL) taosMemoryFree(line);
|
taosMemoryFreeClear(line);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue