Merge remote-tracking branch 'origin/3.0' into feature/config

This commit is contained in:
Shengliang Guan 2022-02-24 10:07:04 +08:00
commit b17f826756
11 changed files with 191 additions and 189 deletions

View File

@ -78,6 +78,9 @@ typedef struct SRpcInit {
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
// call back to keep conn or not
bool (*pfp)(void *parent, tmsg_t msgType);
void *parent; void *parent;
} SRpcInit; } SRpcInit;

View File

@ -20,17 +20,17 @@
extern "C" { extern "C" {
#endif #endif
#include "taos.h"
#include "common.h" #include "common.h"
#include "tmsg.h" #include "parser.h"
#include "query.h"
#include "taos.h"
#include "tdef.h" #include "tdef.h"
#include "tep.h" #include "tep.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "tmsg.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "trpc.h" #include "trpc.h"
#include "query.h"
#include "parser.h"
#include "config.h" #include "config.h"
@ -48,12 +48,12 @@ extern "C" {
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
typedef struct SHbConnInfo { typedef struct SHbConnInfo {
void *param; void* param;
SClientHbReq *req; SClientHbReq* req;
} SHbConnInfo; } SHbConnInfo;
typedef struct SAppHbMgr { typedef struct SAppHbMgr {
char *key; char* key;
// statistics // statistics
int32_t reportCnt; int32_t reportCnt;
int32_t connKeyCnt; int32_t connKeyCnt;
@ -68,11 +68,9 @@ typedef struct SAppHbMgr {
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo> SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
} SAppHbMgr; } SAppHbMgr;
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp); typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req);
typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req);
typedef struct SClientHbMgr { typedef struct SClientHbMgr {
int8_t inited; int8_t inited;
@ -85,7 +83,6 @@ typedef struct SClientHbMgr {
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr; } SClientHbMgr;
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp int64_t start; // start timestamp
int64_t parsed; // start to parse int64_t parsed; // start to parse
@ -107,26 +104,26 @@ typedef struct SInstanceSummary {
} SInstanceSummary; } SInstanceSummary;
typedef struct SHeartBeatInfo { typedef struct SHeartBeatInfo {
void *pTimer; // timer, used to send request msg to mnode void* pTimer; // timer, used to send request msg to mnode
} SHeartBeatInfo; } SHeartBeatInfo;
struct SAppInstInfo { struct SAppInstInfo {
int64_t numOfConns; int64_t numOfConns;
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
SInstanceSummary summary; SInstanceSummary summary;
SList *pConnList; // STscObj linked list SList* pConnList; // STscObj linked list
int64_t clusterId; int64_t clusterId;
void *pTransporter; void* pTransporter;
struct SAppHbMgr *pAppHbMgr; struct SAppHbMgr* pAppHbMgr;
}; };
typedef struct SAppInfo { typedef struct SAppInfo {
int64_t startTime; int64_t startTime;
char appName[TSDB_APP_NAME_LEN]; char appName[TSDB_APP_NAME_LEN];
char *ep; char* ep;
int32_t pid; int32_t pid;
int32_t numOfThreads; int32_t numOfThreads;
SHashObj *pInstMap; SHashObj* pInstMap;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SAppInfo; } SAppInfo;
@ -141,7 +138,7 @@ typedef struct STscObj {
uint64_t id; // ref ID returned by taosAddRef uint64_t id; // ref ID returned by taosAddRef
pthread_mutex_t mutex; // used to protect the operation on db pthread_mutex_t mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo *pAppInfo; SAppInstInfo* pAppInfo;
} STscObj; } STscObj;
typedef struct SMqConsumer { typedef struct SMqConsumer {
@ -149,13 +146,13 @@ typedef struct SMqConsumer {
} SMqConsumer; } SMqConsumer;
typedef struct SReqResultInfo { typedef struct SReqResultInfo {
const char *pRspMsg; const char* pRspMsg;
const char *pData; const char* pData;
TAOS_FIELD *fields; TAOS_FIELD* fields;
uint32_t numOfCols; uint32_t numOfCols;
int32_t *length; int32_t* length;
TAOS_ROW row; TAOS_ROW row;
char **pCol; char** pCol;
uint32_t numOfRows; uint32_t numOfRows;
uint64_t totalRows; uint64_t totalRows;
uint32_t current; uint32_t current;
@ -165,7 +162,7 @@ typedef struct SReqResultInfo {
typedef struct SShowReqInfo { typedef struct SShowReqInfo {
int64_t execId; // showId/queryId int64_t execId; // showId/queryId
int32_t vgId; int32_t vgId;
SArray *pArray; // SArray<SVgroupInfo> SArray* pArray; // SArray<SVgroupInfo>
int32_t currentIndex; // current accessed vgroup index. int32_t currentIndex; // current accessed vgroup index.
} SShowReqInfo; } SShowReqInfo;
@ -174,8 +171,8 @@ typedef struct SRequestSendRecvBody {
void* fp; void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg; SDataBuf requestMsg;
struct SSchJob *pQueryJob; // query job, created according to sql query DAG. struct SSchJob* pQueryJob; // query job, created according to sql query DAG.
struct SQueryDag *pDag; // the query dag, generated according to the sql statement. struct SQueryDag* pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo; SReqResultInfo resInfo;
} SRequestSendRecvBody; } SRequestSendRecvBody;
@ -184,12 +181,12 @@ typedef struct SRequestSendRecvBody {
typedef struct SRequestObj { typedef struct SRequestObj {
uint64_t requestId; uint64_t requestId;
int32_t type; // request type int32_t type; // request type
STscObj *pTscObj; STscObj* pTscObj;
char *sqlstr; // sql string char* sqlstr; // sql string
int32_t sqlLen; int32_t sqlLen;
int64_t self; int64_t self;
char *msgBuf; char* msgBuf;
void *pInfo; // sql parse info, generated by parser module void* pInfo; // sql parse info, generated by parser module
int32_t code; int32_t code;
SQueryExecMetric metric; SQueryExecMetric metric;
SRequestSendRecvBody body; SRequestSendRecvBody body;
@ -205,34 +202,35 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int taos_init(); int taos_init();
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo); void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void destroyTscObj(void*pObj); void destroyTscObj(void* pObj);
uint64_t generateRequestId(); uint64_t generateRequestId();
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest); void destroyRequest(SRequestObj* pRequest);
char *getDbOfConnection(STscObj* pObj); char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db); void setConnectionDB(STscObj* pTscObj, const char* db);
void taos_init_imp(void); void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char *str); int taos_options_imp(TSDB_OPTION option, const char* str);
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads); void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp(); void initMsgHandleFp();
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port);
void *doFetchRow(SRequestObj* pRequest); void* doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery); int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery);
@ -243,7 +241,7 @@ void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp); int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level // cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key); SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void); void appHbMgrCleanup(void);
// conn level // conn level
@ -255,6 +253,7 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq // --- mq
void hbMgrInitMqHbRspHandle(); void hbMgrInitMqHbRspHandle();
// config // config
int32_t tscInitLog(const char *cfgDir, const char *envFile, const char *apolloUrl); int32_t tscInitLog(const char *cfgDir, const char *envFile, const char *apolloUrl);
int32_t tscInitCfg(const char *cfgDir, const char *envFile, const char *apolloUrl); int32_t tscInitCfg(const char *cfgDir, const char *envFile, const char *apolloUrl);

View File

@ -13,15 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "catalog.h" #include "catalog.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "os.h"
#include "query.h" #include "query.h"
#include "scheduler.h" #include "scheduler.h"
#include "tmsg.h"
#include "tcache.h" #include "tcache.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "ttime.h" #include "ttime.h"
@ -37,7 +37,7 @@ int32_t clientConnRefPool = -1;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0; volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj* pRequest) { static void registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id); STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id);
assert(pTscObj != NULL); assert(pTscObj != NULL);
@ -51,44 +51,49 @@ static void registerRequest(SRequestObj* pRequest) {
int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1); int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1); int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self, tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
} }
} }
static void deregisterRequest(SRequestObj* pRequest) { static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
STscObj* pTscObj = pRequest->pTscObj; STscObj * pTscObj = pRequest->pTscObj;
SInstanceSummary* pActivity = &pTscObj->pAppInfo->summary; SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampMs() - pRequest->metric.start; int64_t duration = taosGetTimestampMs() - pRequest->metric.start;
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", reqId:0x%"PRIx64" elapsed:%"PRIu64" ms, current:%d, app current:%d", pRequest->self, pTscObj->id, tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
pRequest->requestId, duration, num, currentInst); " ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration, num, currentInst);
taosReleaseRef(clientConnRefPool, pTscObj->id); taosReleaseRef(clientConnRefPool, pTscObj->id);
} }
// todo close the transporter properly // todo close the transporter properly
void closeTransporter(STscObj* pTscObj) { void closeTransporter(STscObj *pTscObj) {
if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) { if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
return; return;
} }
tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id); tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
rpcClose(pTscObj->pAppInfo->pTransporter); rpcClose(pTscObj->pAppInfo->pTransporter);
} }
// TODO refactor // TODO refactor
void* openTransporter(const char *user, const char *auth, int32_t numOfThread) { void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "TSC"; rpcInit.label = "TSC";
rpcInit.numOfThreads = numOfThread; rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer; rpcInit.cfp = processMsgFromServer;
rpcInit.pfp = persistConnForSpecificMsg;
rpcInit.sessions = cfgGetItem(tscCfg, "maxConnections")->i32; rpcInit.sessions = cfgGetItem(tscCfg, "maxConnections")->i32;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user; rpcInit.user = (char *)user;
@ -97,7 +102,7 @@ void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.secret = (char *)auth; rpcInit.secret = (char *)auth;
void* pDnodeConn = rpcOpen(&rpcInit); void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) { if (pDnodeConn == NULL) {
tscError("failed to init connection to server"); tscError("failed to init connection to server");
return NULL; return NULL;
@ -112,12 +117,12 @@ void destroyTscObj(void *pObj) {
SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType}; SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
pthread_mutex_destroy(&pTscObj->mutex); pthread_mutex_destroy(&pTscObj->mutex);
tfree(pTscObj); tfree(pTscObj);
} }
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo) { void *createTscObj(const char *user, const char *auth, const char *db, SAppInstInfo *pAppInfo) {
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj)); STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) { if (NULL == pObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -135,11 +140,11 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI
pthread_mutex_init(&pObj->mutex, NULL); pthread_mutex_init(&pObj->mutex, NULL);
pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->id = taosAddRef(clientConnRefPool, pObj);
tscDebug("connObj created, 0x%"PRIx64, pObj->id); tscDebug("connObj created, 0x%" PRIx64, pObj->id);
return pObj; return pObj;
} }
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) { void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
assert(pObj != NULL); assert(pObj != NULL);
SRequestObj *pRequest = (SRequestObj *)calloc(1, sizeof(SRequestObj)); SRequestObj *pRequest = (SRequestObj *)calloc(1, sizeof(SRequestObj));
@ -161,7 +166,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
return pRequest; return pRequest;
} }
static void doFreeReqResultInfo(SReqResultInfo* pResInfo) { static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
tfree(pResInfo->pRspMsg); tfree(pResInfo->pRspMsg);
tfree(pResInfo->length); tfree(pResInfo->length);
tfree(pResInfo->row); tfree(pResInfo->row);
@ -169,9 +174,9 @@ static void doFreeReqResultInfo(SReqResultInfo* pResInfo) {
tfree(pResInfo->fields); tfree(pResInfo->fields);
} }
static void doDestroyRequest(void* p) { static void doDestroyRequest(void *p) {
assert(p != NULL); assert(p != NULL);
SRequestObj* pRequest = (SRequestObj*)p; SRequestObj *pRequest = (SRequestObj *)p;
assert(RID_VALID(pRequest->self)); assert(RID_VALID(pRequest->self));
@ -190,7 +195,7 @@ static void doDestroyRequest(void* p) {
tfree(pRequest); tfree(pRequest);
} }
void destroyRequest(SRequestObj* pRequest) { void destroyRequest(SRequestObj *pRequest) {
if (pRequest == NULL) { if (pRequest == NULL) {
return; return;
} }
@ -269,7 +274,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscInfo("set config file directory:%s", str); tscInfo("set config file directory:%s", str);
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;
@ -284,7 +290,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscInfo("set shellActivityTimer:%d", tsShellActivityTimer); tscInfo("set shellActivityTimer:%d", tsShellActivityTimer);
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr); tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
} }
break; break;
@ -302,7 +309,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
char sep = '.'; char sep = '.';
if (strlen(tsLocale) == 0) { // locale does not set yet if (strlen(tsLocale) == 0) { // locale does not set yet
char* defaultLocale = setlocale(LC_CTYPE, ""); char *defaultLocale = setlocale(LC_CTYPE, "");
// The locale of the current OS does not be set correctly, so the default locale cannot be acquired. // The locale of the current OS does not be set correctly, so the default locale cannot be acquired.
// The launch of current system will abort soon. // The launch of current system will abort soon.
@ -352,7 +359,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
tscInfo("charset remains:%s", tsCharset); tscInfo("charset remains:%s", tsCharset);
} }
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;
} }
@ -382,7 +390,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
tscInfo("charset:%s not valid", str); tscInfo("charset:%s not valid", str);
} }
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;
@ -398,7 +407,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscDebug("timezone set:%s, input:%s by taos_options", tsTimezone, str); tscDebug("timezone set:%s, input:%s by taos_options", tsTimezone, str);
} else { } else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
} }
break; break;

View File

@ -363,7 +363,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
SConnectReq connectReq = {0}; SConnectReq connectReq = {0};
STscObj* pObj = pRequest->pTscObj; STscObj* pObj = pRequest->pTscObj;
@ -391,7 +390,9 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree(pMsgBody->msgInfo.pData); tfree(pMsgBody->msgInfo.pData);
tfree(pMsgBody); tfree(pMsgBody);
} }
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
assert(pMsg->ahandle != NULL); assert(pMsg->ahandle != NULL);

View File

@ -235,6 +235,7 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
FstState st = fstStateCreate(OneTrans); FstState st = fstStateCreate(OneTrans);
fstStateSetCommInput(&st, trn->inp); fstStateSetCommInput(&st, trn->inp);
bool null = false; bool null = false;
uint8_t inp = fstStateCommInput(&st, &null); uint8_t inp = fstStateCommInput(&st, &null);
if (null == true) { if (null == true) {
@ -936,6 +937,7 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) {
} }
void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); } void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); }
void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) { void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) {
FstLastTransition* trn = unNode->last; FstLastTransition* trn = unNode->last;
if (trn == NULL) { if (trn == NULL) {

View File

@ -16,24 +16,24 @@
#include "index_fst_automation.h" #include "index_fst_automation.h"
StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) { StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) {
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); StartWithStateValue* sv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { if (sv == NULL) {
return NULL; return NULL;
} }
nsv->kind = kind; sv->kind = kind;
nsv->type = ty; sv->type = ty;
if (ty == FST_INT) { if (ty == FST_INT) {
nsv->val = *(int*)val; sv->val = *(int*)val;
} else if (ty == FST_CHAR) { } else if (ty == FST_CHAR) {
size_t len = strlen((char*)val); size_t len = strlen((char*)val);
nsv->ptr = (char*)calloc(1, len + 1); sv->ptr = (char*)calloc(1, len + 1);
memcpy(nsv->ptr, val, len); memcpy(sv->ptr, val, len);
} else if (ty == FST_ARRAY) { } else if (ty == FST_ARRAY) {
// TODO, // TODO,
// nsv->arr = taosArrayFromList() // nsv->arr = taosArrayFromList()
} }
return nsv; return sv;
} }
void startWithStateValueDestroy(void* val) { void startWithStateValueDestroy(void* val) {
StartWithStateValue* sv = (StartWithStateValue*)val; StartWithStateValue* sv = (StartWithStateValue*)val;
@ -146,11 +146,9 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
if (atype == AUTOMATION_ALWAYS) { if (atype == AUTOMATION_ALWAYS) {
int val = 0; int val = 0;
sv = startWithStateValueCreate(Running, FST_INT, &val); sv = startWithStateValueCreate(Running, FST_INT, &val);
ctx->stdata = (void*)sv;
} else if (atype == AUTOMATION_PREFIX) { } else if (atype == AUTOMATION_PREFIX) {
int val = 0; int val = 0;
sv = startWithStateValueCreate(Running, FST_INT, &val); sv = startWithStateValueCreate(Running, FST_INT, &val);
ctx->stdata = (void*)sv;
} else if (atype == AUTMMATION_MATCH) { } else if (atype == AUTMMATION_MATCH) {
} else { } else {
// add more search type // add more search type
@ -160,9 +158,8 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
if (data != NULL) { if (data != NULL) {
char* src = (char*)data; char* src = (char*)data;
size_t len = strlen(src); size_t len = strlen(src);
dst = (char*)malloc(len * sizeof(char) + 1); dst = (char*)calloc(1, len * sizeof(char) + 1);
memcpy(dst, src, len); memcpy(dst, src, len);
dst[len] = 0;
} }
ctx->data = dst; ctx->data = dst;

View File

@ -99,6 +99,7 @@ class FstReadMemory {
fstSliceDestroy(&skey); fstSliceDestroy(&skey);
return ok; return ok;
} }
bool GetWithTimeCostUs(const std::string& key, uint64_t* val, uint64_t* elapse) { bool GetWithTimeCostUs(const std::string& key, uint64_t* val, uint64_t* elapse) {
int64_t s = taosGetTimestampUs(); int64_t s = taosGetTimestampUs();
bool ok = this->Get(key, val); bool ok = this->Get(key, val);
@ -120,8 +121,6 @@ class FstReadMemory {
printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out)); printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out));
swsResultDestroy(rt); swsResultDestroy(rt);
} }
for (size_t i = 0; i < result.size(); i++) {
}
std::cout << std::endl; std::cout << std::endl;
return true; return true;
} }
@ -137,7 +136,7 @@ class FstReadMemory {
fstDestroy(_fst); fstDestroy(_fst);
fstSliceDestroy(&_s); fstSliceDestroy(&_s);
writerCtxDestroy(_wc, false); writerCtxDestroy(_wc, false);
tfCleanup(); // tfCleanup();
} }
private: private:
@ -196,6 +195,10 @@ class TFst {
} }
return fr->Get(k, v); return fr->Get(k, v);
} }
bool Search(AutomationCtx* ctx, std::vector<uint64_t>& result) {
// add more
return fr->Search(ctx, result);
}
private: private:
FstWriter* fw; FstWriter* fw;
@ -229,5 +232,9 @@ TEST_F(FstEnv, writeNormal) {
assert(fst->Get("a", &val) == false); assert(fst->Get("a", &val) == false);
assert(fst->Get("aa", &val) == true); assert(fst->Get("aa", &val) == true);
assert(val == 0); assert(val == 0);
std::vector<uint64_t> rlt;
AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS);
assert(fst->Search(ctx, rlt) == true);
} }
TEST_F(FstEnv, writeExcpet) {} TEST_F(FstEnv, WriteMillonrRecord) {}

View File

@ -66,6 +66,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
bool (*pfp)(void* parent, tmsg_t msgType);
int32_t refCount; int32_t refCount;
void* parent; void* parent;

View File

@ -29,7 +29,12 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->label) { if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1); tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1);
} }
// register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp;
pRpc->pfp = pInit->pfp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} else { } else {

View File

@ -134,8 +134,7 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP || if (pRpc->pfp != NULL && (pRpc->pfp)(pRpc->parent, rpcMsg.msgType)) {
rpcMsg.msgType == TDMT_VND_RES_READY_RSP) {
rpcMsg.handle = conn; rpcMsg.handle = conn;
conn->persist = 1; conn->persist = 1;
tDebug("client conn %p persist by app", conn); tDebug("client conn %p persist by app", conn);
@ -186,17 +185,12 @@ static void clientHandleExcept(SCliConn* pConn) {
return; return;
} }
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
tmsg_t msgType = TDMT_MND_CONNECT;
if (pMsg != NULL) {
msgType = pMsg->msg.msgType;
}
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcMsg.msgType = msgType + 1; rpcMsg.msgType = pMsg->msg.msgType + 1;
if (pConn->push != NULL && pConn->ctnRdCnt != 0) { if (pConn->push != NULL && pConn->ctnRdCnt != 0) {
(*pConn->push->callback)(pConn->push->arg, &rpcMsg); (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
@ -445,7 +439,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
addrlen = sizeof(pConn->locaddr); addrlen = sizeof(pConn->locaddr);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
tTrace("client conn %p create", pConn); tTrace("client conn %p connect to server successfully", pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
clientWrite(pConn); clientWrite(pConn);
@ -524,6 +518,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
struct sockaddr_in addr; struct sockaddr_in addr;
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
// handle error in callback if fail to connect // handle error in callback if fail to connect
tTrace("client conn %p try to connect to %s:%d", conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
} }

View File

@ -413,11 +413,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
} else { } else {
uvStartSendResp(msg); uvStartSendResp(msg);
} }
// uv_buf_t wb;
// uvPrepareSendData(msg, &wb);
// uv_timer_stop(conn->pTimer);
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
} }
} }
static void uvAcceptAsyncCb(uv_async_t* async) { static void uvAcceptAsyncCb(uv_async_t* async) {
@ -490,7 +485,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn->pTimer->data = pConn; pConn->pTimer->data = pConn;
pConn->hostThrd = pThrd; pConn->hostThrd = pThrd;
// pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
// init client handle // init client handle
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
@ -730,14 +724,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
} }
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
// pthread_mutex_lock(&pThrd->msgMtx);
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
// pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("send quit msg to work thread"); tDebug("send quit msg to work thread");
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
// uv_async_send(pThrd->workerAsync);
} }
void taosCloseServer(void* arg) { void taosCloseServer(void* arg) {
@ -774,19 +763,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn; srvMsg->pConn = pConn;
srvMsg->msg = *pMsg; srvMsg->msg = *pMsg;
// pthread_mutex_lock(&pThrd->msgMtx);
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
// pthread_mutex_unlock(&pThrd->msgMtx);
tTrace("server conn %p start to send resp", pConn); tTrace("server conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
// uv_async_send(pThrd->workerAsync);
} }
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
SSrvConn* pConn = thandle; SSrvConn* pConn = thandle;
// struct sockaddr* pPeerName = &pConn->peername;
struct sockaddr_in addr = pConn->addr; struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr); pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);