Merge branch '3.0' into feat/TD-17552

This commit is contained in:
Ganlin Zhao 2022-07-22 17:41:48 +08:00
commit 51ac32e8ea
45 changed files with 806 additions and 491 deletions

View File

@ -51,6 +51,13 @@ IF(${TD_WINDOWS})
"If build unit tests using googletest"
ON
)
option(
TDENGINE_3
"TDengine 3.x"
ON
)
ELSEIF (TD_DARWIN_64)
add_definitions(-DCOMPILER_SUPPORTS_CXX13)
option(

View File

@ -91,7 +91,7 @@ Add following dependency in the `pom.xml` file of your Maven project:
You can build Java connector from source code after cloning the TDengine project:
```
git clone https://github.com/taosdata/taos-connector-jdbc.git
git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0
cd taos-connector-jdbc
mvn clean install -Dmaven.test.skip=true
```

View File

@ -93,7 +93,7 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector
```shell
git clone https://github.com/taosdata/taos-connector-jdbc.git
git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0
cd taos-connector-jdbc
mvn clean install -Dmaven.test.skip=true
```

View File

@ -44,7 +44,11 @@ extern "C" {
#define SIGBREAK 1234
#endif
#ifdef WINDOWS
typedef BOOL (*FSignalHandler)(DWORD fdwCtrlType);
#else
typedef void (*FSignalHandler)(int32_t signum, void *sigInfo, void *context);
#endif
void taosSetSignal(int32_t signum, FSignalHandler sigfp);
void taosIgnSignal(int32_t signum);
void taosDflSignal(int32_t signum);

View File

@ -29,9 +29,6 @@ extern "C" {
#define tcgetattr TCGETATTR_FUNC_TAOS_FORBID
#endif
#define TAOS_CONSOLE_PROMPT_HEADER "taos> "
#define TAOS_CONSOLE_PROMPT_CONTINUE " -> "
typedef struct TdCmd *TdCmdPtr;
TdCmdPtr taosOpenCmd(const char* cmd);

View File

@ -183,7 +183,7 @@ bool tsStartUdfd = true;
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 60;
int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
@ -466,7 +466,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
return 0;

View File

@ -438,12 +438,15 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
}
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
metaWLock(pMeta);
int ret = metaTtlSmaller(pMeta, ttl, tbUids);
if (ret != 0) {
metaULock(pMeta);
return ret;
}
if (taosArrayGetSize(tbUids) == 0){
return 0;
}
metaWLock(pMeta);
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, *uid, NULL);

View File

@ -51,6 +51,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
if (metaGetTableEntryByUid(&mr, uid) < 0) {
metaReaderClear(&mr);
return -1;
}
char* tbName = strdup(mr.me.name);

View File

@ -55,8 +55,9 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define START_TS_COLUMN_INDEX 0
#define END_TS_COLUMN_INDEX 1
#define UID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX
#define DELETE_GROUPID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX 3
#define CALCULATE_START_TS_COLUMN_INDEX 4
#define CALCULATE_END_TS_COLUMN_INDEX 5
enum {
// when this task starts to execute, this status will set
@ -347,7 +348,6 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER, // todo(liuyao) delete it
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode;
@ -367,7 +367,7 @@ typedef struct SStreamAggSupporter {
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SArray* pScanWindow;
SSDataBlock* pScanBlock;
} SStreamAggSupporter;
typedef struct SessionWindowSupporter {
@ -420,7 +420,7 @@ typedef struct SStreamScanInfo {
int32_t deleteDataIndex;
STimeWindow updateWin;
STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
// status for tmq
// SSchemaWrapper schema;
STqOffset offset;
@ -713,7 +713,6 @@ typedef struct SStreamStateAggOperatorInfo {
SSDataBlock* pDelRes;
SHashObj* pSeDeleted;
void* pDelIterator;
SArray* pScanWindow;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
} SStreamStateAggOperatorInfo;
@ -955,6 +954,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
@ -971,7 +971,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createPullDataBlock();
SSDataBlock* createSpecialDataBlock(EStreamType type);
#ifdef __cplusplus
}

View File

@ -5139,8 +5139,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
}
pSup->valueSize = size;
pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
int32_t pageSize = 4096;
while (pageSize < pSup->resultRowSize * 4) {
pageSize <<= 1u;

View File

@ -25,7 +25,6 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
@ -812,6 +811,10 @@ static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
}
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupId) {
@ -834,17 +837,10 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
}
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
ASSERT(rowIndex < pBlock->info.rows);
switch (pBlock->info.type) {
case STREAM_DELETE_DATA:
case STREAM_RETRIEVE: {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
ASSERT(rowIndex < pBlock->info.rows);
pInfo->groupId = groupCol[rowIndex];
} break;
default:
break;
}
}
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
@ -864,7 +860,17 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* calStartData = (TSKEY*)pCalStartTsCol->pData;
SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
TSKEY* calEndData = (TSKEY*)pCalEndTsCol->pData;
setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
if (isSlidingWindow(pInfo)) {
pInfo->updateWin.skey = calStartData[*pRowIndex];
pInfo->updateWin.ekey = calEndData[*pRowIndex];
}
(*pRowIndex)++;
for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
@ -876,8 +882,8 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
win.skey = TMIN(win.skey, startData[*pRowIndex]);
continue;
}
ASSERT((win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
!(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
break;
}
@ -908,68 +914,6 @@ static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlo
win.ekey = endWin.ekey;
}
}
static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = {
.skey = INT64_MIN,
.ekey = INT64_MAX,
};
bool needRead = false;
if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, tsColIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
if (isSessionWindow(pInfo)) {
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win;
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else {
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
pInfo->updateWin.skey = tsCols[*pRowIndex];
win = getSlidingWindow(tsCols, &pInfo->interval, &pSDB->info, pRowIndex);
pInfo->updateWin.ekey = tsCols[*pRowIndex - 1];
// win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
// (*pRowIndex) +=
// getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL,
// TSDB_ORDER_ASC);
}
needRead = true;
} else if (isStateWindow(pInfo)) {
SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow;
int32_t size = taosArrayGetSize(pWins);
if (pInfo->scanWinIndex < size) {
win = *(STimeWindow*)taosArrayGet(pWins, pInfo->scanWinIndex);
pInfo->scanWinIndex++;
needRead = true;
} else {
pInfo->scanWinIndex = 0;
taosArrayClear(pWins);
}
}
if (!needRead) {
return false;
}
resetTableScanInfo(pInfo->pTableScanOp->info, &win);
return true;
}
static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRowId) {
for (int32_t j = 0; j < taosArrayGetSize(source->pDataBlock); j++) {
SColumnInfoData* pDestCol = (SColumnInfoData*)taosArrayGet(dest->pDataBlock, j);
SColumnInfoData* pSourceCol = (SColumnInfoData*)taosArrayGet(source->pDataBlock, j);
if (colDataIsNull_s(pSourceCol, sourceRowId)) {
colDataAppendNULL(pDestCol, dest->info.rows);
} else {
colDataAppend(pDestCol, dest->info.rows, colDataGetData(pSourceCol, sourceRowId), false);
}
}
dest->info.rows++;
}
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
@ -982,29 +926,6 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
if (!pResult) {
blockDataCleanup(pSDB);
*pRowIndex = 0;
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTableScanInfo->dataReader);
pTableScanInfo->dataReader = NULL;
return NULL;
}
if (pResult->info.groupId == pInfo->groupId) {
return pResult;
}
}
}
static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pTableScanOp);
if (pResult == NULL) {
if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
// scan next window data
pResult = doTableScan(pInfo->pTableScanOp);
}
}
if (!pResult) {
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTableScanInfo->dataReader);
@ -1017,77 +938,31 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_
return pResult;
}
}
/* Todo(liuyao) for partition by column
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
blockDataCleanup(pResult);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock->info.uid);
if (id == pInfo->groupId) {
copyOneRow(pResult, pBlock, i);
}
}
return pResult;
*/
}
static void generateIntervalTs(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator,
SSDataBlock* pUpdateRes) {
if (pDelBlock->info.rows == 0) {
return;
}
blockDataCleanup(pUpdateRes);
blockDataEnsureCapacity(pUpdateRes, 64);
ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = pInfo->deleteDataIndex;
i < pDelBlock->info.rows &&
i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1;
i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
for (TSKEY startTs = startData[i]; startTs <= endData[i];) {
colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false);
colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false);
pUpdateRes->info.rows++;
startTs = taosTimeAdd(startTs, pInfo->interval.interval, pInfo->interval.intervalUnit, pInfo->interval.precision);
}
pInfo->deleteDataIndex++;
}
if (pInfo->deleteDataIndex > 0 && pInfo->deleteDataIndex == pDelBlock->info.rows) {
blockDataCleanup(pDelBlock);
pInfo->deleteDataIndex = 0;
}
}
static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator,
SSDataBlock* pUpdateRes) {
if (pBlock->info.rows == 0) {
return;
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
blockDataCleanup(pUpdateRes);
blockDataEnsureCapacity(pUpdateRes, pBlock->info.rows);
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
blockDataCleanup(pDestBlock);
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
SColumnInfoData* pDestStartCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
int32_t dummy = 0;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[i]);
// gap must be 0.
SResultWindowInfo* pStartWin =
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
@ -1101,46 +976,75 @@ static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOper
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
pUpdateRes->info.rows++;
pDestBlock->info.rows++;
}
return TSDB_CODE_SUCCESS;
}
static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
blockDataCleanup(pUpdateBlock);
int32_t size = taosArrayGetSize(pInfo->tsArray);
if (pInfo->tsArrayIndex < size) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
blockDataEnsureCapacity(pUpdateBlock, size);
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
int32_t i = 0;
for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
if (pInfo->groupId != id) {
break;
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
blockDataCleanup(pDestBlock);
int32_t rows = pSrcBlock->info.rows;
if (rows == 0) {
return TSDB_CODE_SUCCESS;
}
copyOneRow(pUpdateBlock, pBlock, rowId);
int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pUpdateBlock->info.rows = i;
pInfo->tsArrayIndex += i;
pUpdateBlock->info.groupId = pInfo->groupId;
pUpdateBlock->info.type = STREAM_CLEAR;
blockDataUpdateTsWindow(pUpdateBlock, 0);
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
ASSERT(pTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[0]);
for (int32_t i = 0; i < rows; ) {
colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(tsCol + i), false);
STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i);
colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(tsCol + i - 1), false);
colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
pDestBlock->info.rows++;
}
// all rows have same group id
ASSERT(pInfo->tsArrayIndex >= size);
if (size > 0 && pInfo->tsArrayIndex == size) {
taosArrayClear(pInfo->tsArray);
}
pDestBlock->info.groupId = groupId;
return TSDB_CODE_SUCCESS;
}
if (size == 0) {
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock);
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
int32_t code = TSDB_CODE_SUCCESS;
if (isIntervalWindow(pInfo)) {
code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
} else {
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
}
pDestBlock->info.type = STREAM_CLEAR;
blockDataUpdateTsWindow(pDestBlock, 0);
return code;
}
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
pBlock->info.rows++;
}
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
if (out) {
blockDataCleanup(pInfo->pUpdateDataRes);
blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows);
}
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
@ -1151,9 +1055,13 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup))) && out) {
taosArrayPush(pInfo->tsArray, &rowId);
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
}
}
if (out) {
blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
pInfo->pUpdateDataRes->info.type = STREAM_CLEAR;
}
}
static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t uidColIndex) {
@ -1320,26 +1228,18 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
copyDataBlock(pInfo->pUpdateRes, pBlock);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} break;
case STREAM_DELETE_DATA: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0;
if (isIntervalWindow(pInfo)) {
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes);
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else {
generateScanRange(pInfo, pBlock, pInfo->pTableScanOp, pInfo->pUpdateRes);
generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
}
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes;
return pInfo->pDeleteDataRes;
} break;
default:
break;
@ -1352,51 +1252,27 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
return pInfo->pRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
}
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
if (pSDB != NULL) {
checkUpdateData(pInfo, true, pSDB, false);
pSDB->info.type = STREAM_PULL_DATA;
return pSDB;
}
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
pSDB->info.type = STREAM_NORMAL;
checkUpdateData(pInfo, true, pSDB, false);
return pSDB;
}
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) {
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
return pInfo->pUpdateRes;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE) {
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE || pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
pSDB->info.type = STREAM_NORMAL;
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
return pSDB;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
blockDataCleanup(pInfo->pUpdateRes);
// return empty data blcok
return pInfo->pUpdateRes;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
if (isStateWindow(pInfo) && pInfo->sessionSup.pStreamAggSup->pScanBlock->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
pInfo->updateResIndex = 0;
copyDataBlock(pInfo->pUpdateRes, pInfo->sessionSup.pStreamAggSup->pScanBlock);
blockDataCleanup(pInfo->sessionSup.pStreamAggSup->pScanBlock);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
return pInfo->pUpdateRes;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
@ -1455,15 +1331,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} else if (pInfo->pUpdateInfo) {
pInfo->tsArrayIndex = 0;
checkUpdateData(pInfo, true, pInfo->pRes, true);
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
if (pInfo->pUpdateRes->info.rows > 0) {
if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
if (pInfo->pUpdateDataRes->info.rows > 0) {
if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
pInfo->updateResIndex = 0;
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
} else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pUpdateRes;
return pInfo->pUpdateDataRes;
}
}
}
@ -1629,17 +1504,18 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pUpdateRes = createResDataBlock(pDescNode);
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->sessionSup =
(SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0;
pInfo->pPullDataRes = createPullDataBlock();
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock();
pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
pOperator->name = "StreamScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;

View File

@ -1373,8 +1373,10 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t numOfOutput,
SSDataBlock* pBlock, SArray* pUpWins) {
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsCols = (TSKEY*)pTsCol->pData;
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
uint64_t* pGpDatas = NULL;
if (pBlock->info.type == STREAM_RETRIEVE) {
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
@ -1382,22 +1384,18 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
}
int32_t step = 0;
int32_t startPos = 0;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC);
while (1) {
step =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
while (win.ekey <= endTsCols[i]) {
uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId;
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput);
if (pUpWins && res) {
SWinRes winRes = {.ts = win.skey, .groupId = winGpId};
taosArrayPush(pUpWins, &winRes);
}
int32_t prevEndPos = step - 1 + startPos;
startPos = getNextQualifiedWindow(pInterval, &win, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
}
}
}
@ -1501,7 +1499,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo
}
blockDataEnsureCapacity(pBlock, size - *index);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, DELETE_GROUPID_COLUMN_INDEX);
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = *index; i < size; i++) {
SWinRes* pWin = taosArrayGet(pWins, i);
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false);
@ -1798,10 +1796,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
pInfo->delIndex = 0;
// pInfo->pDelRes = createPullDataBlock(); todo(liuyao) for delete
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "TimeIntervalAggOperator";
@ -2603,14 +2598,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
}
// if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) {
// qDebug("===stream===delete child id %d", chId);
// taosArrayRemove(chArray, index);
// if (taosArrayGetSize(chArray) == 0) {
// // pull data is over
// taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
// }
// }
if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
ignore = false;
}
@ -2702,16 +2689,18 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
}
blockDataEnsureCapacity(pBlock, size - (*pIndex));
ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
for (; (*pIndex) < size; (*pIndex)++) {
SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex));
SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
pBlock->info.rows++;
}
if ((*pIndex) == size) {
@ -2830,7 +2819,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
continue;
}
removeResults(pUpWins, pUpdated);
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
copyDataBlock(pInfo->pUpdateRes, pBlock);
// copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
pInfo->returnUpdate = true;
taosArrayDestroy(pUpWins);
break;
@ -2938,12 +2928,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL;
}
SSDataBlock* createPullDataBlock() {
SSDataBlock* createSpecialDataBlock(EStreamType type) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = STREAM_RETRIEVE;
pBlock->info.type = type;
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t);
pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData));
@ -2957,6 +2947,14 @@ SSDataBlock* createPullDataBlock() {
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
// uid
taosArrayPush(pBlock->pDataBlock, &infoData);
// group id
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
@ -3024,8 +3022,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error;
}
}
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_CLEAR;
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->returnUpdate = false;
@ -3047,11 +3044,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pullIndex = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock();
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
// pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
@ -3066,7 +3061,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->fpSet =
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type);
}
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -3091,6 +3088,7 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
}
taosHashCleanup(pSup->pResultRows);
destroyDiskbasedBuf(pSup->pResultBuf);
blockDataDestroy(pSup->pScanBlock);
}
void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
@ -3205,7 +3203,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createPullDataBlock();
// pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pChildren = NULL;
@ -3564,7 +3562,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endDatas = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* gpDatas = (uint64_t*)pGroupCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
int32_t winIndex = 0;
@ -4260,7 +4258,6 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, pKeyColInfo, pBlock->info.rows, i, &allEqual,
pSeDeleted);
ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex);
}
@ -4287,6 +4284,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
blockDataEnsureCapacity(pAggSup->pScanBlock, pSDataBlock->info.rows);
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) {
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
@ -4301,7 +4299,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i,
&allEqual, pInfo->pSeDeleted);
if (!allEqual) {
taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win);
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&pSDataBlock->info.groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex);
continue;
@ -4465,7 +4464,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete
// pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pChildren = NULL;

View File

@ -476,16 +476,16 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) > *(TSKEY*)(pSBuf + bytes))) {
memcpy(pDBuf, pSBuf, bytes);
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes);
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts > pSBuf->ts)) {
memcpy(pDBuf->buf, pSBuf->buf, bytes);
pDBuf->ts = pSBuf->ts;
pDResInfo->numOfRes = 1;
}
return TSDB_CODE_SUCCESS;
@ -2994,16 +2994,16 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
// todo rewrite:
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) < *(TSKEY*)(pSBuf + bytes))) {
memcpy(pDBuf, pSBuf, bytes);
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes);
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts < pSBuf->ts)) {
memcpy(pDBuf->buf, pSBuf->buf, bytes);
pDBuf->ts = pSBuf->ts;
pDResInfo->numOfRes = 1;
}
return TSDB_CODE_SUCCESS;

View File

@ -18,10 +18,6 @@
#include "os.h"
#if defined(WINDOWS)
BOOL WINAPI CtrlHandler(DWORD fdwCtrlType) {
printf("\n" TAOS_CONSOLE_PROMPT_HEADER);
return TRUE;
}
#elif defined(_TD_DARWIN_64)
#else
#include <dlfcn.h>
@ -128,7 +124,6 @@ int taosSetConsoleEcho(bool on) {
void taosSetTerminalMode() {
#if defined(WINDOWS)
SetConsoleCtrlHandler(CtrlHandler, TRUE);
#else
struct termios newtio;
@ -179,7 +174,6 @@ int32_t taosGetOldTerminalMode() {
void taosResetTerminalMode() {
#if defined(WINDOWS)
SetConsoleCtrlHandler(CtrlHandler, FALSE);
#else
if (tcsetattr(0, TCSANOW, &oldtio) != 0) {
fprintf(stderr, "Fail to reset the terminal properties!\n");

View File

@ -58,7 +58,7 @@
static const int32_t TEST_NUMBER = 1;
#define is_bigendian() ((*(char *)&TEST_NUMBER) == 0)
#define SIMPLE8B_MAX_INT64 ((uint64_t)2305843009213693951LL)
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
#define ZIGZAG_ENCODE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode

View File

@ -69,6 +69,7 @@
./test.sh -f tsim/insert/basic.sim
./test.sh -f tsim/insert/basic0.sim
./test.sh -f tsim/insert/basic1.sim
./test.sh -f tsim/insert/basic2.sim
./test.sh -f tsim/insert/commit-merge0.sim
./test.sh -f tsim/insert/insert_drop.sim
./test.sh -f tsim/insert/insert_select.sim
@ -109,7 +110,7 @@
./test.sh -f tsim/parser/fill.sim
./test.sh -f tsim/parser/first_last.sim
./test.sh -f tsim/parser/fourArithmetic-basic.sim
# TD-17659 TD-17658 ./test.sh -f tsim/parser/function.sim
# TD-17659 ./test.sh -f tsim/parser/function.sim
./test.sh -f tsim/parser/groupby-basic.sim
# ./test.sh -f tsim/parser/groupby.sim
# TD-17622 ./test.sh -f tsim/parser/having_child.sim
@ -117,7 +118,7 @@
./test.sh -f tsim/parser/import_commit1.sim
./test.sh -f tsim/parser/import_commit2.sim
./test.sh -f tsim/parser/import_commit3.sim
# TD-17663 ./test.sh -f tsim/parser/import_file.sim
./test.sh -f tsim/parser/import_file.sim
./test.sh -f tsim/parser/import.sim
./test.sh -f tsim/parser/insert_multiTbl.sim
./test.sh -f tsim/parser/insert_tb.sim

View File

@ -0,0 +1,29 @@
#======================b1-start===============
# ---- mnode
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim
# --- vnode
# unsupport ./test.sh -f tsim/vnode/replica3_basic.sim
# unsupport ./test.sh -f tsim/vnode/replica3_repeat.sim
# unsupport ./test.sh -f tsim/vnode/replica3_vgroup.sim
# unsupport ./test.sh -f tsim/vnode/replica3_many.sim
# unsupport ./test.sh -f tsim/vnode/replica3_import.sim
# unsupport ./test.sh -f tsim/vnode/stable_balance_replica1.sim
# unsupport ./test.sh -f tsim/vnode/stable_dnode2_stop.sim
./test.sh -f tsim/vnode/stable_dnode2.sim
./test.sh -f tsim/vnode/stable_dnode3.sim
./test.sh -f tsim/vnode/stable_replica3_dnode6.sim
./test.sh -f tsim/vnode/stable_replica3_vnode3.sim
# --- sync
# jira ./test.sh -f tsim/sync/3Replica1VgElect.sim
./test.sh -f tsim/sync/3Replica5VgElect.sim
./test.sh -f tsim/sync/oneReplica1VgElect.sim
./test.sh -f tsim/sync/oneReplica5VgElect.sim

View File

@ -64,9 +64,10 @@ if $rows != 1 then
endi
print ========== step5
system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname port 7200
system sh/exec.sh -n dnode2 -s start
return
$x = 0
step5:
$x = $x + 1

View File

@ -0,0 +1,322 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== create database
sql create database d0 keep 365000d,365000d,365000d
sql use d0
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int unsigned, c2 double, c3 binary(10), c4 nchar(10), c5 double) tags (city binary(20),district binary(20));
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang")
sql create table ct2 using stb tags("BeiJing", "HaiDian")
sql show tables
if $rows != 2 then
return -1
endi
print =============== step3-1 insert records into ct1
sql insert into ct1 values('2022-05-03 16:59:00.010', 10, 20, 'n','n',30);
sql insert into ct1 values('2022-05-03 16:59:00.011', 'N', 'n', 'N',"N",30);
sql insert into ct1 values('2022-05-03 16:59:00.012', 'Nu', 'nul', 'Nul','NUL',30);
sql insert into ct1 values('2022-05-03 16:59:00.013', NULL, 'null', 'Null',null,30);
sql insert into ct1 values('2022-05-03 16:59:00.014', NULL, 'NuLL', 'Null',NULL,30);
sql_error insert into ct1 values('2022-05-03 16:59:00.015', NULL, 20, 'Null',NUL,30);
sql_error insert into ct1 values('2022-05-03 16:59:00.015', NULL, 20, 'Null',NU,30);
sql_error insert into ct1 values('2022-05-03 16:59:00.015', NULL, 20, 'Null',Nu,30);
sql_error insert into ct1 values('2022-05-03 16:59:00.015', NULL, 20, 'Null',N,30);
sql_error insert into ct1 values('2022-05-03 16:59:00.015', N, 20, 'Null',NULL,30);
sql_error insert into ct1 values('2022-05-03 16:59:00.015', Nu, 20, 'Null',NULL,30);
sql_error insert into ct1 values('2022-05-03 16:59:00.015', Nul, 20, 'Null',NULL,30);
print =============== step3-1 query records of ct1 from memory
sql select * from ct1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
if $rows != 5 then
print rows $rows != 5
return -1
endi
if $data01 != 10 then
print data01 $data01 != 10
return -1
endi
if $data02 != 20.000000000 then
print data02 $data02 != 20.000000000
return -1
endi
if $data03 != n then
print data03 $data03 != n
return -1
endi
if $data04 != n then
print data04 $data04 != n
return -1
endi
if $data05 != 30.000000000 then
print data05 $data05 != 30.000000000
return -1
endi
if $data11 != NULL then
print data11 $data11 != NULL
return -1
endi
if $data12 != NULL then
print data12 $data12 != NULL
return -1
endi
if $data13 != N then
print data13 $data13 != N
return -1
endi
if $data14 != N then
print data14 $data14 != N
return -1
endi
if $data15 != 30.000000000 then
print data15 $data15 != 30.000000000
return -1
endi
if $data21 != NULL then
print data21 $data21 != NULL
return -1
endi
if $data22 != NULL then
print data22 $data22 != NULL
return -1
endi
if $data23 != Nul then
print data23 $data23 != Nul
return -1
endi
if $data24 != NUL then
print data24 $data24 != NUL
return -1
endi
if $data25 != 30.000000000 then
print data25 $data25 != 30.000000000
return -1
endi
if $data31 != NULL then
print data31 $data31 != NULL
return -1
endi
if $data32 != NULL then
print data32 $data32 != NULL
return -1
endi
if $data33 != Null then
print data33 $data33 != Null
return -1
endi
if $data34 != NULL then
print data34 $data34 != NULL
return -1
endi
if $data35 != 30.000000000 then
print data35 $data35 != 30.000000000
return -1
endi
if $data41 != NULL then
print data41 $data41 != NULL
return -1
endi
if $data42 != NULL then
print data42 $data42 != NULL
return -1
endi
if $data43 != Null then
print data43 $data43 != Null
return -1
endi
if $data44 != NULL then
print data44 $data44 != NULL
return -1
endi
if $data45 != 30.000000000 then
print data45 $data45 != 30.000000000
return -1
endi
#==================== reboot to trigger commit data to file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== step3-2 query records of ct1 from file
sql select * from ct1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
if $rows != 5 then
print rows $rows != 5
return -1
endi
if $data01 != 10 then
print data01 $data01 != 10
return -1
endi
if $data02 != 20.000000000 then
print data02 $data02 != 20.000000000
return -1
endi
if $data03 != n then
print data03 $data03 != n
return -1
endi
if $data04 != n then
print data04 $data04 != n
return -1
endi
if $data05 != 30.000000000 then
print data05 $data05 != 30.000000000
return -1
endi
if $data11 != NULL then
print data11 $data11 != NULL
return -1
endi
if $data12 != NULL then
print data12 $data12 != NULL
return -1
endi
if $data13 != N then
print data13 $data13 != N
return -1
endi
if $data14 != N then
print data14 $data14 != N
return -1
endi
if $data15 != 30.000000000 then
print data15 $data15 != 30.000000000
return -1
endi
if $data21 != NULL then
print data21 $data21 != NULL
return -1
endi
if $data22 != NULL then
print data22 $data22 != NULL
return -1
endi
if $data23 != Nul then
print data23 $data23 != Nul
return -1
endi
if $data24 != NUL then
print data24 $data24 != NUL
return -1
endi
if $data25 != 30.000000000 then
print data25 $data25 != 30.000000000
return -1
endi
if $data31 != NULL then
print data31 $data31 != NULL
return -1
endi
if $data32 != NULL then
print data32 $data32 != NULL
return -1
endi
if $data33 != Null then
print data33 $data33 != Null
return -1
endi
if $data34 != NULL then
print data34 $data34 != NULL
return -1
endi
if $data35 != 30.000000000 then
print data35 $data35 != 30.000000000
return -1
endi
if $data41 != NULL then
print data41 $data41 != NULL
return -1
endi
if $data42 != NULL then
print data42 $data42 != NULL
return -1
endi
if $data43 != Null then
print data43 $data43 != Null
return -1
endi
if $data44 != NULL then
print data44 $data44 != NULL
return -1
endi
if $data45 != 30.000000000 then
print data45 $data45 != 30.000000000
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -242,3 +242,5 @@ else
$reboot_cnt = $reboot_cnt + 1
goto reboot_and_check
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -227,3 +227,5 @@ if $data41 != NULL then
print data41 $data41 != NULL
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -817,3 +817,5 @@ if $data44 != n8 then
print data44 $data44 != n8
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -278,7 +278,6 @@ sql create stable td6086st(ts timestamp, d double) tags(t nchar(50));
sql create table td6086ct1 using td6086st tags("ct1");
sql create table td6086ct2 using td6086st tags("ct2");
return
sql SELECT LAST(d),t FROM td6086st WHERE tbname in ('td6086ct1', 'td6086ct2') and ts>="2019-07-30 00:00:00" and ts<="2021-08-31 00:00:00" partition BY tbname interval(1800s) fill(prev);
print ==================> td-2624

View File

@ -3,6 +3,6 @@
Cur_Dir=$(pwd)
echo $Cur_Dir
echo "'2020-1-1 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql
echo "'2020-1-2 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql
echo "'2020-1-3 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql
echo "'2020-1-1 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> /tmp/data.sql
echo "'2020-1-2 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> /tmp/data.sql
echo "'2020-1-3 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> /tmp/data.sql

View File

@ -7,7 +7,7 @@ sql drop database if exists indb
sql create database if not exists indb
sql use indb
$inFileName = '~/data.csv'
$inFileName = '/tmp/data.csv'
$numOfRows = 10000
system tsim/parser/gendata.sh
@ -16,8 +16,8 @@ sql create table stbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16
sql create table tbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16), imsi BINARY(16), imei BINARY(16), mdn BINARY(10), net_type BINARY(4), mno NCHAR(4), province NCHAR(10), city NCHAR(16), alarm BINARY(2))
print ====== create tables success, starting insert data
sql insert into tbx file '~/data.sql'
sql import into tbx file '~/data.sql'
sql insert into tbx file '/tmp/data.sql'
sql import into tbx file '/tmp/data.sql'
sql select count(*) from tbx
if $rows != 1 then
@ -31,8 +31,8 @@ endi
sql drop table tbx;
sql insert into tbx using stbx tags(1,'abc') file '~/data.sql';
sql insert into tbx using stbx tags(1,'abc') file '~/data.sql';
sql insert into tbx using stbx tags(1,'abc') file '/tmp/data.sql';
sql insert into tbx using stbx tags(1,'abc') file '/tmp/data.sql';
sql select count(*) from tbx
if $rows != 1 then
@ -44,7 +44,7 @@ if $data00 != 3 then
endi
sql drop table tbx;
sql insert into tbx using stbx(b) tags('abcf') file '~/data.sql';
sql insert into tbx using stbx(b) tags('abcf') file '/tmp/data.sql';
sql select ts,a,b from tbx;
if $rows != 3 then
@ -64,6 +64,6 @@ if $data02 != @abcf@ then
return -1
endi
system rm -f ~/data.sql
system rm -f /tmp/data.sql
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -66,32 +66,32 @@ if $row != 21600 then
endi
#regression test case 3
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by t1 interval(1h) fill(NULL) limit 1
sql select _wstart, t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by t1 interval(1h) fill(NULL) limit 1
if $row != 2 then
return -1
endi
#if $data01 != 7 then
# return -1
#endi
#if $data02 != 7 then
# return -1
#endi
#if $data03 != 59 then
# print expect 59, actual: $data03
# return -1
#endi
#if $data04 != 7 then
# return -1
#endi
#if $data11 != 8 then
# return -1
#endi
#if $data12 != 8 then
# return -1
#endi
#if $data13 != NULL then
# return -1
#endi
if $data01 != NULL then
return -1
endi
if $data02 != NULL then
return -1
endi
if $data03 != NULL then
return -1
endi
if $data11 != 7 then
return -1
endi
if $data12 != 7 then
return -1
endi
if $data13 != 59 then
print expect 59, actual: $data03
return -1
endi
if $data14 != 7 then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by t1 interval(1h) fill(NULL) limit 9
if $rows != 18 then

View File

@ -179,4 +179,6 @@ if $data05 != 30.000000000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -367,7 +367,7 @@ if $data32 != 8 then
endi
#$loop_all = 0
#looptest:
#=looptest:
sql drop database IF EXISTS test2;
sql drop stream IF EXISTS streams21;
@ -511,6 +511,6 @@ endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
#goto looptest
#=goto looptest
system sh/stop_dnodes.sh

View File

@ -136,9 +136,14 @@ class TDTestCase:
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
# 9. breakdown-frequency
# NULL ---count(NULL)=0 expect count(NULL)= 100
tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ")
parRows=tdSql.queryRows
assert parRows != 0 , "query result is wrong"
tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;")

View File

@ -93,7 +93,7 @@ class TDTestCase:
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dbNumbers': 20,
'dbNumbers': 8,
'dropFlag': 1,
'event': '',
'vgroups': 4,
@ -183,15 +183,22 @@ class TDTestCase:
for tr in threads:
tr.join()
tdLog.info("check dnode number:")
clusterComCheck.checkDnodes(dnodeNumbers)
clusterComCheck.checkDbRows(allDbNumbers)
for i in range(restartNumbers):
clusterComCheck.checkDb(paraDict['dbNumbers'],restartNumbers,dbNameIndex = '%s%d'%(paraDict["dbName"],i))
tdSql.query("show databases")
tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers-2))
# tdLog.info("check DB Rows:")
# clusterComCheck.checkDbRows(allDbNumbers)
# tdLog.info("check DB Status on by on")
# for i in range(restartNumbers):
# clusterComCheck.checkDb(paraDict['dbNumbers'],restartNumbers,dbNameIndex = '%s%d'%(paraDict["dbName"],i))
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='dnode')
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=10,stopRole='dnode')
def stop(self):
tdSql.close()

View File

@ -98,7 +98,7 @@ class TDTestCase:
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'stbNumbers': 100,
'stbNumbers': 80,
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
@ -188,7 +188,10 @@ class TDTestCase:
tdSql.execute("use %s" %(paraDict["dbName"]))
tdSql.query("show stables")
tdSql.checkRows(allStbNumbers)
tdLog.debug("we find %d stables but exepect to create %d stables "%(tdSql.queryRows,allStbNumbers))
# # tdLog.info("check Stable Rows:")
# tdSql.checkRows(allStbNumbers)
def run(self):

View File

@ -68,7 +68,7 @@ class TDTestCase:
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dbNumbers': 10,
'dbNumbers': 8,
'dropFlag': 1,
'event': '',
'vgroups': 2,

View File

@ -98,7 +98,7 @@ class TDTestCase:
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'stbNumbers': 100,
'stbNumbers': 80,
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
@ -142,7 +142,8 @@ class TDTestCase:
threads=[]
for i in range(restartNumbers):
stableName= '%s%d'%(paraDict['stbName'],i)
threads.append(threading.Thread(target=clusterComCreate.create_stables, args=(tdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])))
newTdSql=tdCom.newTdSql()
threads.append(threading.Thread(target=clusterComCreate.create_stables, args=(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])))
for tr in threads:
tr.start()
@ -190,6 +191,7 @@ class TDTestCase:
tdSql.execute("use %s" %(paraDict["dbName"]))
tdSql.query("show stables")
tdLog.debug("we find %d stables but exepect to create %d stables "%(tdSql.queryRows,allStbNumbers))
# # tdLog.info("check Stable Rows:")
# tdSql.checkRows(allStbNumbers)

View File

@ -159,19 +159,19 @@ class TDTestCase:
for tr in threads:
tr.join()
clusterComCheck.checkDnodes(dnodeNumbers)
tdSql.query("show databases")
tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers))
# tdSql.query("show databases")
# tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers))
# # tdLog.info("check DB Rows:")
# clusterComCheck.checkDbRows(allDbNumbers)
# # tdLog.info("check DB Status on by on")
# tdLog.info("check DB Rows:")
clusterComCheck.checkDbRows(allDbNumbers)
# tdLog.info("check DB Status on by on")
# for i in range(restartNumbers):
# clusterComCheck.checkDb(paraDict['dbNumbers'],restartNumbers,dbNameIndex = '%s%d'%(paraDict["dbName"],i))
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=15,stopRole='vnode')
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=10,stopRole='vnode')
def stop(self):
tdSql.close()

View File

@ -98,7 +98,7 @@ class TDTestCase:
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'stbNumbers': 100,
'stbNumbers': 80,
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
@ -192,6 +192,8 @@ class TDTestCase:
tdSql.execute("use %s" %(paraDict["dbName"]))
tdSql.query("show stables")
tdLog.debug("we find %d stables but exepect to create %d stables "%(tdSql.queryRows,allStbNumbers))
# # tdLog.info("check Stable Rows:")
tdSql.checkRows(allStbNumbers)

View File

@ -111,7 +111,7 @@ class TDTestCase:
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
self.fiveDnodeThreeMnode(dnodenumbers=5,mnodeNums=3,restartNumber=1)
def stop(self):
tdSql.close()

View File

@ -63,6 +63,7 @@ class ClusterComCheck:
count=0
while count < 5:
tdSql.query("show databases;")
count+=1
if tdSql.checkRows(dbNumbers+2):
tdLog.success("we find %d databases and expect %d in clusters! " %(tdSql.queryRows,dbNumbers+2))
return True

View File

@ -536,6 +536,18 @@ class TMQCom:
insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
tsql.execute(insert_sql)
def waitSubscriptionExit(self, tsql, max_wait_count=20):
wait_cnt = 0
while (wait_cnt < max_wait_count):
tsql.query("show subscriptions")
if tsql.getRows() == 0:
break
else:
time.sleep(2)
wait_cnt += 1
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
def close(self):
self.cursor.close()

View File

@ -18,8 +18,8 @@ class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 2
self.ctbNum = 100
self.rowsPerTbl = 10000
self.ctbNum = 1000
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
@ -38,8 +38,8 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
@ -83,8 +83,8 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
@ -117,13 +117,13 @@ class TDTestCase:
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:500,\
auto.commit.interval.ms:3000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
@ -147,10 +147,46 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQury = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQury))
if totalConsumeRows != totalRowsFromQury:
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
if not (totalConsumeRows == totalRowsFromQury):
tdLog.exit("tmq consume rows error!")
# tdLog.info("****************************************************************************")
# tmqCom.initConsumerTable()
# consumerId = 1
# expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
# topicList = topicFromStb1
# ifcheckdata = 0
# ifManualCommit = 0
# keyList = 'group.id:cgrp2,\
# enable.auto.commit:true,\
# auto.commit.interval.ms:3000,\
# auto.offset.reset:earliest'
# tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
# tdLog.info("start consume processor")
# tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# expectRows = 1
# resultList = tmqCom.selectConsumeResult(expectRows)
# totalConsumeRows = 0
# for i in range(expectRows):
# totalConsumeRows += resultList[i]
# tdSql.query(queryString)
# totalRowsFromQury = tdSql.getRows()
# tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
# if not (totalConsumeRows == totalRowsFromQury):
# tdLog.exit("tmq consume rows error!")
# tdLog.info("****************************************************************************")
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
@ -168,8 +204,8 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 3000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
@ -201,7 +237,7 @@ class TDTestCase:
tdSql.execute(sqlString)
consumerId = 1
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + 100000
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
@ -220,8 +256,10 @@ class TDTestCase:
tdDnodes.start(1)
time.sleep(3)
tdLog.info("create some new child table and insert data ")
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
# tdLog.info("create some new child table and insert data ")
# paraDict["batchNum"] = 1000
# paraDict["ctbPrefix"] = 'newCtb'
# tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
@ -242,9 +280,9 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
# tdSql.prepare()
self.prepareTestEnv()
# self.tmqCase1()
self.tmqCase1()
self.tmqCase2()
def stop(self):

View File

@ -25,18 +25,6 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def waitSubscriptionExit(self, max_wait_count=20):
wait_cnt = 0
while (wait_cnt < max_wait_count):
tdSql.query("show subscriptions")
if tdSql.getRows() == 0:
break
else:
time.sleep(1)
wait_cnt += 1
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
# drop some ntbs
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
@ -115,7 +103,7 @@ class TDTestCase:
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
@ -208,7 +196,7 @@ class TDTestCase:
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)

View File

@ -25,18 +25,6 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def waitSubscriptionExit(self, max_wait_count=20):
wait_cnt = 0
while (wait_cnt < max_wait_count):
tdSql.query("show subscriptions")
if tdSql.getRows() == 0:
break
else:
time.sleep(2)
wait_cnt += 1
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
@ -169,7 +157,7 @@ class TDTestCase:
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
@ -258,7 +246,7 @@ class TDTestCase:
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)

View File

@ -0,0 +1,22 @@
#!/bin/bash
set -e
set -x
python3 ./test.py -f 6-cluster/5dnode1mnode.py
#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
# python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
# BUG Redict python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5
# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5

View File

@ -152,15 +152,17 @@ python3 ./test.py -f 2-query/max_partition.py
python3 ./test.py -f 2-query/last_row.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py
#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
# python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3

View File

@ -19,6 +19,9 @@
#include "shellInt.h"
#define TAOS_CONSOLE_PROMPT_HEADER "taos> "
#define TAOS_CONSOLE_PROMPT_CONTINUE " -> "
#define SHELL_HOST "The auth string to use when connecting to the server."
#define SHELL_PORT "The TCP/IP port number to use for the connection."
#define SHELL_USER "The user name to use when connecting to the server."

View File

@ -41,7 +41,7 @@ static void shellPrintError(TAOS_RES *tres, int64_t st);
static bool shellIsCommentLine(char *line);
static void shellSourceFile(const char *file);
static void shellGetGrantInfo();
static void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context);
static void shellCleanup(void *arg);
static void *shellCancelHandler(void *arg);
static void *shellThreadLoop(void *arg);
@ -919,11 +919,14 @@ void shellGetGrantInfo() {
fprintf(stdout, "\r\n");
}
void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { tsem_post(&shell.cancelSem); }
void shellSigintHandler(int32_t signum, void *sigInfo, void *context) {
// do nothing
#ifdef WINDOWS
BOOL shellQueryInterruptHandler(DWORD fdwCtrlType) {
tsem_post(&shell.cancelSem);
return TRUE;
}
#else
void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { tsem_post(&shell.cancelSem); }
#endif
void shellCleanup(void *arg) { taosResetTerminalMode(); }
@ -934,11 +937,10 @@ void *shellCancelHandler(void *arg) {
taosMsleep(10);
continue;
}
taosResetTerminalMode();
printf("\r\nReceive SIGTERM or other signal, quit shell.\r\n");
shellWriteHistory();
shellExit();
taos_kill_query(shell.conn);
#ifdef WINDOWS
printf("\n%s", shell.info.promptHeader);
#endif
}
return NULL;
@ -1022,7 +1024,7 @@ int32_t shellExecute() {
taosSetSignal(SIGHUP, shellQueryInterruptHandler);
taosSetSignal(SIGABRT, shellQueryInterruptHandler);
taosSetSignal(SIGINT, shellSigintHandler);
taosSetSignal(SIGINT, shellQueryInterruptHandler);
shellGetGrantInfo();