Merge remote-tracking branch 'origin/3.0' into fix/TD-19245

This commit is contained in:
Shengliang Guan 2022-10-09 18:35:06 +08:00
commit 9fc1e86f4d
32 changed files with 3285 additions and 3155 deletions

View File

@ -375,7 +375,7 @@ public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 20;
private static final int BINARY_COLUMN_SIZE = 30;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",

View File

@ -49,7 +49,7 @@ typedef struct {
TSKEY ts;
} SWinKey;
static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2;
@ -68,6 +68,10 @@ static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, i
return 0;
}
static inline int winKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
return sWinKeyCmprImpl(pKey1, pKey2);
}
typedef struct {
uint64_t groupId;
TSKEY ts;

View File

@ -297,7 +297,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB
#define TSDB_DEFAULT_BUFFER_PER_VNODE 96
#define TSDB_MIN_PAGES_PER_VNODE 64
#define TSDB_MAX_PAGES_PER_VNODE 16384
#define TSDB_MAX_PAGES_PER_VNODE INT32_MAX
#define TSDB_DEFAULT_PAGES_PER_VNODE 256
#define TSDB_MIN_PAGESIZE_PER_VNODE 1 // unit KB
#define TSDB_MAX_PAGESIZE_PER_VNODE 16384

View File

@ -631,7 +631,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) {
#if 1
#if 0
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
return terrno;
#else
@ -641,7 +641,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
}
if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) {
#if 1
#if 0
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
return terrno;
#else

View File

@ -30,6 +30,7 @@ TEST_F(MndTestDb, 01_ShowDb) {
EXPECT_EQ(test.GetShowRows(), 2);
}
#if 0
TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
{
SCreateDbReq createReq = {0};
@ -125,6 +126,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
test.SendShowReq(TSDB_MGMT_TABLE_DB, "ins_databases", "");
EXPECT_EQ(test.GetShowRows(), 2);
}
#endif
TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
{

View File

@ -72,7 +72,7 @@ struct SVBufPool {
SVBufPoolNode node;
};
int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
int32_t vnodeOpenBufPool(SVnode* pVnode);
int32_t vnodeCloseBufPool(SVnode* pVnode);
void vnodeBufPoolReset(SVBufPool* pPool);

View File

@ -111,6 +111,7 @@ SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, boo
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage);
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
int64_t metaGetTbNum(SMeta* pMeta);

View File

@ -197,6 +197,18 @@ int metaClose(SMeta *pMeta) {
return 0;
}
int metaAlterCache(SMeta *pMeta, int32_t nPage) {
metaWLock(pMeta);
if (tdbAlter(pMeta->pEnv, nPage) < 0) {
metaULock(pMeta);
return -1;
}
metaULock(pMeta);
return 0;
}
int32_t metaRLock(SMeta *pMeta) {
int32_t ret = 0;

View File

@ -235,8 +235,13 @@ static int32_t binarySearchForStartRowIndex(uint64_t* uidList, int32_t num, uint
}
}
static bool queryChildTable(uint64_t suid) {
return suid != 0;
}
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) {
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo,
const char* idStr) {
int32_t code = 0;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
if (*pIter == NULL) {
@ -252,12 +257,19 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->timeWindow = *pTimeWindow;
(*pIter)->pBlockLoadInfo = pBlockLoadInfo;
if (taosArrayGetSize(pBlockLoadInfo->aSttBlk) == 0) {
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size == 0) {
int64_t st = taosGetTimestampUs();
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
if (code) {
goto _exit;
} else {
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
}
// only apply to the child tables, ordinary tables will not incur this filter procedure.
if (queryChildTable(suid)) {
size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
SArray *pTmp = taosArrayInit(size, sizeof(SSttBlk));
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
@ -269,9 +281,12 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
taosArrayDestroy(pBlockLoadInfo->aSttBlk);
pBlockLoadInfo->aSttBlk = pTmp;
}
double el = (taosGetTimestampUs() - st)/1000.0;
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
}
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
// find the start block
(*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
@ -493,7 +508,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
struct SLDataIter* pIter = NULL;
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pMTree->pLoadInfo[i]);
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pMTree->pLoadInfo[i], pMTree->idStr);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}

View File

@ -16,20 +16,53 @@
#include "vnd.h"
/* ------------------------ STRUCTURES ------------------------ */
#define VNODE_BUFPOOL_SEGMENTS 3
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool);
static int vnodeBufPoolDestroy(SVBufPool *pPool);
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
SVBufPool *pPool;
int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
if (pPool == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
taosMemoryFree(pPool);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pPool->next = NULL;
pPool->pVnode = pVnode;
pPool->nRef = 0;
pPool->size = 0;
pPool->ptr = pPool->node.data;
pPool->pTail = &pPool->node;
pPool->node.prev = NULL;
pPool->node.pnext = &pPool->pTail;
pPool->node.size = size;
*ppPool = pPool;
return 0;
}
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool);
taosThreadSpinDestroy(&pPool->lock);
taosMemoryFree(pPool);
return 0;
}
int vnodeOpenBufPool(SVnode *pVnode) {
SVBufPool *pPool = NULL;
int ret;
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
ASSERT(pVnode->pPool == NULL);
for (int i = 0; i < 3; i++) {
// create pool
ret = vnodeBufPoolCreate(pVnode, size, &pPool);
if (ret < 0) {
if (vnodeBufPoolCreate(pVnode, size, &pPool)) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
vnodeCloseBufPool(pVnode);
return -1;
@ -41,7 +74,6 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
}
vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
return 0;
}
@ -63,9 +95,7 @@ int vnodeCloseBufPool(SVnode *pVnode) {
}
void vnodeBufPoolReset(SVBufPool *pPool) {
SVBufPoolNode *pNode;
for (pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
ASSERT(pNode->pnext == &pPool->pTail);
pNode->prev->pnext = &pPool->pTail;
pPool->pTail = pNode->prev;
@ -81,7 +111,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
SVBufPoolNode *pNode;
void *p;
void *p = NULL;
taosThreadSpinLock(&pPool->lock);
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
// allocate from the anchor node
@ -124,43 +154,6 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
}
}
// STATIC METHODS -------------------
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
SVBufPool *pPool;
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
if (pPool == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
taosMemoryFree(pPool);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pPool->next = NULL;
pPool->pVnode = pVnode;
pPool->nRef = 0;
pPool->size = 0;
pPool->ptr = pPool->node.data;
pPool->pTail = &pPool->node;
pPool->node.prev = NULL;
pPool->node.pnext = &pPool->pTail;
pPool->node.size = size;
*ppPool = pPool;
return 0;
}
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool);
taosThreadSpinDestroy(&pPool->lock);
taosMemoryFree(pPool);
return 0;
}
void vnodeBufPoolRef(SVBufPool *pPool) {
int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
ASSERT(nRef > 0);
@ -175,6 +168,19 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
taosThreadMutexLock(&pVnode->mutex);
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
if (pPool->node.size != size) {
SVBufPool *pPoolT = NULL;
if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) {
vWarn("vgId:%d try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode),
pPool->node.size, size, tstrerror(errno));
} else {
vnodeBufPoolDestroy(pPool);
pPool = pPoolT;
vDebug("vgId:%d change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size);
}
}
pPool->next = pVnode->pPool;
pVnode->pPool = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);

View File

@ -73,7 +73,7 @@ int vnodeBegin(SVnode *pVnode) {
int vnodeShouldCommit(SVnode *pVnode) {
if (pVnode->inUse) {
return pVnode->inUse->size > pVnode->config.szBuf / 3;
return pVnode->inUse->size > pVnode->inUse->node.size;
}
return false;
}

View File

@ -96,7 +96,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
// open buffer pool
if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) {
if (vnodeOpenBufPool(pVnode) < 0) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}

View File

@ -1040,6 +1040,23 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
}
if (pVnode->config.szBuf != alterReq.buffer * 1024LL * 1024LL) {
vInfo("vgId:%d vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
alterReq.buffer * 1024LL * 1024LL);
pVnode->config.szBuf = alterReq.buffer * 1024LL * 1024LL;
}
if (pVnode->config.szCache != alterReq.pages) {
if (metaAlterCache(pVnode->pMeta, alterReq.pages) < 0) {
vError("vgId:%d failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
pVnode->config.szCache, alterReq.pages, tstrerror(errno));
return errno;
} else {
vInfo("vgId:%d vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, alterReq.pages);
pVnode->config.szCache = alterReq.pages;
}
}
if (pVnode->config.cacheLast != alterReq.cacheLast) {
pVnode->config.cacheLast = alterReq.cacheLast;
}

View File

@ -606,8 +606,6 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
SSDataBlock* pUpdateRes;
bool returnUpdate;
SPhysiNode* pPhyNode; // create new child
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo

View File

@ -1331,8 +1331,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t*
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
colDataAppendNULL(pCalStartCol, pBlock->info.rows);
colDataAppendNULL(pCalEndCol, pBlock->info.rows);
colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
pBlock->info.rows++;
}
@ -1376,7 +1376,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
}
}
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -1430,7 +1430,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
}
if (filter) {
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
}
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock);
return 0;
@ -1466,7 +1468,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
continue;
}
setBlockIntoRes(pInfo, &block);
setBlockIntoRes(pInfo, &block, true);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
@ -1507,7 +1509,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqNextBlock(pInfo->tqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes);
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
ASSERT(0);
}
if (pInfo->pRes->info.rows > 0) {
@ -1771,6 +1773,7 @@ FETCH_NEXT_BLOCK:
// printDataBlock(pSDB, "stream scan update");
return pSDB;
}
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break;
default:
@ -1821,7 +1824,7 @@ FETCH_NEXT_BLOCK:
continue;
}
setBlockIntoRes(pInfo, &block);
setBlockIntoRes(pInfo, &block, false);
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
pInfo->pRes->info.version)) {
@ -1830,28 +1833,7 @@ FETCH_NEXT_BLOCK:
continue;
}
if (pBlockInfo->rows > 0) {
break;
}
}
if (pBlockInfo->rows > 0) {
break;
} else {
pInfo->tqReader->pMsg = NULL;
continue;
}
/*blockDataCleanup(pInfo->pRes);*/
}
// record the scan action.
pInfo->numOfExec++;
pOperator->resultInfo.totalRows += pBlockInfo->rows;
// printDataBlock(pInfo->pRes, "stream scan");
if (pBlockInfo->rows == 0) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
/*pOperator->status = OP_EXEC_DONE;*/
} else if (pInfo->pUpdateInfo) {
if (pInfo->pUpdateInfo) {
checkUpdateData(pInfo, true, pInfo->pRes, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
if (pInfo->pUpdateDataRes->info.rows > 0) {
@ -1867,13 +1849,37 @@ FETCH_NEXT_BLOCK:
}
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break;
}
}
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break;
} else {
pInfo->tqReader->pMsg = NULL;
continue;
}
/*blockDataCleanup(pInfo->pRes);*/
}
// record the scan action.
pInfo->numOfExec++;
pOperator->resultInfo.totalRows += pBlockInfo->rows;
// printDataBlock(pInfo->pRes, "stream scan");
qDebug("scan rows: %d", pBlockInfo->rows);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
} else {
goto NEXT_SUBMIT_BLK;
}
/*return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;*/
if (pInfo->pUpdateDataRes->info.rows > 0) {
goto FETCH_NEXT_BLOCK;
}
goto NEXT_SUBMIT_BLK;
} else {
ASSERT(0);
return NULL;

View File

@ -414,14 +414,17 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
return true;
}
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
if (pInterval->interval != pInterval->sliding &&
(pWin->ekey < pBlockInfo->calWin.skey || pWin->skey > pBlockInfo->calWin.ekey)) {
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
return false;
}
return true;
}
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
}
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
bool ascQuery = (order == TSDB_ORDER_ASC);
@ -912,6 +915,8 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
}
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
taosArraySort(pDelWins, sWinKeyCmprImpl);
taosArrayRemoveDuplicate(pDelWins, sWinKeyCmprImpl, NULL);
int32_t delSize = taosArrayGetSize(pDelWins);
if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
return;
@ -1387,7 +1392,7 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData,
return true;
}
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, int32_t numOfOutput) {
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SWinKey key = {.ts = ts, .groupId = groupId};
tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
@ -1395,21 +1400,37 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
return true;
}
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock,
SArray* pUpWins, SHashObj* pUpdatedMap) {
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* calStTsCols = (TSKEY*)pCalStTsCol->pData;
SColumnInfoData* pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
TSKEY* calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
STimeWindow win = {0};
if (IS_FINAL_OP(pInfo)) {
win.skey = startTsCols[i];
win.ekey = endTsCols[i];
} else {
win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
}
do {
if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
continue;
}
uint64_t winGpId = pGpDatas[i];
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput);
bool res = doDeleteWindow(pOperator, win.skey, winGpId);
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
if (pUpWins && res) {
taosArrayPush(pUpWins, &winRes);
@ -1511,16 +1532,43 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
return TSDB_CODE_SUCCESS;
}
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SWinKey* pos = taosArrayGet(res, index);
SWinKey* pData = (SWinKey*)pKey;
if (pData->ts == pos->ts) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (pData->ts > pos->ts) {
return 1;
}
return -1;
}
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) {
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins,
SOperatorInfo* pOperator) {
qDebug("===stream===close interval window");
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t delSize = taosArrayGetSize(pDelWins);
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
SWinKey* pWinKey = (SWinKey*)key;
if (delSize > 0) {
int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
if (index >= 0 && 0 == compareWinKey(pWinKey, pDelWins, index)) {
taosArrayRemove(pDelWins, index);
delSize = taosArrayGetSize(pDelWins);
}
}
void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
STimeWindow win = {
.skey = pWinKey->ts,
@ -1624,7 +1672,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
pOperator);
NULL, pOperator);
}
}
@ -1694,7 +1742,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
taosHashCleanup(pInfo->pPullDataMap);
taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes);
blockDataDestroy(pInfo->pUpdateRes);
taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes);
taosMemoryFreeClear(pInfo->pState);
@ -2862,11 +2909,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr
isCloseWindow(&parentWin, &pInfo->twAggSup)) {
continue;
}
int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
int32_t num = 0;
for (int32_t j = 0; j < numOfChildren; j++) {
@ -2876,6 +2919,13 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr
if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
continue;
}
if (num == 0) {
int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
num++;
SResultRow* pChResult = NULL;
setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
@ -3214,25 +3264,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL;
} else {
if (!IS_FINAL_OP(pInfo)) {
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes;
}
}
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
// process the rest of the data
return pInfo->pUpdateRes;
}
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pDelRes;
}
doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->binfo.pRes;
}
}
}
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
@ -3241,8 +3285,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
clearSpecialDataBlock(pInfo->pUpdateRes);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
pOperator->status = OP_RES_TO_RETURN;
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
break;
@ -3252,34 +3294,16 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
ASSERT(pBlock->info.type != STREAM_INVERT);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins, NULL);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL);
rebuildIntervalWindow(pOperator, pSup, pUpWins, pUpdatedMap);
taosArrayDestroy(pUpWins);
continue;
}
removeResults(pUpWins, pUpdatedMap);
copyDataBlock(pInfo->pUpdateRes, pBlock);
pInfo->returnUpdate = true;
taosArrayDestroy(pUpWins);
break;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, delWins, pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL);
doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL);
rebuildIntervalWindow(pOperator, pSup, delWins, pUpdatedMap);
addRetriveWindow(delWins, pInfo);
taosArrayAddAll(pInfo->pDelWins, delWins);
@ -3294,7 +3318,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap);
if (taosArrayGetSize(pUpdated) > 0) {
break;
}
@ -3334,11 +3358,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
minTs = TMIN(minTs, pBlock->info.window.skey);
}
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
if (IS_FINAL_OP(pInfo)) {
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
pInfo->pPullDataMap, pUpdatedMap, pOperator);
pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator);
closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
}
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
@ -3374,13 +3399,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes;
}
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
pInfo->returnUpdate = false;
ASSERT(!IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
// process the rest of the data
return pInfo->pUpdateRes;
}
return NULL;
}
@ -3455,9 +3473,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error;
}
}
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->returnUpdate = false;
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
@ -4276,23 +4291,6 @@ static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {
}
}
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index);
SWinKey* pData = (SWinKey*)pKey;
if (pData->ts == *(int64_t*)pos->key) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (pData->ts > *(int64_t*)pos->key) {
return 1;
}
return -1;
}
static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) {
int32_t size = taosHashGetSize(pStDeleted);
if (size == 0) {
@ -5668,13 +5666,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins,
pUpdatedMap);
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
@ -5706,8 +5700,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
pOperator->status = OP_RES_TO_RETURN;
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pOperator);
pInfo->pDelWins, pOperator);
void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
@ -5717,7 +5712,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
taosHashCleanup(pUpdatedMap);
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
@ -5803,8 +5797,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
pInfo->pUpdateRes = NULL;
pInfo->returnUpdate = false;
pInfo->pPhyNode = NULL; // create new child
pInfo->pPullDataMap = NULL;
pInfo->pPullWins = NULL; // SPullWindowInfo

View File

@ -959,8 +959,8 @@ int32_t udfdInitResidentFuncs() {
char* pSave = tsUdfdResFuncs;
char* token;
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
char func[TSDB_FUNC_NAME_LEN] = {0};
strncpy(func, token, sizeof(func));
char func[TSDB_FUNC_NAME_LEN+1] = {0};
strncpy(func, token, TSDB_FUNC_NAME_LEN);
taosArrayPush(global.residentFuncs, func);
}

View File

@ -217,13 +217,13 @@ alter_db_options(A) ::= alter_db_options(B) alter_db_option(C).
%type alter_db_option { SAlterOption }
%destructor alter_db_option { }
//alter_db_option(A) ::= BUFFER NK_INTEGER(B). { A.type = DB_OPTION_BUFFER; A.val = B; }
alter_db_option(A) ::= BUFFER NK_INTEGER(B). { A.type = DB_OPTION_BUFFER; A.val = B; }
alter_db_option(A) ::= CACHEMODEL NK_STRING(B). { A.type = DB_OPTION_CACHEMODEL; A.val = B; }
alter_db_option(A) ::= CACHESIZE NK_INTEGER(B). { A.type = DB_OPTION_CACHESIZE; A.val = B; }
alter_db_option(A) ::= WAL_FSYNC_PERIOD NK_INTEGER(B). { A.type = DB_OPTION_FSYNC; A.val = B; }
alter_db_option(A) ::= KEEP integer_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
alter_db_option(A) ::= KEEP variable_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
//alter_db_option(A) ::= PAGES NK_INTEGER(B). { A.type = DB_OPTION_PAGES; A.val = B; }
alter_db_option(A) ::= PAGES NK_INTEGER(B). { A.type = DB_OPTION_PAGES; A.val = B; }
//alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.val = B; }
//alter_db_option(A) ::= STRICT NK_STRING(B). { A.type = DB_OPTION_STRICT; A.val = B; }
alter_db_option(A) ::= WAL_LEVEL NK_INTEGER(B). { A.type = DB_OPTION_WAL; A.val = B; }

File diff suppressed because it is too large Load Diff

View File

@ -79,12 +79,12 @@ TEST_F(ParserInitialATest, alterDnode) {
* alter_database_option ...
*
* alter_database_option: {
* BUFFER int_value -- todo: range [3, 16384], default 96, unit MB
* BUFFER int_value -- range [3, 16384], default 96, unit MB
* | CACHEMODEL {'none' | 'last_row' | 'last_value' | 'both'} -- default 'none'
* | CACHESIZE int_value -- range [1, 65536], default 1, unit MB
* | WAL_FSYNC_PERIOD int_value -- rang [0, 180000], default 3000, unit ms
* | KEEP {int_value | duration_value} -- rang [1, 365000], default 3650, unit day
* | PAGES int_value -- todo: rang [64, +oo), default 256, unit page
* | PAGES int_value -- rang [64, INT32_MAX], default 256, unit page
* | REPLICA int_value -- todo: enum 1, 3, default 1, unit replica
* | STRICT {'off' | 'on'} -- todo: default 'off'
* | WAL_LEVEL int_value -- enum 1, 2, default 1
@ -162,7 +162,19 @@ TEST_F(ParserInitialATest, alterDatabase) {
setAlterDbWal(1);
setAlterDbCacheModel(TSDB_CACHE_MODEL_LAST_ROW);
setAlterDbSstTrigger(16);
run("ALTER DATABASE test CACHEMODEL 'last_row' CACHESIZE 32 WAL_FSYNC_PERIOD 200 KEEP 10 WAL_LEVEL 1 STT_TRIGGER 16");
setAlterDbBuffer(16);
setAlterDbPages(128);
run("ALTER DATABASE test BUFFER 16 CACHEMODEL 'last_row' CACHESIZE 32 WAL_FSYNC_PERIOD 200 KEEP 10 PAGES 128 "
"WAL_LEVEL 1 STT_TRIGGER 16");
clearAlterDbReq();
initAlterDb("test");
setAlterDbBuffer(3);
run("ALTER DATABASE test BUFFER 3");
setAlterDbBuffer(64);
run("ALTER DATABASE test BUFFER 64");
setAlterDbBuffer(16384);
run("ALTER DATABASE test BUFFER 16384");
clearAlterDbReq();
initAlterDb("test");
@ -213,6 +225,15 @@ TEST_F(ParserInitialATest, alterDatabase) {
run("ALTER DATABASE test KEEP 14400m,2400h,1500d");
clearAlterDbReq();
initAlterDb("test");
setAlterDbPages(64);
run("ALTER DATABASE test PAGES 64");
setAlterDbPages(1024);
run("ALTER DATABASE test PAGES 1024");
setAlterDbPages(16384);
run("ALTER DATABASE test PAGES 16384");
clearAlterDbReq();
initAlterDb("test");
setAlterDbWal(1);
run("ALTER DATABASE test WAL_LEVEL 1");
@ -224,6 +245,8 @@ TEST_F(ParserInitialATest, alterDatabase) {
TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
useDb("root", "test");
run("ALTER DATABASE test BUFFER 2", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test BUFFER 16385", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test CACHEMODEL 'other'", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test CACHESIZE 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test CACHESIZE 65537", TSDB_CODE_PAR_INVALID_DB_OPTION);
@ -234,6 +257,7 @@ TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
run("ALTER DATABASE test KEEP 365001", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test KEEP 1000000000s", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test KEEP 1w", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test PAGES 63", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 3", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test STT_TRIGGER 0", TSDB_CODE_PAR_INVALID_DB_OPTION);

View File

@ -24,7 +24,7 @@ typedef struct SStateKey {
int64_t opNum;
} SStateKey;
static inline int SStateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStateKey* pWin1 = (SStateKey*)pKey1;
SStateKey* pWin2 = (SStateKey*)pKey2;
@ -67,12 +67,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
}
// open state storage backend
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, SStateKeyCmpr, pState->db, &pState->pStateDb) < 0) {
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb) < 0) {
goto _err;
}
// todo refactor
if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pFillStateDb) < 0) {
if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) {
goto _err;
}

View File

@ -36,6 +36,7 @@ int32_t tdbClose(TDB *pDb);
int32_t tdbBegin(TDB *pDb, TXN *pTxn);
int32_t tdbCommit(TDB *pDb, TXN *pTxn);
int32_t tdbAbort(TDB *pDb, TXN *pTxn);
int32_t tdbAlter(TDB *pDb, int pages);
// TTB
int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb);

View File

@ -97,6 +97,8 @@ int tdbClose(TDB *pDb) {
return 0;
}
int32_t tdbAlter(TDB *pDb, int pages) { return tdbPCacheAlter(pDb->pCache, pages); }
int32_t tdbBegin(TDB *pDb, TXN *pTxn) {
SPager *pPager;
int ret;

View File

@ -61,7 +61,11 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
pCache->szPage = pageSize;
pCache->nPages = cacheSize;
pCache->aPage = (SPage **)&pCache[1];
pCache->aPage = (SPage **)tdbOsCalloc(cacheSize, sizeof(SPage *));
if (pCache->aPage == NULL) {
tdbOsFree(pCache);
return -1;
}
if (tdbPCacheOpenImpl(pCache) < 0) {
tdbOsFree(pCache);
@ -75,11 +79,93 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
int tdbPCacheClose(SPCache *pCache) {
if (pCache) {
tdbPCacheCloseImpl(pCache);
tdbOsFree(pCache->aPage);
tdbOsFree(pCache);
}
return 0;
}
// TODO:
// if (pPage->id >= pCache->nPages) {
// free(pPage);
// pCache->aPage[pPage->id] = NULL;
// } else {
// add to free list
// }
static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) {
if (pCache->nPages == nPage) {
return 0;
} else if (pCache->nPages < nPage) {
SPage **aPage = tdbOsCalloc(nPage, sizeof(SPage *));
if (aPage == NULL) {
return -1;
}
for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) {
if (tdbPageCreate(pCache->szPage, &aPage[iPage], tdbDefaultMalloc, NULL) < 0) {
// TODO: handle error
return -1;
}
// pPage->pgid = 0;
aPage[iPage]->isAnchor = 0;
aPage[iPage]->isLocal = 1;
aPage[iPage]->nRef = 0;
aPage[iPage]->pHashNext = NULL;
aPage[iPage]->pLruNext = NULL;
aPage[iPage]->pLruPrev = NULL;
aPage[iPage]->pDirtyNext = NULL;
// add to local list
aPage[iPage]->id = iPage;
}
// add page to free list
for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) {
aPage[iPage]->pFreeNext = pCache->pFree;
pCache->pFree = aPage[iPage];
pCache->nFree++;
}
for (int32_t iPage = 0; iPage < pCache->nPage; iPage++) {
aPage[iPage] = pCache->aPage[iPage];
}
tdbOsFree(pCache->aPage);
pCache->aPage = aPage;
} else {
for (SPage **ppPage = &pCache->pFree; *ppPage;) {
int32_t iPage = (*ppPage)->id;
if (iPage >= nPage) {
SPage *pPage = *ppPage;
*ppPage = pPage->pFreeNext;
pCache->aPage[pPage->id] = NULL;
tdbPageDestroy(pPage, tdbDefaultFree, NULL);
pCache->nFree--;
} else {
ppPage = &(*ppPage)->pFreeNext;
}
}
}
pCache->nPages = nPage;
return 0;
}
int tdbPCacheAlter(SPCache *pCache, int32_t nPage) {
int ret = 0;
tdbPCacheLock(pCache);
ret = tdbPCacheAlterImpl(pCache, nPage);
tdbPCacheUnlock(pCache);
return ret;
}
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
SPage *pPage;
i32 nRef;
@ -310,8 +396,7 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
pCache->nFree = 0;
pCache->pFree = NULL;
for (int i = 0; i < pCache->nPages; i++) {
ret = tdbPageCreate(pCache->szPage, &pPage, tdbDefaultMalloc, NULL);
if (ret < 0) {
if (tdbPageCreate(pCache->szPage, &pPage, tdbDefaultMalloc, NULL) < 0) {
// TODO: handle error
return -1;
}

View File

@ -216,6 +216,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt);
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
int tdbPCacheClose(SPCache *pCache);
int tdbPCacheAlter(SPCache *pCache, int32_t nPage);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn);
int tdbPCacheGetPageSize(SPCache *pCache);

View File

@ -622,4 +622,56 @@ if $data12 != 2 then
goto loop3
endi
sql create database test4 vgroups 1;
sql use test4;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from t1 where a > 5 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sleep 200
sql select * from streamt4;
# row 0
if $rows != 0 then
print =====rows=$rows
return -1
endi
sql insert into t1 values(1648791213000,6,2,3,1.0);
$loop_count = 0
loop4:
sleep 200
sql select * from streamt4;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop4
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop5:
sleep 200
sql select * from streamt4;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop5
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -587,8 +587,6 @@ sleep 300
sql delete from st where ts = 1648791223000;
sql select * from test.streamt5;
$loop_count = 0
loop15:
@ -608,7 +606,6 @@ if $rows != 4 then
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all

View File

@ -5,15 +5,15 @@ sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql select * from information_schema.ins_databases
sql create database test vgroups 1;
sql select * from information_schema.ins_databases;
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql use test;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
@ -48,8 +48,9 @@ if $loop_count == 10 then
return -1
endi
print step 0
sql select * from streamt
sql select * from streamt;
# row 0
if $data01 != 1 then
@ -97,7 +98,7 @@ endi
print step 1
sql select * from streamt2
sql select * from streamt2;
# row 0
if $data01 != 1 then
@ -239,6 +240,67 @@ if $data32 != 6 then
goto loop0
endi
print step 3.1
sql insert into t1 values(1648791216001,2,2,3,1.1);
$loop_count = 0
loop00:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt2;
# row 0
if $data01 != 1 then
print =====data01=$data01
goto loop00
endi
if $data02 != 1 then
print =====data02=$data02
goto loop00
endi
# row 1
if $data11 != 3 then
print =====data11=$data11
goto loop00
endi
if $data12 != 5 then
print =====data12=$data12
goto loop00
endi
# row 2
if $data21 != 3 then
print =====data21=$data21
goto loop00
endi
if $data22 != 7 then
print =====data22=$data22
goto loop00
endi
# row 3
if $data31 != 1 then
print =====data31=$data31
goto loop00
endi
if $data32 != 3 then
print =====data32=$data32
goto loop00
endi
print step 4
sql create database test1 vgroups 1
@ -513,6 +575,8 @@ endi
$loop_count = 0
print step 7
loop4:
sleep 100

View File

@ -40,7 +40,6 @@ typedef struct StrName {
struct StrName* next;
} StrName;
typedef struct STire {
char type; // see define TIRE_
STireNode root;
@ -57,7 +56,6 @@ typedef struct SMatchNode {
struct SMatchNode* next;
} SMatchNode;
typedef struct SMatch {
SMatchNode* head;
SMatchNode* tail; // append node to tail
@ -65,7 +63,6 @@ typedef struct SMatch {
char pre[MAX_WORD_LEN];
} SMatch;
// ----------- interface -------------
// create prefix search tree, return value call freeTire to free

View File

@ -25,7 +25,6 @@
//
#define UNION_ALL " union all "
// extern function
void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos);
void shellGetPrevCharSize(const char* str, int32_t pos, int32_t* size, int32_t* width);
@ -33,7 +32,6 @@ void shellShowOnScreen(SShellCmd *cmd);
void shellInsertChar(SShellCmd* cmd, char* c, int size);
bool appendAfterSelect(TAOS* con, SShellCmd* cmd, char* p, int32_t len);
typedef struct SAutoPtr {
STire* p;
int ref;
@ -58,7 +56,9 @@ typedef struct {
} SWords;
SWords shellCommands[] = {
{"alter database <db_name> <alter_db_options> <anyword> <alter_db_options> <anyword> <alter_db_options> <anyword> <alter_db_options> <anyword> <alter_db_options> <anyword> ;", 0, 0, NULL},
{"alter database <db_name> <alter_db_options> <anyword> <alter_db_options> <anyword> <alter_db_options> <anyword> "
"<alter_db_options> <anyword> <alter_db_options> <anyword> ;",
0, 0, NULL},
{"alter dnode <dnode_id> balance ", 0, 0, NULL},
{"alter dnode <dnode_id> resetlog;", 0, 0, NULL},
{"alter dnode <dnode_id> debugFlag 141;", 0, 0, NULL},
@ -80,7 +80,10 @@ SWords shellCommands[] = {
{"alter user <user_name> <user_actions> <anyword> ;", 0, 0, NULL},
// 20
{"create table <anyword> using <stb_name> tags(", 0, 0, NULL},
{"create database <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> ;", 0, 0, NULL},
{"create database <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> "
"<anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> "
"<db_options> <anyword> <db_options> <anyword> ;",
0, 0, NULL},
{"create dnode ", 0, 0, NULL},
{"create index ", 0, 0, NULL},
{"create mnode on dnode <dnode_id> ;", 0, 0, NULL},
@ -142,11 +145,11 @@ SWords shellCommands[] = {
// 80
{"show query <anyword> ;", 0, 0, NULL},
{"show qnodes;", 0, 0, NULL},
{"show snodes;", 0, 0, NULL},
{"show stables;", 0, 0, NULL},
{"show stables like ", 0, 0, NULL},
{"show streams;", 0, 0, NULL},
{"show scores;", 0, 0, NULL},
{"show snodes;", 0, 0, NULL},
{"show subscriptions;", 0, 0, NULL},
{"show tables;", 0, 0, NULL},
{"show tables like", 0, 0, NULL},
@ -169,126 +172,57 @@ SWords shellCommands[] = {
{"insert into <tb_name> file ", 0, 0, NULL},
{"trim database <db_name>", 0, 0, NULL},
{"use <db_name>", 0, 0, NULL},
{"quit", 0, 0, NULL}
};
{"quit", 0, 0, NULL}};
char* keywords[] = {
"and ",
"asc ",
"desc ",
"from ",
"fill(",
"limit ",
"where ",
"interval(",
"order by ",
"order by ",
"offset ",
"or ",
"group by ",
"now()",
"session(",
"sliding ",
"slimit ",
"soffset ",
"state_window(",
"today() ",
"union all select ",
"partition by "
};
"and ", "asc ", "desc ", "from ", "fill(", "limit ", "where ",
"interval(", "order by ", "order by ", "offset ", "or ", "group by ", "now()",
"session(", "sliding ", "slimit ", "soffset ", "state_window(", "today() ", "union all select ",
"partition by "};
char* functions[] = {
"count(",
"sum(",
"avg(",
"last(",
"last_row(",
"top(",
"interp(",
"max(",
"min(",
"now()",
"today()",
"percentile(",
"tail(",
"pow(",
"abs(",
"atan(",
"acos(",
"asin(",
"apercentile(",
"bottom(",
"cast(",
"ceil(",
"char_length(",
"cos(",
"concat(",
"concat_ws(",
"csum(",
"diff(",
"derivative(",
"elapsed(",
"first(",
"floor(",
"hyperloglog(",
"histogram(",
"irate(",
"leastsquares(",
"length(",
"log(",
"lower(",
"ltrim(",
"mavg(",
"mode(",
"tan(",
"round(",
"rtrim(",
"sample(",
"sin(",
"spread(",
"substr(",
"statecount(",
"stateduration(",
"stddev(",
"sqrt(",
"timediff(",
"timezone(",
"timetruncate(",
"twa(",
"to_unixtimestamp(",
"unique(",
"upper(",
"count(", "sum(",
"avg(", "last(",
"last_row(", "top(",
"interp(", "max(",
"min(", "now()",
"today()", "percentile(",
"tail(", "pow(",
"abs(", "atan(",
"acos(", "asin(",
"apercentile(", "bottom(",
"cast(", "ceil(",
"char_length(", "cos(",
"concat(", "concat_ws(",
"csum(", "diff(",
"derivative(", "elapsed(",
"first(", "floor(",
"hyperloglog(", "histogram(",
"irate(", "leastsquares(",
"length(", "log(",
"lower(", "ltrim(",
"mavg(", "mode(",
"tan(", "round(",
"rtrim(", "sample(",
"sin(", "spread(",
"substr(", "statecount(",
"stateduration(", "stddev(",
"sqrt(", "timediff(",
"timezone(", "timetruncate(",
"twa(", "to_unixtimestamp(",
"unique(", "upper(",
};
char* tb_actions[] = {
"add column ",
"modify column ",
"drop column ",
"rename column ",
"add tag ",
"modify tag ",
"drop tag ",
"rename tag ",
"set tag ",
"add column ", "modify column ", "drop column ", "rename column ", "add tag ",
"modify tag ", "drop tag ", "rename tag ", "set tag ",
};
char * user_actions[] = {
"pass ",
"enable ",
"sysinfo "
};
char* user_actions[] = {"pass ", "enable ", "sysinfo "};
char * tb_options[] = {
"comment ",
"watermark ",
"max_delay ",
"ttl ",
"rollup(",
"sma("
};
char* tb_options[] = {"comment ", "watermark ", "max_delay ", "ttl ", "rollup(", "sma("};
char * db_options[] = {
"keep ",
char* db_options[] = {"keep ",
"replica ",
"precision ",
"strict ",
@ -309,43 +243,22 @@ char * db_options[] = {
"wal_retention_period ",
"wal_roll_period ",
"wal_retention_size ",
"wal_segment_size "
};
"wal_segment_size "};
char * alter_db_options[] = {
"keep ",
"cachemodel ",
"cachesize ",
"wal_fsync_period ",
"wal_level "
};
char* alter_db_options[] = {"keep ", "cachemodel ", "cachesize ", "wal_fsync_period ", "wal_level "};
char * data_types[] = {
"timestamp",
"int",
"int unsigned",
"varchar(16)",
"float",
"double",
"binary(16)",
"nchar(16)",
"bigint",
"bigint unsigned",
"smallint",
"smallint unsigned",
"tinyint",
"tinyint unsigned",
"bool",
"json"
};
char* data_types[] = {"timestamp", "int",
"int unsigned", "varchar(16)",
"float", "double",
"binary(16)", "nchar(16)",
"bigint", "bigint unsigned",
"smallint", "smallint unsigned",
"tinyint", "tinyint unsigned",
"bool", "json"};
char * key_tags[] = {
"tags("
};
char* key_tags[] = {"tags("};
char * key_select[] = {
"select "
};
char* key_select[] = {"select "};
//
// ------- gobal variant define ---------
@ -356,7 +269,6 @@ int32_t curMatchIndex = -1; // current match shellCommands index
int32_t lastWordBytes = -1; // printShow last word length
bool waitAutoFill = false;
//
// ----------- global var array define -----------
//
@ -380,7 +292,6 @@ bool waitAutoFill = false;
#define WT_VAR_USERACTION 17
#define WT_VAR_KEYSELECT 18
#define WT_VAR_CNT 19
#define WT_FROM_DB_MAX 6 // max get content from db
@ -395,50 +306,23 @@ TdThreadMutex tiresMutex;
// save thread handle obtain var name from db server
TdThread* threads[WT_FROM_DB_CNT];
// obtain var name with sql from server
char varTypes[WT_VAR_CNT][64] = {
"<db_name>",
"<stb_name>",
"<tb_name>",
"<dnode_id>",
"<user_name>",
"<topic_name>",
"<stream_name>",
"<all_table>",
"<function>",
"<keyword>",
"<tb_actions>",
"<db_options>",
"<alter_db_options>",
"<data_types>",
"<key_tags>",
"<anyword>",
"<tb_options>",
"<user_actions>",
"<key_select>"
};
char varSqls[WT_FROM_DB_CNT][64] = {
"show databases;",
"show stables;",
"show tables;",
"show dnodes;",
"show users;",
"show topics;",
"show streams;"
};
char varTypes[WT_VAR_CNT][64] = {"<db_name>", "<stb_name>", "<tb_name>", "<dnode_id>", "<user_name>",
"<topic_name>", "<stream_name>", "<all_table>", "<function>", "<keyword>",
"<tb_actions>", "<db_options>", "<alter_db_options>", "<data_types>", "<key_tags>",
"<anyword>", "<tb_options>", "<user_actions>", "<key_select>"};
char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;",
"show users;", "show topics;", "show streams;"};
// var words current cursor, if user press any one key except tab, cursorVar can be reset to -1
int cursorVar = -1;
bool varMode = false; // enter var names list mode
TAOS* varCon = NULL;
SShellCmd* varCmd = NULL;
SMatch* lastMatch = NULL; // save last match result
int cntDel = 0; // delete byte count after next press tab
// show auto tab introduction
void printfIntroduction() {
printf(" **************************** How To Use TAB Key ********************************\n");
@ -459,7 +343,8 @@ void printfIntroduction() {
void showHelp() {
printf("\nThe following are supported commands for TDengine Command Line:");
printf("\n\
printf(
"\n\
----- A ----- \n\
alter database <db_name> <db_options> \n\
alter dnode <dnode_id> balance \n\
@ -574,7 +459,8 @@ void showHelp() {
printf("\n\n");
// define in getDuration() function
printf("\
printf(
"\
Timestamp expression Format:\n\
b - nanosecond \n\
u - microsecond \n\
@ -600,8 +486,7 @@ void showHelp() {
SWord* atWord(SWords* command, int32_t index) {
SWord* word = command->head;
for (int32_t i = 0; i < index; i++) {
if (word == NULL)
return NULL;
if (word == NULL) return NULL;
word = word->next;
}
@ -612,8 +497,7 @@ SWord * atWord(SWords * command, int32_t index) {
int wordType(const char* p, int32_t len) {
for (int i = 0; i < WT_VAR_CNT; i++) {
if (strncmp(p, varTypes[i], len) == 0)
return i;
if (strncmp(p, varTypes[i], len) == 0) return i;
}
return WT_TEXT;
}
@ -689,14 +573,12 @@ void freeCommand(SWords * command) {
SWord* tmp = word;
word = word->next;
// if malloc need free
if(tmp->free && tmp->word)
taosMemoryFree(tmp->word);
if (tmp->free && tmp->word) taosMemoryFree(tmp->word);
taosMemoryFree(tmp);
}
// if malloc need free
if(word->free && word->word)
taosMemoryFree(word->word);
if (word->free && word->word) taosMemoryFree(word->word);
taosMemoryFree(word);
}
@ -715,7 +597,6 @@ void GenerateVarType(int type, char** p, int count) {
// -------------------- shell auto ----------------
//
// init shell auto funciton , shell start call once
bool shellAutoInit() {
// command
@ -747,9 +628,7 @@ bool shellAutoInit() {
}
// set conn
void shellSetConn(TAOS* conn) {
varCon = conn;
}
void shellSetConn(TAOS* conn) { varCon = conn; }
// exit shell auto funciton, shell exit call once
void shellAutoExit() {
@ -790,8 +669,7 @@ void shellAutoExit() {
// ------------------- auto ptr for tires --------------------------
//
bool setNewAuotPtr(int type, STire* pNew) {
if (pNew == NULL)
return false;
if (pNew == NULL) return false;
taosThreadMutexLock(&tiresMutex);
STire* pOld = tires[type];
@ -846,8 +724,6 @@ void putBackAutoPtr(int type, STire* tire) {
return;
}
//
// ------------------- var Word --------------------------
//
@ -952,8 +828,7 @@ char* matchNextPrefix(STire* tire, char* pre) {
// save to lastMatch
if (match) {
if (lastMatch)
freeMatch(lastMatch);
if (lastMatch) freeMatch(lastMatch);
lastMatch = match;
}
}
@ -1011,8 +886,7 @@ char* tireSearchWord(int type, char* pre) {
if (type > WT_FROM_DB_MAX) {
// NOT FROM DB , tires[type] alwary not null
STire* tire = tires[type];
if (tire == NULL)
return NULL;
if (tire == NULL) return NULL;
return matchNextPrefix(tire, pre);
}
@ -1069,8 +943,7 @@ bool matchVarWord(SWord* word1, SWord* word2) {
str = tireSearchWord(WT_VAR_STABLE, pre);
if (str == NULL) {
str = tireSearchWord(WT_VAR_TABLE, pre);
if(str == NULL)
return false;
if (str == NULL) return false;
}
} else {
// OTHER
@ -1098,7 +971,6 @@ bool matchVarWord(SWord* word1, SWord* word2) {
// ------------------- match words --------------------------
//
// compare command cmd1 come from shellCommands , cmd2 come from user input
int32_t compareCommand(SWords* cmd1, SWords* cmd2) {
SWord* word1 = cmd1->head;
@ -1112,8 +984,7 @@ int32_t compareCommand(SWords * cmd1, SWords * cmd2) {
if (word1->type == WT_TEXT) {
// WT_TEXT match
if (word1->len == word2->len) {
if (strncasecmp(word1->word, word2->word, word1->len) != 0)
return -1;
if (strncasecmp(word1->word, word2->word, word1->len) != 0) return -1;
} else if (word1->len < word2->len) {
return -1;
} else {
@ -1172,8 +1043,7 @@ SWords * matchCommand(SWords * input, bool continueSearch) {
// compare
int32_t index = compareCommand(shellCommand, input);
if (index != -1) {
if (firstMatchIndex == -1)
firstMatchIndex = i;
if (firstMatchIndex == -1) firstMatchIndex = i;
curMatchIndex = i;
return &shellCommands[i];
}
@ -1226,8 +1096,7 @@ void printScreen(TAOS * con, SShellCmd * cmd, SWords * match) {
lastMatchIndex = firstMatchIndex;
lastWordBytes = word->len;
} else {
if (lastWordBytes == -1)
return ;
if (lastWordBytes == -1) return;
deleteCount(cmd, lastWordBytes);
SWord* word = MATCH_WORD(match);
@ -1242,7 +1111,6 @@ void printScreen(TAOS * con, SShellCmd * cmd, SWords * match) {
shellInsertChar(cmd, (char*)str, strLen);
}
// main key press tab , matched return true else false
bool firstMatchCommand(TAOS* con, SShellCmd* cmd) {
// parse command
@ -1319,8 +1187,7 @@ bool nextMatchCommand(TAOS * con, SShellCmd * cmd, SWords * firstMatch) {
match = matchCommand(input, false);
if (match == NULL) {
freeCommand(input);
if (input->source)
taosMemoryFree(input->source);
if (input->source) taosMemoryFree(input->source);
taosMemoryFree(input);
return false;
}
@ -1367,8 +1234,7 @@ bool fillTableName(TAOS * con, SShellCmd * cmd, char* pre) {
char* str = tireSearchWord(WT_VAR_STABLE, pre);
if (str == NULL) {
str = tireSearchWord(WT_VAR_TABLE, pre);
if(str == NULL)
return false;
if (str == NULL) return false;
}
// need insert part string
@ -1543,8 +1409,7 @@ int32_t searchAfterSelect(char* p, int32_t len) {
char* p1 = p + 7;
while (1) {
char* p2 = strstr(p1, "select ");
if(p2 == NULL)
break;
if (p2 == NULL) break;
p1 = p2 + 7;
}
@ -1557,13 +1422,13 @@ int32_t searchAfterSelect(char* p, int32_t len) {
}
char* as_pos_end = strstr(p, " as select ");
if (as_pos_end == NULL)
return -1;
if (as_pos_end == NULL) return -1;
as_pos_end += 11;
// create stream <stream_name> as select
if (strncasecmp(p, "create stream ", 14) == 0) {
return as_pos_end - p;;
return as_pos_end - p;
;
}
// create topic <topic_name> as select
@ -1600,8 +1465,7 @@ bool matchSelectQuery(TAOS * con, SShellCmd * cmd) {
char* sql_cp = strndup(p, len);
int32_t n = searchAfterSelect(sql_cp, len);
taosMemoryFree(sql_cp);
if(n == -1 || n > len)
return false;
if (n == -1 || n > len) return false;
p += n;
len -= n;
@ -1711,8 +1575,7 @@ bool matchOther(TAOS * con, SShellCmd * cmd) {
}
// too small
if(len < 8)
return false;
if (len < 8) return false;
// like 'from ( '
char* sql = strndup(p, len);
@ -1731,8 +1594,7 @@ bool matchOther(TAOS * con, SShellCmd * cmd) {
// find last ' from'
while (from) {
char* p1 = strstr(from + 5, " from");
if (p1 == NULL)
break;
if (p1 == NULL) break;
from = p1;
}
@ -1770,13 +1632,11 @@ bool matchOther(TAOS * con, SShellCmd * cmd) {
// INSERT
taosMemoryFree(sql);
return false;
}
// main key press tab
void pressTabKey(SShellCmd* cmd) {
// check
@ -1793,8 +1653,7 @@ void pressTabKey(SShellCmd * cmd) {
// manual match like create table st( ...
matched = matchCreateTable(varCon, cmd);
if (matched)
return ;
if (matched) return;
// shellCommands match
if (firstMatchIndex == -1) {
@ -1802,19 +1661,16 @@ void pressTabKey(SShellCmd * cmd) {
} else {
matched = nextMatchCommand(varCon, cmd, &shellCommands[firstMatchIndex]);
}
if (matched)
return ;
if (matched) return;
// NOT MATCHED ANYONE
// match other like '\G' ...
matched = matchOther(varCon, cmd);
if (matched)
return ;
if (matched) return;
// manual match like select * from ...
matched = matchSelectQuery(varCon, cmd);
if (matched)
return ;
if (matched) return;
return;
}
@ -2006,19 +1862,16 @@ bool dealDropCommand(char * sql) {
bool del = false;
// del in stable
STire* tire = tires[WT_VAR_STABLE];
if(tire)
del = deleteWord(tire, name);
if (tire) del = deleteWord(tire, name);
// del in table
if (!del) {
tire = tires[WT_VAR_TABLE];
if(tire)
del = deleteWord(tire, name);
if (tire) del = deleteWord(tire, name);
}
} else {
// OTHER TYPE
STire* tire = tires[type];
if(tire)
deleteWord(tire, name);
if (tire) deleteWord(tire, name);
}
taosThreadMutexUnlock(&tiresMutex);

View File

@ -32,8 +32,7 @@ STire* createTire(char type) {
// free tire node
void freeTireNode(STireNode* node) {
if (node == NULL)
return ;
if (node == NULL) return;
// nest free sub node on array d
if (node->d) {
@ -184,8 +183,7 @@ bool deleteFromTree(STire* tire, char* word, int len) {
}
}
if(nodes[m]->d == NULL)
break;
if (nodes[m]->d == NULL) break;
// move to next node
nodes = nodes[m]->d;
}
@ -255,8 +253,7 @@ void enumAllWords(STireNode** nodes, char* prefix, SMatch* match) {
addWordToMatch(match, word);
}
// nested call next layer
if (c->d)
enumAllWords(c->d, word, match);
if (c->d) enumAllWords(c->d, word, match);
}
}
}
@ -310,12 +307,10 @@ void matchPrefixFromTree(STire* tire, char* prefix, SMatch* match) {
}
// prefix is match to end char
if (c->d)
enumAllWords(c->d, prefix, root);
if (c->d) enumAllWords(c->d, prefix, root);
} else {
// move to next node continue match
if(c->d == NULL)
break;
if (c->d == NULL) break;
nodes = c->d;
}
}
@ -348,7 +343,6 @@ SMatch* matchPrefix(STire* tire, char* prefix, SMatch* match) {
return match;
}
// get all items from tires tree
void enumFromList(STire* tire, SMatch* match) {
StrName* item = tire->head;
@ -410,16 +404,13 @@ SMatch* enumAll(STire* tire) {
return match;
}
// free match result
void freeMatchNode(SMatchNode* node) {
// first free next
if (node->next)
freeMatchNode(node->next);
if (node->next) freeMatchNode(node->next);
// second free self
if (node->word)
taosMemoryFree(node->word);
if (node->word) taosMemoryFree(node->word);
taosMemoryFree(node);
}