From c054ed889b8e163d569e0f77a915242e625b094b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 Aug 2022 15:26:34 +0800 Subject: [PATCH] other: merge 3.0 --- include/common/systable.h | 60 ++++++++-------- include/common/tdatablock.h | 1 + include/common/tgrant.h | 8 +-- include/common/tmsg.h | 12 ++-- include/libs/executor/executor.h | 7 ++ include/libs/nodes/cmdnodes.h | 5 +- include/libs/nodes/plannodes.h | 3 + include/libs/nodes/querynodes.h | 1 + include/libs/qcom/query.h | 1 + include/libs/stream/tstream.h | 119 +++++++++++++------------------ include/libs/sync/sync.h | 8 +-- include/os/osEnv.h | 2 + include/os/osFile.h | 1 + include/util/taoserror.h | 1 + tools/shell/src/shellCommand.c | 6 +- tools/shell/src/shellEngine.c | 31 +++++--- tools/shell/src/shellWebsocket.c | 34 +++++---- 17 files changed, 166 insertions(+), 134 deletions(-) diff --git a/include/common/systable.h b/include/common/systable.h index ca097cd8d2..ed2e6a46c3 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -23,38 +23,38 @@ extern "C" { #define TDENGINE_SYSTABLE_H #define TSDB_INFORMATION_SCHEMA_DB "information_schema" -#define TSDB_INS_TABLE_DNODES "dnodes" -#define TSDB_INS_TABLE_MNODES "mnodes" -#define TSDB_INS_TABLE_MODULES "modules" -#define TSDB_INS_TABLE_QNODES "qnodes" -#define TSDB_INS_TABLE_BNODES "bnodes" -#define TSDB_INS_TABLE_SNODES "snodes" -#define TSDB_INS_TABLE_CLUSTER "cluster" -#define TSDB_INS_TABLE_USER_DATABASES "user_databases" -#define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions" -#define TSDB_INS_TABLE_USER_INDEXES "user_indexes" -#define TSDB_INS_TABLE_USER_STABLES "user_stables" -#define TSDB_INS_TABLE_USER_TABLES "user_tables" -#define TSDB_INS_TABLE_USER_TAGS "user_tags" -#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed" -#define TSDB_INS_TABLE_USER_USERS "user_users" -#define TSDB_INS_TABLE_LICENCES "grants" -#define TSDB_INS_TABLE_VGROUPS "vgroups" -#define TSDB_INS_TABLE_VNODES "vnodes" -#define TSDB_INS_TABLE_CONFIGS "configs" -#define TSDB_INS_TABLE_DNODE_VARIABLES "dnode_variables" +#define TSDB_INS_TABLE_DNODES "ins_dnodes" +#define TSDB_INS_TABLE_MNODES "ins_mnodes" +#define TSDB_INS_TABLE_MODULES "ins_modules" +#define TSDB_INS_TABLE_QNODES "ins_qnodes" +#define TSDB_INS_TABLE_BNODES "ins_bnodes" +#define TSDB_INS_TABLE_SNODES "ins_snodes" +#define TSDB_INS_TABLE_CLUSTER "ins_cluster" +#define TSDB_INS_TABLE_DATABASES "ins_databases" +#define TSDB_INS_TABLE_FUNCTIONS "ins_functions" +#define TSDB_INS_TABLE_INDEXES "ins_indexes" +#define TSDB_INS_TABLE_STABLES "ins_stables" +#define TSDB_INS_TABLE_TABLES "ins_tables" +#define TSDB_INS_TABLE_TAGS "ins_tags" +#define TSDB_INS_TABLE_TABLE_DISTRIBUTED "ins_table_distributed" +#define TSDB_INS_TABLE_USERS "ins_users" +#define TSDB_INS_TABLE_LICENCES "ins_grants" +#define TSDB_INS_TABLE_VGROUPS "ins_vgroups" +#define TSDB_INS_TABLE_VNODES "ins_vnodes" +#define TSDB_INS_TABLE_CONFIGS "ins_configs" +#define TSDB_INS_TABLE_DNODE_VARIABLES "ins_dnode_variables" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" -#define TSDB_PERFS_TABLE_SMAS "smas" -#define TSDB_PERFS_TABLE_CONNECTIONS "connections" -#define TSDB_PERFS_TABLE_QUERIES "queries" -#define TSDB_PERFS_TABLE_TOPICS "topics" -#define TSDB_PERFS_TABLE_CONSUMERS "consumers" -#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" -#define TSDB_PERFS_TABLE_OFFSETS "offsets" -#define TSDB_PERFS_TABLE_TRANS "trans" -#define TSDB_PERFS_TABLE_STREAMS "streams" -#define TSDB_PERFS_TABLE_APPS "apps" +#define TSDB_PERFS_TABLE_SMAS "perf_smas" +#define TSDB_PERFS_TABLE_CONNECTIONS "perf_connections" +#define TSDB_PERFS_TABLE_QUERIES "perf_queries" +#define TSDB_PERFS_TABLE_TOPICS "perf_topics" +#define TSDB_PERFS_TABLE_CONSUMERS "perf_consumers" +#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "perf_subscriptions" +#define TSDB_PERFS_TABLE_OFFSETS "perf_offsets" +#define TSDB_PERFS_TABLE_TRANS "perf_trans" +#define TSDB_PERFS_TABLE_STREAMS "perf_streams" +#define TSDB_PERFS_TABLE_APPS "perf_apps" typedef struct SSysDbTableSchema { const char* name; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 22aac46560..7839859e8b 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -184,6 +184,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u int32_t getJsonValueLen(const char* data); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); +int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, diff --git a/include/common/tgrant.h b/include/common/tgrant.h index 6392fcf517..97fb773044 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -49,9 +49,9 @@ int32_t grantCheck(EGrantType grant); #ifndef GRANTS_CFG #define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \ {.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ - {.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ - {.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "storage", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ @@ -59,8 +59,8 @@ int32_t grantCheck(EGrantType grant); {.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ - {.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ - {.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "speed", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ } #define GRANT_CFG_ADD diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7eafc4c3d8..bfb80ec8f8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -227,8 +227,7 @@ typedef struct SSubmitBlk { int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t schemaLen; // schema length, if length is 0, no schema exists - int16_t numOfRows; // total number of rows in current submit block - int16_t padding; // TODO just for padding here + int32_t numOfRows; // total number of rows in current submit block char data[]; } SSubmitBlk; @@ -256,7 +255,7 @@ typedef struct { int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t schemaLen; // schema length, if length is 0, no schema exists - int16_t numOfRows; // total number of rows in current submit block + int32_t numOfRows; // total number of rows in current submit block // head of SSubmitBlk int32_t numOfBlocks; const void* pMsg; @@ -337,8 +336,10 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p } static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) { - taosMemoryFree(pSchemaWrapper->pSchema); - taosMemoryFree(pSchemaWrapper); + if (pSchemaWrapper) { + taosMemoryFree(pSchemaWrapper->pSchema); + taosMemoryFree(pSchemaWrapper); + } } static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { @@ -2223,6 +2224,7 @@ typedef struct SAppClusterSummary { uint64_t insertBytes; // submit to tsdb since launched. uint64_t fetchBytes; + uint64_t numOfQueryReq; uint64_t queryElapsedTime; uint64_t numOfSlowQueries; uint64_t totalRequests; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e15708e357..ab33af6acf 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -76,6 +76,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n */ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); +/** + * @brief Cleanup SSDataBlock for StreamScanInfo + * + * @param tinfo + */ +void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); + /** * Update the table id list, add or remove. * diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 32a53c9f4e..21d3fa92a8 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -253,7 +253,8 @@ typedef struct SShowCreateTableStmt { ENodeType type; char dbName[TSDB_DB_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN]; - void* pCfg; // STableCfg + void* pDbCfg; // SDbCfgInfo + void* pTableCfg; // STableCfg } SShowCreateTableStmt; typedef struct SShowTableDistributedStmt { @@ -282,6 +283,7 @@ typedef struct SCreateIndexStmt { ENodeType type; EIndexType indexType; bool ignoreExists; + char indexDbName[TSDB_DB_NAME_LEN]; char indexName[TSDB_INDEX_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN]; @@ -292,6 +294,7 @@ typedef struct SCreateIndexStmt { typedef struct SDropIndexStmt { ENodeType type; bool ignoreNotExists; + char indexDbName[TSDB_DB_NAME_LEN]; char indexName[TSDB_INDEX_NAME_LEN]; } SDropIndexStmt; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c06b4411e6..82235033e1 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -114,6 +114,7 @@ typedef struct SAggLogicNode { SNodeList* pAggFuncs; bool hasLastRow; bool hasTimeLineFunc; + bool onlyHasKeepOrderFunc; } SAggLogicNode; typedef struct SProjectLogicNode { @@ -555,6 +556,8 @@ typedef struct SQueryPlan { void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext); +const char* dataOrderStr(EDataOrderLevel order); + #ifdef __cplusplus } #endif diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index a452388052..f0a226db0a 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -275,6 +275,7 @@ typedef struct SSelectStmt { bool hasInterpFunc; bool hasLastRowFunc; bool hasTimeLineFunc; + bool onlyHasKeepOrderFunc; bool groupSort; } SSelectStmt; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index b62f822313..34d870397f 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -71,6 +71,7 @@ typedef struct SIndexMeta { typedef struct SExecResult { int32_t code; uint64_t numOfRows; + uint64_t numOfBytes; int32_t msgType; void* res; } SExecResult; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 240415b66b..103ca6a4f0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -46,9 +46,10 @@ enum { }; enum { - TASK_EXEC_STATUS__IDLE = 1, - TASK_EXEC_STATUS__EXECUTING, - TASK_EXEC_STATUS__CLOSING, + TASK_SCHED_STATUS__INACTIVE = 1, + TASK_SCHED_STATUS__WAITING, + TASK_SCHED_STATUS__ACTIVE, + TASK_SCHED_STATUS__FAILED, }; enum { @@ -65,6 +66,25 @@ enum { TASK_OUTPUT_STATUS__BLOCKED, }; +enum { + TASK_TRIGGER_STATUS__INACTIVE = 1, + TASK_TRIGGER_STATUS__ACTIVE, +}; + +enum { + TASK_LEVEL__SOURCE = 1, + TASK_LEVEL__AGG, + TASK_LEVEL__SINK, +}; + +enum { + TASK_OUTPUT__FIXED_DISPATCH = 1, + TASK_OUTPUT__SHUFFLE_DISPATCH, + TASK_OUTPUT__TABLE, + TASK_OUTPUT__SMA, + TASK_OUTPUT__FETCH, +}; + typedef struct { int8_t type; } SStreamQueueItem; @@ -201,41 +221,6 @@ typedef struct { int8_t reserved; } STaskSinkFetch; -enum { - TASK_SOURCE__SCAN = 1, - TASK_SOURCE__PIPE, - TASK_SOURCE__MERGE, -}; - -enum { - TASK_EXEC__NONE = 1, - TASK_EXEC__PIPE, - TASK_EXEC__MERGE, -}; - -enum { - TASK_DISPATCH__NONE = 1, - TASK_DISPATCH__FIXED, - TASK_DISPATCH__SHUFFLE, -}; - -enum { - TASK_SINK__NONE = 1, - TASK_SINK__TABLE, - TASK_SINK__SMA, - TASK_SINK__FETCH, -}; - -enum { - TASK_INPUT_TYPE__SUMBIT_BLOCK = 1, - TASK_INPUT_TYPE__DATA_BLOCK, -}; - -enum { - TASK_TRIGGER_STATUS__IN_ACTIVE = 1, - TASK_TRIGGER_STATUS__ACTIVE, -}; - typedef struct { int32_t nodeId; int32_t childId; @@ -248,28 +233,24 @@ typedef struct { typedef struct SStreamTask { int64_t streamId; int32_t taskId; - int8_t isDataScan; - int8_t execType; - int8_t sinkType; - int8_t dispatchType; - int8_t isStreamDistributed; + int8_t taskLevel; + int8_t outputType; int16_t dispatchMsgType; int8_t taskStatus; - int8_t execStatus; + int8_t schedStatus; // node info int32_t selfChildId; int32_t nodeId; SEpSet epSet; - // used for semi or single task, - // while final task should have processedVer for each child + // used for task source and sink, + // while task agg should have processedVer for each child int64_t recoverSnapVer; int64_t startVer; int64_t checkpointVer; int64_t processedVer; - // int32_t numOfVgroups; // children info SArray* childEpInfo; // SArray @@ -277,19 +258,13 @@ typedef struct SStreamTask { // exec STaskExec exec; - // TODO: unify sink and dispatch - - // local sink - union { - STaskSinkTb tbSink; - STaskSinkSma smaSink; - STaskSinkFetch fetchSink; - }; - - // remote dispatcher + // output union { STaskDispatcherFixedEp fixedEpDispatcher; STaskDispatcherShuffle shuffleDispatcher; + STaskSinkTb tbSink; + STaskSinkSma smaSink; + STaskSinkFetch fetchSink; }; int8_t inputStatus; @@ -303,9 +278,6 @@ typedef struct SStreamTask { int64_t triggerParam; void* timer; - // application storage - // void* ahandle; - // msg handle SMsgCb* pMsgCb; } SStreamTask; @@ -342,7 +314,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem } if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { - atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE); + atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } #if 0 @@ -357,18 +329,15 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { } static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { - if (pTask->sinkType == TASK_SINK__TABLE) { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); - } else if (pTask->sinkType == TASK_SINK__SMA) { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + } else if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); } else { - ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); taosWriteQitem(pTask->outputQueue->queue, pBlock); } return 0; @@ -475,11 +444,10 @@ typedef struct { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); -int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp); int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); @@ -487,6 +455,21 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); +int32_t streamTryExec(SStreamTask* pTask); +int32_t streamSchedExec(SStreamTask* pTask); + +typedef struct SStreamMeta SStreamMeta; + +SStreamMeta* streamMetaOpen(); +void streamMetaClose(SStreamMeta* streamMeta); + +int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); + +int32_t streamMetaBegin(SStreamMeta* pMeta); +int32_t streamMetaCommit(SStreamMeta* pMeta); +int32_t streamMetaRollBack(SStreamMeta* pMeta); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index aec8a1f73e..d96a55c74c 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -28,10 +28,10 @@ extern bool gRaftDetailLog; #define SYNC_RESP_TTL_MS 10000000 -#define SYNC_MAX_BATCH_SIZE 500 -#define SYNC_INDEX_BEGIN 0 -#define SYNC_INDEX_INVALID -1 -#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF +#define SYNC_MAX_BATCH_SIZE 1 +#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_INVALID -1 +#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF typedef enum { SYNC_STRATEGY_NO_SNAPSHOT = 0, diff --git a/include/os/osEnv.h b/include/os/osEnv.h index a3f92a0b29..798bfc197e 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -49,6 +49,8 @@ void osDefaultInit(); void osUpdate(); void osCleanup(); bool osLogSpaceAvailable(); +bool osDataSpaceAvailable(); +bool osTempSpaceAvailable(); void osSetTimezone(const char *timezone); void osSetSystemLocale(const char *inLocale, const char *inCharSet); diff --git a/include/os/osFile.h b/include/os/osFile.h index 2f6a6ba480..21e3d2e6cf 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -54,6 +54,7 @@ typedef struct TdFile *TdFilePtr; #define TD_FILE_EXCL 0x0080 #define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions); +TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions); #define TD_FILE_ACCESS_EXIST_OK 0x1 #define TD_FILE_ACCESS_READ_OK 0x2 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b15be12490..97a664d776 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -608,6 +608,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151) #define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152) #define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153) +#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index 1af6ee4c06..d87e10fd08 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -525,7 +525,11 @@ int32_t shellReadCommand(char *command) { switch (c) { case 'A': // Up arrow hist_counter = (hist_counter + SHELL_MAX_HISTORY_SIZE - 1) % SHELL_MAX_HISTORY_SIZE; - shellResetCommand(&cmd, (pHistory->hist[hist_counter] == NULL) ? "" : pHistory->hist[hist_counter]); + if (pHistory->hist[hist_counter] == NULL) { + hist_counter = (hist_counter + SHELL_MAX_HISTORY_SIZE + 1) % SHELL_MAX_HISTORY_SIZE; + } else { + shellResetCommand(&cmd, pHistory->hist[hist_counter]); + } break; case 'B': // Down arrow if (hist_counter != pHistory->hend) { diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 4526ff2230..f0bda82172 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -22,7 +22,8 @@ static bool shellIsEmptyCommand(const char *cmd); static int32_t shellRunSingleCommand(char *command); -static int32_t shellRunCommand(char *command); +static void shellRecordCommandToHistory(char *command); +static int32_t shellRunCommand(char *command, bool recordHistory); static void shellRunSingleCommandImp(char *command); static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision); static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); @@ -101,11 +102,7 @@ int32_t shellRunSingleCommand(char *command) { return 0; } -int32_t shellRunCommand(char *command) { - if (shellIsEmptyCommand(command)) { - return 0; - } - +void shellRecordCommandToHistory(char *command) { SShellHistory *pHistory = &shell.history; if (pHistory->hstart == pHistory->hend || pHistory->hist[(pHistory->hend + SHELL_MAX_HISTORY_SIZE - 1) % SHELL_MAX_HISTORY_SIZE] == NULL || @@ -120,6 +117,14 @@ int32_t shellRunCommand(char *command) { pHistory->hstart = (pHistory->hstart + 1) % SHELL_MAX_HISTORY_SIZE; } } +} + +int32_t shellRunCommand(char *command, bool recordHistory) { + if (shellIsEmptyCommand(command)) { + return 0; + } + + if (recordHistory) shellRecordCommandToHistory(command); char quote = 0, *cmd = command; for (char c = *command++; c != 0; c = *command++) { @@ -826,11 +831,15 @@ void shellSourceFile(const char *file) { size_t cmd_len = 0; char *line = NULL; char fullname[PATH_MAX] = {0}; + char sourceFileCommand[PATH_MAX + 8] = {0}; if (taosExpandDir(file, fullname, PATH_MAX) != 0) { tstrncpy(fullname, file, PATH_MAX); } + sprintf(sourceFileCommand, "source %s;",fullname); + shellRecordCommandToHistory(sourceFileCommand); + TdFilePtr pFile = taosOpenFile(fullname, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { fprintf(stderr, "failed to open file %s\r\n", fullname); @@ -853,9 +862,13 @@ void shellSourceFile(const char *file) { continue; } + if (line[read_len - 1] == '\r') { + line[read_len - 1] = ' '; + } + memcpy(cmd + cmd_len, line, read_len); printf("%s%s\r\n", shell.info.promptHeader, cmd); - shellRunCommand(cmd); + shellRunCommand(cmd, false); memset(cmd, 0, TSDB_MAX_ALLOWED_SQL_LEN); cmd_len = 0; } @@ -977,7 +990,7 @@ void *shellThreadLoop(void *arg) { } taosResetTerminalMode(); - } while (shellRunCommand(command) == 0); + } while (shellRunCommand(command, true) == 0); taosMemoryFreeClear(command); shellWriteHistory(); @@ -1019,7 +1032,7 @@ int32_t shellExecute() { if (pArgs->commands != NULL) { printf("%s%s\r\n", shell.info.promptHeader, pArgs->commands); char *cmd = strdup(pArgs->commands); - shellRunCommand(cmd); + shellRunCommand(cmd, true); taosMemoryFree(cmd); } diff --git a/tools/shell/src/shellWebsocket.c b/tools/shell/src/shellWebsocket.c index fee2325c34..2dcab04b3f 100644 --- a/tools/shell/src/shellWebsocket.c +++ b/tools/shell/src/shellWebsocket.c @@ -33,10 +33,11 @@ int shell_conn_ws_server(bool first) { return 0; } -static int horizontalPrintWebsocket(WS_RES* wres) { +static int horizontalPrintWebsocket(WS_RES* wres, double* execute_time) { const void* data = NULL; int rows; ws_fetch_block(wres, &data, &rows); + *execute_time += (double)(ws_take_timing(wres)/1E6); if (!rows) { return 0; } @@ -72,10 +73,11 @@ static int horizontalPrintWebsocket(WS_RES* wres) { return numOfRows; } -static int verticalPrintWebsocket(WS_RES* wres) { +static int verticalPrintWebsocket(WS_RES* wres, double* pexecute_time) { int rows = 0; const void* data = NULL; ws_fetch_block(wres, &data, &rows); + *pexecute_time += (double)(ws_take_timing(wres)/1E6); if (!rows) { return 0; } @@ -112,7 +114,7 @@ static int verticalPrintWebsocket(WS_RES* wres) { return numOfRows; } -static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { +static int dumpWebsocketToFile(const char* fname, WS_RES* wres, double* pexecute_time) { char fullname[PATH_MAX] = {0}; if (taosExpandDir(fname, fullname, PATH_MAX) != 0) { tstrncpy(fullname, fname, PATH_MAX); @@ -127,6 +129,7 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { int rows = 0; const void* data = NULL; ws_fetch_block(wres, &data, &rows); + *pexecute_time += (double)(ws_take_timing(wres)/1E6); if (!rows) { taosCloseFile(&pFile); return 0; @@ -162,14 +165,14 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { return numOfRows; } -static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) { +static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical, double* pexecute_time) { int numOfRows = 0; if (fname != NULL) { - numOfRows = dumpWebsocketToFile(fname, wres); + numOfRows = dumpWebsocketToFile(fname, wres, pexecute_time); } else if (vertical) { - numOfRows = verticalPrintWebsocket(wres); + numOfRows = verticalPrintWebsocket(wres, pexecute_time); } else { - numOfRows = horizontalPrintWebsocket(wres); + numOfRows = horizontalPrintWebsocket(wres, pexecute_time); } *error_no = ws_errno(wres); return numOfRows; @@ -225,6 +228,8 @@ void shellRunSingleCommandWebsocketImp(char *command) { return; } + double execute_time = ws_take_timing(res)/1E6; + if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { fprintf(stdout, "Database changed.\r\n\r\n"); fflush(stdout); @@ -236,22 +241,27 @@ void shellRunSingleCommandWebsocketImp(char *command) { if (ws_is_update_query(res)) { numOfRows = ws_affected_rows(res); et = taosGetTimestampUs(); - printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows, - (et - st)/1E6); + double total_time = (et - st)/1E3; + double net_time = total_time - (double)execute_time; + printf("Query Ok, %d of %d row(s) in database\n", numOfRows, numOfRows); + printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time); } else { int error_no = 0; - numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode); + numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode, &execute_time); if (numOfRows < 0) { ws_free_result(res); return; } et = taosGetTimestampUs(); + double total_time = (et - st) / 1E3; + double net_time = total_time - execute_time; if (error_no == 0 && !shell.stop_query) { - printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, - (et - st)/1E6); + printf("Query OK, %d row(s) in set\n", numOfRows); + printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time); } else { printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows, (et - st)/1E6); + printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time); } } printf("\n");