Merge branch '3.0' of github.com:taosdata/TDengine into szhou/feature/project-elimation
This commit is contained in:
commit
2c5fc8b537
|
@ -156,104 +156,105 @@
|
|||
#define TK_SNODES 138
|
||||
#define TK_CLUSTER 139
|
||||
#define TK_TRANSACTIONS 140
|
||||
#define TK_LIKE 141
|
||||
#define TK_INDEX 142
|
||||
#define TK_FULLTEXT 143
|
||||
#define TK_FUNCTION 144
|
||||
#define TK_INTERVAL 145
|
||||
#define TK_TOPIC 146
|
||||
#define TK_AS 147
|
||||
#define TK_CONSUMER 148
|
||||
#define TK_GROUP 149
|
||||
#define TK_DESC 150
|
||||
#define TK_DESCRIBE 151
|
||||
#define TK_RESET 152
|
||||
#define TK_QUERY 153
|
||||
#define TK_CACHE 154
|
||||
#define TK_EXPLAIN 155
|
||||
#define TK_ANALYZE 156
|
||||
#define TK_VERBOSE 157
|
||||
#define TK_NK_BOOL 158
|
||||
#define TK_RATIO 159
|
||||
#define TK_NK_FLOAT 160
|
||||
#define TK_COMPACT 161
|
||||
#define TK_VNODES 162
|
||||
#define TK_IN 163
|
||||
#define TK_OUTPUTTYPE 164
|
||||
#define TK_AGGREGATE 165
|
||||
#define TK_BUFSIZE 166
|
||||
#define TK_STREAM 167
|
||||
#define TK_INTO 168
|
||||
#define TK_TRIGGER 169
|
||||
#define TK_AT_ONCE 170
|
||||
#define TK_WINDOW_CLOSE 171
|
||||
#define TK_KILL 172
|
||||
#define TK_CONNECTION 173
|
||||
#define TK_TRANSACTION 174
|
||||
#define TK_BALANCE 175
|
||||
#define TK_VGROUP 176
|
||||
#define TK_MERGE 177
|
||||
#define TK_REDISTRIBUTE 178
|
||||
#define TK_SPLIT 179
|
||||
#define TK_SYNCDB 180
|
||||
#define TK_DELETE 181
|
||||
#define TK_NULL 182
|
||||
#define TK_NK_QUESTION 183
|
||||
#define TK_NK_ARROW 184
|
||||
#define TK_ROWTS 185
|
||||
#define TK_TBNAME 186
|
||||
#define TK_QSTARTTS 187
|
||||
#define TK_QENDTS 188
|
||||
#define TK_WSTARTTS 189
|
||||
#define TK_WENDTS 190
|
||||
#define TK_WDURATION 191
|
||||
#define TK_CAST 192
|
||||
#define TK_NOW 193
|
||||
#define TK_TODAY 194
|
||||
#define TK_TIMEZONE 195
|
||||
#define TK_COUNT 196
|
||||
#define TK_LAST_ROW 197
|
||||
#define TK_BETWEEN 198
|
||||
#define TK_IS 199
|
||||
#define TK_NK_LT 200
|
||||
#define TK_NK_GT 201
|
||||
#define TK_NK_LE 202
|
||||
#define TK_NK_GE 203
|
||||
#define TK_NK_NE 204
|
||||
#define TK_MATCH 205
|
||||
#define TK_NMATCH 206
|
||||
#define TK_CONTAINS 207
|
||||
#define TK_JOIN 208
|
||||
#define TK_INNER 209
|
||||
#define TK_SELECT 210
|
||||
#define TK_DISTINCT 211
|
||||
#define TK_WHERE 212
|
||||
#define TK_PARTITION 213
|
||||
#define TK_BY 214
|
||||
#define TK_SESSION 215
|
||||
#define TK_STATE_WINDOW 216
|
||||
#define TK_SLIDING 217
|
||||
#define TK_FILL 218
|
||||
#define TK_VALUE 219
|
||||
#define TK_NONE 220
|
||||
#define TK_PREV 221
|
||||
#define TK_LINEAR 222
|
||||
#define TK_NEXT 223
|
||||
#define TK_HAVING 224
|
||||
#define TK_ORDER 225
|
||||
#define TK_SLIMIT 226
|
||||
#define TK_SOFFSET 227
|
||||
#define TK_LIMIT 228
|
||||
#define TK_OFFSET 229
|
||||
#define TK_ASC 230
|
||||
#define TK_NULLS 231
|
||||
#define TK_ID 232
|
||||
#define TK_NK_BITNOT 233
|
||||
#define TK_INSERT 234
|
||||
#define TK_VALUES 235
|
||||
#define TK_IMPORT 236
|
||||
#define TK_NK_SEMI 237
|
||||
#define TK_FILE 238
|
||||
#define TK_DISTRIBUTED 141
|
||||
#define TK_LIKE 142
|
||||
#define TK_INDEX 143
|
||||
#define TK_FULLTEXT 144
|
||||
#define TK_FUNCTION 145
|
||||
#define TK_INTERVAL 146
|
||||
#define TK_TOPIC 147
|
||||
#define TK_AS 148
|
||||
#define TK_CONSUMER 149
|
||||
#define TK_GROUP 150
|
||||
#define TK_DESC 151
|
||||
#define TK_DESCRIBE 152
|
||||
#define TK_RESET 153
|
||||
#define TK_QUERY 154
|
||||
#define TK_CACHE 155
|
||||
#define TK_EXPLAIN 156
|
||||
#define TK_ANALYZE 157
|
||||
#define TK_VERBOSE 158
|
||||
#define TK_NK_BOOL 159
|
||||
#define TK_RATIO 160
|
||||
#define TK_NK_FLOAT 161
|
||||
#define TK_COMPACT 162
|
||||
#define TK_VNODES 163
|
||||
#define TK_IN 164
|
||||
#define TK_OUTPUTTYPE 165
|
||||
#define TK_AGGREGATE 166
|
||||
#define TK_BUFSIZE 167
|
||||
#define TK_STREAM 168
|
||||
#define TK_INTO 169
|
||||
#define TK_TRIGGER 170
|
||||
#define TK_AT_ONCE 171
|
||||
#define TK_WINDOW_CLOSE 172
|
||||
#define TK_KILL 173
|
||||
#define TK_CONNECTION 174
|
||||
#define TK_TRANSACTION 175
|
||||
#define TK_BALANCE 176
|
||||
#define TK_VGROUP 177
|
||||
#define TK_MERGE 178
|
||||
#define TK_REDISTRIBUTE 179
|
||||
#define TK_SPLIT 180
|
||||
#define TK_SYNCDB 181
|
||||
#define TK_DELETE 182
|
||||
#define TK_NULL 183
|
||||
#define TK_NK_QUESTION 184
|
||||
#define TK_NK_ARROW 185
|
||||
#define TK_ROWTS 186
|
||||
#define TK_TBNAME 187
|
||||
#define TK_QSTARTTS 188
|
||||
#define TK_QENDTS 189
|
||||
#define TK_WSTARTTS 190
|
||||
#define TK_WENDTS 191
|
||||
#define TK_WDURATION 192
|
||||
#define TK_CAST 193
|
||||
#define TK_NOW 194
|
||||
#define TK_TODAY 195
|
||||
#define TK_TIMEZONE 196
|
||||
#define TK_COUNT 197
|
||||
#define TK_LAST_ROW 198
|
||||
#define TK_BETWEEN 199
|
||||
#define TK_IS 200
|
||||
#define TK_NK_LT 201
|
||||
#define TK_NK_GT 202
|
||||
#define TK_NK_LE 203
|
||||
#define TK_NK_GE 204
|
||||
#define TK_NK_NE 205
|
||||
#define TK_MATCH 206
|
||||
#define TK_NMATCH 207
|
||||
#define TK_CONTAINS 208
|
||||
#define TK_JOIN 209
|
||||
#define TK_INNER 210
|
||||
#define TK_SELECT 211
|
||||
#define TK_DISTINCT 212
|
||||
#define TK_WHERE 213
|
||||
#define TK_PARTITION 214
|
||||
#define TK_BY 215
|
||||
#define TK_SESSION 216
|
||||
#define TK_STATE_WINDOW 217
|
||||
#define TK_SLIDING 218
|
||||
#define TK_FILL 219
|
||||
#define TK_VALUE 220
|
||||
#define TK_NONE 221
|
||||
#define TK_PREV 222
|
||||
#define TK_LINEAR 223
|
||||
#define TK_NEXT 224
|
||||
#define TK_HAVING 225
|
||||
#define TK_ORDER 226
|
||||
#define TK_SLIMIT 227
|
||||
#define TK_SOFFSET 228
|
||||
#define TK_LIMIT 229
|
||||
#define TK_OFFSET 230
|
||||
#define TK_ASC 231
|
||||
#define TK_NULLS 232
|
||||
#define TK_ID 233
|
||||
#define TK_NK_BITNOT 234
|
||||
#define TK_INSERT 235
|
||||
#define TK_VALUES 236
|
||||
#define TK_IMPORT 237
|
||||
#define TK_NK_SEMI 238
|
||||
#define TK_FILE 239
|
||||
|
||||
#define TK_NK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
|
|
@ -122,6 +122,7 @@ typedef enum EFunctionType {
|
|||
// internal function
|
||||
FUNCTION_TYPE_SELECT_VALUE,
|
||||
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
|
||||
FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function
|
||||
|
||||
// distributed splitting functions
|
||||
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
|
||||
|
|
|
@ -205,7 +205,8 @@ typedef struct SAlterDnodeStmt {
|
|||
typedef struct SShowStmt {
|
||||
ENodeType type;
|
||||
SNode* pDbName; // SValueNode
|
||||
SNode* pTbNamePattern; // SValueNode
|
||||
SNode* pTbName; // SValueNode
|
||||
EOperatorType tableCondType;
|
||||
} SShowStmt;
|
||||
|
||||
typedef struct SShowCreateDatabaseStmt {
|
||||
|
@ -221,6 +222,12 @@ typedef struct SShowCreateTableStmt {
|
|||
STableMeta* pMeta;
|
||||
} SShowCreateTableStmt;
|
||||
|
||||
typedef struct SShowTableDistributedStmt {
|
||||
ENodeType type;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
} SShowTableDistributedStmt;
|
||||
|
||||
typedef enum EIndexType { INDEX_TYPE_SMA = 1, INDEX_TYPE_FULLTEXT } EIndexType;
|
||||
|
||||
typedef struct SIndexOptions {
|
||||
|
|
|
@ -185,6 +185,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
|
||||
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
|
||||
QUERY_NODE_SHOW_TRANSACTIONS_STMT,
|
||||
QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT,
|
||||
QUERY_NODE_KILL_CONNECTION_STMT,
|
||||
QUERY_NODE_KILL_QUERY_STMT,
|
||||
QUERY_NODE_KILL_TRANSACTION_STMT,
|
||||
|
@ -214,6 +215,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
|
||||
|
|
|
@ -39,7 +39,8 @@ typedef enum EScanType {
|
|||
SCAN_TYPE_TABLE,
|
||||
SCAN_TYPE_SYSTEM_TABLE,
|
||||
SCAN_TYPE_STREAM,
|
||||
SCAN_TYPE_TABLE_MERGE
|
||||
SCAN_TYPE_TABLE_MERGE,
|
||||
SCAN_TYPE_BLOCK_INFO
|
||||
} EScanType;
|
||||
|
||||
typedef struct SScanLogicNode {
|
||||
|
@ -247,6 +248,7 @@ typedef struct SScanPhysiNode {
|
|||
} SScanPhysiNode;
|
||||
|
||||
typedef SScanPhysiNode STagScanPhysiNode;
|
||||
typedef SScanPhysiNode SBlockDistScanPhysiNode;
|
||||
|
||||
typedef struct SSystemTableScanPhysiNode {
|
||||
SScanPhysiNode scan;
|
||||
|
|
|
@ -23,6 +23,7 @@ extern "C" {
|
|||
#include <stdint.h>
|
||||
#include "taosdef.h"
|
||||
#include "tmsg.h"
|
||||
#include "ttrace.h"
|
||||
|
||||
#define TAOS_CONN_SERVER 0
|
||||
#define TAOS_CONN_CLIENT 1
|
||||
|
@ -45,6 +46,8 @@ typedef struct SRpcHandleInfo {
|
|||
int64_t refId; // refid, used by server
|
||||
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
|
||||
int32_t persistHandle; // persist handle or not
|
||||
STraceId traceId;
|
||||
// int64_t traceId;
|
||||
|
||||
// app info
|
||||
void *ahandle; // app handle set by client
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#ifndef _TD_TRACE_H_
|
||||
#define _TD_TRACE_H_
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#pragma(push, 1)
|
||||
|
||||
typedef struct STraceId {
|
||||
int64_t rootId;
|
||||
int64_t msgId;
|
||||
} STraceId;
|
||||
|
||||
#pragma(pop)
|
||||
|
||||
#define TRACE_SET_ROOTID(traceId, root) \
|
||||
do { \
|
||||
(traceId)->rootId = root; \
|
||||
} while (0);
|
||||
|
||||
#define TRACE_GET_ROOTID(traceId) (traceId)->rootId
|
||||
|
||||
#define TRACE_SET_MSGID(traceId, mId) \
|
||||
do { \
|
||||
(traceId)->msgId = mId; \
|
||||
} while (0)
|
||||
|
||||
#define TRACE_GET_MSGID(traceId) (traceId)->msgId
|
||||
|
||||
#define TRACE_TO_STR(traceId, buf) \
|
||||
do { \
|
||||
sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", traceId->rootId, traceId->msgId); \
|
||||
} while (0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -162,6 +162,7 @@ typedef struct SReqResultInfo {
|
|||
int32_t precision;
|
||||
bool convertUcs4;
|
||||
int32_t payloadLen;
|
||||
char* convertJson;
|
||||
} SReqResultInfo;
|
||||
|
||||
typedef struct SRequestSendRecvBody {
|
||||
|
@ -242,6 +243,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
|
|||
taosMemoryFreeClear(msg->resInfo.pCol);
|
||||
taosMemoryFreeClear(msg->resInfo.length);
|
||||
taosMemoryFreeClear(msg->resInfo.convertBuf);
|
||||
taosMemoryFreeClear(msg->resInfo.convertJson);
|
||||
}
|
||||
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4, false);
|
||||
return &msg->resInfo;
|
||||
|
|
|
@ -212,6 +212,7 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
|
|||
taosMemoryFreeClear(pResInfo->pCol);
|
||||
taosMemoryFreeClear(pResInfo->fields);
|
||||
taosMemoryFreeClear(pResInfo->userFields);
|
||||
taosMemoryFreeClear(pResInfo->convertJson);
|
||||
|
||||
if (pResInfo->convertBuf != NULL) {
|
||||
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
|
||||
|
|
|
@ -1341,27 +1341,126 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
|
|||
|
||||
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
|
||||
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
|
||||
} else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
|
||||
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
pResultInfo->convertBuf[i] = p;
|
||||
int32_t len = 0;
|
||||
SResultColumn* pCol = &pResultInfo->pCol[i];
|
||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
||||
if (pCol->offset[j] != -1) {
|
||||
char* pStart = pCol->offset[j] + pCol->pData;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t jsonInnerType = *pStart;
|
||||
char* jsonInnerData = pStart + CHAR_BYTES;
|
||||
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows){
|
||||
char* p = (char*)pResultInfo->pData;
|
||||
|
||||
int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
|
||||
int32_t* colLength = (int32_t*)(p + len);
|
||||
len += sizeof(int32_t) * numOfCols;
|
||||
|
||||
char* pStart = p + len;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t colLen = htonl(colLength[i]);
|
||||
|
||||
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t* offset = (int32_t*)pStart;
|
||||
int32_t lenTmp = numOfRows * sizeof(int32_t);
|
||||
len += lenTmp;
|
||||
pStart += lenTmp;
|
||||
|
||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
||||
if (offset[j] == -1) {
|
||||
continue;
|
||||
}
|
||||
char* data = offset[j] + pStart;
|
||||
|
||||
int32_t jsonInnerType = *data;
|
||||
char* jsonInnerData = data + CHAR_BYTES;
|
||||
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
|
||||
len += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
|
||||
} else if (jsonInnerType & TD_TAG_JSON) {
|
||||
len += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
|
||||
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
|
||||
len += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
|
||||
} else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
|
||||
len += (VARSTR_HEADER_SIZE + 32);
|
||||
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
|
||||
len += (VARSTR_HEADER_SIZE + 5);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
}
|
||||
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
int32_t lenTmp = numOfRows * sizeof(int32_t);
|
||||
len += (lenTmp + colLen);
|
||||
pStart += lenTmp;
|
||||
} else {
|
||||
int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
|
||||
len += (lenTmp + colLen);
|
||||
pStart += lenTmp;
|
||||
}
|
||||
pStart += colLen;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
|
||||
bool needConvert = false;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
|
||||
needConvert = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(!needConvert) return TSDB_CODE_SUCCESS;
|
||||
|
||||
char* p = (char*)pResultInfo->pData;
|
||||
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
|
||||
|
||||
pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
|
||||
if(pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
char* p1 = pResultInfo->convertJson;
|
||||
|
||||
int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
|
||||
memcpy(p1, p, len);
|
||||
|
||||
p += len;
|
||||
p1 += len;
|
||||
|
||||
len = sizeof(int32_t) * numOfCols;
|
||||
int32_t* colLength = (int32_t*)p;
|
||||
int32_t* colLength1 = (int32_t*)p1;
|
||||
memcpy(p1, p, len);
|
||||
p += len;
|
||||
p1 += len;
|
||||
|
||||
char* pStart = p;
|
||||
char* pStart1 = p1;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t colLen = htonl(colLength[i]);
|
||||
int32_t colLen1 = htonl(colLength1[i]);
|
||||
ASSERT(colLen < dataLen);
|
||||
|
||||
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t* offset = (int32_t*)pStart;
|
||||
int32_t* offset1 = (int32_t*)pStart1;
|
||||
len = numOfRows * sizeof(int32_t);
|
||||
memcpy(pStart1, pStart, len);
|
||||
pStart += len;
|
||||
pStart1 += len;
|
||||
|
||||
len = 0;
|
||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
||||
if (offset[j] == -1) {
|
||||
continue;
|
||||
}
|
||||
char* data = offset[j] + pStart;
|
||||
|
||||
int32_t jsonInnerType = *data;
|
||||
char* jsonInnerData = data + CHAR_BYTES;
|
||||
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
|
||||
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
|
||||
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
|
||||
varDataSetLen(dst, strlen(varDataVal(dst)));
|
||||
} else if (jsonInnerType == TD_TAG_JSON) {
|
||||
char* jsonString = parseTagDatatoJson(pStart);
|
||||
} else if (jsonInnerType & TD_TAG_JSON) {
|
||||
char* jsonString = parseTagDatatoJson(data);
|
||||
STR_TO_VARSTR(dst, jsonString);
|
||||
taosMemoryFree(jsonString);
|
||||
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
|
||||
|
@ -1385,26 +1484,31 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (len + varDataTLen(dst) > colLength[i]) {
|
||||
p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pResultInfo->convertBuf[i] = p;
|
||||
}
|
||||
p = pResultInfo->convertBuf[i] + len;
|
||||
memcpy(p, dst, varDataTLen(dst));
|
||||
pCol->offset[j] = len;
|
||||
offset1[j]= len;
|
||||
memcpy(pStart1 + len, dst, varDataTLen(dst));
|
||||
len += varDataTLen(dst);
|
||||
}
|
||||
colLen1 = len;
|
||||
colLength1[i] = htonl(len);
|
||||
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
len = numOfRows * sizeof(int32_t);
|
||||
memcpy(pStart1, pStart, len);
|
||||
pStart += len;
|
||||
pStart1 += len;
|
||||
memcpy(pStart1, pStart, colLen);
|
||||
} else {
|
||||
len = BitmapLen(pResultInfo->numOfRows);
|
||||
memcpy(pStart1, pStart, len);
|
||||
pStart += len;
|
||||
pStart1 += len;
|
||||
memcpy(pStart1, pStart, colLen);
|
||||
|
||||
}
|
||||
pStart += colLen;
|
||||
pStart1 += colLen1;
|
||||
}
|
||||
|
||||
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
|
||||
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
|
||||
}
|
||||
}
|
||||
|
||||
pResultInfo->pData = pResultInfo->convertJson;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1419,6 +1523,10 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
code = doConvertJson(pResultInfo, numOfCols, numOfRows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
char* p = (char*)pResultInfo->pData;
|
||||
|
||||
|
@ -1462,7 +1570,6 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
|||
pStart += colLength[i];
|
||||
}
|
||||
|
||||
// convert UCS4-LE encoded character to native multi-bytes character in current data block.
|
||||
if(convertUcs4){
|
||||
code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
|
||||
}
|
||||
|
|
|
@ -101,7 +101,8 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
|
|||
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SDnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
int32_t code = -1;
|
||||
dTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
STraceId * trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_DND_CONFIG_DNODE:
|
||||
|
|
|
@ -48,7 +48,8 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
|||
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
int32_t code = -1;
|
||||
dTrace("msg:%p, get from mnode queue", pMsg);
|
||||
STraceId * trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, get from mnode queue", pMsg);
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_MON_MM_INFO:
|
||||
|
|
|
@ -31,7 +31,8 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
SVnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
int32_t code = -1;
|
||||
|
||||
dTrace("msg:%p, get from vnode-mgmt queue", pMsg);
|
||||
STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, get from vnode-mgmt queue", pMsg);
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_MON_VM_INFO:
|
||||
code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
|
||||
|
|
|
@ -40,7 +40,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
SMgmtWrapper *pWrapper = NULL;
|
||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
||||
|
||||
dTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
|
||||
STraceId *trace = &pRpc->info.traceId;
|
||||
dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
|
||||
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
|
||||
|
||||
if (pRpc->msgType == TDMT_DND_NET_TEST) {
|
||||
|
|
|
@ -34,13 +34,13 @@
|
|||
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "qnode.h"
|
||||
#include "monitor.h"
|
||||
#include "qnode.h"
|
||||
#include "sync.h"
|
||||
#include "wal.h"
|
||||
|
||||
#include "libs/function/function.h"
|
||||
|
||||
// clang-format off
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
@ -51,6 +51,7 @@ extern "C" {
|
|||
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||
#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",GTID: %s", __VA_ARGS__, buf);} while(0)
|
||||
|
||||
typedef enum {
|
||||
DNODE = 0,
|
||||
|
@ -184,3 +185,4 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
|||
#endif
|
||||
|
||||
#endif /*_TD_DM_INT_H_*/
|
||||
// clang-format on
|
||||
|
|
|
@ -40,6 +40,8 @@ extern "C" {
|
|||
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
||||
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
||||
#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0)
|
||||
|
||||
// clang-format on
|
||||
|
||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
|
|
|
@ -551,7 +551,8 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
|||
if (mndCheckMsgContent(pMsg) != 0) return -1;
|
||||
if (mndCheckMnodeState(pMsg) != 0) return -1;
|
||||
|
||||
mTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
STraceId *trace = &pMsg->info.traceId;
|
||||
mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
int32_t code = (*fp)(pMsg);
|
||||
mndReleaseRpcRef(pMnode);
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
#include "sync.h"
|
||||
#include "syncTools.h"
|
||||
#include "ttrace.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -31,6 +32,7 @@ extern "C" {
|
|||
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
||||
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define vGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param " GTID: %s", __VA_ARGS__, buf);} while(0)//#define vDye(...) do
|
||||
// clang-format on
|
||||
|
||||
// vnodeCfg.c
|
||||
|
|
|
@ -84,8 +84,8 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
|
||||
STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
|
||||
SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
|
||||
for (int32_t r = 0; r < req.replica; ++r) {
|
||||
SNodeInfo *pNode = &cfg.nodeInfo[r];
|
||||
|
@ -126,7 +126,8 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
|
||||
for (int32_t m = 0; m < numOfMsgs; m++) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
vTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
|
||||
STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
|
||||
|
||||
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
|
||||
code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
|
||||
|
@ -149,10 +150,10 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
||||
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
||||
newEpSet.inUse);
|
||||
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
||||
vTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
|
||||
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
|
||||
}
|
||||
|
||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||
|
@ -164,7 +165,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
|
||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
@ -180,8 +181,9 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
vTrace("vgId:%d, msg:%p get from vnode-apply queue, index:%" PRId64 " type:%s handle:%p", vgId, pMsg,
|
||||
pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
|
||||
STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||
pMsg->info.handle);
|
||||
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (rsp.code == 0) {
|
||||
|
@ -196,7 +198,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code);
|
||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
@ -218,8 +220,9 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
static int64_t vndTick = 0;
|
||||
STraceId * trace = &pMsg->info.traceId;
|
||||
if (++vndTick % 10 == 1) {
|
||||
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
vGTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
}
|
||||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
taosMemoryFree(syncNodeStr);
|
||||
|
@ -334,8 +337,9 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
|
|||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||
|
||||
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " index:%" PRId64 " handle:%p",
|
||||
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, cbMeta.index, rpcMsg.info.handle);
|
||||
STraceId *trace = (STraceId *)&pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
|
||||
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
|
||||
if (rpcMsg.info.handle != NULL) {
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
}
|
||||
|
|
|
@ -1508,6 +1508,11 @@ static int32_t translateBlockDistFunc(SFunctionNode* pFunc, char* pErrBuf, int32
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateBlockDistInfoFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
pFunc->node.resType = (SDataType){.bytes = 128, .type = TSDB_DATA_TYPE_VARCHAR};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(STableBlockDistInfo);
|
||||
return true;
|
||||
|
@ -2520,6 +2525,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getBlockDistFuncEnv,
|
||||
.processFunc = blockDistFunction,
|
||||
.finalizeFunc = blockDistFinalize
|
||||
},
|
||||
{
|
||||
.name = "_block_dist_info",
|
||||
.type = FUNCTION_TYPE_BLOCK_DIST_INFO,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
|
||||
.translateFunc = translateBlockDistInfoFunc,
|
||||
}
|
||||
};
|
||||
// clang-format on
|
||||
|
|
|
@ -216,6 +216,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "PhysiSreamScan";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
return "PhysiSystemTableScan";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
return "PhysiBlockDistScan";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return "PhysiProject";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
|
||||
|
@ -3971,6 +3973,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_LOGIC_PLAN:
|
||||
return logicPlanToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
return physiTagScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
|
||||
|
@ -4106,6 +4109,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
case QUERY_NODE_LOGIC_PLAN:
|
||||
return jsonToLogicPlan(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
return jsonToPhysiTagScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
|
||||
|
|
|
@ -215,6 +215,8 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
|
||||
return makeNode(type, sizeof(SShowCreateTableStmt));
|
||||
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
|
||||
return makeNode(type, sizeof(SShowTableDistributedStmt));
|
||||
case QUERY_NODE_KILL_QUERY_STMT:
|
||||
return makeNode(type, sizeof(SKillQueryStmt));
|
||||
case QUERY_NODE_KILL_TRANSACTION_STMT:
|
||||
|
@ -264,6 +266,8 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SStreamScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
return makeNode(type, sizeof(SSystemTableScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
return makeNode(type, sizeof(SBlockDistScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return makeNode(type, sizeof(SProjectPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
|
||||
|
@ -628,15 +632,20 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
case QUERY_NODE_SHOW_SCORES_STMT:
|
||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
|
||||
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: {
|
||||
SShowStmt* pStmt = (SShowStmt*)pNode;
|
||||
nodesDestroyNode(pStmt->pDbName);
|
||||
nodesDestroyNode(pStmt->pTbNamePattern);
|
||||
nodesDestroyNode(pStmt->pTbName);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
|
||||
taosMemoryFreeClear(((SShowCreateDatabaseStmt*)pNode)->pCfg);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
|
||||
taosMemoryFreeClear(((SShowCreateTableStmt*)pNode)->pMeta);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: // no pointer field
|
||||
case QUERY_NODE_KILL_CONNECTION_STMT: // no pointer field
|
||||
case QUERY_NODE_KILL_QUERY_STMT: // no pointer field
|
||||
case QUERY_NODE_KILL_TRANSACTION_STMT: // no pointer field
|
||||
|
@ -752,6 +761,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
|
||||
|
|
|
@ -151,9 +151,12 @@ SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int
|
|||
SToken* pNewColName);
|
||||
SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, SToken* pTagName, SNode* pVal);
|
||||
SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
||||
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pDbName, SNode* pTbNamePattern);
|
||||
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type);
|
||||
SNode* createShowStmtWithCond(SAstCreateContext* pCxt, ENodeType type, SNode* pDbName, SNode* pTbName,
|
||||
EOperatorType tableCondType);
|
||||
SNode* createShowCreateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
||||
SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pRealTable);
|
||||
SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable);
|
||||
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword);
|
||||
SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t alterType, const SToken* pVal);
|
||||
SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
|
||||
|
|
|
@ -350,34 +350,35 @@ col_name_list(A) ::= col_name_list(B) NK_COMMA col_name(C).
|
|||
col_name(A) ::= column_name(B). { A = createColumnNode(pCxt, NULL, &B); }
|
||||
|
||||
/************************************************ show ****************************************************************/
|
||||
cmd ::= SHOW DNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DNODES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW USERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_USERS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW db_name_cond_opt(A) TABLES like_pattern_opt(B). { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TABLES_STMT, A, B); }
|
||||
cmd ::= SHOW db_name_cond_opt(A) STABLES like_pattern_opt(B). { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_STABLES_STMT, A, B); }
|
||||
cmd ::= SHOW db_name_cond_opt(A) VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, A, NULL); }
|
||||
cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW MODULES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MODULES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW QNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QNODES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW FUNCTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_FUNCTIONS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW INDEXES FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_INDEXES_STMT, A, B); }
|
||||
cmd ::= SHOW STREAMS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_STREAMS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW DNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DNODES_STMT); }
|
||||
cmd ::= SHOW USERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_USERS_STMT); }
|
||||
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASES_STMT); }
|
||||
cmd ::= SHOW db_name_cond_opt(A) TABLES like_pattern_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TABLES_STMT, A, B, OP_TYPE_LIKE); }
|
||||
cmd ::= SHOW db_name_cond_opt(A) STABLES like_pattern_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_STABLES_STMT, A, B, OP_TYPE_LIKE); }
|
||||
cmd ::= SHOW db_name_cond_opt(A) VGROUPS. { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, A, NULL, OP_TYPE_LIKE); }
|
||||
cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT); }
|
||||
cmd ::= SHOW MODULES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MODULES_STMT); }
|
||||
cmd ::= SHOW QNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QNODES_STMT); }
|
||||
cmd ::= SHOW FUNCTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_FUNCTIONS_STMT); }
|
||||
cmd ::= SHOW INDEXES FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_INDEXES_STMT, B, A, OP_TYPE_EQUAL); }
|
||||
cmd ::= SHOW STREAMS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_STREAMS_STMT); }
|
||||
cmd ::= SHOW ACCOUNTS. { pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_EXPRIE_STATEMENT); }
|
||||
cmd ::= SHOW APPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_APPS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW CONNECTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONNECTIONS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW LICENCE. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_LICENCE_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW GRANTS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_LICENCE_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW APPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_APPS_STMT); }
|
||||
cmd ::= SHOW CONNECTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONNECTIONS_STMT); }
|
||||
cmd ::= SHOW LICENCE. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_LICENCE_STMT); }
|
||||
cmd ::= SHOW GRANTS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_LICENCE_STMT); }
|
||||
cmd ::= SHOW CREATE DATABASE db_name(A). { pCxt->pRootNode = createShowCreateDatabaseStmt(pCxt, &A); }
|
||||
cmd ::= SHOW CREATE TABLE full_table_name(A). { pCxt->pRootNode = createShowCreateTableStmt(pCxt, QUERY_NODE_SHOW_CREATE_TABLE_STMT, A); }
|
||||
cmd ::= SHOW CREATE STABLE full_table_name(A). { pCxt->pRootNode = createShowCreateTableStmt(pCxt, QUERY_NODE_SHOW_CREATE_STABLE_STMT, A); }
|
||||
cmd ::= SHOW QUERIES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QUERIES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW SCORES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SCORES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW TOPICS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TOPICS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW TRANSACTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TRANSACTIONS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW QUERIES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QUERIES_STMT); }
|
||||
cmd ::= SHOW SCORES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SCORES_STMT); }
|
||||
cmd ::= SHOW TOPICS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TOPICS_STMT); }
|
||||
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT); }
|
||||
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT); }
|
||||
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT); }
|
||||
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT); }
|
||||
cmd ::= SHOW TRANSACTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TRANSACTIONS_STMT); }
|
||||
cmd ::= SHOW TABLE DISTRIBUTED full_table_name(A). { pCxt->pRootNode = createShowTableDistributedStmt(pCxt, A); }
|
||||
|
||||
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
|
||||
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
|
||||
|
|
|
@ -1100,7 +1100,15 @@ static bool needDbShowStmt(ENodeType type) {
|
|||
QUERY_NODE_SHOW_VGROUPS_STMT == type;
|
||||
}
|
||||
|
||||
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pDbName, SNode* pTbNamePattern) {
|
||||
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SShowStmt* pStmt = (SShowStmt*)nodesMakeNode(type);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createShowStmtWithCond(SAstCreateContext* pCxt, ENodeType type, SNode* pDbName, SNode* pTbName,
|
||||
EOperatorType tableCondType) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
if (needDbShowStmt(type) && NULL == pDbName && NULL == pCxt->pQueryCxt->db) {
|
||||
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "db not specified");
|
||||
|
@ -1110,7 +1118,8 @@ SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pDbName, S
|
|||
SShowStmt* pStmt = (SShowStmt*)nodesMakeNode(type);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->pDbName = pDbName;
|
||||
pStmt->pTbNamePattern = pTbNamePattern;
|
||||
pStmt->pTbName = pTbName;
|
||||
pStmt->tableCondType = tableCondType;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
|
@ -1131,6 +1140,17 @@ SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode*
|
|||
CHECK_OUT_OF_MEM(pStmt);
|
||||
strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SShowTableDistributedStmt* pStmt = (SShowTableDistributedStmt*)nodesMakeNode(QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
|
|
|
@ -68,12 +68,12 @@ static SKeyword keywordTable[] = {
|
|||
{"CONTAINS", TK_CONTAINS},
|
||||
{"DATABASE", TK_DATABASE},
|
||||
{"DATABASES", TK_DATABASES},
|
||||
// {"DAYS", TK_DAYS},
|
||||
{"DBS", TK_DBS},
|
||||
{"DELETE", TK_DELETE},
|
||||
{"DESC", TK_DESC},
|
||||
{"DESCRIBE", TK_DESCRIBE},
|
||||
{"DISTINCT", TK_DISTINCT},
|
||||
{"DISTRIBUTED", TK_DISTRIBUTED},
|
||||
{"DNODE", TK_DNODE},
|
||||
{"DNODES", TK_DNODES},
|
||||
{"DOUBLE", TK_DOUBLE},
|
||||
|
@ -81,7 +81,6 @@ static SKeyword keywordTable[] = {
|
|||
{"DURATION", TK_DURATION},
|
||||
{"EXISTS", TK_EXISTS},
|
||||
{"EXPLAIN", TK_EXPLAIN},
|
||||
// {"FILE_FACTOR", TK_FILE_FACTOR},
|
||||
{"FILL", TK_FILL},
|
||||
{"FIRST", TK_FIRST},
|
||||
{"FLOAT", TK_FLOAT},
|
||||
|
|
|
@ -4534,28 +4534,36 @@ static const char* getSysTableName(ENodeType type) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt) {
|
||||
static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, SSelectStmt** pStmt) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
if (NULL == pSelect) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
sprintf(pSelect->stmtName, "%p", pSelect);
|
||||
|
||||
SRealTableNode* pTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
if (NULL == pTable) {
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
if (NULL == pRealTable) {
|
||||
nodesDestroyNode((SNode*)pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pTable->table.dbName, getSysDbName(showType));
|
||||
strcpy(pTable->table.tableName, getSysTableName(showType));
|
||||
strcpy(pTable->table.tableAlias, pTable->table.tableName);
|
||||
pSelect->pFromTable = (SNode*)pTable;
|
||||
strcpy(pRealTable->table.dbName, pDb);
|
||||
strcpy(pRealTable->table.tableName, pTable);
|
||||
strcpy(pRealTable->table.tableAlias, pTable);
|
||||
pSelect->pFromTable = (SNode*)pRealTable;
|
||||
|
||||
*pStmt = pSelect;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt) {
|
||||
return createSimpleSelectStmt(getSysDbName(showType), getSysTableName(showType), pStmt);
|
||||
}
|
||||
|
||||
static int32_t createSelectStmtForShowTableDist(SShowTableDistributedStmt* pStmt, SSelectStmt** pOutput) {
|
||||
return createSimpleSelectStmt(pStmt->dbName, pStmt->tableName, pOutput);
|
||||
}
|
||||
|
||||
static int32_t createOperatorNode(EOperatorType opType, const char* pColName, SNode* pRight, SNode** pOp) {
|
||||
if (NULL == pRight) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -4609,7 +4617,7 @@ static int32_t createShowCondition(const SShowStmt* pShow, SSelectStmt* pSelect)
|
|||
SNode* pTbCond = NULL;
|
||||
if (TSDB_CODE_SUCCESS != createOperatorNode(OP_TYPE_EQUAL, "db_name", pShow->pDbName, &pDbCond) ||
|
||||
TSDB_CODE_SUCCESS !=
|
||||
createOperatorNode(OP_TYPE_LIKE, getTbNameColName(nodeType(pShow)), pShow->pTbNamePattern, &pTbCond)) {
|
||||
createOperatorNode(pShow->tableCondType, getTbNameColName(nodeType(pShow)), pShow->pTbName, &pTbCond)) {
|
||||
nodesDestroyNode(pDbCond);
|
||||
nodesDestroyNode(pTbCond);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -4646,6 +4654,46 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static SNode* createBlockDistInfoFunc() {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
strcpy(pFunc->functionName, "_block_dist_info");
|
||||
strcpy(pFunc->node.aliasName, "_block_dist_info");
|
||||
return (SNode*)pFunc;
|
||||
}
|
||||
|
||||
static SNode* createBlockDistFunc() {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
strcpy(pFunc->functionName, "_block_dist");
|
||||
strcpy(pFunc->node.aliasName, "_block_dist");
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createBlockDistInfoFunc())) {
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
return NULL;
|
||||
}
|
||||
return (SNode*)pFunc;
|
||||
}
|
||||
|
||||
static int32_t rewriteShowTableDist(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SSelectStmt* pStmt = NULL;
|
||||
int32_t code = createSelectStmtForShowTableDist((SShowTableDistributedStmt*)pQuery->pRoot, &pStmt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pStmt->pProjectionList, createBlockDistFunc());
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pQuery->showRewrite = true;
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
pQuery->pRoot = (SNode*)pStmt;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
typedef struct SVgroupCreateTableBatch {
|
||||
SVCreateTbBatchReq req;
|
||||
SVgroupInfo info;
|
||||
|
@ -5584,6 +5632,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
code = rewriteShow(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
|
||||
code = rewriteShowTableDist(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
if (NULL == ((SCreateTableStmt*)pQuery->pRoot)->pTags) {
|
||||
code = rewriteCreateTable(pCxt, pQuery);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -156,7 +156,11 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
|
|||
|
||||
if (NULL == pScanCols) {
|
||||
// select count(*) from t
|
||||
return NULL == pScanPseudoCols ? SCAN_TYPE_TABLE : SCAN_TYPE_TAG;
|
||||
return NULL == pScanPseudoCols
|
||||
? SCAN_TYPE_TABLE
|
||||
: ((FUNCTION_TYPE_BLOCK_DIST_INFO == ((SFunctionNode*)nodesListGetNode(pScanPseudoCols, 0))->funcType)
|
||||
? SCAN_TYPE_BLOCK_INFO
|
||||
: SCAN_TYPE_TAG);
|
||||
}
|
||||
|
||||
if (TSDB_SYSTEM_TABLE == tableType) {
|
||||
|
|
|
@ -450,34 +450,37 @@ static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAdd
|
|||
pNodeAddr->epSet = vg->epSet;
|
||||
}
|
||||
|
||||
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
STagScanPhysiNode* pTagScan =
|
||||
(STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
|
||||
if (NULL == pTagScan) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
|
||||
}
|
||||
|
||||
static ENodeType getScanOperatorType(EScanType scanType) {
|
||||
switch (scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
return QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||
case SCAN_TYPE_TABLE:
|
||||
return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
case SCAN_TYPE_STREAM:
|
||||
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
case SCAN_TYPE_TABLE_MERGE:
|
||||
// return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
|
||||
case SCAN_TYPE_BLOCK_INFO:
|
||||
return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
}
|
||||
|
||||
static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
SScanPhysiNode* pScan =
|
||||
(SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, getScanOperatorType(pScanLogicNode->scanType));
|
||||
if (NULL == pScan) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
|
||||
}
|
||||
|
||||
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
|
||||
|
@ -558,7 +561,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
|
|||
SPhysiNode** pPhyNode) {
|
||||
switch (pScanLogicNode->scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
|
||||
case SCAN_TYPE_BLOCK_INFO:
|
||||
return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
|
||||
case SCAN_TYPE_TABLE:
|
||||
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
|
||||
case SCAN_TYPE_SYSTEM_TABLE:
|
||||
|
|
|
@ -70,6 +70,10 @@ TEST_F(PlanOtherTest, show) {
|
|||
useDb("root", "test");
|
||||
|
||||
run("SHOW DATABASES");
|
||||
|
||||
run("SHOW TABLE DISTRIBUTED t1");
|
||||
|
||||
run("SHOW TABLE DISTRIBUTED st1");
|
||||
}
|
||||
|
||||
TEST_F(PlanOtherTest, delete) {
|
||||
|
|
|
@ -154,7 +154,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
|
|||
.info.persistHandle = persistHandle,
|
||||
.code = 0};
|
||||
assert(pInfo->fp != NULL);
|
||||
|
||||
TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
|
||||
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -220,5 +220,3 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
|
|||
qError("invalid exec result for request type %d", pRes->msgType);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -841,15 +841,6 @@ void syncNodeStart(SSyncNode* pSyncNode) {
|
|||
syncNodeBecomeLeader(pSyncNode, "one replica start");
|
||||
|
||||
// Raft 3.6.2 Committing entries from previous terms
|
||||
|
||||
// use this now
|
||||
syncNodeAppendNoop(pSyncNode);
|
||||
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
|
||||
|
||||
if (gRaftDetailLog) {
|
||||
syncNodeLog2("==state change become leader immediately==", pSyncNode);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1390,7 +1381,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
|
||||
|
||||
do {
|
||||
char eventLog[128];
|
||||
char eventLog[256];
|
||||
snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for %lu, newIndex:%d, %s:%d, %p",
|
||||
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
|
||||
syncNodeEventLog(pSyncNode, eventLog);
|
||||
|
@ -1405,7 +1396,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
(pSyncNode->senders)[i]->replicaIndex = i;
|
||||
|
||||
do {
|
||||
char eventLog[128];
|
||||
char eventLog[256];
|
||||
snprintf(eventLog, sizeof(eventLog), "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d",
|
||||
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
|
||||
syncNodeEventLog(pSyncNode, eventLog);
|
||||
|
@ -1583,6 +1574,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// start heartbeat timer
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
|
||||
// append noop
|
||||
syncNodeAppendNoop(pSyncNode);
|
||||
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
|
||||
|
||||
// trace log
|
||||
do {
|
||||
int32_t debugStrLen = strlen(debugStr);
|
||||
|
@ -1608,10 +1603,6 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
|||
|
||||
// Raft 3.6.2 Committing entries from previous terms
|
||||
|
||||
// use this now
|
||||
syncNodeAppendNoop(pSyncNode);
|
||||
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
|
||||
|
||||
// do not use this
|
||||
// syncNodeEqNoop(pSyncNode);
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
|||
#include "transLog.h"
|
||||
#include "transportInt.h"
|
||||
#include "trpc.h"
|
||||
#include "ttrace.h"
|
||||
#include "tutil.h"
|
||||
|
||||
typedef void* queue[2];
|
||||
|
@ -140,6 +141,7 @@ typedef struct {
|
|||
char spi : 2;
|
||||
|
||||
char user[TSDB_UNI_LEN];
|
||||
STraceId traceId;
|
||||
uint64_t ahandle; // ahandle assigned by client
|
||||
uint32_t code; // del later
|
||||
uint32_t msgType;
|
||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
|||
|
||||
// clang-format off
|
||||
#include "tlog.h"
|
||||
#include "ttrace.h"
|
||||
|
||||
#define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0)
|
||||
#define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0)
|
||||
|
@ -30,6 +31,10 @@ extern "C" {
|
|||
#define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0)
|
||||
|
||||
//#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0)
|
||||
#define tGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0)
|
||||
|
||||
// clang-format on
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -163,6 +163,11 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
|
|||
transSetDefaultAddr(thandle, ip, fqdn);
|
||||
}
|
||||
|
||||
// void rpcSetMsgTraceId(SRpcMsg* pMsg, STraceId uid) {
|
||||
// SRpcHandleInfo* pInfo = &pMsg->info;
|
||||
// pInfo->traceId = uid;
|
||||
//}
|
||||
|
||||
int32_t rpcInit() {
|
||||
// impl later
|
||||
return 0;
|
||||
|
|
|
@ -48,8 +48,9 @@ typedef struct SCliMsg {
|
|||
STransConnCtx* ctx;
|
||||
STransMsg msg;
|
||||
queue q;
|
||||
uint64_t st;
|
||||
STransMsgType type;
|
||||
|
||||
uint64_t st;
|
||||
int sent; //(0: no send, 1: alread sent)
|
||||
} SCliMsg;
|
||||
|
||||
|
@ -167,7 +168,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
|
||||
} while (0)
|
||||
|
||||
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||
#define CONN_HOST_THREAD_IDX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
|
||||
|
@ -179,7 +180,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
conn->status = ConnRelease; \
|
||||
transClearBuffer(&conn->readBuf); \
|
||||
transFreeMsg(transContFromHead((char*)head)); \
|
||||
tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
|
||||
tDebug("%s conn %p receive release request, ref: %d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \
|
||||
if (T_REF_VAL_GET(conn) > 1) { \
|
||||
transUnrefCliHandle(conn); \
|
||||
} \
|
||||
|
@ -280,6 +281,7 @@ void cliHandleResp(SCliConn* conn) {
|
|||
transMsg.code = pHead->code;
|
||||
transMsg.msgType = pHead->msgType;
|
||||
transMsg.info.ahandle = NULL;
|
||||
transMsg.info.traceId = pHead->traceId;
|
||||
|
||||
SCliMsg* pMsg = NULL;
|
||||
STransConnCtx* pCtx = NULL;
|
||||
|
@ -293,27 +295,28 @@ void cliHandleResp(SCliConn* conn) {
|
|||
if (transMsg.info.ahandle == NULL) {
|
||||
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
|
||||
}
|
||||
tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.info.ahandle);
|
||||
tDebug("%s conn %p construct ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
|
||||
} else {
|
||||
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||
tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.info.ahandle);
|
||||
tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
|
||||
}
|
||||
} else {
|
||||
uint64_t ahandle = (uint64_t)pHead->ahandle;
|
||||
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
|
||||
if (pMsg == NULL) {
|
||||
transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
|
||||
tDebug("cli conn %p construct ahandle %p by %s, persist: 1", conn, transMsg.info.ahandle,
|
||||
TMSG_INFO(transMsg.msgType));
|
||||
tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
|
||||
transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
|
||||
if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
|
||||
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
|
||||
tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.info.ahandle);
|
||||
tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
|
||||
transMsg.info.ahandle);
|
||||
}
|
||||
} else {
|
||||
pCtx = pMsg ? pMsg->ctx : NULL;
|
||||
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||
tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.info.ahandle);
|
||||
tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
|
||||
}
|
||||
}
|
||||
// buf's mem alread translated to transMsg.pCont
|
||||
|
@ -321,26 +324,27 @@ void cliHandleResp(SCliConn* conn) {
|
|||
|
||||
if (!CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
transMsg.info.handle = conn;
|
||||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pTransInst->label, conn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
|
||||
// char buf[64] = {0};
|
||||
// TRACE_TO_STR(&transMsg.info.traceId, buf);
|
||||
STraceId* trace = &transMsg.info.traceId;
|
||||
tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", conn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->localAddr.sin_addr),
|
||||
ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
|
||||
|
||||
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
tTrace("except, server continue send while cli ignore it");
|
||||
tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn));
|
||||
// transUnrefCliHandle(conn);
|
||||
return;
|
||||
}
|
||||
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
|
||||
tTrace("except, server continue send while cli ignore it");
|
||||
tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn));
|
||||
// transUnrefCliHandle(conn);
|
||||
return;
|
||||
}
|
||||
|
||||
if (cliAppCb(conn, &transMsg, pMsg) != 0) {
|
||||
tTrace("try to send req to next node");
|
||||
return;
|
||||
}
|
||||
destroyCmsg(pMsg);
|
||||
|
@ -363,7 +367,7 @@ void cliHandleResp(SCliConn* conn) {
|
|||
void cliHandleExcept(SCliConn* pConn) {
|
||||
if (transQueueEmpty(&pConn->cliMsgs)) {
|
||||
if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||
tTrace("%s cli conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
transUnrefCliHandle(pConn);
|
||||
return;
|
||||
}
|
||||
|
@ -386,11 +390,11 @@ void cliHandleExcept(SCliConn* pConn) {
|
|||
|
||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||
transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
|
||||
tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
|
||||
TMSG_INFO(transMsg.msgType));
|
||||
if (transMsg.info.ahandle == NULL) {
|
||||
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
transMsg.info.ahandle);
|
||||
}
|
||||
} else {
|
||||
|
@ -404,11 +408,10 @@ void cliHandleExcept(SCliConn* pConn) {
|
|||
}
|
||||
}
|
||||
if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
|
||||
tTrace("try to send req to next node");
|
||||
return;
|
||||
}
|
||||
destroyCmsg(pMsg);
|
||||
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
tTrace("%s conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
} while (!transQueueEmpty(&pConn->cliMsgs));
|
||||
transUnrefCliHandle(pConn);
|
||||
}
|
||||
|
@ -417,7 +420,7 @@ void cliTimeoutCb(uv_timer_t* handle) {
|
|||
SCliThrdObj* pThrd = handle->data;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
int64_t currentTime = pThrd->nextTimeout;
|
||||
tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pTransInst->label);
|
||||
tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
|
||||
|
||||
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
|
||||
while (p != NULL) {
|
||||
|
@ -492,7 +495,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
|
||||
char key[128] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
|
||||
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||
tTrace("%s conn %p added to conn pool, read buf cap: %d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
// list already create before
|
||||
|
@ -515,10 +518,10 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
|||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
if (transReadComplete(pBuf)) {
|
||||
tTrace("%s cli conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||
cliHandleResp(conn);
|
||||
} else {
|
||||
tTrace("%s cli conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
|
||||
tTrace("%s conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -528,11 +531,11 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
|||
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
|
||||
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
|
||||
// read(2).
|
||||
tTrace("%s cli conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
|
||||
tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
|
||||
return;
|
||||
}
|
||||
if (nread < 0) {
|
||||
tError("%s cli conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
|
||||
tError("%s conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
|
||||
conn->broken = true;
|
||||
cliHandleExcept(conn);
|
||||
}
|
||||
|
@ -557,7 +560,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
|
|||
return conn;
|
||||
}
|
||||
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||
tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
|
||||
QUEUE_REMOVE(&conn->conn);
|
||||
if (clear) {
|
||||
|
@ -570,7 +573,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
taosMemoryFree(conn->stream);
|
||||
transCtxCleanup(&conn->ctx);
|
||||
transQueueDestroy(&conn->cliMsgs);
|
||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
taosMemoryFree(conn);
|
||||
}
|
||||
|
@ -597,14 +600,14 @@ static void cliSendCb(uv_write_t* req, int status) {
|
|||
SCliConn* pConn = req->data;
|
||||
|
||||
if (status == 0) {
|
||||
tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
} else {
|
||||
tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
|
||||
tError("%s conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
|
||||
cliHandleExcept(pConn);
|
||||
return;
|
||||
}
|
||||
if (cliHandleNoResp(pConn) == true) {
|
||||
tTrace("%s cli conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
return;
|
||||
}
|
||||
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
||||
|
@ -640,11 +643,16 @@ void cliSend(SCliConn* pConn) {
|
|||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
||||
pHead->traceId = pMsg->info.traceId;
|
||||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port));
|
||||
|
||||
// char buf[64] = {0};
|
||||
// TRACE_TO_STR(&pMsg->info.traceId, buf);
|
||||
STraceId* trace = &pMsg->info.traceId;
|
||||
tGTrace("conn %p %s is sent to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
|
||||
ntohs(pConn->localAddr.sin_port));
|
||||
|
||||
if (pHead->persist == 1) {
|
||||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
|
@ -662,7 +670,7 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
// impl later
|
||||
SCliConn* pConn = req->data;
|
||||
if (status != 0) {
|
||||
tError("%s cli conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
|
||||
tError("%s conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
|
||||
cliHandleExcept(pConn);
|
||||
return;
|
||||
}
|
||||
|
@ -672,7 +680,7 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
addrlen = sizeof(pConn->localAddr);
|
||||
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->localAddr, &addrlen);
|
||||
|
||||
tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
assert(pConn->stream == req->handle);
|
||||
|
||||
cliSend(pConn);
|
||||
|
@ -691,7 +699,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
}
|
||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||
SCliConn* conn = pMsg->msg.info.handle;
|
||||
tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
|
||||
tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
|
||||
|
||||
if (T_REF_VAL_GET(conn) == 2) {
|
||||
transUnrefCliHandle(conn);
|
||||
|
@ -716,15 +724,15 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
if (pMsg->msg.info.handle != NULL) {
|
||||
conn = (SCliConn*)(pMsg->msg.info.handle);
|
||||
if (conn != NULL) {
|
||||
tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
|
||||
tTrace("%s conn %p reused", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
} else {
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
|
||||
if (conn != NULL) {
|
||||
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
tTrace("%s conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
} else {
|
||||
tTrace("not found conn in conn pool %p", pThrd->pool);
|
||||
tTrace("%s not found conn in conn pool %p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
|
||||
}
|
||||
}
|
||||
return conn;
|
||||
|
@ -750,6 +758,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
|
||||
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
||||
|
||||
transPrintEpSet(&pCtx->epSet);
|
||||
|
||||
SCliConn* conn = cliGetConn(pMsg, pThrd);
|
||||
if (conn != NULL) {
|
||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||
|
@ -768,11 +778,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
|
||||
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
||||
if (ret) {
|
||||
tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
||||
tError("%s conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
||||
}
|
||||
int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT);
|
||||
if (fd == -1) {
|
||||
tTrace("%s cli conn %p failed to create socket", pTransInst->label, conn);
|
||||
tTrace("%s conn %p failed to create socket", pTransInst->label, conn);
|
||||
cliHandleExcept(conn);
|
||||
return;
|
||||
}
|
||||
|
@ -782,10 +792,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
|
||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
|
||||
tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
|
||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||
if (ret != 0) {
|
||||
tTrace("%s cli conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port,
|
||||
tTrace("%s conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port,
|
||||
uv_err_name(ret));
|
||||
cliHandleExcept(conn);
|
||||
return;
|
||||
|
@ -944,14 +954,13 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
if (pMsg == NULL || pMsg->ctx == NULL) {
|
||||
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
|
||||
tTrace("%s conn %p handle resp", pTransInst->label, pConn);
|
||||
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
SEpSet* pEpSet = &pCtx->epSet;
|
||||
transPrintEpSet(pEpSet);
|
||||
|
||||
if (pCtx->retryCount == 0) {
|
||||
pCtx->origEpSet = pCtx->epSet;
|
||||
|
@ -964,6 +973,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
(pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pResp->code == TSDB_CODE_APP_NOT_READY ||
|
||||
pResp->code == TSDB_CODE_NODE_NOT_DEPLOYED || pResp->code == TSDB_CODE_SYN_NOT_LEADER)) {
|
||||
pMsg->sent = 0;
|
||||
tTrace("try to send req to next node");
|
||||
pMsg->st = taosGetTimestampUs();
|
||||
pCtx->retryCount += 1;
|
||||
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||
|
@ -974,22 +984,27 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
arg->param1 = pMsg;
|
||||
arg->param2 = pThrd;
|
||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||
tTrace("use local epset, current in use: %d, retry count:%d, limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
|
||||
pEpSet->numOfEps * 3);
|
||||
transPrintEpSet(pEpSet);
|
||||
tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
|
||||
pCtx->retryCount + 1, pEpSet->numOfEps * 3);
|
||||
|
||||
transUnrefCliHandle(pConn);
|
||||
return -1;
|
||||
}
|
||||
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
|
||||
if (pResp->contLen == 0) {
|
||||
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
|
||||
tTrace("use local epset, current in use: %d, retry count:%d, limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
|
||||
TRANS_RETRY_COUNT_LIMIT);
|
||||
transPrintEpSet(&pCtx->epSet);
|
||||
tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
|
||||
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
|
||||
} else {
|
||||
SEpSet epSet = {0};
|
||||
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
|
||||
pCtx->epSet = epSet;
|
||||
tTrace("use remote epset, current in use: %d, retry count:%d, limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
|
||||
TRANS_RETRY_COUNT_LIMIT);
|
||||
|
||||
transPrintEpSet(&pCtx->epSet);
|
||||
tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
|
||||
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
|
||||
}
|
||||
addConnToPool(pThrd->pool, pConn);
|
||||
|
||||
|
@ -1001,17 +1016,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
STraceId* trace = &pResp->info.traceId;
|
||||
if (pCtx->pSem != NULL) {
|
||||
tTrace("%s cli conn %p(sync) handle resp", pTransInst->label, pConn);
|
||||
tGTrace("conn %p(sync) handle resp", pConn);
|
||||
if (pCtx->pRsp == NULL) {
|
||||
tTrace("%s cli conn %p(sync) failed to resp, ignore", pTransInst->label, pConn);
|
||||
tGTrace("conn %p(sync) failed to resp, ignore", pConn);
|
||||
} else {
|
||||
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
|
||||
}
|
||||
tsem_post(pCtx->pSem);
|
||||
pCtx->pRsp = NULL;
|
||||
} else {
|
||||
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
|
||||
tGTrace("conn %p handle resp", pConn);
|
||||
if (pResp->code != 0 || pCtx->retryCount == 0 || transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) {
|
||||
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||
} else {
|
||||
|
@ -1039,6 +1055,7 @@ void transRefCliHandle(void* handle) {
|
|||
return;
|
||||
}
|
||||
int ref = T_REF_INC((SCliConn*)handle);
|
||||
tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
|
||||
UNUSED(ref);
|
||||
}
|
||||
void transUnrefCliHandle(void* handle) {
|
||||
|
@ -1046,7 +1063,7 @@ void transUnrefCliHandle(void* handle) {
|
|||
return;
|
||||
}
|
||||
int ref = T_REF_DEC((SCliConn*)handle);
|
||||
tDebug("%s cli conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
|
||||
tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
|
||||
if (ref == 0) {
|
||||
cliDestroyConn((SCliConn*)handle, true);
|
||||
}
|
||||
|
@ -1067,16 +1084,17 @@ void transReleaseCliHandle(void* handle) {
|
|||
|
||||
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||
STrans* pTransInst = (STrans*)shandle;
|
||||
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->info.handle);
|
||||
if (index == -1) {
|
||||
index = cliRBChoseIdx(pTransInst);
|
||||
int idx = CONN_HOST_THREAD_IDX((SCliConn*)pReq->info.handle);
|
||||
if (idx == -1) {
|
||||
idx = cliRBChoseIdx(pTransInst);
|
||||
}
|
||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
pCtx->epSet = *pEpSet;
|
||||
pCtx->ahandle = pReq->info.ahandle;
|
||||
pCtx->msgType = pReq->msgType;
|
||||
pCtx->hThrdIdx = index;
|
||||
pCtx->hThrdIdx = idx;
|
||||
|
||||
if (ctx != NULL) {
|
||||
pCtx->appCtx = *ctx;
|
||||
|
@ -1089,27 +1107,30 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
|
|||
cliMsg->st = taosGetTimestampUs();
|
||||
cliMsg->type = Normal;
|
||||
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
|
||||
|
||||
tDebug("send request at thread:%d, threadID: %08" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->pid, pReq,
|
||||
STraceId* trace = &pReq->info.traceId;
|
||||
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid,
|
||||
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
||||
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
|
||||
}
|
||||
|
||||
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
||||
STrans* pTransInst = (STrans*)shandle;
|
||||
int index = CONN_HOST_THREAD_INDEX(pReq->info.handle);
|
||||
if (index == -1) {
|
||||
index = cliRBChoseIdx(pTransInst);
|
||||
int idx = CONN_HOST_THREAD_IDX(pReq->info.handle);
|
||||
if (idx == -1) {
|
||||
idx = cliRBChoseIdx(pTransInst);
|
||||
}
|
||||
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
|
||||
tsem_init(sem, 0, 0);
|
||||
|
||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
pCtx->epSet = *pEpSet;
|
||||
pCtx->ahandle = pReq->info.ahandle;
|
||||
pCtx->msgType = pReq->msgType;
|
||||
pCtx->hThrdIdx = index;
|
||||
pCtx->hThrdIdx = idx;
|
||||
pCtx->pSem = sem;
|
||||
pCtx->pRsp = pRsp;
|
||||
|
||||
|
@ -1119,8 +1140,10 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
|
|||
cliMsg->st = taosGetTimestampUs();
|
||||
cliMsg->type = Normal;
|
||||
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||
tDebug("send request at thread:%d, threadID:%08" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->pid, pReq,
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
|
||||
|
||||
STraceId* trace = &pReq->info.traceId;
|
||||
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid,
|
||||
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
||||
|
||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||
|
@ -1128,7 +1151,6 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
|
|||
tsem_destroy(sem);
|
||||
taosMemoryFree(sem);
|
||||
}
|
||||
|
||||
/*
|
||||
*
|
||||
**/
|
||||
|
@ -1151,7 +1173,7 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
|
|||
cliMsg->type = Update;
|
||||
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
|
||||
tDebug("update epset at thread:%d, threadID:%08" PRId64 "", i, thrd->pid);
|
||||
tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid);
|
||||
|
||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||
}
|
||||
|
|
|
@ -381,7 +381,6 @@ static void transDQTimeout(uv_timer_t* timer) {
|
|||
HeapNode* minNode = heapMin(queue->heap);
|
||||
if (minNode == NULL) break;
|
||||
SDelayTask* task = container_of(minNode, SDelayTask, node);
|
||||
|
||||
if (task->execTime <= current) {
|
||||
heapRemove(queue->heap, minNode);
|
||||
task->func(task->arg);
|
||||
|
@ -444,7 +443,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
|
|||
}
|
||||
}
|
||||
|
||||
tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
|
||||
tTrace("timer %p put task into delay queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
|
||||
heapInsert(queue->heap, &task->node);
|
||||
uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
|
||||
return 0;
|
||||
|
@ -455,11 +454,17 @@ void transPrintEpSet(SEpSet* pEpSet) {
|
|||
tTrace("NULL epset");
|
||||
return;
|
||||
}
|
||||
tTrace("epset begin inUse: %d", pEpSet->inUse);
|
||||
char buf[512] = {0};
|
||||
int len = snprintf(buf, sizeof(buf), "epset { ");
|
||||
for (int i = 0; i < pEpSet->numOfEps; i++) {
|
||||
tTrace("ip: %s, port: %d", pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
if (i == pEpSet->numOfEps - 1) {
|
||||
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
} else {
|
||||
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
}
|
||||
tTrace("epset end");
|
||||
}
|
||||
len += snprintf(buf + len, sizeof(buf) - len, "}");
|
||||
tTrace("%s, inUse: %d", buf, pEpSet->inUse);
|
||||
}
|
||||
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
|
||||
if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
|
||||
|
|
|
@ -169,7 +169,7 @@ static bool addHandleToAcceptloop(void* arg);
|
|||
conn->status = ConnRelease; \
|
||||
transClearBuffer(&conn->readBuf); \
|
||||
transFreeMsg(transContFromHead((char*)head)); \
|
||||
tTrace("server conn %p received release request", conn); \
|
||||
tTrace("conn %p received release request", conn); \
|
||||
\
|
||||
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
|
||||
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
|
||||
|
@ -181,7 +181,7 @@ static bool addHandleToAcceptloop(void* arg);
|
|||
return; \
|
||||
} \
|
||||
if (conn->regArg.init) { \
|
||||
tTrace("server conn %p release, notify server app", conn); \
|
||||
tTrace("conn %p release, notify server app", conn); \
|
||||
STrans* pTransInst = conn->pTransInst; \
|
||||
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
|
||||
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
|
||||
|
@ -209,25 +209,25 @@ static bool addHandleToAcceptloop(void* arg);
|
|||
#define ASYNC_CHECK_HANDLE(exh1, refId) \
|
||||
do { \
|
||||
if (refId > 0) { \
|
||||
tTrace("server handle step1"); \
|
||||
tTrace("handle step1"); \
|
||||
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
|
||||
if (exh2 == NULL || refId != exh2->refId) { \
|
||||
tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
|
||||
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
|
||||
exh2 ? exh2->refId : 0, refId); \
|
||||
goto _return1; \
|
||||
} \
|
||||
} else if (refId == 0) { \
|
||||
tTrace("server handle step2"); \
|
||||
tTrace("handle step2"); \
|
||||
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
|
||||
if (exh2 == NULL || refId != exh2->refId) { \
|
||||
tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
|
||||
refId, exh2 ? exh2->refId : 0); \
|
||||
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, refId, \
|
||||
exh2 ? exh2->refId : 0); \
|
||||
goto _return1; \
|
||||
} else { \
|
||||
refId = exh1->refId; \
|
||||
} \
|
||||
} else if (refId < 0) { \
|
||||
tTrace("server handle step3"); \
|
||||
tTrace("handle step3"); \
|
||||
goto _return2; \
|
||||
} \
|
||||
} while (0)
|
||||
|
@ -270,26 +270,25 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
transMsg.msgType = pHead->msgType;
|
||||
transMsg.code = pHead->code;
|
||||
|
||||
transMsg.info.ahandle = (void*)pHead->ahandle;
|
||||
transMsg.info.handle = NULL;
|
||||
|
||||
// transDestroyBuffer(&pConn->readBuf);
|
||||
transClearBuffer(&pConn->readBuf);
|
||||
|
||||
pConn->inType = pHead->msgType;
|
||||
if (pConn->status == ConnNormal) {
|
||||
if (pHead->persist == 1) {
|
||||
pConn->status = ConnAcquire;
|
||||
transRefSrvHandle(pConn);
|
||||
tDebug("server conn %p acquired by server app", pConn);
|
||||
tDebug("conn %p acquired by server app", pConn);
|
||||
}
|
||||
}
|
||||
STraceId* trace = &pHead->traceId;
|
||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||
transRefSrvHandle(pConn);
|
||||
tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pConn,
|
||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, transMsg.code);
|
||||
|
||||
tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
|
||||
ntohs(pConn->localAddr.sin_port), transMsg.contLen);
|
||||
} else {
|
||||
tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d code:0x%x", pConn,
|
||||
tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", pConn,
|
||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
|
||||
transMsg.code);
|
||||
|
@ -300,11 +299,14 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
// 1. server application should not send resp on handle
|
||||
// 2. once send out data, cli conn released to conn pool immediately
|
||||
// 3. not mixed with persist
|
||||
|
||||
transMsg.info.ahandle = (void*)pHead->ahandle;
|
||||
transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId);
|
||||
transMsg.info.refId = pConn->refId;
|
||||
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
|
||||
transMsg.info.traceId = pHead->traceId;
|
||||
|
||||
tGTrace("handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
|
||||
assert(transMsg.info.handle != NULL);
|
||||
|
||||
if (pHead->noResp == 1) {
|
||||
transMsg.info.refId = -1;
|
||||
}
|
||||
|
@ -328,12 +330,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
|
||||
tTrace("conn %p total read: %d, current read: %d", conn, pBuf->len, (int)nread);
|
||||
if (transReadComplete(pBuf)) {
|
||||
tTrace("server conn %p alread read complete packet", conn);
|
||||
tTrace("conn %p alread read complete packet", conn);
|
||||
uvHandleReq(conn);
|
||||
} else {
|
||||
tTrace("server %p read partial packet, continue to read", conn);
|
||||
tTrace("conn %p read partial packet, continue to read", conn);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -341,12 +343,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
return;
|
||||
}
|
||||
|
||||
tError("server conn %p read error: %s", conn, uv_err_name(nread));
|
||||
tError("conn %p read error: %s", conn, uv_err_name(nread));
|
||||
if (nread < 0) {
|
||||
conn->broken = true;
|
||||
if (conn->status == ConnAcquire) {
|
||||
if (conn->regArg.init) {
|
||||
tTrace("server conn %p broken, notify server app", conn);
|
||||
tTrace("conn %p broken, notify server app", conn);
|
||||
STrans* pTransInst = conn->pTransInst;
|
||||
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
|
||||
memset(&conn->regArg, 0, sizeof(conn->regArg));
|
||||
|
@ -363,14 +365,14 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
|
|||
void uvOnTimeoutCb(uv_timer_t* handle) {
|
||||
// opt
|
||||
SSvrConn* pConn = handle->data;
|
||||
tError("server conn %p time out", pConn);
|
||||
tError("conn %p time out", pConn);
|
||||
}
|
||||
|
||||
void uvOnSendCb(uv_write_t* req, int status) {
|
||||
SSvrConn* conn = req->data;
|
||||
// transClearBuffer(&conn->readBuf);
|
||||
if (status == 0) {
|
||||
tTrace("server conn %p data already was written on stream", conn);
|
||||
tTrace("conn %p data already was written on stream", conn);
|
||||
if (!transQueueEmpty(&conn->srvMsgs)) {
|
||||
SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
|
||||
// if (msg->type == Release && conn->status != ConnNormal) {
|
||||
|
@ -407,7 +409,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
|
||||
tError("conn %p failed to write data, %s", conn, uv_err_name(status));
|
||||
conn->broken = true;
|
||||
transUnrefSrvHandle(conn);
|
||||
}
|
||||
|
@ -424,8 +426,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
|||
}
|
||||
|
||||
static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||
tTrace("server conn %p prepare to send resp", smsg->pConn);
|
||||
|
||||
SSvrConn* pConn = smsg->pConn;
|
||||
STransMsg* pMsg = &smsg->msg;
|
||||
if (pMsg->pCont == 0) {
|
||||
|
@ -434,6 +434,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
|||
}
|
||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||
pHead->ahandle = (uint64_t)pMsg->info.ahandle;
|
||||
pHead->traceId = pMsg->info.traceId;
|
||||
|
||||
if (pConn->status == ConnNormal) {
|
||||
pHead->msgType = pConn->inType + 1;
|
||||
|
@ -454,7 +455,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
|||
|
||||
char* msg = (char*)pHead;
|
||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||
|
||||
STraceId* trace = &pMsg->info.traceId;
|
||||
tGTrace("conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
|
||||
ntohs(pConn->localAddr.sin_port), len);
|
||||
pHead->msgLen = htonl(len);
|
||||
|
@ -545,7 +548,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
|||
int64_t refId = transMsg.info.refId;
|
||||
SExHandle* exh2 = transAcquireExHandle(refMgt, refId);
|
||||
if (exh2 == NULL || exh1 != exh2) {
|
||||
tTrace("server handle except msg %p, ignore it", exh1);
|
||||
tTrace("handle except msg %p, ignore it", exh1);
|
||||
transReleaseExHandle(refMgt, refId);
|
||||
destroySmsg(msg);
|
||||
continue;
|
||||
|
@ -583,18 +586,18 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
|||
static void uvWorkDoTask(uv_work_t* req) {
|
||||
// doing time-consumeing task
|
||||
// only auth conn currently, add more func later
|
||||
tTrace("server conn %p start to be processed in BG Thread", req->data);
|
||||
tTrace("conn %p start to be processed in BG Thread", req->data);
|
||||
return;
|
||||
}
|
||||
|
||||
static void uvWorkAfterTask(uv_work_t* req, int status) {
|
||||
if (status != 0) {
|
||||
tTrace("server conn %p failed to processed ", req->data);
|
||||
tTrace("conn %p failed to processed ", req->data);
|
||||
}
|
||||
// Done time-consumeing task
|
||||
// add more func later
|
||||
// this func called in main loop
|
||||
tTrace("server conn %p already processed ", req->data);
|
||||
tTrace("conn %p already processed ", req->data);
|
||||
taosMemoryFree(req);
|
||||
}
|
||||
|
||||
|
@ -629,7 +632,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
|||
}
|
||||
}
|
||||
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||
tTrace("server connection coming");
|
||||
tTrace("connection coming");
|
||||
if (nread < 0) {
|
||||
if (nread != UV_EOF) {
|
||||
tError("read error %s", uv_err_name(nread));
|
||||
|
@ -678,18 +681,18 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
|
||||
uv_os_fd_t fd;
|
||||
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
||||
tTrace("server conn %p created, fd: %d", pConn, fd);
|
||||
tTrace("conn %p created, fd: %d", pConn, fd);
|
||||
|
||||
int addrlen = sizeof(pConn->addr);
|
||||
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
|
||||
tError("server conn %p failed to get peer info", pConn);
|
||||
tError("conn %p failed to get peer info", pConn);
|
||||
transUnrefSrvHandle(pConn);
|
||||
return;
|
||||
}
|
||||
|
||||
addrlen = sizeof(pConn->localAddr);
|
||||
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->localAddr, &addrlen)) {
|
||||
tError("server conn %p failed to get local info", pConn);
|
||||
tError("conn %p failed to get local info", pConn);
|
||||
transUnrefSrvHandle(pConn);
|
||||
return;
|
||||
}
|
||||
|
@ -798,7 +801,7 @@ static SSvrConn* createConn(void* hThrd) {
|
|||
|
||||
pConn->refId = exh->refId;
|
||||
transRefSrvHandle(pConn);
|
||||
tTrace("server handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId);
|
||||
tTrace("handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId);
|
||||
return pConn;
|
||||
}
|
||||
|
||||
|
@ -809,7 +812,7 @@ static void destroyConn(SSvrConn* conn, bool clear) {
|
|||
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
if (clear) {
|
||||
tTrace("server conn %p to be destroyed", conn);
|
||||
tTrace("conn %p to be destroyed", conn);
|
||||
// uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
|
||||
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
|
||||
// uv_close(conn->pTcp)
|
||||
|
@ -845,7 +848,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
|||
transReleaseExHandle(refMgt, conn->refId);
|
||||
transRemoveExHandle(refMgt, conn->refId);
|
||||
|
||||
tDebug("server conn %p destroy", conn);
|
||||
tDebug("conn %p destroy", conn);
|
||||
// uv_timer_stop(&conn->pTimer);
|
||||
transQueueDestroy(&conn->srvMsgs);
|
||||
|
||||
|
@ -974,18 +977,18 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
|||
uvStartSendRespInternal(msg);
|
||||
return;
|
||||
} else if (conn->status == ConnRelease || conn->status == ConnNormal) {
|
||||
tDebug("server conn %p already released, ignore release-msg", conn);
|
||||
tDebug("conn %p already released, ignore release-msg", conn);
|
||||
}
|
||||
destroySmsg(msg);
|
||||
}
|
||||
void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
||||
// send msg to client
|
||||
tDebug("server conn %p start to send resp (2/2)", msg->pConn);
|
||||
tDebug("conn %p start to send resp (2/2)", msg->pConn);
|
||||
uvStartSendResp(msg);
|
||||
}
|
||||
void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
||||
SSvrConn* conn = msg->pConn;
|
||||
tDebug("server conn %p register brokenlink callback", conn);
|
||||
tDebug("conn %p register brokenlink callback", conn);
|
||||
if (conn->status == ConnAcquire) {
|
||||
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
||||
return;
|
||||
|
@ -994,7 +997,7 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
|||
conn->regArg.notifyCount = 0;
|
||||
conn->regArg.init = 1;
|
||||
conn->regArg.msg = msg->msg;
|
||||
tDebug("server conn %p register brokenlink callback succ", conn);
|
||||
tDebug("conn %p register brokenlink callback succ", conn);
|
||||
|
||||
if (conn->broken) {
|
||||
STrans* pTransInst = conn->pTransInst;
|
||||
|
@ -1062,7 +1065,7 @@ void transRefSrvHandle(void* handle) {
|
|||
return;
|
||||
}
|
||||
int ref = T_REF_INC((SSvrConn*)handle);
|
||||
tDebug("server conn %p ref count: %d", handle, ref);
|
||||
tDebug("conn %p ref count: %d", handle, ref);
|
||||
}
|
||||
|
||||
void transUnrefSrvHandle(void* handle) {
|
||||
|
@ -1070,7 +1073,7 @@ void transUnrefSrvHandle(void* handle) {
|
|||
return;
|
||||
}
|
||||
int ref = T_REF_DEC((SSvrConn*)handle);
|
||||
tDebug("server conn %p ref count: %d", handle, ref);
|
||||
tDebug("conn %p ref count: %d", handle, ref);
|
||||
if (ref == 0) {
|
||||
destroyConn((SSvrConn*)handle, true);
|
||||
}
|
||||
|
@ -1091,16 +1094,16 @@ void transReleaseSrvHandle(void* handle) {
|
|||
m->msg = tmsg;
|
||||
m->type = Release;
|
||||
|
||||
tTrace("server conn %p start to release", exh->handle);
|
||||
tTrace("conn %p start to release", exh->handle);
|
||||
transSendAsync(pThrd->asyncPool, &m->q);
|
||||
transReleaseExHandle(refMgt, refId);
|
||||
return;
|
||||
_return1:
|
||||
tTrace("server handle %p failed to send to release handle", exh);
|
||||
tTrace("handle %p failed to send to release handle", exh);
|
||||
transReleaseExHandle(refMgt, refId);
|
||||
return;
|
||||
_return2:
|
||||
tTrace("server handle %p failed to send to release handle", exh);
|
||||
tTrace("handle %p failed to send to release handle", exh);
|
||||
return;
|
||||
}
|
||||
void transSendResponse(const STransMsg* msg) {
|
||||
|
@ -1118,17 +1121,19 @@ void transSendResponse(const STransMsg* msg) {
|
|||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||
m->msg = tmsg;
|
||||
m->type = Normal;
|
||||
tDebug("server conn %p start to send resp (1/2)", exh->handle);
|
||||
|
||||
STraceId* trace = (STraceId*)&msg->info.traceId;
|
||||
tGTrace("conn %p start to send resp (1/2)", exh->handle);
|
||||
transSendAsync(pThrd->asyncPool, &m->q);
|
||||
transReleaseExHandle(refMgt, refId);
|
||||
return;
|
||||
_return1:
|
||||
tTrace("server handle %p failed to send resp", exh);
|
||||
tTrace("handle %p failed to send resp", exh);
|
||||
rpcFreeCont(msg->pCont);
|
||||
transReleaseExHandle(refMgt, refId);
|
||||
return;
|
||||
_return2:
|
||||
tTrace("server handle %p failed to send resp", exh);
|
||||
tTrace("handle %p failed to send resp", exh);
|
||||
rpcFreeCont(msg->pCont);
|
||||
return;
|
||||
}
|
||||
|
@ -1146,18 +1151,19 @@ void transRegisterMsg(const STransMsg* msg) {
|
|||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||
m->msg = tmsg;
|
||||
m->type = Register;
|
||||
tTrace("server conn %p start to register brokenlink callback", exh->handle);
|
||||
|
||||
tTrace("conn %p start to register brokenlink callback", exh->handle);
|
||||
transSendAsync(pThrd->asyncPool, &m->q);
|
||||
transReleaseExHandle(refMgt, refId);
|
||||
return;
|
||||
|
||||
_return1:
|
||||
tTrace("server handle %p failed to send to register brokenlink", exh);
|
||||
tTrace("handle %p failed to register brokenlink", exh);
|
||||
rpcFreeCont(msg->pCont);
|
||||
transReleaseExHandle(refMgt, refId);
|
||||
return;
|
||||
_return2:
|
||||
tTrace("server handle %p failed to send to register brokenlink", exh);
|
||||
tTrace("handle %p failed to register brokenlink", exh);
|
||||
rpcFreeCont(msg->pCont);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "ttrace.h"
|
||||
#include "taos.h"
|
||||
#include "thash.h"
|
||||
#include "tuuid.h"
|
||||
|
||||
// clang-format off
|
||||
//static TdThreadOnce init = PTHREAD_ONCE_INIT;
|
||||
//static void * ids = NULL;
|
||||
//static TdThreadMutex mtx;
|
||||
//
|
||||
//void traceInit() {
|
||||
// ids = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
// taosThreadMutexInit(&mtx, NULL);
|
||||
//}
|
||||
//void traceCreateEnv() {
|
||||
// taosThreadOnce(&init, traceInit);
|
||||
//}
|
||||
//void traceDestroyEnv() {
|
||||
// taosThreadMutexDestroy(&mtx);
|
||||
// taosHashCleanup(ids);
|
||||
//}
|
||||
//
|
||||
//STraceId traceInitId(STraceSubId *h, STraceSubId *l) {
|
||||
// STraceId id = *h;
|
||||
// id = ((id << 32) & 0xFFFFFFFF) | ((*l) & 0xFFFFFFFF);
|
||||
// return id;
|
||||
//}
|
||||
//void traceId2Str(STraceId *id, char *buf) {
|
||||
// int32_t f = (*id >> 32) & 0xFFFFFFFF;
|
||||
// int32_t s = (*id) & 0xFFFFFFFF;
|
||||
// sprintf(buf, "%d:%d", f, s);
|
||||
//}
|
||||
//
|
||||
//void traceSetSubId(STraceId *id, STraceSubId *subId) {
|
||||
// int32_t parent = ((*id >> 32) & 0xFFFFFFFF);
|
||||
// taosThreadMutexLock(&mtx);
|
||||
// taosHashPut(ids, subId, sizeof(*subId), &parent, sizeof(parent));
|
||||
// taosThreadMutexUnlock(&mtx);
|
||||
//}
|
||||
//
|
||||
//STraceSubId traceGetParentId(STraceId *id) {
|
||||
// int32_t parent = ((*id >> 32) & 0xFFFFFFFF);
|
||||
// taosThreadMutexLock(&mtx);
|
||||
// STraceSubId *p = taosHashGet(ids, (void *)&parent, sizeof(parent));
|
||||
// parent = *p;
|
||||
// taosThreadMutexUnlock(&mtx);
|
||||
//
|
||||
// return parent;
|
||||
//}
|
||||
//
|
||||
//STraceSubId traceGenSubId() {
|
||||
// return tGenIdPI32();
|
||||
//}
|
||||
//void traceSetRootId(STraceId *traceid, int64_t rootId) {
|
||||
// traceId->rootId = rootId;
|
||||
//}
|
||||
//int64_t traceGetRootId(STraceId *traceId);
|
||||
//
|
||||
//void traceSetMsgId(STraceId *traceid, int64_t msgId);
|
||||
//int64_t traceGetMsgId(STraceId *traceid);
|
||||
//
|
||||
//char *trace2Str(STraceId *id);
|
||||
//
|
||||
//void traceSetSubId(STraceId *id, int32_t *subId);
|
||||
// clang-format on
|
|
@ -1 +1 @@
|
|||
Subproject commit 9ce3f5c98ef95d9c7c596c4ed7302b0ed69a92b2
|
||||
Subproject commit 29926478edd87533a043f91c1a9ed0e27671e626
|
Loading…
Reference in New Issue