fix: avoid invalid read/write
This commit is contained in:
parent
9888d086c2
commit
76b9a5acee
|
@ -63,11 +63,6 @@ typedef struct SRpcMsg {
|
||||||
} SRpcMsg;
|
} SRpcMsg;
|
||||||
|
|
||||||
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
|
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
|
||||||
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
|
||||||
///
|
|
||||||
// // SRpcMsg code
|
|
||||||
// REDIERE,
|
|
||||||
// NOT READY, EpSet
|
|
||||||
typedef bool (*RpcRfp)(int32_t code);
|
typedef bool (*RpcRfp)(int32_t code);
|
||||||
|
|
||||||
typedef struct SRpcInit {
|
typedef struct SRpcInit {
|
||||||
|
@ -80,18 +75,11 @@ typedef struct SRpcInit {
|
||||||
int idleTime; // milliseconds, 0 means idle timer is disabled
|
int idleTime; // milliseconds, 0 means idle timer is disabled
|
||||||
|
|
||||||
// the following is for client app ecurity only
|
// the following is for client app ecurity only
|
||||||
char *user; // user name
|
char *user; // user name
|
||||||
char spi; // security parameter index
|
|
||||||
char encrypt; // encrypt algorithm
|
|
||||||
char *secret; // key for authentication
|
|
||||||
char *ckey; // ciphering key
|
|
||||||
|
|
||||||
// call back to process incoming msg, code shall be ignored by server app
|
// call back to process incoming msg, code shall be ignored by server app
|
||||||
RpcCfp cfp;
|
RpcCfp cfp;
|
||||||
|
|
||||||
// call back to retrieve the client auth info, for server app only
|
|
||||||
RpcAfp afp;
|
|
||||||
|
|
||||||
// user defined retry func
|
// user defined retry func
|
||||||
RpcRfp rfp;
|
RpcRfp rfp;
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ static void registerRequest(SRequestObj *pRequest) {
|
||||||
static void deregisterRequest(SRequestObj *pRequest) {
|
static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
STscObj *pTscObj = pRequest->pTscObj;
|
STscObj * pTscObj = pRequest->pTscObj;
|
||||||
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
|
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||||
|
|
||||||
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
|
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
|
||||||
|
@ -91,7 +91,6 @@ static bool clientRpcRfp(int32_t code) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO refactor
|
// TODO refactor
|
||||||
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit;
|
||||||
|
@ -105,10 +104,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.user = (char *)user;
|
rpcInit.user = (char *)user;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.ckey = "key";
|
|
||||||
rpcInit.spi = 1;
|
|
||||||
rpcInit.secret = (char *)auth;
|
|
||||||
|
|
||||||
void *pDnodeConn = rpcOpen(&rpcInit);
|
void *pDnodeConn = rpcOpen(&rpcInit);
|
||||||
if (pDnodeConn == NULL) {
|
if (pDnodeConn == NULL) {
|
||||||
tscError("failed to init connection to server");
|
tscError("failed to init connection to server");
|
||||||
|
@ -318,7 +313,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SConfig *pCfg = taosGetCfg();
|
SConfig * pCfg = taosGetCfg();
|
||||||
SConfigItem *pItem = NULL;
|
SConfigItem *pItem = NULL;
|
||||||
|
|
||||||
switch (option) {
|
switch (option) {
|
||||||
|
|
|
@ -291,7 +291,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
|
|
||||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||||
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
||||||
pRequest->metric.start, &res);
|
pRequest->metric.start, &res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (pRequest->body.queryJob != 0) {
|
if (pRequest->body.queryJob != 0) {
|
||||||
schedulerFreeJob(pRequest->body.queryJob);
|
schedulerFreeJob(pRequest->body.queryJob);
|
||||||
|
@ -339,12 +339,11 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
|
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
|
||||||
SSubmitBlkRsp *blk = pRsp->pBlocks + i;
|
SSubmitBlkRsp* blk = pRsp->pBlocks + i;
|
||||||
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
|
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
|
||||||
taosArrayPush(pArray, &tbSver);
|
taosArrayPush(pArray, &tbSver);
|
||||||
}
|
}
|
||||||
} else if (TDMT_VND_QUERY == pRequest->type) {
|
} else if (TDMT_VND_QUERY == pRequest->type) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SCatalog* pCatalog = NULL;
|
SCatalog* pCatalog = NULL;
|
||||||
|
@ -369,7 +368,6 @@ void freeRequestRes(SRequestObj* pRequest, void* res) {
|
||||||
if (TDMT_VND_SUBMIT == pRequest->type) {
|
if (TDMT_VND_SUBMIT == pRequest->type) {
|
||||||
tFreeSSubmitRsp((SSubmitRsp*)res);
|
tFreeSSubmitRsp((SSubmitRsp*)res);
|
||||||
} else if (TDMT_VND_QUERY == pRequest->type) {
|
} else if (TDMT_VND_QUERY == pRequest->type) {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1022,7 +1020,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
|
||||||
SRpcInit rpcInit = {0};
|
SRpcInit rpcInit = {0};
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
|
|
||||||
taosEncryptPass_c((uint8_t*)("_pwd"), strlen("_pwd"), pass);
|
|
||||||
rpcInit.label = "CHK";
|
rpcInit.label = "CHK";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 1;
|
||||||
rpcInit.cfp = NULL;
|
rpcInit.cfp = NULL;
|
||||||
|
@ -1030,9 +1027,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.user = "_dnd";
|
rpcInit.user = "_dnd";
|
||||||
rpcInit.ckey = "_key";
|
|
||||||
rpcInit.spi = 1;
|
|
||||||
rpcInit.secret = pass;
|
|
||||||
|
|
||||||
clientRpc = rpcOpen(&rpcInit);
|
clientRpc = rpcOpen(&rpcInit);
|
||||||
if (clientRpc == NULL) {
|
if (clientRpc == NULL) {
|
||||||
|
|
|
@ -49,9 +49,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans * pTrans = &pDnode->trans;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg * pMsg = NULL;
|
||||||
bool needRelease = false;
|
bool needRelease = false;
|
||||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
||||||
SMgmtWrapper *pWrapper = NULL;
|
SMgmtWrapper *pWrapper = NULL;
|
||||||
|
@ -179,11 +179,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
|
|
||||||
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||||
SArray *pArray = (*pWrapper->func.getHandlesFp)();
|
SArray * pArray = (*pWrapper->func.getHandlesFp)();
|
||||||
if (pArray == NULL) return -1;
|
if (pArray == NULL) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
|
SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
|
||||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
||||||
if (pMgmt->needCheckVgId) {
|
if (pMgmt->needCheckVgId) {
|
||||||
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
||||||
|
@ -318,15 +318,9 @@ int32_t dmInitClient(SDnode *pDnode) {
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.user = INTERNAL_USER;
|
rpcInit.user = INTERNAL_USER;
|
||||||
rpcInit.ckey = INTERNAL_CKEY;
|
|
||||||
rpcInit.spi = 1;
|
|
||||||
rpcInit.parent = pDnode;
|
rpcInit.parent = pDnode;
|
||||||
rpcInit.rfp = rpcRfp;
|
rpcInit.rfp = rpcRfp;
|
||||||
|
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
|
||||||
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
|
|
||||||
rpcInit.secret = pass;
|
|
||||||
|
|
||||||
pTrans->clientRpc = rpcOpen(&rpcInit);
|
pTrans->clientRpc = rpcOpen(&rpcInit);
|
||||||
if (pTrans->clientRpc == NULL) {
|
if (pTrans->clientRpc == NULL) {
|
||||||
dError("failed to init dnode rpc client");
|
dError("failed to init dnode rpc client");
|
||||||
|
|
|
@ -48,10 +48,10 @@ void TestClient::DoInit() {
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.idleTime = 30 * 1000;
|
rpcInit.idleTime = 30 * 1000;
|
||||||
rpcInit.user = (char*)this->user;
|
rpcInit.user = (char*)this->user;
|
||||||
rpcInit.ckey = (char*)"key";
|
// rpcInit.ckey = (char*)"key";
|
||||||
rpcInit.parent = this;
|
rpcInit.parent = this;
|
||||||
rpcInit.secret = (char*)secretEncrypt;
|
// rpcInit.secret = (char*)secretEncrypt;
|
||||||
rpcInit.spi = 1;
|
// rpcInit.spi = 1;
|
||||||
|
|
||||||
clientRpc = rpcOpen(&rpcInit);
|
clientRpc = rpcOpen(&rpcInit);
|
||||||
ASSERT(clientRpc);
|
ASSERT(clientRpc);
|
||||||
|
|
|
@ -27,16 +27,16 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
typedef struct SUdfdContext {
|
typedef struct SUdfdContext {
|
||||||
uv_loop_t *loop;
|
uv_loop_t * loop;
|
||||||
uv_pipe_t ctrlPipe;
|
uv_pipe_t ctrlPipe;
|
||||||
uv_signal_t intrSignal;
|
uv_signal_t intrSignal;
|
||||||
char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
|
char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
|
||||||
uv_pipe_t listeningPipe;
|
uv_pipe_t listeningPipe;
|
||||||
|
|
||||||
void *clientRpc;
|
void * clientRpc;
|
||||||
SCorEpSet mgmtEp;
|
SCorEpSet mgmtEp;
|
||||||
uv_mutex_t udfsMutex;
|
uv_mutex_t udfsMutex;
|
||||||
SHashObj *udfsHash;
|
SHashObj * udfsHash;
|
||||||
|
|
||||||
bool printVersion;
|
bool printVersion;
|
||||||
} SUdfdContext;
|
} SUdfdContext;
|
||||||
|
@ -45,7 +45,7 @@ SUdfdContext global;
|
||||||
|
|
||||||
typedef struct SUdfdUvConn {
|
typedef struct SUdfdUvConn {
|
||||||
uv_stream_t *client;
|
uv_stream_t *client;
|
||||||
char *inputBuf;
|
char * inputBuf;
|
||||||
int32_t inputLen;
|
int32_t inputLen;
|
||||||
int32_t inputCap;
|
int32_t inputCap;
|
||||||
int32_t inputTotal;
|
int32_t inputTotal;
|
||||||
|
@ -65,25 +65,25 @@ typedef struct SUdf {
|
||||||
uv_mutex_t lock;
|
uv_mutex_t lock;
|
||||||
uv_cond_t condReady;
|
uv_cond_t condReady;
|
||||||
|
|
||||||
char name[TSDB_FUNC_NAME_LEN];
|
char name[TSDB_FUNC_NAME_LEN];
|
||||||
int8_t funcType;
|
int8_t funcType;
|
||||||
int8_t scriptType;
|
int8_t scriptType;
|
||||||
int8_t outputType;
|
int8_t outputType;
|
||||||
int32_t outputLen;
|
int32_t outputLen;
|
||||||
int32_t bufSize;
|
int32_t bufSize;
|
||||||
|
|
||||||
char path[PATH_MAX];
|
char path[PATH_MAX];
|
||||||
|
|
||||||
uv_lib_t lib;
|
uv_lib_t lib;
|
||||||
|
|
||||||
TUdfScalarProcFunc scalarProcFunc;
|
TUdfScalarProcFunc scalarProcFunc;
|
||||||
|
|
||||||
TUdfAggStartFunc aggStartFunc;
|
TUdfAggStartFunc aggStartFunc;
|
||||||
TUdfAggProcessFunc aggProcFunc;
|
TUdfAggProcessFunc aggProcFunc;
|
||||||
TUdfAggFinishFunc aggFinishFunc;
|
TUdfAggFinishFunc aggFinishFunc;
|
||||||
|
|
||||||
TUdfInitFunc initFunc;
|
TUdfInitFunc initFunc;
|
||||||
TUdfDestroyFunc destroyFunc;
|
TUdfDestroyFunc destroyFunc;
|
||||||
} SUdf;
|
} SUdf;
|
||||||
|
|
||||||
// TODO: add private udf structure.
|
// TODO: add private udf structure.
|
||||||
|
@ -98,9 +98,9 @@ typedef enum EUdfdRpcReqRspType {
|
||||||
|
|
||||||
typedef struct SUdfdRpcSendRecvInfo {
|
typedef struct SUdfdRpcSendRecvInfo {
|
||||||
EUdfdRpcReqRspType rpcType;
|
EUdfdRpcReqRspType rpcType;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
void* param;
|
void * param;
|
||||||
uv_sem_t resultSem;
|
uv_sem_t resultSem;
|
||||||
} SUdfdRpcSendRecvInfo;
|
} SUdfdRpcSendRecvInfo;
|
||||||
|
|
||||||
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
@ -136,7 +136,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
|
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
|
||||||
|
|
||||||
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
||||||
SUdf* udf = msgInfo->param;
|
SUdf * udf = msgInfo->param;
|
||||||
udf->funcType = pFuncInfo->funcType;
|
udf->funcType = pFuncInfo->funcType;
|
||||||
udf->scriptType = pFuncInfo->scriptType;
|
udf->scriptType = pFuncInfo->scriptType;
|
||||||
udf->outputType = pFuncInfo->outputType;
|
udf->outputType = pFuncInfo->outputType;
|
||||||
|
@ -145,7 +145,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
|
||||||
char path[PATH_MAX] = {0};
|
char path[PATH_MAX] = {0};
|
||||||
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
|
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
|
||||||
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
TdFilePtr file =
|
||||||
|
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
||||||
// TODO check for failure of flush to disk
|
// TODO check for failure of flush to disk
|
||||||
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
|
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
|
||||||
taosCloseFile(&file);
|
taosCloseFile(&file);
|
||||||
|
@ -168,11 +169,11 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
||||||
taosArrayPush(retrieveReq.pFuncNames, udfName);
|
taosArrayPush(retrieveReq.pFuncNames, udfName);
|
||||||
|
|
||||||
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
||||||
void *pReq = rpcMallocCont(contLen);
|
void * pReq = rpcMallocCont(contLen);
|
||||||
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
||||||
taosArrayDestroy(retrieveReq.pFuncNames);
|
taosArrayDestroy(retrieveReq.pFuncNames);
|
||||||
|
|
||||||
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
||||||
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
|
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
|
||||||
msgInfo->param = udf;
|
msgInfo->param = udf;
|
||||||
uv_sem_init(&msgInfo->resultSem, 0);
|
uv_sem_init(&msgInfo->resultSem, 0);
|
||||||
|
@ -194,7 +195,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
||||||
int32_t udfdConnectToMnode() {
|
int32_t udfdConnectToMnode() {
|
||||||
SConnectReq connReq = {0};
|
SConnectReq connReq = {0};
|
||||||
connReq.connType = CONN_TYPE__UDFD;
|
connReq.connType = CONN_TYPE__UDFD;
|
||||||
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
|
tstrncpy(connReq.app, "udfd", sizeof(connReq.app));
|
||||||
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
|
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
||||||
|
@ -203,7 +204,7 @@ int32_t udfdConnectToMnode() {
|
||||||
connReq.startTime = htobe64(taosGetTimestampMs());
|
connReq.startTime = htobe64(taosGetTimestampMs());
|
||||||
|
|
||||||
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
|
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
|
||||||
void* pReq = rpcMallocCont(contLen);
|
void * pReq = rpcMallocCont(contLen);
|
||||||
tSerializeSConnectReq(pReq, contLen, &connReq);
|
tSerializeSConnectReq(pReq, contLen, &connReq);
|
||||||
|
|
||||||
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
||||||
|
@ -240,17 +241,17 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
char initFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
|
char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
|
||||||
char *initSuffix = "_init";
|
char *initSuffix = "_init";
|
||||||
strcpy(initFuncName, udfName);
|
strcpy(initFuncName, udfName);
|
||||||
strncat(initFuncName, initSuffix, strlen(initSuffix));
|
strncat(initFuncName, initSuffix, strlen(initSuffix));
|
||||||
uv_dlsym(&udf->lib, initFuncName, (void**)(&udf->initFunc));
|
uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc));
|
||||||
|
|
||||||
char destroyFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
|
char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
|
||||||
char *destroySuffix = "_destroy";
|
char *destroySuffix = "_destroy";
|
||||||
strcpy(destroyFuncName, udfName);
|
strcpy(destroyFuncName, udfName);
|
||||||
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
|
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
|
||||||
uv_dlsym(&udf->lib, destroyFuncName, (void**)(&udf->destroyFunc));
|
uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc));
|
||||||
|
|
||||||
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
|
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
|
||||||
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||||
|
@ -270,87 +271,86 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
|
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
|
||||||
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
|
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
|
||||||
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
|
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
|
||||||
//TODO: merge
|
// TODO: merge
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
// TODO: tracable id from client. connect, setup, call, teardown
|
// TODO: tracable id from client. connect, setup, call, teardown
|
||||||
fnInfo( "setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
|
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
|
||||||
SUdfSetupRequest *setup = &request->setup;
|
SUdfSetupRequest *setup = &request->setup;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SUdf *udf = NULL;
|
SUdf * udf = NULL;
|
||||||
uv_mutex_lock(&global.udfsMutex);
|
uv_mutex_lock(&global.udfsMutex);
|
||||||
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
|
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
|
||||||
if (udfInHash) {
|
if (udfInHash) {
|
||||||
++(*udfInHash)->refCount;
|
++(*udfInHash)->refCount;
|
||||||
udf = *udfInHash;
|
udf = *udfInHash;
|
||||||
uv_mutex_unlock(&global.udfsMutex);
|
uv_mutex_unlock(&global.udfsMutex);
|
||||||
} else {
|
} else {
|
||||||
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
||||||
udfNew->refCount = 1;
|
udfNew->refCount = 1;
|
||||||
udfNew->state = UDF_STATE_INIT;
|
udfNew->state = UDF_STATE_INIT;
|
||||||
|
|
||||||
uv_mutex_init(&udfNew->lock);
|
uv_mutex_init(&udfNew->lock);
|
||||||
uv_cond_init(&udfNew->condReady);
|
uv_cond_init(&udfNew->condReady);
|
||||||
udf = udfNew;
|
udf = udfNew;
|
||||||
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
|
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
|
||||||
uv_mutex_unlock(&global.udfsMutex);
|
uv_mutex_unlock(&global.udfsMutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
uv_mutex_lock(&udf->lock);
|
||||||
|
if (udf->state == UDF_STATE_INIT) {
|
||||||
|
udf->state = UDF_STATE_LOADING;
|
||||||
|
code = udfdLoadUdf(setup->udfName, udf);
|
||||||
|
if (udf->initFunc) {
|
||||||
|
udf->initFunc();
|
||||||
}
|
}
|
||||||
|
udf->state = UDF_STATE_READY;
|
||||||
uv_mutex_lock(&udf->lock);
|
uv_cond_broadcast(&udf->condReady);
|
||||||
if (udf->state == UDF_STATE_INIT) {
|
uv_mutex_unlock(&udf->lock);
|
||||||
udf->state = UDF_STATE_LOADING;
|
} else {
|
||||||
code = udfdLoadUdf(setup->udfName, udf);
|
while (udf->state != UDF_STATE_READY) {
|
||||||
if (udf->initFunc) {
|
uv_cond_wait(&udf->condReady, &udf->lock);
|
||||||
udf->initFunc();
|
|
||||||
}
|
|
||||||
udf->state = UDF_STATE_READY;
|
|
||||||
uv_cond_broadcast(&udf->condReady);
|
|
||||||
uv_mutex_unlock(&udf->lock);
|
|
||||||
} else {
|
|
||||||
while (udf->state != UDF_STATE_READY) {
|
|
||||||
uv_cond_wait(&udf->condReady, &udf->lock);
|
|
||||||
}
|
|
||||||
uv_mutex_unlock(&udf->lock);
|
|
||||||
}
|
}
|
||||||
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
uv_mutex_unlock(&udf->lock);
|
||||||
handle->udf = udf;
|
}
|
||||||
|
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
||||||
|
handle->udf = udf;
|
||||||
|
|
||||||
SUdfResponse rsp;
|
SUdfResponse rsp;
|
||||||
rsp.seqNum = request->seqNum;
|
rsp.seqNum = request->seqNum;
|
||||||
rsp.type = request->type;
|
rsp.type = request->type;
|
||||||
rsp.code = code;
|
rsp.code = code;
|
||||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||||
rsp.setupRsp.outputType = udf->outputType;
|
rsp.setupRsp.outputType = udf->outputType;
|
||||||
rsp.setupRsp.outputLen = udf->outputLen;
|
rsp.setupRsp.outputLen = udf->outputLen;
|
||||||
rsp.setupRsp.bufSize = udf->bufSize;
|
rsp.setupRsp.bufSize = udf->bufSize;
|
||||||
|
|
||||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||||
rsp.msgLen = len;
|
rsp.msgLen = len;
|
||||||
void *bufBegin = taosMemoryMalloc(len);
|
void *bufBegin = taosMemoryMalloc(len);
|
||||||
void *buf = bufBegin;
|
void *buf = bufBegin;
|
||||||
encodeUdfResponse(&buf, &rsp);
|
encodeUdfResponse(&buf, &rsp);
|
||||||
|
|
||||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||||
|
|
||||||
taosMemoryFree(uvUdf->input.base);
|
taosMemoryFree(uvUdf->input.base);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
SUdfCallRequest *call = &request->call;
|
SUdfCallRequest *call = &request->call;
|
||||||
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType,
|
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle);
|
||||||
call->udfHandle);
|
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
|
||||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
SUdf * udf = handle->udf;
|
||||||
SUdf *udf = handle->udf;
|
SUdfResponse response = {0};
|
||||||
SUdfResponse response = {0};
|
SUdfResponse * rsp = &response;
|
||||||
SUdfResponse *rsp = &response;
|
|
||||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
switch(call->callType) {
|
switch (call->callType) {
|
||||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||||
SUdfColumn output = {0};
|
SUdfColumn output = {0};
|
||||||
|
|
||||||
|
@ -363,9 +363,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_UDF_CALL_AGG_INIT: {
|
case TSDB_UDF_CALL_AGG_INIT: {
|
||||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||||
.bufLen= udf->bufSize,
|
|
||||||
.numOfResult = 0};
|
|
||||||
udf->aggStartFunc(&outBuf);
|
udf->aggStartFunc(&outBuf);
|
||||||
subRsp->resultBuf = outBuf;
|
subRsp->resultBuf = outBuf;
|
||||||
break;
|
break;
|
||||||
|
@ -373,9 +371,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
case TSDB_UDF_CALL_AGG_PROC: {
|
case TSDB_UDF_CALL_AGG_PROC: {
|
||||||
SUdfDataBlock input = {0};
|
SUdfDataBlock input = {0};
|
||||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||||
.bufLen= udf->bufSize,
|
|
||||||
.numOfResult = 0};
|
|
||||||
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
||||||
freeUdfInterBuf(&call->interBuf);
|
freeUdfInterBuf(&call->interBuf);
|
||||||
freeUdfDataDataBlock(&input);
|
freeUdfDataDataBlock(&input);
|
||||||
|
@ -384,9 +380,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_UDF_CALL_AGG_FIN: {
|
case TSDB_UDF_CALL_AGG_FIN: {
|
||||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
||||||
.bufLen= udf->bufSize,
|
|
||||||
.numOfResult = 0};
|
|
||||||
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
|
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
|
||||||
freeUdfInterBuf(&call->interBuf);
|
freeUdfInterBuf(&call->interBuf);
|
||||||
subRsp->resultBuf = outBuf;
|
subRsp->resultBuf = outBuf;
|
||||||
|
@ -429,20 +423,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(uvUdf->input.base);
|
taosMemoryFree(uvUdf->input.base);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
|
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
SUdfTeardownRequest *teardown = &request->teardown;
|
SUdfTeardownRequest *teardown = &request->teardown;
|
||||||
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
||||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
||||||
SUdf *udf = handle->udf;
|
SUdf * udf = handle->udf;
|
||||||
bool unloadUdf = false;
|
bool unloadUdf = false;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
uv_mutex_lock(&global.udfsMutex);
|
uv_mutex_lock(&global.udfsMutex);
|
||||||
udf->refCount--;
|
udf->refCount--;
|
||||||
|
@ -568,7 +561,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdHandleRequest(SUdfdUvConn *conn) {
|
void udfdHandleRequest(SUdfdUvConn *conn) {
|
||||||
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
|
uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t));
|
||||||
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
|
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
|
||||||
udfWork->client = conn->client;
|
udfWork->client = conn->client;
|
||||||
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
|
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
|
||||||
|
@ -653,11 +646,11 @@ static bool udfdRpcRfp(int32_t code) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
|
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
|
||||||
pEpSet->version = 0;
|
pEpSet->version = 0;
|
||||||
|
|
||||||
// init mnode ip set
|
// init mnode ip set
|
||||||
SEpSet* mgmtEpSet = &(pEpSet->epSet);
|
SEpSet *mgmtEpSet = &(pEpSet->epSet);
|
||||||
mgmtEpSet->numOfEps = 0;
|
mgmtEpSet->numOfEps = 0;
|
||||||
mgmtEpSet->inUse = 0;
|
mgmtEpSet->inUse = 0;
|
||||||
|
|
||||||
|
@ -694,7 +687,6 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t udfdOpenClientRpc() {
|
int32_t udfdOpenClientRpc() {
|
||||||
SRpcInit rpcInit = {0};
|
SRpcInit rpcInit = {0};
|
||||||
rpcInit.label = "UDFD";
|
rpcInit.label = "UDFD";
|
||||||
|
@ -704,15 +696,9 @@ int32_t udfdOpenClientRpc() {
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.user = TSDB_DEFAULT_USER;
|
rpcInit.user = TSDB_DEFAULT_USER;
|
||||||
rpcInit.ckey = "key";
|
|
||||||
rpcInit.spi = 1;
|
|
||||||
rpcInit.parent = &global;
|
rpcInit.parent = &global;
|
||||||
rpcInit.rfp = udfdRpcRfp;
|
rpcInit.rfp = udfdRpcRfp;
|
||||||
|
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
|
||||||
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
|
||||||
rpcInit.secret = pass;
|
|
||||||
|
|
||||||
global.clientRpc = rpcOpen(&rpcInit);
|
global.clientRpc = rpcOpen(&rpcInit);
|
||||||
if (global.clientRpc == NULL) {
|
if (global.clientRpc == NULL) {
|
||||||
fnError("failed to init dnode rpc client");
|
fnError("failed to init dnode rpc client");
|
||||||
|
@ -823,7 +809,7 @@ static int32_t udfdUvInit() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void udfdCloseWalkCb(uv_handle_t* handle, void* arg) {
|
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
|
||||||
if (!uv_is_closing(handle)) {
|
if (!uv_is_closing(handle)) {
|
||||||
uv_close(handle, NULL);
|
uv_close(handle, NULL);
|
||||||
}
|
}
|
||||||
|
@ -883,7 +869,7 @@ int main(int argc, char *argv[]) {
|
||||||
int32_t retryMnodeTimes = 0;
|
int32_t retryMnodeTimes = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
while (retryMnodeTimes++ < TSDB_MAX_REPLICA) {
|
while (retryMnodeTimes++ < TSDB_MAX_REPLICA) {
|
||||||
uv_sleep(500 * ( 1 << retryMnodeTimes));
|
uv_sleep(500 * (1 << retryMnodeTimes));
|
||||||
code = udfdConnectToMnode();
|
code = udfdConnectToMnode();
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -183,9 +183,6 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
|
||||||
rpcInit.sessions = 100;
|
rpcInit.sessions = 100;
|
||||||
rpcInit.idleTime = 100;
|
rpcInit.idleTime = 100;
|
||||||
rpcInit.user = "sync-io";
|
rpcInit.user = "sync-io";
|
||||||
rpcInit.secret = "sync-io";
|
|
||||||
rpcInit.ckey = "key";
|
|
||||||
rpcInit.spi = 0;
|
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
|
||||||
io->clientRpc = rpcOpen(&rpcInit);
|
io->clientRpc = rpcOpen(&rpcInit);
|
||||||
|
@ -206,7 +203,6 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
|
||||||
rpcInit.cfp = syncIOProcessRequest;
|
rpcInit.cfp = syncIOProcessRequest;
|
||||||
rpcInit.sessions = 1000;
|
rpcInit.sessions = 1000;
|
||||||
rpcInit.idleTime = 2 * 1500;
|
rpcInit.idleTime = 2 * 1500;
|
||||||
rpcInit.afp = syncIOAuth;
|
|
||||||
rpcInit.parent = io;
|
rpcInit.parent = io;
|
||||||
rpcInit.connType = TAOS_CONN_SERVER;
|
rpcInit.connType = TAOS_CONN_SERVER;
|
||||||
|
|
||||||
|
|
|
@ -52,23 +52,15 @@ typedef struct {
|
||||||
int idleTime; // milliseconds;
|
int idleTime; // milliseconds;
|
||||||
uint16_t localPort;
|
uint16_t localPort;
|
||||||
int8_t connType;
|
int8_t connType;
|
||||||
int64_t index;
|
|
||||||
char label[TSDB_LABEL_LEN];
|
char label[TSDB_LABEL_LEN];
|
||||||
|
char user[TSDB_UNI_LEN]; // meter ID
|
||||||
char user[TSDB_UNI_LEN]; // meter ID
|
|
||||||
char spi; // security parameter index
|
|
||||||
char encrypt; // encrypt algorithm
|
|
||||||
char secret[TSDB_PASSWORD_LEN]; // secret for the link
|
|
||||||
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
|
|
||||||
|
|
||||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||||
bool (*retry)(int32_t code);
|
bool (*retry)(int32_t code);
|
||||||
|
int index;
|
||||||
|
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
void* parent;
|
void* parent;
|
||||||
void* idPool; // handle to ID pool
|
|
||||||
void* tmrCtrl; // handle to timer
|
|
||||||
SHashObj* hash; // handle returned by hash utility
|
|
||||||
void* tcphandle; // returned handle from TCP initialization
|
void* tcphandle; // returned handle from TCP initialization
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
} SRpcInfo;
|
} SRpcInfo;
|
||||||
|
|
|
@ -69,9 +69,6 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
if (pInit->user) {
|
if (pInit->user) {
|
||||||
memcpy(pRpc->user, pInit->user, strlen(pInit->user));
|
memcpy(pRpc->user, pInit->user, strlen(pInit->user));
|
||||||
}
|
}
|
||||||
if (pInit->secret) {
|
|
||||||
memcpy(pRpc->secret, pInit->secret, strlen(pInit->secret));
|
|
||||||
}
|
|
||||||
return pRpc;
|
return pRpc;
|
||||||
}
|
}
|
||||||
void rpcClose(void* arg) {
|
void rpcClose(void* arg) {
|
||||||
|
|
|
@ -134,7 +134,6 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.cfp = processRequestMsg;
|
rpcInit.cfp = processRequestMsg;
|
||||||
rpcInit.sessions = 1000;
|
rpcInit.sessions = 1000;
|
||||||
rpcInit.idleTime = 2 * 1500;
|
rpcInit.idleTime = 2 * 1500;
|
||||||
rpcInit.afp = retrieveAuthInfo;
|
|
||||||
|
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
|
|
|
@ -118,9 +118,6 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.sessions = 100;
|
rpcInit.sessions = 100;
|
||||||
rpcInit.idleTime = 100;
|
rpcInit.idleTime = 100;
|
||||||
rpcInit.user = "michael";
|
rpcInit.user = "michael";
|
||||||
rpcInit.secret = secret;
|
|
||||||
rpcInit.ckey = "key";
|
|
||||||
rpcInit.spi = 1;
|
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcDebugFlag = 131;
|
rpcDebugFlag = 131;
|
||||||
|
|
||||||
|
@ -144,9 +141,7 @@ int main(int argc, char *argv[]) {
|
||||||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||||
rpcInit.user = argv[++i];
|
rpcInit.user = argv[++i];
|
||||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||||
rpcInit.secret = argv[++i];
|
|
||||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||||
rpcInit.spi = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||||
rpcDebugFlag = atoi(argv[++i]);
|
rpcDebugFlag = atoi(argv[++i]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -160,8 +155,6 @@ int main(int argc, char *argv[]) {
|
||||||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||||
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||||
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
|
||||||
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
|
||||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||||
printf(" [-h help]: print out this help\n\n");
|
printf(" [-h help]: print out this help\n\n");
|
||||||
exit(0);
|
exit(0);
|
||||||
|
|
|
@ -123,7 +123,6 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.cfp = processRequestMsg;
|
rpcInit.cfp = processRequestMsg;
|
||||||
rpcInit.sessions = 1000;
|
rpcInit.sessions = 1000;
|
||||||
rpcInit.idleTime = 2 * 1500;
|
rpcInit.idleTime = 2 * 1500;
|
||||||
rpcInit.afp = retrieveAuthInfo;
|
|
||||||
|
|
||||||
rpcDebugFlag = 131;
|
rpcDebugFlag = 131;
|
||||||
|
|
||||||
|
|
|
@ -21,15 +21,15 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int index;
|
int index;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
int num;
|
int num;
|
||||||
int numOfReqs;
|
int numOfReqs;
|
||||||
int msgSize;
|
int msgSize;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
tsem_t * pOverSem;
|
tsem_t * pOverSem;
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
void * pRpc;
|
void * pRpc;
|
||||||
} SInfo;
|
} SInfo;
|
||||||
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
|
SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
|
||||||
|
@ -103,7 +103,7 @@ int main(int argc, char *argv[]) {
|
||||||
char secret[20] = "mypassword";
|
char secret[20] = "mypassword";
|
||||||
struct timeval systemTime;
|
struct timeval systemTime;
|
||||||
int64_t startTime, endTime;
|
int64_t startTime, endTime;
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
|
|
||||||
// server info
|
// server info
|
||||||
epSet.inUse = 0;
|
epSet.inUse = 0;
|
||||||
|
@ -119,9 +119,6 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.sessions = 100;
|
rpcInit.sessions = 100;
|
||||||
rpcInit.idleTime = 100;
|
rpcInit.idleTime = 100;
|
||||||
rpcInit.user = "michael";
|
rpcInit.user = "michael";
|
||||||
rpcInit.secret = secret;
|
|
||||||
rpcInit.ckey = "key";
|
|
||||||
rpcInit.spi = 1;
|
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
|
@ -144,9 +141,7 @@ int main(int argc, char *argv[]) {
|
||||||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||||
rpcInit.user = argv[++i];
|
rpcInit.user = argv[++i];
|
||||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||||
rpcInit.secret = argv[++i];
|
|
||||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||||
rpcInit.spi = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||||
rpcDebugFlag = atoi(argv[++i]);
|
rpcDebugFlag = atoi(argv[++i]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -160,8 +155,6 @@ int main(int argc, char *argv[]) {
|
||||||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||||
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||||
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
|
||||||
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
|
||||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||||
printf(" [-h help]: print out this help\n\n");
|
printf(" [-h help]: print out this help\n\n");
|
||||||
exit(0);
|
exit(0);
|
||||||
|
|
|
@ -50,9 +50,6 @@ class Client {
|
||||||
rpcInit_.numOfThreads = nThread;
|
rpcInit_.numOfThreads = nThread;
|
||||||
rpcInit_.cfp = processResp;
|
rpcInit_.cfp = processResp;
|
||||||
rpcInit_.user = (char *)user;
|
rpcInit_.user = (char *)user;
|
||||||
rpcInit_.secret = (char *)secret;
|
|
||||||
rpcInit_.ckey = (char *)ckey;
|
|
||||||
rpcInit_.spi = 1;
|
|
||||||
rpcInit_.parent = this;
|
rpcInit_.parent = this;
|
||||||
rpcInit_.connType = TAOS_CONN_CLIENT;
|
rpcInit_.connType = TAOS_CONN_CLIENT;
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
|
@ -117,9 +114,6 @@ class Server {
|
||||||
rpcInit_.numOfThreads = 5;
|
rpcInit_.numOfThreads = 5;
|
||||||
rpcInit_.cfp = processReq;
|
rpcInit_.cfp = processReq;
|
||||||
rpcInit_.user = (char *)user;
|
rpcInit_.user = (char *)user;
|
||||||
rpcInit_.secret = (char *)secret;
|
|
||||||
rpcInit_.ckey = (char *)ckey;
|
|
||||||
rpcInit_.spi = 1;
|
|
||||||
rpcInit_.connType = TAOS_CONN_SERVER;
|
rpcInit_.connType = TAOS_CONN_SERVER;
|
||||||
}
|
}
|
||||||
void Start() {
|
void Start() {
|
||||||
|
|
|
@ -31,9 +31,6 @@ static void shellWorkAsClient() {
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.user = "_dnd";
|
rpcInit.user = "_dnd";
|
||||||
rpcInit.ckey = "_key";
|
|
||||||
rpcInit.spi = 1;
|
|
||||||
rpcInit.secret = pass;
|
|
||||||
|
|
||||||
clientRpc = rpcOpen(&rpcInit);
|
clientRpc = rpcOpen(&rpcInit);
|
||||||
if (clientRpc == NULL) {
|
if (clientRpc == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue