Merge branch 'main' into enh/stream_buffer_param

This commit is contained in:
Liu Jicong 2023-02-13 09:48:02 +08:00
commit 0cd6bd3d86
17 changed files with 315 additions and 173 deletions

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e04f39b GIT_TAG 22627d7
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -10,4 +10,4 @@
| 6 | taosdemo | This is an internal tool for testing Our JDBC-JNI, JDBC-RESTful, RESTful interfaces | | 6 | taosdemo | This is an internal tool for testing Our JDBC-JNI, JDBC-RESTful, RESTful interfaces |
more detail: https://docs.taosdata.com/reference/connector/java/ more detail: https://docs.taosdata.com/connector/java/

View File

@ -111,6 +111,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A) #define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A)
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) //
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) //

View File

@ -25,6 +25,10 @@
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
static tb_uid_t processSuid(tb_uid_t suid, char* db){
return suid + MurmurHash3_32(db, strlen(db));
}
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) { int8_t t) {
char* string = NULL; char* string = NULL;
@ -681,7 +685,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.numOfColumns = req.schemaRow.nCols; pReq.numOfColumns = req.schemaRow.nCols;
pReq.numOfTags = req.schemaTag.nCols; pReq.numOfTags = req.schemaTag.nCols;
pReq.commentLen = -1; pReq.commentLen = -1;
pReq.suid = req.suid; pReq.suid = processSuid(req.suid, pRequest->pDb);
pReq.source = TD_REQ_FROM_TAOX; pReq.source = TD_REQ_FROM_TAOX;
pReq.igExists = true; pReq.igExists = true;
@ -753,7 +757,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
// build drop stable // build drop stable
pReq.igNotExists = true; pReq.igNotExists = true;
pReq.source = TD_REQ_FROM_TAOX; pReq.source = TD_REQ_FROM_TAOX;
pReq.suid = req.suid; pReq.suid = processSuid(req.suid, pRequest->pDb);
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SName tableName = {0}; SName tableName = {0};
@ -871,6 +875,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if (pCreateReq->type == TSDB_CHILD_TABLE) { if (pCreateReq->type == TSDB_CHILD_TABLE) {
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SName sName = {0}; SName sName = {0};
pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName); toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1008,6 +1013,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq; pDropReq = req.pReqs + iReq;
pDropReq->igNotExists = true; pDropReq->igNotExists = true;
pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName = {0}; SName pName = {0};
@ -1922,6 +1928,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SMqTaosxRspObj rspObj = {0}; SMqTaosxRspObj rspObj = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
void* schemaContent = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
@ -2008,27 +2015,49 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SDecoder decoderTmp = {0}; SDecoder decoderTmp = {0};
SVCreateTbReq pCreateReq = {0}; SVCreateTbReq pCreateReq = {0};
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); do{
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
tDecoderClear(&decoderTmp); if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
taosMemoryFreeClear(pCreateReq.comment); code = TSDB_CODE_MSG_DECODE_ERROR;
taosArrayDestroy(pCreateReq.ctb.tagName); break;
goto end; }
}
ASSERT(pCreateReq.type == TSDB_CHILD_TABLE); if (strcmp(tbName, pCreateReq.name) != 0) {
if (strcmp(tbName, pCreateReq.name) == 0) { break;
schemaLen = *lenTmp; }
schemaData = *dataTmp;
pCreateReq.ctb.suid = processSuid(pCreateReq.ctb.suid, pRequest->pDb);
int32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, &pCreateReq, len, code);
if(code != 0) {
code = TSDB_CODE_MSG_ENCODE_ERROR;
break;
}
taosMemoryFree(schemaContent);
schemaContent = taosMemoryMalloc(len);
if(!schemaContent) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, schemaContent, len);
code = tEncodeSVCreateTbReq(&encoder, &pCreateReq);
if (code != 0) {
tEncoderClear(&encoder);
code = TSDB_CODE_MSG_ENCODE_ERROR;
break;
}
schemaLen = len;
schemaData = schemaContent;
strcpy(pName.tname, pCreateReq.ctb.stbName); strcpy(pName.tname, pCreateReq.ctb.stbName);
tDecoderClear(&decoderTmp); tEncoderClear(&encoder);
taosMemoryFreeClear(pCreateReq.comment); }while(0);
taosArrayDestroy(pCreateReq.ctb.tagName);
break;
}
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
taosMemoryFreeClear(pCreateReq.comment); taosMemoryFreeClear(pCreateReq.comment);
taosArrayDestroy(pCreateReq.ctb.tagName); taosArrayDestroy(pCreateReq.ctb.tagName);
if(code != 0) goto end;
if(schemaLen != 0) break;
} }
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
@ -2217,6 +2246,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
destroyRequest(pRequest); destroyRequest(pRequest);
taosHashCleanup(pVgHash); taosHashCleanup(pVgHash);
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
taosMemoryFree(schemaContent);
return code; return code;
} }

View File

@ -99,6 +99,38 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
int32_t numOfTables = p->numOfTables;
if (suid != 0) {
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
} else {
for (int32_t i = 0; i < numOfTables; ++i) {
uint64_t uid = p->pTableList[i].uid;
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
if (p->pSchema != NULL) {
break;
}
tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
}
// all queried tables have been dropped already, return immediately.
if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void** pReader, const char* idstr) { uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL; *pReader = NULL;
@ -117,11 +149,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
p->numOfTables = numOfTables; p->numOfTables = numOfTables;
int32_t code = setTableSchema(p, suid, idstr);
if (code != TSDB_CODE_SUCCESS) {
tsdbCacherowsReaderClose(p);
return code;
}
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
if (p->transferBuf == NULL) { if (p->transferBuf == NULL) {
tsdbCacherowsReaderClose(p); tsdbCacherowsReaderClose(p);

View File

@ -423,19 +423,6 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
return win; return win;
} }
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
int32_t rowLen = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
rowLen += pCond->colList[i].bytes;
}
// make sure the output SSDataBlock size be less than 2MB.
const int32_t TWOMB = 2 * 1024 * 1024;
if ((*capacity) * rowLen > TWOMB) {
(*capacity) = TWOMB / rowLen;
}
}
// init file iterator // init file iterator
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) { static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
size_t numOfFileset = taosArrayGetSize(aDFileSet); size_t numOfFileset = taosArrayGetSize(aDFileSet);
@ -618,9 +605,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
goto _end; goto _end;
} }
// todo refactor.
limitOutputBufferSize(pCond, &pReader->capacity);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg)); pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
@ -1611,9 +1595,9 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64 tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
" - %" PRId64 " %s", " - %" PRId64 ", uid:%"PRIu64", %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pReader->idStr); pBlockScanInfo->uid, pReader->idStr);
pReader->cost.buildmemBlock += elapsedTime; pReader->cost.buildmemBlock += elapsedTime;
return code; return code;
@ -1639,8 +1623,10 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return false; return false;
} }
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) { SVersionRange* pVerRange) {
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1;
while (1) { while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) { if (!hasVal) {
@ -1649,8 +1635,15 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row); TSDBKEY k = TSDBROW_KEY(&row);
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
pVerRange)) { pScanInfo->lastKey = k.ts;
} else {
// the qualifed ts may equal to k.ts, only a greater version one.
// here we need to fallback one step.
if (pScanInfo->lastKey == k.ts) {
pScanInfo->lastKey -= step;
}
return true; return true;
} }
} }
@ -2316,6 +2309,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
w.ekey = pScanInfo->lastKey + step; w.ekey = pScanInfo->lastKey + step;
} }
tsdbDebug("init last block reader, window:%"PRId64"-%"PRId64", uid:%"PRIu64", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
pLBlockReader->pInfo, false, pReader->idStr); pLBlockReader->pInfo, false, pReader->idStr);
@ -2755,18 +2749,6 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc); taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
} }
// reset the last del file index
static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t order) {
void* p = taosHashIterate(pStatus->pTableMap, NULL);
while (p != NULL) {
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
// reset the last del file index
pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order);
p = taosHashIterate(pStatus->pTableMap, p);
}
}
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) { static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
@ -3082,7 +3064,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// this file does not have data files, let's start check the last block file if exists // this file does not have data files, let's start check the last block file if exists
if (pBlockIter->numOfBlocks == 0) { if (pBlockIter->numOfBlocks == 0) {
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin; goto _begin;
} }
} }
@ -3114,7 +3095,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// data blocks in current file are exhausted, let's try the next file now // data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order); resetDataBlockIterator(pBlockIter, pReader->order);
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin; goto _begin;
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
@ -3126,7 +3106,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// this file does not have blocks, let's start check the last block file // this file does not have blocks, let's start check the last block file
if (pBlockIter->numOfBlocks == 0) { if (pBlockIter->numOfBlocks == 0) {
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin; goto _begin;
} }
} }
@ -3840,11 +3819,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pCond->twindows.ekey -= 1; pCond->twindows.ekey -= 1;
} }
int32_t capacity = 0; int32_t capacity = pVnode->config.tsdbCfg.maxRows;
if (pResBlock == NULL) { if (pResBlock != NULL) {
capacity = 4096; blockDataEnsureCapacity(pResBlock, capacity);
} else {
capacity = pResBlock->info.capacity;
} }
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);

View File

@ -215,8 +215,15 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList);
continue;
}
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);

View File

@ -781,6 +781,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
}
} }
SSDataBlock* result = doGroupedTableScan(pOperator); SSDataBlock* result = doGroupedTableScan(pOperator);
@ -884,7 +888,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); // blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -65,7 +65,7 @@ typedef struct SSysTableScanInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
int64_t numOfBlocks; // extract basic running information. int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
SLimitInfo limitInfo;
int32_t tbnameSlotId; int32_t tbnameSlotId;
} SSysTableScanInfo; } SSysTableScanInfo;
@ -133,7 +133,6 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName); static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
static void destroySysScanOperator(void* param); static void destroySysScanOperator(void* param);
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code); static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo);
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse); static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse);
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable, static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
@ -351,7 +350,7 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name, static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode); void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
const char* name, SSDataBlock* pBlock); const char* name, SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) { __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) { if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
@ -556,7 +555,7 @@ void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfR
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false);
doFilterResult(pInfo->pRes, pFilterInfo); doFilter(pInfo->pRes, pFilterInfo, NULL);
blockDataCleanup(dataBlock); blockDataCleanup(dataBlock);
} }
@ -975,7 +974,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
@ -991,7 +990,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
@ -1152,7 +1151,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
@ -1168,7 +1167,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
@ -1199,7 +1198,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database. // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) { if (pInfo->readHandle.mnd != NULL) {
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity); buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
@ -1324,6 +1323,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
getDBNameFromCondition(pInfo->pCondition, dbName); getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
} }
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pBlock = sysTableScanUserTables(pOperator); pBlock = sysTableScanUserTables(pOperator);
@ -1336,30 +1336,37 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pBlock = sysTableScanFromMNode(pOperator, pInfo, name, pTaskInfo); pBlock = sysTableScanFromMNode(pOperator, pInfo, name, pTaskInfo);
} }
return sysTableScanFillTbName(pOperator, pInfo, name, pBlock); sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
}
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
const char* name, SSDataBlock* pBlock) {
if (pBlock != NULL) { if (pBlock != NULL) {
if (pInfo->tbnameSlotId != -1) { bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId); if (limitReached) {
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0}; setOperatorCompleted(pOperator);
memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name));
for (int i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColumnInfoData, i, varTbName, NULL);
}
doFilterResult(pBlock, pOperator->exprSupp.pFilterInfo);
} }
}
if (pBlock && pBlock->info.rows != 0) { return pBlock->info.rows > 0? pBlock:NULL;
return pBlock;
} else { } else {
return NULL; return NULL;
} }
} }
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
if (pInfo->tbnameSlotId != -1) {
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name));
colDataAppendNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
}
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
}
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name, static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
@ -1423,7 +1430,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
// todo log the filter info // todo log the filter info
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
@ -1457,13 +1464,13 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->sysInfo = pScanPhyNode->sysInfo;
pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->pCondition = pScanNode->node.pConditions; pInfo->pCondition = pScanNode->node.pConditions;
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initLimitInfo(pScanPhyNode->scan.node.pLimit, pScanPhyNode->scan.node.pSlimit, &pInfo->limitInfo);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
@ -1556,15 +1563,6 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo) {
if (pFilterInfo == NULL) {
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
}
doFilter(pDataBlock, pFilterInfo, NULL);
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
}
static int32_t sysChkFilter__Comm(SNode* pNode) { static int32_t sysChkFilter__Comm(SNode* pNode) {
// impl // impl
SOperatorNode* pOper = (SOperatorNode*)pNode; SOperatorNode* pOper = (SOperatorNode*)pNode;

View File

@ -44,7 +44,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pMeta->pTasks == NULL) { if (pMeta->pTasks == NULL) {
goto _err; goto _err;
} }

View File

@ -420,7 +420,13 @@ static void transHttpEnvInit() {
uv_loop_init(http->loop); uv_loop_init(http->loop);
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
if (NULL == http->asyncPool) {
taosMemoryFree(http->loop);
taosMemoryFree(http);
http = NULL;
return;
}
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
if (err != 0) { if (err != 0) {
taosMemoryFree(http->loop); taosMemoryFree(http->loop);

View File

@ -18,6 +18,7 @@
#include "taoserror.h" #include "taoserror.h"
#define PROCESS_ITEM 12 #define PROCESS_ITEM 12
#define UUIDLEN37 37
typedef struct { typedef struct {
uint64_t user; uint64_t user;
@ -830,7 +831,8 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
return 0; return 0;
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
uuid_t uuid = {0}; uuid_t uuid = {0};
char buf[37] = {0}; char buf[UUIDLEN37];
memset(buf, 0, UUIDLEN37);
uuid_generate(uuid); uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse_lower(uuid, buf); uuid_unparse_lower(uuid, buf);

View File

@ -88,6 +88,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_ENCODE_ERROR, "Msg encode error")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")

View File

@ -29,6 +29,7 @@ class TDSimClient:
self.testCluster = False self.testCluster = False
self.path = path self.path = path
self.cfgDict = { self.cfgDict = {
"fqdn": "localhost",
"numOfLogLines": "100000000", "numOfLogLines": "100000000",
"locale": "en_US.UTF-8", "locale": "en_US.UTF-8",
"charset": "UTF-8", "charset": "UTF-8",
@ -119,6 +120,7 @@ class TDDnode:
self.asan = False self.asan = False
self.remoteIP = "" self.remoteIP = ""
self.cfgDict = { self.cfgDict = {
"fqdn": "localhost",
"monitor": "0", "monitor": "0",
"maxShellConns": "30000", "maxShellConns": "30000",
"locale": "en_US.UTF-8", "locale": "en_US.UTF-8",

View File

@ -186,4 +186,10 @@ if $data11 != 4 then
return -1 return -1
endi endi
print ===============================================> TS-2613
sql select * from information_schema.ins_databases limit 1 offset 1;
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -194,6 +194,11 @@ class TDTestCase:
tdSql.checkData(0, 2, None) tdSql.checkData(0, 2, None)
tdSql.checkData(1, 1, 1) tdSql.checkData(1, 1, 1)
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}') tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'")
uid1 = tdSql.getData(0, 5)
uid2 = tdSql.getData(1, 5)
tdSql.checkNotEqual(uid1, uid2)
return return
def checkWal1Vgroup(self): def checkWal1Vgroup(self):

View File

@ -32,6 +32,8 @@ void shellShowOnScreen(SShellCmd* cmd);
void shellInsertChar(SShellCmd* cmd, char* c, int size); void shellInsertChar(SShellCmd* cmd, char* c, int size);
void shellInsertStr(SShellCmd* cmd, char* str, int size); void shellInsertStr(SShellCmd* cmd, char* str, int size);
bool appendAfterSelect(TAOS* con, SShellCmd* cmd, char* p, int32_t len); bool appendAfterSelect(TAOS* con, SShellCmd* cmd, char* p, int32_t len);
char* tireSearchWord(int type, char* pre);
bool updateTireValue(int type, bool autoFill) ;
typedef struct SAutoPtr { typedef struct SAutoPtr {
STire* p; STire* p;
@ -60,23 +62,22 @@ SWords shellCommands[] = {
{"alter database <db_name> <alter_db_options> <anyword> <alter_db_options> <anyword> <alter_db_options> <anyword> " {"alter database <db_name> <alter_db_options> <anyword> <alter_db_options> <anyword> <alter_db_options> <anyword> "
"<alter_db_options> <anyword> <alter_db_options> <anyword> ;", "<alter_db_options> <anyword> <alter_db_options> <anyword> ;",
0, 0, NULL}, 0, 0, NULL},
{"alter dnode <dnode_id> balance ", 0, 0, NULL}, {"alter dnode <dnode_id> \"resetlog\";", 0, 0, NULL},
{"alter dnode <dnode_id> resetlog;", 0, 0, NULL}, {"alter dnode <dnode_id> \"debugFlag\" \"141\";", 0, 0, NULL},
{"alter dnode <dnode_id> debugFlag 141;", 0, 0, NULL}, {"alter dnode <dnode_id> \"monitor\" \"0\";", 0, 0, NULL},
{"alter dnode <dnode_id> monitor 1;", 0, 0, NULL}, {"alter dnode <dnode_id> \"monitor\" \"1\";", 0, 0, NULL},
{"alter all dnodes monitor ", 0, 0, NULL}, {"alter all dnodes \"resetlog\";", 0, 0, NULL},
{"alter alldnodes balance ", 0, 0, NULL}, {"alter all dnodes \"debugFlag\" \"141\";", 0, 0, NULL},
{"alter alldnodes resetlog;", 0, 0, NULL}, {"alter all dnodes \"monitor\" \"0\";", 0, 0, NULL},
{"alter alldnodes debugFlag 141;", 0, 0, NULL}, {"alter all dnodes \"monitor\" \"1\";", 0, 0, NULL},
{"alter alldnodes monitor 1;", 0, 0, NULL},
{"alter table <tb_name> <tb_actions> <anyword> ;", 0, 0, NULL}, {"alter table <tb_name> <tb_actions> <anyword> ;", 0, 0, NULL},
{"alter table modify column", 0, 0, NULL}, {"alter table modify column", 0, 0, NULL},
{"alter local resetlog;", 0, 0, NULL}, {"alter local \"resetlog\";", 0, 0, NULL},
{"alter local DebugFlag 143;", 0, 0, NULL}, {"alter local \"DebugFlag\" \"143\";", 0, 0, NULL},
{"alter local cDebugFlag 143;", 0, 0, NULL}, {"alter local \"cDebugFlag\" \"143\";", 0, 0, NULL},
{"alter local uDebugFlag 143;", 0, 0, NULL}, {"alter local \"uDebugFlag\" \"143\";", 0, 0, NULL},
{"alter local rpcDebugFlag 143;", 0, 0, NULL}, {"alter local \"rpcDebugFlag\" \"143\";", 0, 0, NULL},
{"alter local tmrDebugFlag 143;", 0, 0, NULL}, {"alter local \"tmrDebugFlag\" \"143\";", 0, 0, NULL},
{"alter topic", 0, 0, NULL}, {"alter topic", 0, 0, NULL},
{"alter user <user_name> <user_actions> <anyword> ;", 0, 0, NULL}, {"alter user <user_name> <user_actions> <anyword> ;", 0, 0, NULL},
// 20 // 20
@ -108,6 +109,7 @@ SWords shellCommands[] = {
{"drop topic <topic_name> ;", 0, 0, NULL}, {"drop topic <topic_name> ;", 0, 0, NULL},
{"drop stream <stream_name> ;", 0, 0, NULL}, {"drop stream <stream_name> ;", 0, 0, NULL},
{"explain select", 0, 0, NULL}, // 44 append sub sql {"explain select", 0, 0, NULL}, // 44 append sub sql
{"flush database <db_name> ;", 0, 0, NULL},
{"help;", 0, 0, NULL}, {"help;", 0, 0, NULL},
{"grant all on <anyword> to <user_name> ;", 0, 0, NULL}, {"grant all on <anyword> to <user_name> ;", 0, 0, NULL},
{"grant read on <anyword> to <user_name> ;", 0, 0, NULL}, {"grant read on <anyword> to <user_name> ;", 0, 0, NULL},
@ -121,7 +123,6 @@ SWords shellCommands[] = {
{"revoke read on <anyword> from <user_name> ;", 0, 0, NULL}, {"revoke read on <anyword> from <user_name> ;", 0, 0, NULL},
{"revoke write on <anyword> from <user_name> ;", 0, 0, NULL}, {"revoke write on <anyword> from <user_name> ;", 0, 0, NULL},
{"select * from <all_table>", 0, 0, NULL}, {"select * from <all_table>", 0, 0, NULL},
{"select _block_dist() from <all_table> \\G;", 0, 0, NULL},
{"select client_version();", 0, 0, NULL}, {"select client_version();", 0, 0, NULL},
// 60 // 60
{"select current_user();", 0, 0, NULL}, {"select current_user();", 0, 0, NULL},
@ -247,7 +248,7 @@ char* db_options[] = {"keep ",
"wal_retention_size ", "wal_retention_size ",
"wal_segment_size "}; "wal_segment_size "};
char* alter_db_options[] = {"keep ", "cachemodel ", "cachesize ", "wal_fsync_period ", "wal_level "}; char* alter_db_options[] = {"cachemodel ", "replica ", "keep ", "cachesize ", "wal_fsync_period ", "wal_level "};
char* data_types[] = {"timestamp", "int", char* data_types[] = {"timestamp", "int",
"int unsigned", "varchar(16)", "int unsigned", "varchar(16)",
@ -262,6 +263,14 @@ char* key_tags[] = {"tags("};
char* key_select[] = {"select "}; char* key_select[] = {"select "};
char* key_systable[] = {
"ins_dnodes", "ins_mnodes", "ins_modules", "ins_qnodes", "ins_snodes", "ins_cluster",
"ins_databases", "ins_functions", "ins_indexes", "ins_stables", "ins_tables", "ins_tags",
"ins_users", "ins_grants", "ins_vgroups", "ins_configs", "ins_dnode_variables", "ins_topics",
"ins_subscriptions", "ins_streams", "ins_stream_tasks", "ins_vnodes", "ins_user_privileges", "perf_connections",
"perf_queries", "perf_consumers", "perf_trans", "perf_apps"};
// //
// ------- gobal variant define --------- // ------- gobal variant define ---------
// //
@ -293,8 +302,9 @@ bool waitAutoFill = false;
#define WT_VAR_TBOPTION 16 #define WT_VAR_TBOPTION 16
#define WT_VAR_USERACTION 17 #define WT_VAR_USERACTION 17
#define WT_VAR_KEYSELECT 18 #define WT_VAR_KEYSELECT 18
#define WT_VAR_SYSTABLE 19
#define WT_VAR_CNT 19 #define WT_VAR_CNT 20
#define WT_FROM_DB_MAX 6 // max get content from db #define WT_FROM_DB_MAX 6 // max get content from db
#define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1) #define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1)
@ -327,19 +337,19 @@ int cntDel = 0; // delete byte count after next press tab
// show auto tab introduction // show auto tab introduction
void printfIntroduction() { void printfIntroduction() {
printf(" ****************************** Tab Completion **********************************\n"); printf(" ****************************** Tab Completion *************************************\n");
printf(" * The TDengine CLI supports tab completion for a variety of items, *\n"); printf(" * The TDengine CLI supports tab completion for a variety of items, *\n");
printf(" * including database names, table names, function names and keywords. *\n"); printf(" * including database names, table names, function names and keywords. *\n");
printf(" * The full list of shortcut keys is as follows: *\n"); printf(" * The full list of shortcut keys is as follows: *\n");
printf(" * [ TAB ] ...... complete the current word *\n"); printf(" * [ TAB ] ...... complete the current word *\n");
printf(" * ...... if used on a blank line, display all valid commands *\n"); printf(" * ...... if used on a blank line, display all supported commands *\n");
printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n"); printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n");
printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n"); printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n");
printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n"); printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n");
printf(" * [ Ctrl + L ] ...... clear the entire screen *\n"); printf(" * [ Ctrl + L ] ...... clear the entire screen *\n");
printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n"); printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n");
printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n");
printf(" **********************************************************************************\n\n"); printf(" *************************************************************************************\n\n");
} }
void showHelp() { void showHelp() {
@ -348,23 +358,24 @@ void showHelp() {
"\n\ "\n\
----- A ----- \n\ ----- A ----- \n\
alter database <db_name> <db_options> \n\ alter database <db_name> <db_options> \n\
alter dnode <dnode_id> balance \n\ alter dnode <dnode_id> 'resetlog';\n\
alter dnode <dnode_id> resetlog;\n\ alter dnode <dnode_id> 'monitor' '0';\n\
alter all dnodes monitor \n\ alter dnode <dnode_id> 'monitor' \"1\";\n\
alter alldnodes balance \n\ alter dnode <dnode_id> \"debugflag\" \"143\";\n\
alter alldnodes resetlog;\n\ alter all dnodes \"monitor\" \"0\";\n\
alter alldnodes debugFlag \n\ alter all dnodes \"monitor\" \"1\";\n\
alter alldnodes monitor \n\ alter all dnodes \"resetlog\";\n\
alter all dnodes \"debugFlag\" \n\
alter table <tb_name> <tb_actions> ;\n\ alter table <tb_name> <tb_actions> ;\n\
alter table modify column\n\ alter table modify column\n\
alter local resetlog;\n\ alter local \"resetlog\";\n\
alter local DebugFlag 143;\n\ alter local \"DebugFlag\" \"143\";\n\
alter topic\n\ alter topic\n\
alter user <user_name> <user_actions> ...\n\ alter user <user_name> <user_actions> ...\n\
----- C ----- \n\ ----- C ----- \n\
create table <tb_name> using <stb_name> tags ...\n\ create table <tb_name> using <stb_name> tags ...\n\
create database <db_name> <db_options> ...\n\ create database <db_name> <db_options> ...\n\
create dnode ...\n\ create dnode \"fqdn:port\"n\
create index ...\n\ create index ...\n\
create mnode on dnode <dnode_id> ;\n\ create mnode on dnode <dnode_id> ;\n\
create qnode on dnode <dnode_id> ;\n\ create qnode on dnode <dnode_id> ;\n\
@ -387,6 +398,8 @@ void showHelp() {
drop stream <stream_name> ;\n\ drop stream <stream_name> ;\n\
----- E ----- \n\ ----- E ----- \n\
explain select clause ...\n\ explain select clause ...\n\
----- F ----- \n\
flush database <db_name>;\n\
----- H ----- \n\ ----- H ----- \n\
help;\n\ help;\n\
----- I ----- \n\ ----- I ----- \n\
@ -409,7 +422,6 @@ void showHelp() {
revoke write on <priv_level> from <user_name> ;\n\ revoke write on <priv_level> from <user_name> ;\n\
----- S ----- \n\ ----- S ----- \n\
select * from <all_table> where ... \n\ select * from <all_table> where ... \n\
select _block_dist() from <all_table>;\n\
select client_version();\n\ select client_version();\n\
select current_user();\n\ select current_user();\n\
select database();\n\ select database();\n\
@ -619,12 +631,17 @@ bool shellAutoInit() {
GenerateVarType(WT_VAR_TBOPTION, tb_options, sizeof(tb_options) / sizeof(char*)); GenerateVarType(WT_VAR_TBOPTION, tb_options, sizeof(tb_options) / sizeof(char*));
GenerateVarType(WT_VAR_USERACTION, user_actions, sizeof(user_actions) / sizeof(char*)); GenerateVarType(WT_VAR_USERACTION, user_actions, sizeof(user_actions) / sizeof(char*));
GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*)); GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*));
GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*));
return true; return true;
} }
// set conn // set conn
void shellSetConn(TAOS* conn) { varCon = conn; } void shellSetConn(TAOS* conn) {
varCon = conn;
// init database and stable
updateTireValue(WT_VAR_DBNAME, false);
}
// exit shell auto funciton, shell exit call once // exit shell auto funciton, shell exit call once
void shellAutoExit() { void shellAutoExit() {
@ -800,9 +817,42 @@ void* varObtainThread(void* param) {
return NULL; return NULL;
} }
// return true is need update value by async
bool updateTireValue(int type, bool autoFill) {
// TYPE CONTEXT GET FROM DB
taosThreadMutexLock(&tiresMutex);
// check need obtain from server
if (tires[type] == NULL) {
waitAutoFill = autoFill;
// need async obtain var names from db sever
if (threads[type] != NULL) {
if (taosThreadRunning(threads[type])) {
// thread running , need not obtain again, return
taosThreadMutexUnlock(&tiresMutex);
return NULL;
}
// destroy previous thread handle for new create thread handle
taosDestroyThread(threads[type]);
threads[type] = NULL;
}
// create new
void* param = taosMemoryMalloc(sizeof(int));
*((int*)param) = type;
threads[type] = taosCreateThread(varObtainThread, param);
taosThreadMutexUnlock(&tiresMutex);
return true;
}
taosThreadMutexUnlock(&tiresMutex);
return false;
}
// only match next one word from all match words, return valuue must free by caller // only match next one word from all match words, return valuue must free by caller
char* matchNextPrefix(STire* tire, char* pre) { char* matchNextPrefix(STire* tire, char* pre) {
SMatch* match = NULL; SMatch* match = NULL;
if(tire == NULL) return NULL;
// re-use last result // re-use last result
if (lastMatch) { if (lastMatch) {
@ -888,32 +938,9 @@ char* tireSearchWord(int type, char* pre) {
return matchNextPrefix(tire, pre); return matchNextPrefix(tire, pre);
} }
// TYPE CONTEXT GET FROM DB if(updateTireValue(type, true)) {
taosThreadMutexLock(&tiresMutex);
// check need obtain from server
if (tires[type] == NULL) {
waitAutoFill = true;
// need async obtain var names from db sever
if (threads[type] != NULL) {
if (taosThreadRunning(threads[type])) {
// thread running , need not obtain again, return
taosThreadMutexUnlock(&tiresMutex);
return NULL;
}
// destroy previous thread handle for new create thread handle
taosDestroyThread(threads[type]);
threads[type] = NULL;
}
// create new
void* param = taosMemoryMalloc(sizeof(int));
*((int*)param) = type;
threads[type] = taosCreateThread(varObtainThread, param);
taosThreadMutexUnlock(&tiresMutex);
return NULL; return NULL;
} }
taosThreadMutexUnlock(&tiresMutex);
// can obtain var names from local // can obtain var names from local
STire* tire = getAutoPtr(type); STire* tire = getAutoPtr(type);
@ -1116,6 +1143,7 @@ void printScreen(TAOS* con, SShellCmd* cmd, SWords* match) {
// main key press tab , matched return true else false // main key press tab , matched return true else false
bool firstMatchCommand(TAOS* con, SShellCmd* cmd) { bool firstMatchCommand(TAOS* con, SShellCmd* cmd) {
if(con == NULL || cmd == NULL) return false;
// parse command // parse command
SWords* input = (SWords*)taosMemoryMalloc(sizeof(SWords)); SWords* input = (SWords*)taosMemoryMalloc(sizeof(SWords));
memset(input, 0, sizeof(SWords)); memset(input, 0, sizeof(SWords));
@ -1660,6 +1688,41 @@ bool matchOther(TAOS* con, SShellCmd* cmd) {
return false; return false;
} }
// last match if nothing matched
bool matchEnd(TAOS* con, SShellCmd* cmd) {
// str dump
bool ret = false;
char* ps = strndup(cmd->command, cmd->commandSize);
char* last = lastWord(ps);
char* elast = strrchr(last, '.'); // find end last
if(elast) {
last = elast + 1;
}
// less one char can match
if(strlen(last) == 0 ) {
goto _return;
}
// match database
if(elast == NULL) {
// dot need not completed with dbname
if (fillWithType(con, cmd, last, WT_VAR_DBNAME)) {
ret = true;
goto _return;
}
}
if (fillWithType(con, cmd, last, WT_VAR_SYSTABLE)) {
ret = true;
goto _return;
}
_return:
taosMemoryFree(ps);
return ret;
}
// main key press tab // main key press tab
void pressTabKey(SShellCmd* cmd) { void pressTabKey(SShellCmd* cmd) {
// check // check
@ -1695,6 +1758,9 @@ void pressTabKey(SShellCmd* cmd) {
matched = matchSelectQuery(varCon, cmd); matched = matchSelectQuery(varCon, cmd);
if (matched) return; if (matched) return;
// match end
matched = matchEnd(varCon, cmd);
return; return;
} }
@ -1911,6 +1977,7 @@ void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb) {
if (dealUseDB(sql)) { if (dealUseDB(sql)) {
// change to new db // change to new db
updateTireValue(WT_VAR_STABLE, false);
return; return;
} }