delete data for scalar function
This commit is contained in:
parent
0a206f5819
commit
65457d2cf4
|
@ -451,6 +451,7 @@ typedef struct SStreamScanInfo {
|
||||||
SExprInfo* pPseudoExpr;
|
SExprInfo* pPseudoExpr;
|
||||||
int32_t numOfPseudoExpr;
|
int32_t numOfPseudoExpr;
|
||||||
SExprSupp tbnameCalSup;
|
SExprSupp tbnameCalSup;
|
||||||
|
SExprSupp* pPartTbnameSup;
|
||||||
SExprSupp tagCalSup;
|
SExprSupp tagCalSup;
|
||||||
int32_t primaryTsIndex; // primary time stamp slot id
|
int32_t primaryTsIndex; // primary time stamp slot id
|
||||||
int32_t primaryKeyIndex;
|
int32_t primaryKeyIndex;
|
||||||
|
|
|
@ -1255,7 +1255,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr) {
|
void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, SExprSupp* pTbnameExpr) {
|
||||||
SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI;
|
||||||
|
|
||||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
@ -1265,6 +1265,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
|
||||||
SStreamScanInfo* pScanInfo = downstream->info;
|
SStreamScanInfo* pScanInfo = downstream->info;
|
||||||
pScanInfo->partitionSup = *pParSup;
|
pScanInfo->partitionSup = *pParSup;
|
||||||
pScanInfo->pPartScalarSup = pExpr;
|
pScanInfo->pPartScalarSup = pExpr;
|
||||||
|
pScanInfo->pPartTbnameSup = pTbnameExpr;
|
||||||
if (!pScanInfo->pUpdateInfo) {
|
if (!pScanInfo->pUpdateInfo) {
|
||||||
pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate);
|
pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate);
|
||||||
}
|
}
|
||||||
|
@ -1408,7 +1409,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
|
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
|
||||||
|
|
||||||
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
|
@ -1783,28 +1783,57 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) {
|
||||||
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
|
|
||||||
blockDataCleanup(pInfo->pCreateTbRes);
|
blockDataCleanup(pInfo->pCreateTbRes);
|
||||||
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
|
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
|
||||||
pBlock->info.parTbName[0] = 0;
|
pBlock->info.parTbName[0] = 0;
|
||||||
} else {
|
} else {
|
||||||
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
|
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
|
||||||
pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
|
pBlock->info.id.groupId, pBlock, rowId, pInfo->pCreateTbRes, &pInfo->stateStore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
static int32_t generatePartitionDelResBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||||
blockDataCleanup(pDestBlock);
|
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
int32_t rows = pSrcBlock->info.rows;
|
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
if (rows == 0) {
|
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
|
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
||||||
|
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
|
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
|
||||||
|
SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
|
||||||
|
|
||||||
|
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
|
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||||
|
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||||
|
int64_t ver = pSrcBlock->info.version - 1;
|
||||||
|
for (int32_t delI = 0; delI < pSrcBlock->info.rows; delI++) {
|
||||||
|
uint64_t groupId = 0;
|
||||||
|
uint64_t srcUid = srcUidData[delI];
|
||||||
|
char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
|
||||||
|
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[delI], srcEndTsCol[delI], ver);
|
||||||
|
blockDataEnsureCapacity(pDestBlock, pDestBlock->info.rows + pPreRes->info.rows);
|
||||||
|
for (int32_t preJ = 0; preJ < pPreRes->info.rows; preJ++) {
|
||||||
|
groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, preJ);
|
||||||
|
if (pInfo->pPartTbnameSup) {
|
||||||
|
void* parTbname = NULL;
|
||||||
|
int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
calBlockTbName(pInfo, pPreRes, preJ);
|
||||||
|
memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName));
|
||||||
|
} else {
|
||||||
|
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
|
||||||
|
}
|
||||||
|
varDataSetLen(tbname, strlen(varDataVal(tbname)));
|
||||||
|
pInfo->stateStore.streamStateFreeVal(parTbname);
|
||||||
|
}
|
||||||
|
appendDataToSpecialBlock(pDestBlock, srcStartTsCol + delI, srcEndTsCol + delI, srcUidData + delI, &groupId,
|
||||||
|
tbname[0] == 0 ? NULL : tbname);
|
||||||
|
}
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||||
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
|
@ -1830,12 +1859,11 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
||||||
}
|
}
|
||||||
if (pInfo->tbnameCalSup.pExprInfo) {
|
if (pInfo->tbnameCalSup.pExprInfo) {
|
||||||
void* parTbname = NULL;
|
void* parTbname = NULL;
|
||||||
code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
|
int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo(liuyao) 这里可能需要修改,需要考虑复合主键时,pPreRes包含多行数据。
|
|
||||||
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver);
|
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver);
|
||||||
printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo));
|
printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo));
|
||||||
calBlockTbName(pInfo, pPreRes);
|
calBlockTbName(pInfo, pPreRes, 0);
|
||||||
memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName));
|
memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName));
|
||||||
} else {
|
} else {
|
||||||
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
|
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
|
||||||
|
@ -1849,6 +1877,22 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||||
|
blockDataCleanup(pDestBlock);
|
||||||
|
int32_t rows = pSrcBlock->info.rows;
|
||||||
|
if (rows == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
if (pInfo->partitionSup.needCalc) {
|
||||||
|
return generatePartitionDelResBlock(pInfo, pSrcBlock, pDestBlock);
|
||||||
|
}
|
||||||
|
return generateDeleteResultBlockImpl(pInfo, pSrcBlock, pDestBlock);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType type) {
|
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType type) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (isIntervalWindow(pInfo)) {
|
if (isIntervalWindow(pInfo)) {
|
||||||
|
@ -2120,7 +2164,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
calBlockTbName(pInfo, pInfo->pRes);
|
calBlockTbName(pInfo, pInfo->pRes, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2380,7 +2424,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
|
pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
|
||||||
if (pInfo->pRecoverRes != NULL) {
|
if (pInfo->pRecoverRes != NULL) {
|
||||||
calBlockTbName(pInfo, pInfo->pRecoverRes);
|
calBlockTbName(pInfo, pInfo->pRecoverRes, 0);
|
||||||
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
|
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
|
||||||
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||||
|
@ -2469,11 +2513,11 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
if (!isStreamWindow(pInfo)) {
|
if (!isStreamWindow(pInfo)) {
|
||||||
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
|
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
|
||||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
|
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||||
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
|
|
||||||
blockDataDestroy(pDelBlock);
|
blockDataDestroy(pDelBlock);
|
||||||
|
|
||||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||||
|
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
|
||||||
return pInfo->pDeleteDataRes;
|
return pInfo->pDeleteDataRes;
|
||||||
} else {
|
} else {
|
||||||
goto FETCH_NEXT_BLOCK;
|
goto FETCH_NEXT_BLOCK;
|
||||||
|
@ -2485,12 +2529,12 @@ FETCH_NEXT_BLOCK:
|
||||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||||
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
|
|
||||||
if (pInfo->tqReader) {
|
if (pInfo->tqReader) {
|
||||||
blockDataDestroy(pDelBlock);
|
blockDataDestroy(pDelBlock);
|
||||||
}
|
}
|
||||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||||
|
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
|
||||||
return pInfo->pDeleteDataRes;
|
return pInfo->pDeleteDataRes;
|
||||||
} else {
|
} else {
|
||||||
goto FETCH_NEXT_BLOCK;
|
goto FETCH_NEXT_BLOCK;
|
||||||
|
@ -2542,7 +2586,7 @@ FETCH_NEXT_BLOCK:
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
checkUpdateData(pInfo, true, pSDB, false);
|
||||||
}
|
}
|
||||||
printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
|
||||||
calBlockTbName(pInfo, pSDB);
|
calBlockTbName(pInfo, pSDB, 0);
|
||||||
return pSDB;
|
return pSDB;
|
||||||
}
|
}
|
||||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||||
|
@ -2940,6 +2984,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pPartTbnameSup = NULL;
|
||||||
if (pTableScanNode->pSubtable != NULL) {
|
if (pTableScanNode->pSubtable != NULL) {
|
||||||
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
|
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
|
||||||
if (pSubTableExpr == NULL) {
|
if (pSubTableExpr == NULL) {
|
||||||
|
|
|
@ -0,0 +1,175 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 1000
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql drop database if exists test;
|
||||||
|
sql create database test vgroups 1;
|
||||||
|
sql use test;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select ts, a, b from t1 partition by a;
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,0,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213002,2,2,3,1.0);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213003,0,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213004,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213005,2,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop0:
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
print $data30 $data31 $data32
|
||||||
|
print $data40 $data41 $data42
|
||||||
|
print $data50 $data51 $data52
|
||||||
|
print $data60 $data61 $data62
|
||||||
|
print $data70 $data71 $data72
|
||||||
|
|
||||||
|
if $rows != 6 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
print delete from t1 where ts <= 1648791213002;
|
||||||
|
sql delete from t1 where ts <= 1648791213002;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop1:
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt order by 1;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
print $data30 $data31 $data32
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 0 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 1 then
|
||||||
|
print ======data11=$data11
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != 2 then
|
||||||
|
print ======data21=$data21
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ======================step 2
|
||||||
|
|
||||||
|
sql drop database if exists test1;
|
||||||
|
sql create database test1 vgroups 1;
|
||||||
|
sql use test1;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 subtable(concat("aaa-", cast( a as varchar(10) ))) as select ts, a, b from t1 partition by a;
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,0,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213002,2,2,3,1.0);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213003,0,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213004,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213005,2,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop2:
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
print $data30 $data31 $data32
|
||||||
|
print $data40 $data41 $data42
|
||||||
|
print $data50 $data51 $data52
|
||||||
|
print $data60 $data61 $data62
|
||||||
|
print $data70 $data71 $data72
|
||||||
|
|
||||||
|
if $rows != 6 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
print delete from t1 where ts <= 1648791213002;
|
||||||
|
sql delete from t1 where ts <= 1648791213002;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop3:
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt1 order by 1;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
print $data30 $data31 $data32
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 0 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 1 then
|
||||||
|
print ======data11=$data11
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != 2 then
|
||||||
|
print ======data21=$data21
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
|
||||||
|
#goto looptest
|
|
@ -70,5 +70,36 @@ if $data12 != 4 then
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791210001,3,1,1,2.0);
|
||||||
|
|
||||||
|
print sql delete from t1 where ts = 1648791210000;
|
||||||
|
sql delete from t1 where ts = 1648791210000;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop1:
|
||||||
|
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from streamt1 order by 1,2
|
||||||
|
sql select * from streamt1 order by 1,2;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 3 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue