Merge pull request #10604 from taosdata/feature/index_oper
Feature/index oper
This commit is contained in:
commit
0595d9f1a6
|
@ -64,7 +64,6 @@ typedef struct SRpcInit {
|
||||||
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
||||||
int idleTime; // milliseconds, 0 means idle timer is disabled
|
int idleTime; // milliseconds, 0 means idle timer is disabled
|
||||||
|
|
||||||
bool noPool; // create conn pool or not
|
|
||||||
// the following is for client app ecurity only
|
// the following is for client app ecurity only
|
||||||
char *user; // user name
|
char *user; // user name
|
||||||
char spi; // security parameter index
|
char spi; // security parameter index
|
||||||
|
@ -86,7 +85,7 @@ typedef struct SRpcInit {
|
||||||
|
|
||||||
int32_t rpcInit();
|
int32_t rpcInit();
|
||||||
void rpcCleanup();
|
void rpcCleanup();
|
||||||
void *rpcOpen(const SRpcInit *pRpc);
|
void * rpcOpen(const SRpcInit *pRpc);
|
||||||
void rpcClose(void *);
|
void rpcClose(void *);
|
||||||
void * rpcMallocCont(int contLen);
|
void * rpcMallocCont(int contLen);
|
||||||
void rpcFreeCont(void *pCont);
|
void rpcFreeCont(void *pCont);
|
||||||
|
@ -99,6 +98,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
|
||||||
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
||||||
void rpcCancelRequest(int64_t rid);
|
void rpcCancelRequest(int64_t rid);
|
||||||
|
|
||||||
|
void rpcRefHandle(void *handle, int8_t type);
|
||||||
|
void rpcUnrefHandle(void *handle, int8_t type);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -72,8 +72,6 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
taosReleaseRef(clientConnRefPool, pTscObj->id);
|
taosReleaseRef(clientConnRefPool, pTscObj->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// todo close the transporter properly
|
// todo close the transporter properly
|
||||||
void closeTransporter(STscObj *pTscObj) {
|
void closeTransporter(STscObj *pTscObj) {
|
||||||
if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
|
if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
|
||||||
|
@ -241,6 +239,7 @@ void taos_init_imp(void) {
|
||||||
clientConnRefPool = taosOpenRef(200, destroyTscObj);
|
clientConnRefPool = taosOpenRef(200, destroyTscObj);
|
||||||
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
|
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
|
||||||
|
|
||||||
|
// transDestroyBuffer(&conn->readBuf);
|
||||||
taosGetAppName(appInfo.appName, NULL);
|
taosGetAppName(appInfo.appName, NULL);
|
||||||
pthread_mutex_init(&appInfo.mutex, NULL);
|
pthread_mutex_init(&appInfo.mutex, NULL);
|
||||||
|
|
||||||
|
|
|
@ -215,8 +215,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
|
||||||
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||||
|
|
||||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||||
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
|
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -230,12 +232,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
|
|
||||||
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
|
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||||
pRequest->body.resInfo.numOfRows = res.numOfRows;
|
pRequest->body.resInfo.numOfRows = res.numOfRows;
|
||||||
|
|
||||||
if (pRequest->body.queryJob != 0) {
|
if (pRequest->body.queryJob != 0) {
|
||||||
schedulerFreeJob(pRequest->body.queryJob);
|
schedulerFreeJob(pRequest->body.queryJob);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code = res.code;
|
pRequest->code = res.code;
|
||||||
return pRequest->code;
|
return pRequest->code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,8 @@
|
||||||
#include "dndMnode.h"
|
#include "dndMnode.h"
|
||||||
#include "dndVnodes.h"
|
#include "dndVnodes.h"
|
||||||
|
|
||||||
#define INTERNAL_USER "_dnd"
|
#define INTERNAL_USER "_dnd"
|
||||||
#define INTERNAL_CKEY "_key"
|
#define INTERNAL_CKEY "_key"
|
||||||
#define INTERNAL_SECRET "_pwd"
|
#define INTERNAL_SECRET "_pwd"
|
||||||
|
|
||||||
static void dndInitMsgFp(STransMgmt *pMgmt) {
|
static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
|
@ -156,7 +156,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
||||||
SDnode *pDnode = parent;
|
SDnode * pDnode = parent;
|
||||||
STransMgmt *pMgmt = &pDnode->tmgmt;
|
STransMgmt *pMgmt = &pDnode->tmgmt;
|
||||||
|
|
||||||
tmsg_t msgType = pRsp->msgType;
|
tmsg_t msgType = pRsp->msgType;
|
||||||
|
@ -194,7 +194,6 @@ static int32_t dndInitClient(SDnode *pDnode) {
|
||||||
rpcInit.ckey = INTERNAL_CKEY;
|
rpcInit.ckey = INTERNAL_CKEY;
|
||||||
rpcInit.spi = 1;
|
rpcInit.spi = 1;
|
||||||
rpcInit.parent = pDnode;
|
rpcInit.parent = pDnode;
|
||||||
rpcInit.noPool = true;
|
|
||||||
|
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
|
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
|
||||||
|
@ -220,7 +219,7 @@ static void dndCleanupClient(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
||||||
SDnode *pDnode = param;
|
SDnode * pDnode = param;
|
||||||
STransMgmt *pMgmt = &pDnode->tmgmt;
|
STransMgmt *pMgmt = &pDnode->tmgmt;
|
||||||
|
|
||||||
tmsg_t msgType = pReq->msgType;
|
tmsg_t msgType = pReq->msgType;
|
||||||
|
@ -314,7 +313,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
|
||||||
SAuthReq authReq = {0};
|
SAuthReq authReq = {0};
|
||||||
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
||||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
||||||
void *pReq = rpcMallocCont(contLen);
|
void * pReq = rpcMallocCont(contLen);
|
||||||
tSerializeSAuthReq(pReq, contLen, &authReq);
|
tSerializeSAuthReq(pReq, contLen, &authReq);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
|
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
|
||||||
|
|
|
@ -341,6 +341,8 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
// TODO: iterator mem and tidex
|
// TODO: iterator mem and tidex
|
||||||
STermValueType s = kTypeValue;
|
STermValueType s = kTypeValue;
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
SIdxTempResult* tr = sIdxTempResultCreate();
|
SIdxTempResult* tr = sIdxTempResultCreate();
|
||||||
if (0 == indexCacheSearch(cache, query, tr, &s)) {
|
if (0 == indexCacheSearch(cache, query, tr, &s)) {
|
||||||
if (s == kTypeDeletion) {
|
if (s == kTypeDeletion) {
|
||||||
|
@ -348,17 +350,23 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
// coloum already drop by other oper, no need to query tindex
|
// coloum already drop by other oper, no need to query tindex
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
st = taosGetTimestampUs();
|
||||||
if (0 != indexTFileSearch(sIdx->tindex, query, tr)) {
|
if (0 != indexTFileSearch(sIdx->tindex, query, tr)) {
|
||||||
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
|
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
int64_t tfCost = taosGetTimestampUs() - st;
|
||||||
|
indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
|
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
int64_t cost = taosGetTimestampUs() - st;
|
||||||
|
indexInfo("search cost: %" PRIu64 "us", cost);
|
||||||
|
|
||||||
sIdxTempResultMergeTo(*result, tr);
|
sIdxTempResultMergeTo(*result, tr);
|
||||||
|
|
||||||
sIdxTempResultDestroy(tr);
|
sIdxTempResultDestroy(tr);
|
||||||
return 0;
|
return 0;
|
||||||
END:
|
END:
|
||||||
|
|
|
@ -276,7 +276,12 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI
|
||||||
} else if (c->operaType == DEL_VALUE) {
|
} else if (c->operaType == DEL_VALUE) {
|
||||||
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
} else if (qtype == QUERY_PREFIX) {
|
||||||
|
} else if (qtype == QUERY_SUFFIX) {
|
||||||
|
} else if (qtype == QUERY_RANGE) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -284,6 +289,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
|
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -312,12 +318,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result
|
||||||
// continue search in imm
|
// continue search in imm
|
||||||
ret = indexQueryMem(imm, &ct, qtype, result, s);
|
ret = indexQueryMem(imm, &ct, qtype, result, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasJson) {
|
if (hasJson) {
|
||||||
tfree(p);
|
tfree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
indexMemUnRef(mem);
|
indexMemUnRef(mem);
|
||||||
indexMemUnRef(imm);
|
indexMemUnRef(imm);
|
||||||
|
indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,8 +189,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul
|
||||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
||||||
EIndexQueryType qtype = query->qType;
|
EIndexQueryType qtype = query->qType;
|
||||||
|
|
||||||
SArray* result = taosArrayInit(16, sizeof(uint64_t));
|
// SArray* result = taosArrayInit(16, sizeof(uint64_t));
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
// refactor to callback later
|
// refactor to callback later
|
||||||
if (qtype == QUERY_TERM) {
|
if (qtype == QUERY_TERM) {
|
||||||
uint64_t offset;
|
uint64_t offset;
|
||||||
|
@ -200,11 +200,18 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul
|
||||||
p = indexPackJsonData(term);
|
p = indexPackJsonData(term);
|
||||||
sz = strlen(p);
|
sz = strlen(p);
|
||||||
}
|
}
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
FstSlice key = fstSliceCreate(p, sz);
|
FstSlice key = fstSliceCreate(p, sz);
|
||||||
if (fstGet(reader->fst, &key, &offset)) {
|
if (fstGet(reader->fst, &key, &offset)) {
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName,
|
int64_t et = taosGetTimestampUs();
|
||||||
term->colVal);
|
int64_t cost = et - st;
|
||||||
ret = tfileReaderLoadTableIds(reader, offset, result);
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
|
||||||
|
term->suid, term->colName, term->colVal, cost);
|
||||||
|
|
||||||
|
ret = tfileReaderLoadTableIds(reader, offset, tr->total);
|
||||||
|
cost = taosGetTimestampUs() - et;
|
||||||
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", term->suid,
|
||||||
|
term->colName, term->colVal, cost);
|
||||||
} else {
|
} else {
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
|
||||||
term->colVal);
|
term->colVal);
|
||||||
|
@ -225,8 +232,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul
|
||||||
}
|
}
|
||||||
tfileReaderUnRef(reader);
|
tfileReaderUnRef(reader);
|
||||||
|
|
||||||
taosArrayAddAll(tr->total, result);
|
// taosArrayAddAll(tr->total, result);
|
||||||
taosArrayDestroy(result);
|
// taosArrayDestroy(result);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -391,6 +398,7 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
IndexTFile* pTfile = tfile;
|
IndexTFile* pTfile = tfile;
|
||||||
|
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
|
@ -399,6 +407,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result
|
||||||
if (reader == NULL) {
|
if (reader == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
int64_t cost = taosGetTimestampUs() - st;
|
||||||
|
indexInfo("index tfile stage 1 cost: %" PRId64 "", cost);
|
||||||
|
|
||||||
return tfileReaderSearch(reader, query, result);
|
return tfileReaderSearch(reader, query, result);
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,7 +258,7 @@ void checkFstCheckIterator() {
|
||||||
// prefix search
|
// prefix search
|
||||||
std::vector<uint64_t> result;
|
std::vector<uint64_t> result;
|
||||||
|
|
||||||
AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS);
|
AutomationCtx* ctx = automCtxCreate((void*)"H", AUTOMATION_PREFIX);
|
||||||
m->Search(ctx, result);
|
m->Search(ctx, result);
|
||||||
std::cout << "size: " << result.size() << std::endl;
|
std::cout << "size: " << result.size() << std::endl;
|
||||||
// assert(result.size() == count);
|
// assert(result.size() == count);
|
||||||
|
@ -328,11 +328,11 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
|
||||||
int main(int argc, char* argv[]) {
|
int main(int argc, char* argv[]) {
|
||||||
// tool to check all kind of fst test
|
// tool to check all kind of fst test
|
||||||
// if (argc > 1) { validateTFile(argv[1]); }
|
// if (argc > 1) { validateTFile(argv[1]); }
|
||||||
if (argc > 4) {
|
// if (argc > 4) {
|
||||||
// path suid colName ver
|
// path suid colName ver
|
||||||
iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
|
// iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
|
||||||
}
|
//}
|
||||||
// checkFstCheckIterator();
|
checkFstCheckIterator();
|
||||||
// checkFstLongTerm();
|
// checkFstLongTerm();
|
||||||
// checkFstPrefixSearch();
|
// checkFstPrefixSearch();
|
||||||
|
|
||||||
|
|
|
@ -768,7 +768,7 @@ class IndexObj {
|
||||||
int64_t s = taosGetTimestampUs();
|
int64_t s = taosGetTimestampUs();
|
||||||
if (Search(mq, result) == 0) {
|
if (Search(mq, result) == 0) {
|
||||||
int64_t e = taosGetTimestampUs();
|
int64_t e = taosGetTimestampUs();
|
||||||
std::cout << "search one successfully and time cost:" << e - s << "\tquery col:" << colName
|
std::cout << "search one successfully and time cost:" << e - s << "us\tquery col:" << colName
|
||||||
<< "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl;
|
<< "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl;
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
|
@ -1106,6 +1106,7 @@ TEST_F(IndexEnv2, testIndex_del) {
|
||||||
}
|
}
|
||||||
index->Del("tag10", "Hello", 12);
|
index->Del("tag10", "Hello", 12);
|
||||||
index->Del("tag10", "Hello", 11);
|
index->Del("tag10", "Hello", 11);
|
||||||
|
EXPECT_EQ(98, index->SearchOne("tag10", "Hello"));
|
||||||
|
|
||||||
index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000);
|
index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000);
|
||||||
index->Del("tag10", "Hello", 17);
|
index->Del("tag10", "Hello", 17);
|
||||||
|
|
|
@ -121,9 +121,8 @@ typedef struct {
|
||||||
} SRpcReqContext;
|
} SRpcReqContext;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRpcInfo* pTransInst; // associated SRpcInfo
|
SEpSet epSet; // ip list provided by app
|
||||||
SEpSet epSet; // ip list provided by app
|
void* ahandle; // handle provided by app
|
||||||
void* ahandle; // handle provided by app
|
|
||||||
// struct SRpcConn* pConn; // pConn allocated
|
// struct SRpcConn* pConn; // pConn allocated
|
||||||
tmsg_t msgType; // message type
|
tmsg_t msgType; // message type
|
||||||
uint8_t* pCont; // content provided by app
|
uint8_t* pCont; // content provided by app
|
||||||
|
@ -242,8 +241,12 @@ int transDestroyBuffer(SConnBuffer* buf);
|
||||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
||||||
bool transReadComplete(SConnBuffer* connBuf);
|
bool transReadComplete(SConnBuffer* connBuf);
|
||||||
|
|
||||||
// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen);
|
int transSetConnOption(uv_tcp_t* stream);
|
||||||
|
|
||||||
// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool );
|
void transRefSrvHandle(void* handle);
|
||||||
|
void transUnrefSrvHandle(void* handle);
|
||||||
|
|
||||||
|
void transRefCliHandle(void* handle);
|
||||||
|
void transUnrefCliHandle(void* handle);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -54,7 +54,6 @@ typedef struct {
|
||||||
int8_t connType;
|
int8_t connType;
|
||||||
int64_t index;
|
int64_t index;
|
||||||
char label[TSDB_LABEL_LEN];
|
char label[TSDB_LABEL_LEN];
|
||||||
bool noPool; // pool or not
|
|
||||||
|
|
||||||
char user[TSDB_UNI_LEN]; // meter ID
|
char user[TSDB_UNI_LEN]; // meter ID
|
||||||
char spi; // security parameter index
|
char spi; // security parameter index
|
||||||
|
|
|
@ -64,7 +64,6 @@ typedef struct {
|
||||||
void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
|
void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
|
||||||
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
||||||
|
|
||||||
bool noPool;
|
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
void * parent;
|
void * parent;
|
||||||
void * idPool; // handle to ID pool
|
void * idPool; // handle to ID pool
|
||||||
|
|
|
@ -41,7 +41,6 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
pRpc->numOfThreads = pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRpc->noPool = pInit->noPool;
|
|
||||||
pRpc->connType = pInit->connType;
|
pRpc->connType = pInit->connType;
|
||||||
pRpc->idleTime = pInit->idleTime;
|
pRpc->idleTime = pInit->idleTime;
|
||||||
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||||
|
@ -122,4 +121,17 @@ void rpcCleanup(void) {
|
||||||
//
|
//
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
|
||||||
|
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
|
||||||
|
|
||||||
|
void rpcRefHandle(void* handle, int8_t type) {
|
||||||
|
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
||||||
|
(*taosRefHandle[type])(handle);
|
||||||
|
}
|
||||||
|
void rpcUnrefHandle(void* handle, int8_t type) {
|
||||||
|
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
||||||
|
(*taosUnRefHandle[type])(handle);
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -20,7 +20,10 @@
|
||||||
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||||
|
|
||||||
|
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
|
||||||
|
|
||||||
typedef struct SCliConn {
|
typedef struct SCliConn {
|
||||||
|
T_REF_DECLARE()
|
||||||
uv_connect_t connReq;
|
uv_connect_t connReq;
|
||||||
uv_stream_t* stream;
|
uv_stream_t* stream;
|
||||||
uv_write_t* writeReq;
|
uv_write_t* writeReq;
|
||||||
|
@ -32,8 +35,7 @@ typedef struct SCliConn {
|
||||||
int8_t ctnRdCnt; // continue read count
|
int8_t ctnRdCnt; // continue read count
|
||||||
int hThrdIdx;
|
int hThrdIdx;
|
||||||
|
|
||||||
SRpcPush* push;
|
int persist; //
|
||||||
int persist; //
|
|
||||||
// spi configure
|
// spi configure
|
||||||
char spi;
|
char spi;
|
||||||
char secured;
|
char secured;
|
||||||
|
@ -41,6 +43,7 @@ typedef struct SCliConn {
|
||||||
// debug and log info
|
// debug and log info
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
struct sockaddr_in locaddr;
|
struct sockaddr_in locaddr;
|
||||||
|
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
typedef struct SCliMsg {
|
typedef struct SCliMsg {
|
||||||
|
@ -54,14 +57,17 @@ typedef struct SCliThrdObj {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
// uv_async_t* cliAsync; //
|
// uv_async_t* cliAsync; //
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
uv_timer_t* timer;
|
uv_timer_t* timer;
|
||||||
void* pool; // conn pool
|
void* pool; // conn pool
|
||||||
|
|
||||||
|
// msg queue
|
||||||
queue msg;
|
queue msg;
|
||||||
pthread_mutex_t msgMtx;
|
pthread_mutex_t msgMtx;
|
||||||
uint64_t nextTimeout; // next timeout
|
|
||||||
void* pTransInst; //
|
uint64_t nextTimeout; // next timeout
|
||||||
bool quit;
|
void* pTransInst; //
|
||||||
|
bool quit;
|
||||||
} SCliThrdObj;
|
} SCliThrdObj;
|
||||||
|
|
||||||
typedef struct SClientObj {
|
typedef struct SClientObj {
|
||||||
|
@ -96,7 +102,7 @@ static void clientAsyncCb(uv_async_t* handle);
|
||||||
static void clientDestroy(uv_handle_t* handle);
|
static void clientDestroy(uv_handle_t* handle);
|
||||||
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
||||||
|
|
||||||
// process data read from server, auth/decompress etc later
|
// process data read from server, add decompress etc later
|
||||||
static void clientHandleResp(SCliConn* conn);
|
static void clientHandleResp(SCliConn* conn);
|
||||||
// handle except about conn
|
// handle except about conn
|
||||||
static void clientHandleExcept(SCliConn* conn);
|
static void clientHandleExcept(SCliConn* conn);
|
||||||
|
@ -104,9 +110,10 @@ static void clientHandleExcept(SCliConn* conn);
|
||||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
static void clientSendQuit(SCliThrdObj* thrd);
|
static void clientSendQuit(SCliThrdObj* thrd);
|
||||||
|
|
||||||
static void destroyUserdata(SRpcMsg* userdata);
|
static void destroyUserdata(SRpcMsg* userdata);
|
||||||
|
|
||||||
|
static int clientRBChoseIdx(SRpcInfo* pTransInst);
|
||||||
|
|
||||||
static void destroyCmsg(SCliMsg* cmsg);
|
static void destroyCmsg(SCliMsg* cmsg);
|
||||||
static void transDestroyConnCtx(STransConnCtx* ctx);
|
static void transDestroyConnCtx(STransConnCtx* ctx);
|
||||||
// thread obj
|
// thread obj
|
||||||
|
@ -115,10 +122,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
// thread
|
// thread
|
||||||
static void* clientThread(void* arg);
|
static void* clientThread(void* arg);
|
||||||
|
|
||||||
static void clientHandleResp(SCliConn* conn) {
|
static void* clientNotifyApp() {}
|
||||||
|
static void clientHandleResp(SCliConn* conn) {
|
||||||
SCliMsg* pMsg = conn->data;
|
SCliMsg* pMsg = conn->data;
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
SRpcInfo* pRpc = pCtx->pTransInst;
|
|
||||||
|
SCliThrdObj* pThrd = conn->hostThrd;
|
||||||
|
SRpcInfo* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
|
@ -134,56 +144,51 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
rpcMsg.msgType = pHead->msgType;
|
rpcMsg.msgType = pHead->msgType;
|
||||||
rpcMsg.ahandle = pCtx->ahandle;
|
rpcMsg.ahandle = pCtx->ahandle;
|
||||||
|
|
||||||
if (pRpc->pfp != NULL && (pRpc->pfp)(pRpc->parent, rpcMsg.msgType)) {
|
if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
|
||||||
rpcMsg.handle = conn;
|
rpcMsg.handle = conn;
|
||||||
conn->persist = 1;
|
conn->persist = 1;
|
||||||
tDebug("client conn %p persist by app", conn);
|
tDebug("client conn %p persist by app", conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pRpc->label, conn,
|
tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
||||||
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||||
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen);
|
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen);
|
||||||
|
|
||||||
conn->secured = pHead->secured;
|
conn->secured = pHead->secured;
|
||||||
if (conn->push != NULL && conn->ctnRdCnt != 0) {
|
|
||||||
(*conn->push->callback)(conn->push->arg, &rpcMsg);
|
if (pCtx->pSem == NULL) {
|
||||||
conn->push = NULL;
|
tTrace("%s client conn %p handle resp", pTransInst->label, conn);
|
||||||
|
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
|
||||||
} else {
|
} else {
|
||||||
if (pCtx->pSem == NULL) {
|
tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn);
|
||||||
tTrace("%s client conn %p handle resp", pRpc->label, conn);
|
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
||||||
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
|
tsem_post(pCtx->pSem);
|
||||||
} else {
|
|
||||||
tTrace("%s client conn(sync) %p handle resp", pRpc->label, conn);
|
|
||||||
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
|
||||||
tsem_post(pCtx->pSem);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
conn->ctnRdCnt += 1;
|
conn->ctnRdCnt += 1;
|
||||||
|
|
||||||
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
|
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
|
||||||
|
|
||||||
SCliThrdObj* pThrd = conn->hostThrd;
|
|
||||||
|
|
||||||
// user owns conn->persist = 1
|
// user owns conn->persist = 1
|
||||||
if (conn->push == NULL && conn->persist == 0) {
|
if (conn->persist == 0) {
|
||||||
if (pRpc->noPool == true) {
|
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
||||||
} else {
|
|
||||||
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
destroyCmsg(conn->data);
|
destroyCmsg(conn->data);
|
||||||
conn->data = NULL;
|
conn->data = NULL;
|
||||||
|
|
||||||
// start thread's timer of conn pool if not active
|
// start thread's timer of conn pool if not active
|
||||||
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
|
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pTransInst->idleTime > 0) {
|
||||||
// uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
// uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void clientHandleExcept(SCliConn* pConn) {
|
static void clientHandleExcept(SCliConn* pConn) {
|
||||||
if (pConn->data == NULL && pConn->push == NULL) {
|
if (pConn->data == NULL) {
|
||||||
// handle conn except in conn pool
|
// handle conn except in conn pool
|
||||||
clientConnDestroy(pConn, true);
|
clientConnDestroy(pConn, true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||||
|
SRpcInfo* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
SCliMsg* pMsg = pConn->data;
|
SCliMsg* pMsg = pConn->data;
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
|
@ -192,29 +197,19 @@ static void clientHandleExcept(SCliConn* pConn) {
|
||||||
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
rpcMsg.msgType = pMsg->msg.msgType + 1;
|
rpcMsg.msgType = pMsg->msg.msgType + 1;
|
||||||
|
|
||||||
if (pConn->push != NULL && pConn->ctnRdCnt != 0) {
|
if (pCtx->pSem == NULL) {
|
||||||
(*pConn->push->callback)(pConn->push->arg, &rpcMsg);
|
tTrace("%s client conn %p handle resp", pTransInst->label, pConn);
|
||||||
pConn->push = NULL;
|
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
|
||||||
} else {
|
} else {
|
||||||
if (pCtx->pSem == NULL) {
|
tTrace("%s client conn(sync) %p handle resp", pTransInst->label, pConn);
|
||||||
(pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
|
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
||||||
} else {
|
tsem_post(pCtx->pSem);
|
||||||
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
|
||||||
tsem_post(pCtx->pSem);
|
|
||||||
}
|
|
||||||
if (pConn->push != NULL) {
|
|
||||||
(*pConn->push->callback)(pConn->push->arg, &rpcMsg);
|
|
||||||
}
|
|
||||||
pConn->push = NULL;
|
|
||||||
}
|
}
|
||||||
tTrace("%s client conn %p start to destroy", pCtx->pTransInst->label, pConn);
|
destroyCmsg(pConn->data);
|
||||||
if (pConn->push == NULL) {
|
pConn->data = NULL;
|
||||||
destroyCmsg(pConn->data);
|
|
||||||
pConn->data = NULL;
|
tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
}
|
|
||||||
// transDestroyConnCtx(pCtx);
|
|
||||||
clientConnDestroy(pConn, true);
|
clientConnDestroy(pConn, true);
|
||||||
pConn->ctnRdCnt += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientTimeoutCb(uv_timer_t* handle) {
|
static void clientTimeoutCb(uv_timer_t* handle) {
|
||||||
|
@ -316,17 +311,14 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
pBuf->len += nread;
|
pBuf->len += nread;
|
||||||
if (transReadComplete(pBuf)) {
|
if (transReadComplete(pBuf)) {
|
||||||
tTrace("client conn %p read complete", conn);
|
tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||||
clientHandleResp(conn);
|
clientHandleResp(conn);
|
||||||
} else {
|
} else {
|
||||||
tTrace("client conn %p read partial packet, continue to read", conn);
|
tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (nread == UV_EOF) {
|
|
||||||
tError("client conn %p read error: %s", conn, uv_err_name(nread));
|
|
||||||
clientHandleExcept(conn);
|
|
||||||
}
|
|
||||||
assert(nread <= 0);
|
assert(nread <= 0);
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
|
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
|
||||||
|
@ -335,18 +327,16 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
tError("client conn %p read error: %s", conn, uv_err_name(nread));
|
tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
|
||||||
clientHandleExcept(conn);
|
clientHandleExcept(conn);
|
||||||
}
|
}
|
||||||
// tDebug("Read error %s\n", uv_err_name(nread));
|
|
||||||
// uv_close((uv_handle_t*)handle, clientDestroy);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientConnDestroy(SCliConn* conn, bool clear) {
|
static void clientConnDestroy(SCliConn* conn, bool clear) {
|
||||||
//
|
//
|
||||||
conn->ref--;
|
conn->ref--;
|
||||||
if (conn->ref == 0) {
|
if (conn->ref == 0) {
|
||||||
tTrace("client conn %p remove from conn pool", conn);
|
tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||||
QUEUE_REMOVE(&conn->conn);
|
QUEUE_REMOVE(&conn->conn);
|
||||||
if (clear) {
|
if (clear) {
|
||||||
uv_close((uv_handle_t*)conn->stream, clientDestroy);
|
uv_close((uv_handle_t*)conn->stream, clientDestroy);
|
||||||
|
@ -355,20 +345,18 @@ static void clientConnDestroy(SCliConn* conn, bool clear) {
|
||||||
}
|
}
|
||||||
static void clientDestroy(uv_handle_t* handle) {
|
static void clientDestroy(uv_handle_t* handle) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
// transDestroyBuffer(&conn->readBuf);
|
|
||||||
|
|
||||||
free(conn->stream);
|
free(conn->stream);
|
||||||
free(conn->writeReq);
|
free(conn->writeReq);
|
||||||
tTrace("client conn %p destroy successfully", conn);
|
tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
free(conn);
|
free(conn);
|
||||||
|
|
||||||
// clientConnDestroy(conn, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientWriteCb(uv_write_t* req, int status) {
|
static void clientWriteCb(uv_write_t* req, int status) {
|
||||||
SCliConn* pConn = req->data;
|
SCliConn* pConn = req->data;
|
||||||
|
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
tTrace("client conn %p data already was written out", pConn);
|
tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
SCliMsg* pMsg = pConn->data;
|
SCliMsg* pMsg = pConn->data;
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
// handle
|
// handle
|
||||||
|
@ -376,18 +364,19 @@ static void clientWriteCb(uv_write_t* req, int status) {
|
||||||
}
|
}
|
||||||
destroyUserdata(&pMsg->msg);
|
destroyUserdata(&pMsg->msg);
|
||||||
} else {
|
} else {
|
||||||
tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
|
tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
|
||||||
clientHandleExcept(pConn);
|
clientHandleExcept(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
|
||||||
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
|
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientWrite(SCliConn* pConn) {
|
static void clientWrite(SCliConn* pConn) {
|
||||||
SCliMsg* pCliMsg = pConn->data;
|
SCliMsg* pCliMsg = pConn->data;
|
||||||
STransConnCtx* pCtx = pCliMsg->ctx;
|
STransConnCtx* pCtx = pCliMsg->ctx;
|
||||||
SRpcInfo* pTransInst = pCtx->pTransInst;
|
|
||||||
|
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||||
|
SRpcInfo* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
|
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
|
||||||
|
|
||||||
|
@ -416,20 +405,18 @@ static void clientWrite(SCliConn* pConn) {
|
||||||
pHead->msgType = pMsg->msgType;
|
pHead->msgType = pMsg->msgType;
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
|
|
||||||
// if (pHead->msgType == TDMT_VND_QUERY || pHead->msgType == TDMT_VND_)
|
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||||
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
ntohs(pConn->locaddr.sin_port));
|
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
|
|
||||||
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
|
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
|
||||||
}
|
}
|
||||||
static void clientConnCb(uv_connect_t* req, int status) {
|
static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
// impl later
|
// impl later
|
||||||
SCliConn* pConn = req->data;
|
SCliConn* pConn = req->data;
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
|
tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
|
||||||
tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
|
|
||||||
clientHandleExcept(pConn);
|
clientHandleExcept(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -439,7 +426,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
addrlen = sizeof(pConn->locaddr);
|
addrlen = sizeof(pConn->locaddr);
|
||||||
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
|
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
|
||||||
|
|
||||||
tTrace("client conn %p connect to server successfully", pConn);
|
tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
|
|
||||||
assert(pConn->stream == req->handle);
|
assert(pConn->stream == req->handle);
|
||||||
clientWrite(pConn);
|
clientWrite(pConn);
|
||||||
|
@ -458,25 +445,20 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
uint64_t et = taosGetTimestampUs();
|
uint64_t et = taosGetTimestampUs();
|
||||||
uint64_t el = et - pMsg->st;
|
uint64_t el = et - pMsg->st;
|
||||||
tTrace("client msg tran time cost: %" PRIu64 "us", el);
|
tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el);
|
||||||
et = taosGetTimestampUs();
|
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
SRpcInfo* pTransInst = pThrd->pTransInst;
|
||||||
|
SCliConn* conn = NULL;
|
||||||
|
|
||||||
SCliConn* conn = NULL;
|
if (pMsg->msg.handle != NULL) {
|
||||||
if (pMsg->msg.handle == NULL) {
|
|
||||||
if (pCtx->pTransInst->noPool == true) {
|
|
||||||
} else {
|
|
||||||
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
|
||||||
}
|
|
||||||
if (conn != NULL) {
|
|
||||||
tTrace("client conn %p get from conn pool", conn);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
conn = (SCliConn*)(pMsg->msg.handle);
|
conn = (SCliConn*)(pMsg->msg.handle);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
tTrace("client conn %p reused", conn);
|
tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
||||||
|
if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
|
@ -489,7 +471,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
clientWrite(conn);
|
clientWrite(conn);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
conn = calloc(1, sizeof(SCliConn));
|
conn = calloc(1, sizeof(SCliConn));
|
||||||
conn->ref++;
|
conn->ref++;
|
||||||
|
@ -497,12 +478,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
||||||
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
||||||
conn->stream->data = conn;
|
conn->stream->data = conn;
|
||||||
uv_tcp_nodelay((uv_tcp_t*)conn->stream, 1);
|
|
||||||
int ret = uv_tcp_keepalive((uv_tcp_t*)conn->stream, 1, 1);
|
|
||||||
if (ret) {
|
|
||||||
tTrace("client conn %p failed to set keepalive, %s", conn, uv_err_name(ret));
|
|
||||||
}
|
|
||||||
// write req handle
|
|
||||||
conn->writeReq = malloc(sizeof(uv_write_t));
|
conn->writeReq = malloc(sizeof(uv_write_t));
|
||||||
conn->writeReq->data = conn;
|
conn->writeReq->data = conn;
|
||||||
|
|
||||||
|
@ -512,17 +488,17 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
conn->hostThrd = pThrd;
|
conn->hostThrd = pThrd;
|
||||||
|
|
||||||
// conn->push = pMsg->msg.push;
|
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
||||||
// conn->ctnRdCnt = 0;
|
if (ret) {
|
||||||
|
tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
||||||
|
}
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||||
// handle error in callback if fail to connect
|
// handle error in callback if fail to connect
|
||||||
tTrace("client conn %p try to connect to %s:%d", conn, pMsg->ctx->ip, pMsg->ctx->port);
|
tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
||||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
conn->push = pMsg->msg.push;
|
|
||||||
conn->ctnRdCnt = 0;
|
conn->ctnRdCnt = 0;
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
}
|
}
|
||||||
|
@ -548,7 +524,6 @@ static void clientAsyncCb(uv_async_t* handle) {
|
||||||
} else {
|
} else {
|
||||||
clientHandleReq(pMsg, pThrd);
|
clientHandleReq(pMsg, pThrd);
|
||||||
}
|
}
|
||||||
// clientHandleReq(pMsg, pThrd);
|
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
if (count >= 2) {
|
if (count >= 2) {
|
||||||
|
@ -643,8 +618,6 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
|
||||||
static void clientSendQuit(SCliThrdObj* thrd) {
|
static void clientSendQuit(SCliThrdObj* thrd) {
|
||||||
// cli can stop gracefully
|
// cli can stop gracefully
|
||||||
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
||||||
msg->ctx = NULL; //
|
|
||||||
|
|
||||||
transSendAsync(thrd->asyncPool, &msg->q);
|
transSendAsync(thrd->asyncPool, &msg->q);
|
||||||
}
|
}
|
||||||
void taosCloseClient(void* arg) {
|
void taosCloseClient(void* arg) {
|
||||||
|
@ -656,37 +629,53 @@ void taosCloseClient(void* arg) {
|
||||||
free(cli->pThreadObj);
|
free(cli->pThreadObj);
|
||||||
free(cli);
|
free(cli);
|
||||||
}
|
}
|
||||||
static int clientRBChoseIdx(SRpcInfo* pRpc) {
|
static int clientRBChoseIdx(SRpcInfo* pTransInst) {
|
||||||
int64_t index = pRpc->index;
|
int64_t index = pTransInst->index;
|
||||||
if (pRpc->index++ >= pRpc->numOfThreads) {
|
if (pTransInst->index++ >= pTransInst->numOfThreads) {
|
||||||
pRpc->index = 0;
|
pTransInst->index = 0;
|
||||||
}
|
}
|
||||||
return index % pRpc->numOfThreads;
|
return index % pTransInst->numOfThreads;
|
||||||
|
}
|
||||||
|
void transRefCliHandle(void* handle) {
|
||||||
|
if (handle == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int ref = T_REF_INC((SCliConn*)handle);
|
||||||
|
UNUSED(ref);
|
||||||
|
}
|
||||||
|
void transUnrefCliHandle(void* handle) {
|
||||||
|
if (handle == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int ref = T_REF_DEC((SCliConn*)handle);
|
||||||
|
if (ref == 0) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// unref cli handle
|
||||||
}
|
}
|
||||||
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||||
// impl later
|
// impl later
|
||||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||||
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
||||||
|
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
SRpcInfo* pTransInst = (SRpcInfo*)shandle;
|
||||||
|
|
||||||
int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
|
int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = clientRBChoseIdx(pRpc);
|
index = clientRBChoseIdx(pTransInst);
|
||||||
}
|
}
|
||||||
int32_t flen = 0;
|
int32_t flen = 0;
|
||||||
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
|
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
|
||||||
// imp later
|
// imp later
|
||||||
}
|
}
|
||||||
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
||||||
pCtx->pTransInst = (SRpcInfo*)shandle;
|
|
||||||
pCtx->ahandle = pMsg->ahandle;
|
pCtx->ahandle = pMsg->ahandle;
|
||||||
pCtx->msgType = pMsg->msgType;
|
pCtx->msgType = pMsg->msgType;
|
||||||
pCtx->ip = strdup(ip);
|
pCtx->ip = strdup(ip);
|
||||||
pCtx->port = port;
|
pCtx->port = port;
|
||||||
pCtx->hThrdIdx = index;
|
pCtx->hThrdIdx = index;
|
||||||
|
|
||||||
assert(pRpc->connType == TAOS_CONN_CLIENT);
|
assert(pTransInst->connType == TAOS_CONN_CLIENT);
|
||||||
// atomic or not
|
// atomic or not
|
||||||
|
|
||||||
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
|
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
|
||||||
|
@ -694,7 +683,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
||||||
cliMsg->msg = *pMsg;
|
cliMsg->msg = *pMsg;
|
||||||
cliMsg->st = taosGetTimestampUs();
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -702,15 +691,14 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||||
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
||||||
|
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
SRpcInfo* pTransInst = (SRpcInfo*)shandle;
|
||||||
|
|
||||||
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = clientRBChoseIdx(pRpc);
|
index = clientRBChoseIdx(pTransInst);
|
||||||
}
|
}
|
||||||
|
|
||||||
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
||||||
pCtx->pTransInst = (SRpcInfo*)shandle;
|
|
||||||
pCtx->ahandle = pReq->ahandle;
|
pCtx->ahandle = pReq->ahandle;
|
||||||
pCtx->msgType = pReq->msgType;
|
pCtx->msgType = pReq->msgType;
|
||||||
pCtx->ip = strdup(ip);
|
pCtx->ip = strdup(ip);
|
||||||
|
@ -725,7 +713,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||||
cliMsg->msg = *pReq;
|
cliMsg->msg = *pReq;
|
||||||
cliMsg->st = taosGetTimestampUs();
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
tsem_t* pSem = pCtx->pSem;
|
tsem_t* pSem = pCtx->pSem;
|
||||||
tsem_wait(pSem);
|
tsem_wait(pSem);
|
||||||
|
|
|
@ -257,6 +257,12 @@ int transDestroyBuffer(SConnBuffer* buf) {
|
||||||
transClearBuffer(buf);
|
transClearBuffer(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int transSetConnOption(uv_tcp_t* stream) {
|
||||||
|
uv_tcp_nodelay(stream, 1);
|
||||||
|
int ret = uv_tcp_keepalive(stream, 5, 5);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
|
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
|
||||||
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
|
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
|
||||||
pool->index = 0;
|
pool->index = 0;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
|
||||||
typedef struct SSrvConn {
|
typedef struct SSrvConn {
|
||||||
|
T_REF_DECLARE()
|
||||||
uv_tcp_t* pTcp;
|
uv_tcp_t* pTcp;
|
||||||
uv_write_t* pWriter;
|
uv_write_t* pWriter;
|
||||||
uv_timer_t* pTimer;
|
uv_timer_t* pTimer;
|
||||||
|
@ -32,11 +33,11 @@ typedef struct SSrvConn {
|
||||||
void* ahandle; //
|
void* ahandle; //
|
||||||
void* hostThrd;
|
void* hostThrd;
|
||||||
SArray* srvMsgs;
|
SArray* srvMsgs;
|
||||||
// void* pSrvMsg;
|
|
||||||
|
bool broken; // conn broken;
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
struct sockaddr_in locaddr;
|
struct sockaddr_in locaddr;
|
||||||
|
|
||||||
// SRpcMsg sendMsg;
|
// SRpcMsg sendMsg;
|
||||||
// del later
|
// del later
|
||||||
char secured;
|
char secured;
|
||||||
|
@ -64,19 +65,23 @@ typedef struct SWorkThrdObj {
|
||||||
queue conn;
|
queue conn;
|
||||||
pthread_mutex_t msgMtx;
|
pthread_mutex_t msgMtx;
|
||||||
void* pTransInst;
|
void* pTransInst;
|
||||||
|
bool stop;
|
||||||
} SWorkThrdObj;
|
} SWorkThrdObj;
|
||||||
|
|
||||||
typedef struct SServerObj {
|
typedef struct SServerObj {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
uv_tcp_t server;
|
uv_tcp_t server;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
|
|
||||||
|
// work thread info
|
||||||
int workerIdx;
|
int workerIdx;
|
||||||
int numOfThreads;
|
int numOfThreads;
|
||||||
SWorkThrdObj** pThreadObj;
|
SWorkThrdObj** pThreadObj;
|
||||||
uv_pipe_t** pipe;
|
|
||||||
uint32_t ip;
|
uv_pipe_t** pipe;
|
||||||
uint32_t port;
|
uint32_t ip;
|
||||||
uv_async_t* pAcceptAsync; // just to quit from from accept thread
|
uint32_t port;
|
||||||
|
uv_async_t* pAcceptAsync; // just to quit from from accept thread
|
||||||
} SServerObj;
|
} SServerObj;
|
||||||
|
|
||||||
static const char* notify = "a";
|
static const char* notify = "a";
|
||||||
|
@ -202,7 +207,6 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pConn->inType = pHead->msgType;
|
pConn->inType = pHead->msgType;
|
||||||
// assert(transIsReq(pHead->msgType));
|
|
||||||
|
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
|
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
|
@ -226,7 +230,8 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
rpcMsg.handle = pConn;
|
rpcMsg.handle = pConn;
|
||||||
|
|
||||||
transClearBuffer(&pConn->readBuf);
|
transClearBuffer(&pConn->readBuf);
|
||||||
pConn->ref++;
|
|
||||||
|
transRefSrvHandle(pConn);
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType),
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType),
|
||||||
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
||||||
ntohs(pConn->locaddr.sin_port), rpcMsg.contLen);
|
ntohs(pConn->locaddr.sin_port), rpcMsg.contLen);
|
||||||
|
@ -251,23 +256,20 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (nread == UV_EOF) {
|
|
||||||
tError("server conn %p read error: %s", conn, uv_err_name(nread));
|
|
||||||
if (conn->ref > 1) {
|
|
||||||
conn->ref++; // ref > 1 signed that write is in progress
|
|
||||||
}
|
|
||||||
destroyConn(conn, true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (nread < 0 || nread != UV_EOF) {
|
|
||||||
if (conn->ref > 1) {
|
tError("server conn %p read error: %s", conn, uv_err_name(nread));
|
||||||
conn->ref++; // ref > 1 signed that write is in progress
|
if (nread < 0 || nread == UV_EOF) {
|
||||||
}
|
conn->broken = true;
|
||||||
tError("server conn %p read error: %s", conn, uv_err_name(nread));
|
transUnrefSrvHandle(conn);
|
||||||
destroyConn(conn, true);
|
|
||||||
|
// if (conn->ref > 1) {
|
||||||
|
// conn->ref++; // ref > 1 signed that write is in progress
|
||||||
|
//}
|
||||||
|
// tError("server conn %p read error: %s", conn, uv_err_name(nread));
|
||||||
|
// destroyConn(conn, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
|
@ -300,10 +302,9 @@ void uvOnWriteCb(uv_write_t* req, int status) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
|
tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
|
||||||
//
|
conn->broken = false;
|
||||||
destroyConn(conn, true);
|
transUnrefSrvHandle(conn);
|
||||||
}
|
}
|
||||||
// opt
|
|
||||||
}
|
}
|
||||||
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
|
@ -349,15 +350,18 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
|
||||||
|
|
||||||
SSrvConn* pConn = smsg->pConn;
|
SSrvConn* pConn = smsg->pConn;
|
||||||
uv_timer_stop(pConn->pTimer);
|
uv_timer_stop(pConn->pTimer);
|
||||||
|
|
||||||
// pConn->pSrvMsg = smsg;
|
|
||||||
// conn->pWriter->data = smsg;
|
|
||||||
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
|
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
|
||||||
}
|
}
|
||||||
static void uvStartSendResp(SSrvMsg* smsg) {
|
static void uvStartSendResp(SSrvMsg* smsg) {
|
||||||
// impl
|
// impl
|
||||||
SSrvConn* pConn = smsg->pConn;
|
SSrvConn* pConn = smsg->pConn;
|
||||||
pConn->ref--; //
|
|
||||||
|
if (pConn->broken == true) {
|
||||||
|
transUnrefSrvHandle(pConn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
transUnrefSrvHandle(pConn);
|
||||||
|
|
||||||
if (taosArrayGetSize(pConn->srvMsgs) > 0) {
|
if (taosArrayGetSize(pConn->srvMsgs) > 0) {
|
||||||
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
|
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
|
||||||
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
|
@ -382,7 +386,7 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
|
||||||
QUEUE_INIT(h);
|
QUEUE_INIT(h);
|
||||||
|
|
||||||
SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
|
SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
|
||||||
destroyConn(c, true);
|
transUnrefSrvHandle(c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void uvWorkerAsyncCb(uv_async_t* handle) {
|
void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
|
@ -390,11 +394,11 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
SWorkThrdObj* pThrd = item->pThrd;
|
SWorkThrdObj* pThrd = item->pThrd;
|
||||||
SSrvConn* conn = NULL;
|
SSrvConn* conn = NULL;
|
||||||
queue wq;
|
queue wq;
|
||||||
|
|
||||||
// batch process to avoid to lock/unlock frequently
|
// batch process to avoid to lock/unlock frequently
|
||||||
pthread_mutex_lock(&item->mtx);
|
pthread_mutex_lock(&item->mtx);
|
||||||
QUEUE_MOVE(&item->qmsg, &wq);
|
QUEUE_MOVE(&item->qmsg, &wq);
|
||||||
pthread_mutex_unlock(&item->mtx);
|
pthread_mutex_unlock(&item->mtx);
|
||||||
// pthread_mutex_unlock(&mtx);
|
|
||||||
|
|
||||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||||
queue* head = QUEUE_HEAD(&wq);
|
queue* head = QUEUE_HEAD(&wq);
|
||||||
|
@ -407,11 +411,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
}
|
}
|
||||||
if (msg->pConn == NULL) {
|
if (msg->pConn == NULL) {
|
||||||
free(msg);
|
free(msg);
|
||||||
|
bool noConn = QUEUE_IS_EMPTY(&pThrd->conn);
|
||||||
destroyAllConn(pThrd);
|
if (noConn == true) {
|
||||||
|
uv_loop_close(pThrd->loop);
|
||||||
uv_loop_close(pThrd->loop);
|
uv_stop(pThrd->loop);
|
||||||
uv_stop(pThrd->loop);
|
} else {
|
||||||
|
destroyAllConn(pThrd);
|
||||||
|
uv_loop_close(pThrd->loop);
|
||||||
|
pThrd->stop = true;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
uvStartSendResp(msg);
|
uvStartSendResp(msg);
|
||||||
}
|
}
|
||||||
|
@ -419,12 +427,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
}
|
}
|
||||||
static void uvAcceptAsyncCb(uv_async_t* async) {
|
static void uvAcceptAsyncCb(uv_async_t* async) {
|
||||||
SServerObj* srv = async->data;
|
SServerObj* srv = async->data;
|
||||||
|
tDebug("close server port %d", srv->port);
|
||||||
uv_close((uv_handle_t*)&srv->server, NULL);
|
uv_close((uv_handle_t*)&srv->server, NULL);
|
||||||
uv_stop(srv->loop);
|
uv_stop(srv->loop);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
||||||
tDebug("conn failed to shut down: %s", uv_err_name(status));
|
if (status != 0) {
|
||||||
|
tDebug("conn failed to shut down: %s", uv_err_name(status));
|
||||||
|
}
|
||||||
uv_close((uv_handle_t*)req->handle, uvDestroyConn);
|
uv_close((uv_handle_t*)req->handle, uvDestroyConn);
|
||||||
free(req);
|
free(req);
|
||||||
}
|
}
|
||||||
|
@ -493,13 +504,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
uv_tcp_init(pThrd->loop, pConn->pTcp);
|
uv_tcp_init(pThrd->loop, pConn->pTcp);
|
||||||
pConn->pTcp->data = pConn;
|
pConn->pTcp->data = pConn;
|
||||||
|
|
||||||
// uv_tcp_nodelay(pConn->pTcp, 1);
|
|
||||||
// uv_tcp_keepalive(pConn->pTcp, 1, 1);
|
|
||||||
|
|
||||||
// init write request, just
|
|
||||||
pConn->pWriter = calloc(1, sizeof(uv_write_t));
|
pConn->pWriter = calloc(1, sizeof(uv_write_t));
|
||||||
pConn->pWriter->data = pConn;
|
pConn->pWriter->data = pConn;
|
||||||
|
|
||||||
|
transSetConnOption((uv_tcp_t*)pConn->pTcp);
|
||||||
|
|
||||||
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
|
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
|
||||||
uv_os_fd_t fd;
|
uv_os_fd_t fd;
|
||||||
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
||||||
|
@ -508,14 +517,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
int addrlen = sizeof(pConn->addr);
|
int addrlen = sizeof(pConn->addr);
|
||||||
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
|
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
|
||||||
tError("server conn %p failed to get peer info", pConn);
|
tError("server conn %p failed to get peer info", pConn);
|
||||||
destroyConn(pConn, true);
|
transUnrefSrvHandle(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
addrlen = sizeof(pConn->locaddr);
|
addrlen = sizeof(pConn->locaddr);
|
||||||
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
|
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
|
||||||
tError("server conn %p failed to get local info", pConn);
|
tError("server conn %p failed to get local info", pConn);
|
||||||
destroyConn(pConn, true);
|
transUnrefSrvHandle(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -523,7 +532,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
tDebug("failed to create new connection");
|
tDebug("failed to create new connection");
|
||||||
destroyConn(pConn, true);
|
transUnrefSrvHandle(pConn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -599,7 +608,10 @@ static SSrvConn* createConn(void* hThrd) {
|
||||||
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
|
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
|
||||||
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
|
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
|
||||||
tTrace("conn %p created", pConn);
|
tTrace("conn %p created", pConn);
|
||||||
++pConn->ref;
|
|
||||||
|
pConn->broken = false;
|
||||||
|
|
||||||
|
transRefSrvHandle(pConn);
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -607,10 +619,6 @@ static void destroyConn(SSrvConn* conn, bool clear) {
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref);
|
|
||||||
if (--conn->ref > 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
|
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
|
||||||
|
@ -618,25 +626,26 @@ static void destroyConn(SSrvConn* conn, bool clear) {
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
}
|
}
|
||||||
conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
|
conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
|
||||||
QUEUE_REMOVE(&conn->queue);
|
|
||||||
|
|
||||||
if (clear) {
|
if (clear) {
|
||||||
tTrace("try to destroy conn %p", conn);
|
tTrace("try to destroy conn %p", conn);
|
||||||
uv_tcp_close_reset(conn->pTcp, uvDestroyConn);
|
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
|
||||||
// uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
|
uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
|
||||||
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
|
|
||||||
// uv_unref((uv_handle_t*)conn->pTcp);
|
|
||||||
// uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void uvDestroyConn(uv_handle_t* handle) {
|
static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
SSrvConn* conn = handle->data;
|
SSrvConn* conn = handle->data;
|
||||||
|
SWorkThrdObj* thrd = conn->hostThrd;
|
||||||
|
|
||||||
tDebug("server conn %p destroy", conn);
|
tDebug("server conn %p destroy", conn);
|
||||||
uv_timer_stop(conn->pTimer);
|
uv_timer_stop(conn->pTimer);
|
||||||
// free(conn->pTimer);
|
QUEUE_REMOVE(&conn->queue);
|
||||||
free(conn->pTcp);
|
free(conn->pTcp);
|
||||||
free(conn->pWriter);
|
free(conn->pWriter);
|
||||||
free(conn);
|
free(conn);
|
||||||
|
|
||||||
|
if (thrd->stop && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
|
uv_stop(thrd->loop);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
|
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
|
||||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||||
|
@ -671,6 +680,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
|
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
|
||||||
|
thrd->stop = false;
|
||||||
srv->pThreadObj[i] = thrd;
|
srv->pThreadObj[i] = thrd;
|
||||||
|
|
||||||
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
||||||
|
@ -720,8 +730,6 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
pthread_join(pThrd->thread, NULL);
|
pthread_join(pThrd->thread, NULL);
|
||||||
free(pThrd->loop);
|
free(pThrd->loop);
|
||||||
transDestroyAsyncPool(pThrd->asyncPool);
|
transDestroyAsyncPool(pThrd->asyncPool);
|
||||||
|
|
||||||
// free(pThrd->workerAsync);
|
|
||||||
free(pThrd);
|
free(pThrd);
|
||||||
}
|
}
|
||||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
|
@ -755,6 +763,28 @@ void taosCloseServer(void* arg) {
|
||||||
free(srv);
|
free(srv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void transRefSrvHandle(void* handle) {
|
||||||
|
if (handle == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
SSrvConn* conn = handle;
|
||||||
|
|
||||||
|
int ref = T_REF_INC((SSrvConn*)handle);
|
||||||
|
UNUSED(ref);
|
||||||
|
}
|
||||||
|
|
||||||
|
void transUnrefSrvHandle(void* handle) {
|
||||||
|
if (handle == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int ref = T_REF_DEC((SSrvConn*)handle);
|
||||||
|
tDebug("handle %p ref count: %d", handle, ref);
|
||||||
|
|
||||||
|
if (ref == 0) {
|
||||||
|
destroyConn((SSrvConn*)handle, true);
|
||||||
|
}
|
||||||
|
// unref srv handle
|
||||||
|
}
|
||||||
void rpcSendResponse(const SRpcMsg* pMsg) {
|
void rpcSendResponse(const SRpcMsg* pMsg) {
|
||||||
if (pMsg->handle == NULL) {
|
if (pMsg->handle == NULL) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -124,6 +124,7 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.ckey = "key";
|
rpcInit.ckey = "key";
|
||||||
rpcInit.spi = 1;
|
rpcInit.spi = 1;
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcDebugFlag = 143;
|
||||||
|
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
|
|
|
@ -125,6 +125,8 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.idleTime = 2 * 1500;
|
rpcInit.idleTime = 2 * 1500;
|
||||||
rpcInit.afp = retrieveAuthInfo;
|
rpcInit.afp = retrieveAuthInfo;
|
||||||
|
|
||||||
|
rpcDebugFlag = 143;
|
||||||
|
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
rpcInit.localPort = atoi(argv[++i]);
|
rpcInit.localPort = atoi(argv[++i]);
|
||||||
|
|
Loading…
Reference in New Issue