Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

This commit is contained in:
Minghao Li 2022-10-27 16:22:08 +08:00
commit bbb624b63e
19 changed files with 380 additions and 101 deletions

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG f9c1d32 GIT_TAG f20eb34
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -89,6 +89,16 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
*/ */
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
/**
* Set block for sma
* @param tinfo
* @param pBlocks
* @param numOfInputBlock
* @param type
* @return
*/
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
* *

View File

@ -776,11 +776,6 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
} }
} }
if (numOfVnodes > 0) {
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
goto _OVER;
}
code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes); code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;

View File

@ -424,7 +424,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL; return NULL;
} }
mInfo("mnode open successfully "); mInfo("mnode open successfully");
return pMnode; return pMnode;
} }

View File

@ -1112,7 +1112,9 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId); mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1; if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1; if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, -1) != 0) return -1; for (int32_t i = 0; i < newVg.replica - 1; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
mInfo("vgId:%d, will remove 1 vnodes", pVgroup->vgId); mInfo("vgId:%d, will remove 1 vnodes", pVgroup->vgId);
@ -1120,8 +1122,10 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
SVnodeGid del = newVg.vnodeGid[vnIndex]; SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica]; newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid)); memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, -1) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
for (int32_t i = 0; i < newVg.replica; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
{ {
@ -1193,21 +1197,11 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
pGid->dnodeId = newDnodeId; pGid->dnodeId = newDnodeId;
pGid->syncState = TAOS_SYNC_STATE_ERROR; pGid->syncState = TAOS_SYNC_STATE_ERROR;
if (pVgroup->replica == 2) { if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1; for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[0].dnodeId) != 0) return -1; if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
} else if (pVgroup->replica == 4) {
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[0].dnodeId) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[1].dnodeId) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[2].dnodeId) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
} else {
mError("vgId:%d, failed to add 1 vnode since invalid replica:%d", pVgroup->vgId, pVgroup->replica);
terrno = TSDB_CODE_MND_APP_ERROR;
return -1;
} }
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
return 0; return 0;
} }
@ -1232,21 +1226,11 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid)); memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid)); memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
if (pVgroup->replica == 1) { if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[0].dnodeId) != 0) return -1; for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1; if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
} else if (pVgroup->replica == 3) {
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[0].dnodeId) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[1].dnodeId) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[2].dnodeId) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
} else {
mError("vgId:%d, failed to remove 1 vnode since invalid replica:%d", pVgroup->vgId, pVgroup->replica);
terrno = TSDB_CODE_MND_APP_ERROR;
return -1;
} }
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
return 0; return 0;
} }
@ -1890,9 +1874,6 @@ _OVER:
} }
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
#if 1
return TSDB_CODE_OPS_NOT_SUPPORT;
#else
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
SArray *pArray = NULL; SArray *pArray = NULL;
@ -1941,7 +1922,6 @@ _OVER:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
return code; return code;
#endif
} }
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }

View File

@ -15,11 +15,11 @@
#include "sma.h" #include "sma.h"
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
#define RSMA_FETCH_DELAY_MAX (120000) // ms #define RSMA_FETCH_DELAY_MAX (120000) // ms
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.inited = 0, .inited = 0,
@ -839,7 +839,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
tdRsmaPrintSubmitReq(pSma, pReq); tdRsmaPrintSubmitReq(pSma, pReq);
} }
#endif #endif
if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) { if (qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType) < 0) {
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -1404,7 +1404,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
pItem->nScanned = 0; pItem->nScanned = 0;
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err; goto _err;
} }
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {

View File

@ -111,7 +111,7 @@ typedef struct SDataBlockIter {
int32_t index; int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo> SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order; int32_t order;
SDataBlk block; // current SDataBlk data SDataBlk block; // current SDataBlk data
SHashObj* pTableMap; SHashObj* pTableMap;
} SDataBlockIter; } SDataBlockIter;
@ -169,14 +169,14 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger); SRowMerger* pMerger);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger); SRowMerger* pMerger, SVersionRange* pVerRange);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex); int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow); STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
@ -1052,7 +1052,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo == NULL) { if (pScanInfo == NULL) {
tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, %s", pBlockInfo->uid, idStr); tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -1466,7 +1466,8 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return false; return false;
} }
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
SVersionRange* pVerRange) {
while (1) { while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) { if (!hasVal) {
@ -1475,7 +1476,8 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row); TSDBKEY k = TSDBROW_KEY(&row);
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
pVerRange)) {
return true; return true;
} }
} }
@ -1483,7 +1485,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) { STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo); bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
if (hasVal) { if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 != ts) { if (next1 != ts) {
@ -1602,7 +1604,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
} }
if (minKey == k.ts) { if (minKey == k.ts) {
@ -1647,7 +1649,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
} }
if (minKey == key) { if (minKey == key) {
@ -1699,7 +1701,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
code = tRowMergerGetRow(&merge, &pTSRow); code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1717,7 +1719,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
ASSERT(mergeBlockData); ASSERT(mergeBlockData);
// merge with block data if ts == key // merge with block data if ts == key
@ -1771,7 +1773,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
code = tRowMergerGetRow(&merge, &pTSRow); code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1882,7 +1884,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
} }
if (minKey == ik.ts) { if (minKey == ik.ts) {
@ -1901,8 +1903,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
&merge, pReader); pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -1973,7 +1975,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
} }
if (minKey == key) { if (minKey == key) {
@ -1993,7 +1995,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
if (merge.pTSchema == NULL) { if (merge.pTSchema == NULL) {
return code; return code;
} }
@ -2095,7 +2097,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
} }
TSDBKEY k = {.ts = ts, .version = ver}; TSDBKEY k = {.ts = ts, .version = ver};
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) { if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
&pReader->verRange)) {
return false; return false;
} }
@ -2130,7 +2133,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
return false; return false;
} }
return nextRowFromLastBlocks(pLBlockReader, pScanInfo); return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
} }
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
@ -2225,8 +2228,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) { if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, total tables:%d, %s", tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
pBlockInfo->uid, taosHashGetSize(pReader->status.pTableMap), pReader->idStr); taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
goto _end; goto _end;
} }
@ -2290,7 +2293,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
_end: _end:
pResBlock->info.uid = (pBlockScanInfo != NULL)? pBlockScanInfo->uid:0; pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
blockDataUpdateTsWindow(pResBlock, 0); blockDataUpdateTsWindow(pResBlock, 0);
setComposedBlockFlag(pReader, true); setComposedBlockFlag(pReader, true);
@ -2859,7 +2862,7 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
return (SVersionRange){.minVer = startVer, .maxVer = endVer}; return (SVersionRange){.minVer = startVer, .maxVer = endVer};
} }
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) { bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
ASSERT(pKey != NULL); ASSERT(pKey != NULL);
if (pDelList == NULL) { if (pDelList == NULL) {
return false; return false;
@ -2887,7 +2890,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
return false; return false;
} }
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) { if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
pVerRange->maxVer >= pCurrent->version) {
return true; return true;
} }
@ -2903,7 +2907,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
continue; continue;
} }
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) { if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
pVerRange->maxVer >= pCurrent->version) {
return true; return true;
} }
} }
@ -2973,7 +2978,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
// it is a valid data version // it is a valid data version
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) && if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) { (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
return pRow; return pRow;
} }
@ -2992,7 +2997,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
} }
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer && if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) { (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
return pRow; return pRow;
} }
} }
@ -3130,9 +3135,9 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
} }
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger) { SRowMerger* pMerger, SVersionRange* pVerRange) {
pScanInfo->lastKey = ts; pScanInfo->lastKey = ts;
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) { while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) { if (next1 == ts) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
@ -3772,7 +3777,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
if (pBlockScanInfo == NULL) { // no data block for the table of given uid if (pBlockScanInfo == NULL) { // no data block for the table of given uid
return false; return false;
} }

View File

@ -30,6 +30,46 @@ static void cleanupRefPool() {
taosCloseRef(ref); taosCloseRef(ref);
} }
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
if (pOperator->numOfDownstream > 1) { // not handle this in join query
qError("join not supported for stream block scan, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
pOperator->status = OP_NOT_OPENED;
return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else {
pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info;
if (type == STREAM_INPUT__MERGED_SUBMIT) {
for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq);
}
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
taosArrayPush(pInfo->pBlockLists, &input);
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
taosArrayPush(pInfo->pBlockLists, &pDataBlock);
}
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
}
return TSDB_CODE_SUCCESS;
}
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
@ -100,6 +140,27 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return code; return code;
} }
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR;
}
if (pBlocks == NULL || numOfBlocks == 0) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
} else {
qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
}
return code;
}
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
if (msg == NULL) { if (msg == NULL) {
// create raw scan // create raw scan

View File

@ -1738,8 +1738,10 @@ static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, S
j++; j++;
} }
} }
uint32_t cap = pDst->info.capacity;
pDst->info = pSrc->info; pDst->info = pSrc->info;
pDst->info.rows = j; pDst->info.rows = j;
pDst->info.capacity = cap;
return 0; return 0;
} }

View File

@ -1148,6 +1148,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->pRaftCfg = NULL; pSyncNode->pRaftCfg = NULL;
} }
// init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId;
SSyncCfg* pCfg = &pSyncInfo->syncCfg; SSyncCfg* pCfg = &pSyncInfo->syncCfg;
sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex); sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) { for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
@ -1155,8 +1157,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort); sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
} }
// init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId;
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path, snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
TD_DIRSEP); TD_DIRSEP);
@ -3429,7 +3429,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// config change finish // config change finish
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
if (rpcMsg.pCont != NULL) { if (rpcMsg.pCont != NULL && rpcMsg.contLen > 0) {
code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry); code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry);
ASSERT(code == 0); ASSERT(code == 0);
} }

View File

@ -3026,7 +3026,7 @@ void syncReconfigFinishFromRpcMsg(const SRpcMsg* pRpcMsg, SyncReconfigFinish* pM
} }
SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncReconfigFinish* pMsg = syncReconfigFinishDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); SyncReconfigFinish* pMsg = syncReconfigFinishDeserialize2(pRpcMsg->pCont, (uint32_t)(pRpcMsg->contLen));
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
return pMsg; return pMsg;
} }

View File

@ -36,26 +36,26 @@
./test.sh -f tsim/db/taosdlog.sim ./test.sh -f tsim/db/taosdlog.sim
# ---- dnode # ---- dnode
# unsupport ./test.sh -f tsim/dnode/balance_replica1.sim ./test.sh -f tsim/dnode/balance_replica1.sim
# unsupport ./test.sh -f tsim/dnode/balance_replica3.sim ./test.sh -f tsim/dnode/balance_replica3.sim
# unsupport ./test.sh -f tsim/dnode/balance1.sim ./test.sh -f tsim/dnode/balance1.sim
# unsupport ./test.sh -f tsim/dnode/balance2.sim ./test.sh -f tsim/dnode/balance2.sim
# unsupport ./test.sh -f tsim/dnode/balance3.sim ./test.sh -f tsim/dnode/balance3.sim
# unsupport ./test.sh -f tsim/dnode/balancex.sim ./test.sh -f tsim/dnode/balancex.sim
./test.sh -f tsim/dnode/create_dnode.sim ./test.sh -f tsim/dnode/create_dnode.sim
./test.sh -f tsim/dnode/drop_dnode_has_mnode.sim ./test.sh -f tsim/dnode/drop_dnode_has_mnode.sim
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_qnode_snode.sim # unsupport ./test.sh -f tsim/dnode/drop_dnode_has_qnode_snode.sim
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica1.sim ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica1.sim
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
./test.sh -f tsim/dnode/offline_reason.sim ./test.sh -f tsim/dnode/offline_reason.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim ./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim
# unsupport ./test.sh -f tsim/dnode/vnode_clean.sim ./test.sh -f tsim/dnode/vnode_clean.sim
./test.sh -f tsim/dnode/use_dropped_dnode.sim ./test.sh -f tsim/dnode/use_dropped_dnode.sim
# ---- import ---- # ---- import ----

View File

@ -81,6 +81,42 @@ if $data(2)[2] != 2 then
return -1 return -1
endi endi
sql select * from d1.t1 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 11 then
return -1
endi
if $data11 != 12 then
return -1
endi
if $data21 != 13 then
return -1
endi
if $data31 != 14 then
return -1
endi
if $data41 != 15 then
return -1
endi
sql select * from d2.t2 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 21 then
return -1
endi
if $data11 != 22 then
return -1
endi
if $data21 != 23 then
return -1
endi
if $data31 != 24 then
return -1
endi
if $data41 != 25 then
return -1
endi
print ========== step4 print ========== step4
sql drop dnode 2 sql drop dnode 2
sql select * from information_schema.ins_dnodes sql select * from information_schema.ins_dnodes
@ -93,6 +129,42 @@ if $data(2)[2] != null then
return -1 return -1
endi endi
sql select * from d1.t1 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 11 then
return -1
endi
if $data11 != 12 then
return -1
endi
if $data21 != 13 then
return -1
endi
if $data31 != 14 then
return -1
endi
if $data41 != 15 then
return -1
endi
sql select * from d2.t2 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 21 then
return -1
endi
if $data11 != 22 then
return -1
endi
if $data21 != 23 then
return -1
endi
if $data31 != 24 then
return -1
endi
if $data41 != 25 then
return -1
endi
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
print ========== step5 print ========== step5
@ -131,6 +203,42 @@ if $data(3)[2] != 1 then
return -1 return -1
endi endi
sql select * from d1.t1 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 11 then
return -1
endi
if $data11 != 12 then
return -1
endi
if $data21 != 13 then
return -1
endi
if $data31 != 14 then
return -1
endi
if $data41 != 15 then
return -1
endi
sql select * from d2.t2 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 21 then
return -1
endi
if $data11 != 22 then
return -1
endi
if $data21 != 23 then
return -1
endi
if $data31 != 24 then
return -1
endi
if $data41 != 25 then
return -1
endi
print ========== step6 print ========== step6
sql create database d3 vgroups 1 sql create database d3 vgroups 1
sql create table d3.t3 (t timestamp, i int) sql create table d3.t3 (t timestamp, i int)

View File

@ -653,6 +653,58 @@ class TDTestCase:
tdSql.query(f"select cast(c9 as timestamp ) as b from {self.dbname}.ct4") tdSql.query(f"select cast(c9 as timestamp ) as b from {self.dbname}.ct4")
tdSql.query(f"select cast(c9 as binary(64) ) as b from {self.dbname}.ct4") tdSql.query(f"select cast(c9 as binary(64) ) as b from {self.dbname}.ct4")
# enh of cast function about coverage
tdSql.query(f"select cast(c1 as int) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c1 as bool) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c1 as tinyint) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c1 as smallint) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c1 as float) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c1 as double) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c1 as tinyint unsigned) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c1 as smallint unsigned) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c1 as int unsigned) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c2 as int) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c3 as bool) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c4 as tinyint) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c5 as smallint) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c6 as float) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c7 as double) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c8 as tinyint unsigned) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c8 as timestamp ) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c9 as timestamp ) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c9 as binary(64) ) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(abs(c2) as int) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c3 as bool) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(floor(c4) as tinyint) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c5+2 as smallint) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(2 as float) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c7 as double) as b from {self.dbname}.stb1")
tdSql.query(f"select cast('123' as tinyint unsigned) as b from {self.dbname}.stb1")
tdSql.query(f"select max(cast(abs(c2) as int)) as b from {self.dbname}.stb1")
tdSql.query(f"select log(cast(c3 as int),2) as b from {self.dbname}.stb1")
tdSql.query(f"select abs(cast(floor(c4) as tinyint)) as b from {self.dbname}.stb1")
tdSql.query(f"select last(cast(c5+2 as smallint)) as b from {self.dbname}.stb1")
tdSql.query(f"select mavg(cast(2 as float),3) as b from {self.dbname}.stb1 partition by tbname")
tdSql.query(f"select cast(c7 as double) as b from {self.dbname}.stb1 partition by tbname order by tbname")
tdSql.query(f"select cast('123' as tinyint unsigned) as b from {self.dbname}.stb1 partition by tbname")
# uion with cast and common cols
tdSql.query(f"select cast(c2 as int) as b from {self.dbname}.stb1 union all select c1 from {self.dbname}.stb1 ")
tdSql.query(f"select cast(c3 as bool) as b from {self.dbname}.stb1 union all select c7 from {self.dbname}.ct1 ")
tdSql.query(f"select cast(c4 as tinyint) as b from {self.dbname}.stb1 union all select c4 from {self.dbname}.stb1")
tdSql.query(f"select cast(c5 as smallint) as b from {self.dbname}.stb1 union all select cast(c5 as smallint) as b from {self.dbname}.stb1")
tdSql.query(f"select cast(c6 as float) as b from {self.dbname}.stb1 union all select c5 from {self.dbname}.stb1")
tdSql.query(f"select cast(c7 as double) as b from {self.dbname}.stb1 union all select 123 from {self.dbname}.stb1 ")
tdSql.query(f"select cast(c8 as tinyint unsigned) as b from {self.dbname}.stb1 union all select last(cast(c8 as tinyint unsigned)) from {self.dbname}.stb1")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()

View File

@ -17,6 +17,7 @@ class TDTestCase:
self.ts = 1537146000000 self.ts = 1537146000000
self.binary_str = 'taosdata' self.binary_str = 'taosdata'
self.nchar_str = '涛思数据' self.nchar_str = '涛思数据'
self.cachemodel = None
def set_create_normaltable_sql(self, ntbname, column_dict): def set_create_normaltable_sql(self, ntbname, column_dict):
column_sql = '' column_sql = ''
@ -36,7 +37,8 @@ class TDTestCase:
return create_stb_sql return create_stb_sql
def last_check_stb_tb_base(self): def last_check_stb_tb_base(self):
tdSql.prepare() tdSql.execute(
f'create database if not exists db cachemodel "{self.cachemodel}"')
stbname = f'db.{tdCom.getLongName(5, "letters")}' stbname = f'db.{tdCom.getLongName(5, "letters")}'
column_dict = { column_dict = {
'col1': 'tinyint', 'col1': 'tinyint',
@ -112,7 +114,8 @@ class TDTestCase:
tdSql.execute('drop database db') tdSql.execute('drop database db')
def last_check_ntb_base(self): def last_check_ntb_base(self):
tdSql.prepare() tdSql.execute(
f'create database if not exists db cachemodel "{self.cachemodel}"')
ntbname = f'db.{tdCom.getLongName(5, "letters")}' ntbname = f'db.{tdCom.getLongName(5, "letters")}'
column_dict = { column_dict = {
'col1': 'tinyint', 'col1': 'tinyint',
@ -191,7 +194,7 @@ class TDTestCase:
} }
tdSql.execute( tdSql.execute(
f"create database if not exists {dbname} vgroups {vgroup_num}") f'create database if not exists {dbname} vgroups {vgroup_num} cachemodel "{self.cachemodel}"')
tdSql.execute(f'use {dbname}') tdSql.execute(f'use {dbname}')
# build 20 child tables,every table insert 10 rows # build 20 child tables,every table insert 10 rows
@ -244,9 +247,11 @@ class TDTestCase:
tdSql.execute(f'drop database {dbname}') tdSql.execute(f'drop database {dbname}')
def run(self): def run(self):
self.last_check_stb_tb_base() for cachemodel in ["None", "last_row", "last_value", "both"]:
self.last_check_ntb_base() self.cachemodel = cachemodel
self.last_check_stb_distribute() self.last_check_stb_tb_base()
self.last_check_ntb_base()
self.last_check_stb_distribute()
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -229,6 +229,36 @@ class TDTestCase:
tdSql.query(f"select log(c6 ,2) from {dbname}.ct3") tdSql.query(f"select log(c6 ,2) from {dbname}.ct3")
tdSql.checkRows(0) tdSql.checkRows(0)
# log used for different param types
tdSql.query(f"select log(c1,c2) from {dbname}.ct1;")
tdSql.query(f"select log(c1,c2) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select log(c1,2) from {dbname}.ct1;")
tdSql.query(f"select log(c1,2) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select log(2,c2) from {dbname}.ct1;")
tdSql.query(f"select log(2,c2) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select log(2,1) from {dbname}.ct1;")
tdSql.query(f"select log(2,2) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select log(2,floor(1)) from {dbname}.ct1;")
tdSql.query(f"select log(2,floor(2)) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select log(abs(2),floor(1)) from {dbname}.ct1;")
tdSql.query(f"select log(abs(2),floor(2)) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select log(abs(c2),c1) from {dbname}.ct1;")
tdSql.query(f"select log(abs(c2),c1) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select log(c2,abs(c1)) from {dbname}.ct1;")
tdSql.query(f"select log(c2,abs(c1)) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select log(abs(c2),2) from {dbname}.ct1;")
tdSql.query(f"select log(abs(c2),2) from {dbname}.stb1 partition by tbname order by tbname;")
# # used for regular table # # used for regular table
tdSql.query(f"select log(c1 ,2) from {dbname}.t1") tdSql.query(f"select log(c1 ,2) from {dbname}.t1")
@ -291,6 +321,7 @@ class TDTestCase:
tdSql.query(f"select log(c1, 2) from {dbname}.stb1") tdSql.query(f"select log(c1, 2) from {dbname}.stb1")
tdSql.checkRows(25) tdSql.checkRows(25)
# used for not exists table # used for not exists table
tdSql.error(f"select log(c1, 2) from {dbname}.stbbb1") tdSql.error(f"select log(c1, 2) from {dbname}.stbbb1")

View File

@ -216,6 +216,36 @@ class TDTestCase:
tdSql.checkRows(0) tdSql.checkRows(0)
# pow used for different param types
tdSql.query(f"select pow(c1,c2) from {dbname}.ct1;")
tdSql.query(f"select pow(c1,c2) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select pow(c1,2) from {dbname}.ct1;")
tdSql.query(f"select pow(c1,2) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select pow(2,c2) from {dbname}.ct1;")
tdSql.query(f"select pow(2,c2) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select pow(2,1) from {dbname}.ct1;")
tdSql.query(f"select pow(2,2) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select pow(2,floor(1)) from {dbname}.ct1;")
tdSql.query(f"select pow(2,floor(2)) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select pow(abs(2),floor(1)) from {dbname}.ct1;")
tdSql.query(f"select pow(abs(2),floor(2)) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select pow(abs(c2),c1) from {dbname}.ct1;")
tdSql.query(f"select pow(abs(c2),c1) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select pow(c2,abs(c1)) from {dbname}.ct1;")
tdSql.query(f"select pow(c2,abs(c1)) from {dbname}.stb1 partition by tbname order by tbname;")
tdSql.query(f"select pow(abs(c2),2) from {dbname}.ct1;")
tdSql.query(f"select pow(abs(c2),2) from {dbname}.stb1 partition by tbname order by tbname;")
# # used for regular table # # used for regular table
tdSql.query(f"select pow(c1 ,2) from {dbname}.t1") tdSql.query(f"select pow(c1 ,2) from {dbname}.t1")
tdSql.checkData(0, 0, None) tdSql.checkData(0, 0, None)

View File

@ -551,7 +551,7 @@ python3 ./test.py -f 2-query/upper.py -Q 4
python3 ./test.py -f 2-query/lower.py -Q 4 python3 ./test.py -f 2-query/lower.py -Q 4
python3 ./test.py -f 2-query/join.py -Q 4 python3 ./test.py -f 2-query/join.py -Q 4
python3 ./test.py -f 2-query/join2.py -Q 4 python3 ./test.py -f 2-query/join2.py -Q 4
python3 ./test.py -f 2-query/cast.py -Q 4 #python3 ./test.py -f 2-query/cast.py -Q 4
python3 ./test.py -f 2-query/substr.py -Q 4 python3 ./test.py -f 2-query/substr.py -Q 4
python3 ./test.py -f 2-query/union.py -Q 4 python3 ./test.py -f 2-query/union.py -Q 4
python3 ./test.py -f 2-query/union1.py -Q 4 python3 ./test.py -f 2-query/union1.py -Q 4

View File

@ -12,7 +12,7 @@ IF (TD_WEBSOCKET)
ExternalProject_Add(taosws-rs ExternalProject_Add(taosws-rs
PREFIX "taosws-rs" PREFIX "taosws-rs"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs
BUILD_ALWAYS on BUILD_ALWAYS off
DEPENDS taos DEPENDS taos
BUILD_IN_SOURCE 1 BUILD_IN_SOURCE 1
CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config" CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config"